Build::GraphSourceBuildGraphWalker

class Walker

A walker walks over a graph and applies a task to each node.

Definitions

def self.for(task_class, *args, **options)

Create a walker that instantiates tasks of the given class for each visited node.

Signature

parameter task_class Class

the task class to instantiate; must accept (walker, node, *args).

returns Walker

a new walker bound to the given task class.

Implementation

def self.for(task_class, *args, **options)
	self.new(**options) do |walker, node, parent_task = nil|
		task = task_class.new(walker, node, *args)
		
		task.visit do
			task.update
		end
	end
end

def initialize(&block)

Create a walker with a block that is called for each node to create and run a task.

Signature

parameter block Proc

called with (walker, node, parent_task) for each visited node.

Implementation

def initialize(&block)
	# Node -> Task mapping.
	@tasks = {}
	
	@update = block
	
	# A list of paths which are currently being generated by tasks:
	@outputs = {}
	
	@parents = Hash.new{|h,k| h[k] = []}
	
	# Failed output paths:
	@failed_tasks = []
	@failed_outputs = Set.new
	
	@monitor = Files::Monitor.new
end

attr :tasks

An Array of all instantiated tasks.

attr :outputs

An Array of transient outputs which are currently being generated.

def update(nodes)

Invoke the walker for each node in the given collection.

Signature

parameter nodes Enumerable

the nodes to update.

Implementation

def update(nodes)
	Array(nodes).each do |node|
		self.call(node)
	end
end

def call(node, parent_task = nil)

Invoke the walker for a single node, reusing an existing task if the node has already been visited.

Signature

parameter node Node

the node to update.

parameter parent_task Task | nil

the parent task, if any.

returns Task

the task associated with the node.

Implementation

def call(node, parent_task = nil)
	# We try to fetch the task if it has already been invoked, otherwise we create a new task.
	@tasks.fetch(node) do
		Console.debug(self){"Update: #{node} #{parent_task.class}"}
		
		# This method should add the node
		@update.call(self, node, parent_task)
		
		# This should now be defined:
		return @tasks[node]
	end
end

def failed?

Signature

returns Boolean

whether any tasks have failed during this walk.

Implementation

def failed?
	@failed_tasks.size > 0
end

def wait_on_paths(task, paths)

Block the current fiber until all tasks generating the given paths have completed.

Signature

parameter task Task

the task waiting on these paths.

parameter paths Build::Files::List

the paths to wait on.

returns Boolean

true if all paths are available, false if any failed.

Implementation

def wait_on_paths(task, paths)
	# If there are no paths, we are done:
	return true if paths.count == 0
	
	# We create a new directed hyper-graph edge which waits for all paths to be ready (or failed):
	edge = Edge.new
	
	paths = paths.collect(&:to_s)
	
	paths.each do |path|
		# Is there a task generating this output?
		if outputs = @outputs[path]
			Console.debug(self){"Task #{task} is waiting on path #{path}"}
			
			# When the output is ready, trigger this edge:
			outputs << edge
			edge.increment!
		elsif !File.exist?(path)
			Console.warn(self){"Task #{task} is waiting on paths which don't exist and are not being generated!"}
			raise RuntimeError, "File #{path} is not being generated by any active task!"
			# What should we do about paths which haven't been registered as outputs?
			# Either they exist - or they don't.
			# If they exist, it means they are probably static inputs of the build graph.
			# If they don't, it might be an error, or it might be deliberate.
		end
	end
	
	failed = paths.any?{|path| @failed_outputs.include?(path)}
	
	return edge.wait && !failed
end

def wait_for_children(parent, children)

A parent task only completes once all it's children are complete.

Implementation

def wait_for_children(parent, children)
	# Consider only incomplete/failed children:
	children = children.select{|child| !child.complete?}
	
	# If there are no children like this, then done:
	return true if children.size == 0
	
	Console.debug(self){"Task #{parent} is waiting on #{children.count} children"}
	
	# Otherwise, construct an edge to track state changes:
	edge = Edge.new
	
	children.each do |child|
		if child.failed?
			edge.skip!(child)
		else
			# We are waiting for this child to finish:
			edge.increment!
			
			@parents[child.node] << edge
		end
	end
	
	return edge.wait
end

def enter(task)

Register a task as active and record its declared output paths.

Signature

parameter task Task

the task that is beginning execution.

Implementation

def enter(task)
	
	@tasks[task.node] = task
	
	# In order to wait on outputs, they must be known before entering the task. This might seem odd, but unless we know outputs are being generated, waiting for them to complete is impossible - unless this was somehow specified ahead of time. The implications of this logic is that all tasks must be sequential in terms of output -> input chaning. This is by design and is not a problem in practice.
	
	if outputs = task.outputs
		Console.debug(self) do |buffer|
			buffer.puts "Task will generate outputs:"
			Array(outputs).each do |output|
				buffer.puts output.inspect
			end
		end
		
		outputs.each do |path|
			# Tasks which have children tasks may list the same output twice. This is not a bug.
			@outputs[path.to_s] ||= []
		end
	end
end

def exit(task)

Mark a task as finished, resolve its output paths and notify any waiting tasks.

Signature

parameter task Task

the task that has finished execution.

Implementation

def exit(task)
	
	# Fail outputs if the node failed:
	if task.failed?
		@failed_tasks << task
		
		if task.outputs
			@failed_outputs += task.outputs.collect{|path| path.to_s}
		end
	end
	
	# Clean the node's outputs:
	task.outputs.each do |path|
		path = path.to_s
		
		Console.debug(self){"File #{task.failed? ? 'failed' : 'available'}: #{path}"}
		
		if edges = @outputs.delete(path)
			# @logger&.debug "\tUpdating #{edges.count} edges..."
			edges.each{|edge| edge.traverse(task)}
		end
	end
	
	# Notify the parent nodes that the child is done:
	if parents = @parents.delete(task.node)
		parents.each{|edge| edge.traverse(task)}
	end
	
	@monitor.add(task)
end

def delete(node)

Remove a node and its associated task from the walker, e.g. after it has been invalidated.

Signature

parameter node Node

the node to remove.

Implementation

def delete(node)
	
	if task = @tasks.delete(node)
		@monitor.delete(task)
	end
end

def clear_failed

Remove all failed tasks from the walker so they can be retried on the next update.

Implementation

def clear_failed
	@failed_tasks.each do |task|
		self.delete(task.node)
	end if @failed_tasks
	
	@failed_tasks = []
	@failed_outputs = Set.new
end

def run(**options)

Run an update loop, re-executing the given block whenever the monitor detects filesystem changes.

Implementation

def run(**options)
	yield
	
	monitor.run(**options) do
		yield
	end
end

def inspect

Signature

returns String

a human-readable summary of the walker state.

Implementation

def inspect
	"\#<#{self.class}:0x#{self.object_id.to_s(16)} #{@tasks.count} tasks, #{@failed_tasks.count} failed>"
end