class Walker
A walker walks over a graph and applies a task to each node.
Definitions
def self.for(task_class, *args, **options)
Create a walker that instantiates tasks of the given class for each visited node.
Signature
-
parameter
task_classClass the task class to instantiate; must accept
(walker, node, *args).-
returns
Walker a new walker bound to the given task class.
Implementation
def self.for(task_class, *args, **options)
self.new(**options) do |walker, node, parent_task = nil|
task = task_class.new(walker, node, *args)
task.visit do
task.update
end
end
end
def initialize(&block)
Create a walker with a block that is called for each node to create and run a task.
Signature
-
parameter
blockProc called with
(walker, node, parent_task)for each visited node.
Implementation
def initialize(&block)
# Node -> Task mapping.
@tasks = {}
@update = block
# A list of paths which are currently being generated by tasks:
@outputs = {}
@parents = Hash.new{|h,k| h[k] = []}
# Failed output paths:
@failed_tasks = []
@failed_outputs = Set.new
@monitor = Files::Monitor.new
end
attr :tasks
An Array of all instantiated tasks.
attr :outputs
An Array of transient outputs which are currently being generated.
def update(nodes)
Invoke the walker for each node in the given collection.
Signature
-
parameter
nodesEnumerable the nodes to update.
Implementation
def update(nodes)
Array(nodes).each do |node|
self.call(node)
end
end
def call(node, parent_task = nil)
Invoke the walker for a single node, reusing an existing task if the node has already been visited.
Signature
-
parameter
nodeNode the node to update.
-
parameter
parent_taskTask | nil the parent task, if any.
-
returns
Task the task associated with the node.
Implementation
def call(node, parent_task = nil)
# We try to fetch the task if it has already been invoked, otherwise we create a new task.
@tasks.fetch(node) do
Console.debug(self){"Update: #{node} #{parent_task.class}"}
# This method should add the node
@update.call(self, node, parent_task)
# This should now be defined:
return @tasks[node]
end
end
def failed?
Signature
-
returns
Boolean whether any tasks have failed during this walk.
Implementation
def failed?
@failed_tasks.size > 0
end
def wait_on_paths(task, paths)
Block the current fiber until all tasks generating the given paths have completed.
Signature
-
parameter
taskTask the task waiting on these paths.
-
parameter
pathsBuild::Files::List the paths to wait on.
-
returns
Boolean true if all paths are available, false if any failed.
Implementation
def wait_on_paths(task, paths)
# If there are no paths, we are done:
return true if paths.count == 0
# We create a new directed hyper-graph edge which waits for all paths to be ready (or failed):
edge = Edge.new
paths = paths.collect(&:to_s)
paths.each do |path|
# Is there a task generating this output?
if outputs = @outputs[path]
Console.debug(self){"Task #{task} is waiting on path #{path}"}
# When the output is ready, trigger this edge:
outputs << edge
edge.increment!
elsif !File.exist?(path)
Console.warn(self){"Task #{task} is waiting on paths which don't exist and are not being generated!"}
raise RuntimeError, "File #{path} is not being generated by any active task!"
# What should we do about paths which haven't been registered as outputs?
# Either they exist - or they don't.
# If they exist, it means they are probably static inputs of the build graph.
# If they don't, it might be an error, or it might be deliberate.
end
end
failed = paths.any?{|path| @failed_outputs.include?(path)}
return edge.wait && !failed
end
def wait_for_children(parent, children)
A parent task only completes once all it's children are complete.
Implementation
def wait_for_children(parent, children)
# Consider only incomplete/failed children:
children = children.select{|child| !child.complete?}
# If there are no children like this, then done:
return true if children.size == 0
Console.debug(self){"Task #{parent} is waiting on #{children.count} children"}
# Otherwise, construct an edge to track state changes:
edge = Edge.new
children.each do |child|
if child.failed?
edge.skip!(child)
else
# We are waiting for this child to finish:
edge.increment!
@parents[child.node] << edge
end
end
return edge.wait
end
def enter(task)
Register a task as active and record its declared output paths.
Signature
-
parameter
taskTask the task that is beginning execution.
Implementation
def enter(task)
@tasks[task.node] = task
# In order to wait on outputs, they must be known before entering the task. This might seem odd, but unless we know outputs are being generated, waiting for them to complete is impossible - unless this was somehow specified ahead of time. The implications of this logic is that all tasks must be sequential in terms of output -> input chaning. This is by design and is not a problem in practice.
if outputs = task.outputs
Console.debug(self) do |buffer|
buffer.puts "Task will generate outputs:"
Array(outputs).each do |output|
buffer.puts output.inspect
end
end
outputs.each do |path|
# Tasks which have children tasks may list the same output twice. This is not a bug.
@outputs[path.to_s] ||= []
end
end
end
def exit(task)
Mark a task as finished, resolve its output paths and notify any waiting tasks.
Signature
-
parameter
taskTask the task that has finished execution.
Implementation
def exit(task)
# Fail outputs if the node failed:
if task.failed?
@failed_tasks << task
if task.outputs
@failed_outputs += task.outputs.collect{|path| path.to_s}
end
end
# Clean the node's outputs:
task.outputs.each do |path|
path = path.to_s
Console.debug(self){"File #{task.failed? ? 'failed' : 'available'}: #{path}"}
if edges = @outputs.delete(path)
# @logger&.debug "\tUpdating #{edges.count} edges..."
edges.each{|edge| edge.traverse(task)}
end
end
# Notify the parent nodes that the child is done:
if parents = @parents.delete(task.node)
parents.each{|edge| edge.traverse(task)}
end
@monitor.add(task)
end
def delete(node)
Remove a node and its associated task from the walker, e.g. after it has been invalidated.
Signature
-
parameter
nodeNode the node to remove.
Implementation
def delete(node)
if task = @tasks.delete(node)
@monitor.delete(task)
end
end
def clear_failed
Remove all failed tasks from the walker so they can be retried on the next update.
Implementation
def clear_failed
@failed_tasks.each do |task|
self.delete(task.node)
end if @failed_tasks
@failed_tasks = []
@failed_outputs = Set.new
end
def run(**options)
Run an update loop, re-executing the given block whenever the monitor detects filesystem changes.
Implementation
def run(**options)
yield
monitor.run(**options) do
yield
end
end
def inspect
Signature
-
returns
String a human-readable summary of the walker state.
Implementation
def inspect
"\#<#{self.class}:0x#{self.object_id.to_s(16)} #{@tasks.count} tasks, #{@failed_tasks.count} failed>"
end