DAG Executor
Kahn-BFS tiers, memoization, and null propagation.
The DAG executor is the core runtime engine that turns a published workflow definition into ordered, parallel-safe execution. It is a pure computation module - no database or Redis dependencies - that accepts an ExecutionGraph and a StepExecutorFn injected by the host, and drives execution tier-by-tier.
Graph construction
DAGExecutor.buildGraph(definition) converts a WorkflowDefinitionV2 spec into an ExecutionGraph:
- Each
spec.steps[]entry becomes anExecutionNodekeyed bystep.id. - Edges are derived from
step.next[](unlabeled) andstep.branches{}(labeled, for condition/gate routing). - Branch labels on
branchesedges double assourceHandleidentifiers so the W4 null-propagation check can pull the correct per-handle output type. - Group definitions (
spec.groups[]) are threaded through so loop nodes can compile their body sub-graphs at run time.
const graph = DAGExecutor.buildGraph(definition)
// graph.nodes - Map<nodeId, ExecutionNode>
// graph.edges - ExecutionEdge[] (source, target, label?, sourceHandle?)
// graph.groups - SerializedGroupDefinition[] | undefinedKahn-BFS tier computation
computeTiers(graph, opts?) implements Kahn's algorithm to produce parallel execution tiers:
- Build an in-degree map for every node in the active subgraph.
- Collect all zero-in-degree nodes into Tier 0.
- "Remove" those nodes - decrement the in-degree of each successor.
- New zero-in-degree nodes form the next tier.
- Repeat until all nodes are assigned. A remaining unassigned node indicates a cycle and throws.
Nodes in the same tier have no dependency relationship and run concurrently via Promise.allSettled. This means a ten-step linear chain runs as ten sequential tiers of one, while a fan-out of ten parallel branches runs as a single tier of ten.
Multi-trigger subgraph pruning
When a workflow declares multiple trigger nodes (e.g. one for linear.agent_session.created and another for linear.issue.status_changed), only the firing trigger's downstream subgraph should execute on any given instance. The dispatcher records subscription.triggerId on the instance's triggerEvent.triggerId field; the tier computation reads this as opts.firingTriggerId and performs a forward BFS from the matching trigger node to compute the active subgraph before Kahn's pass runs.
const tiers = computeTiers(graph, { firingTriggerId: 'linear.agent_session.created' })
// → only nodes reachable from the matching trigger node are includedWhen firingTriggerId is absent or doesn't match any trigger node, the algorithm falls back to legacy "all roots in tier 0" semantics - preserving single-trigger and manual-dispatch workflows unchanged.
Condition and gate branching
After a condition node completes, applyConditionBranching marks non-taken downstream branches as skipped. The branch label returned by resolveBranchLabel(result.output) is matched against outgoing edge labels, with aliases for true/yes and false/no. Switch-mode conditions return a string label; if no edge matches, the executor routes through a default edge when one is present.
Every non-taken edge is recorded in an internal deadEdgeKeys set. The checkUpstreamSkipCascade guard consults this set so a downstream node with multiple inbound condition sources - none of which routed to it - is correctly skipped even when one of those sources completed normally (SUP-1825 fix).
Gate nodes (type 'gate') return status: 'waiting' instead of completing, which causes the executor to stop after the current tier and return { status: 'waiting' } to the caller. See Workflow Gates for the full suspension and resume lifecycle.
Step memoization and retry
The StepExecutor wraps every action through a dual-persist memoization layer:
- In-flight: An in-memory memoization cache keyed by
(instanceId, stepId)prevents a step from re-executing if a concurrent tier happens to dispatch it twice. - Crash recovery: A
step_executionsPostgreSQL row is written before the node starts and updated when it completes. On a gate resume,loadCompletedStepResultshydrates the results map from this table so already-completed tiers are skipped.
The default retry policy applied to every node that doesn't declare its own:
| Field | Default |
|---|---|
maxAttempts | 3 |
backoff | 'exponential' |
initialDelayMs | 1 000 ms |
maxDelayMs | 30 000 ms |
You can override per node in the YAML definition:
- id: call-external-api
type: action
config:
action: github.pr.create
retryPolicy:
maxAttempts: 5
backoff: exponential
initialDelayMs: 2000
maxDelayMs: 60000Backoff formula for exponential: initialDelayMs × 2^(attempt-1), capped at maxDelayMs.
Null propagation (W4)
When typed port declarations are present on a node, the executor checks whether required input ports would receive null before scheduling the node. A node whose required port would receive null is skipped with skipReason: { kind: 'null_propagation' } - it is not failed, so the workflow continues along other branches.
The skip cascades: a skipped node's downstream targets accumulate skips transitively until a node with other live incoming edges is reached.
Optional ports accept null and allow execution to proceed. Nodes without any declared ports use legacy "untyped" semantics - all inputs are treated as optional.
// SkipReason union (inspector + memoization surface)
type SkipReason =
| { kind: 'annotation' }
| { kind: 'when_guard'; expression: string }
| { kind: 'condition_branch'; conditionNodeId: string }
| { kind: 'null_propagation'; sourceNodeId: string; portId: string }
| { kind: 'upstream_skipped'; sourceNodeId: string }When guards
Any node may carry a when expression field. This is evaluated as a boolean against the current ExecutionContext using the expression evaluator's $nodes / $trigger / $config namespaces. A guard that evaluates to false skips the node. Parse errors and unresolvable references default to allow - a malformed guard never silently skips a node.
- id: maybe-open-pr
type: action
config:
action: github.pr.create
when: "{{ nodes.classify.output.needsPr }}"Executor event stream
The DAG executor emits ExecutorEvent values via the onEvent callback on every state transition. In production, createRedisEventHandler(instanceId) publishes these to the wf:<instanceId>:events Redis Pub/Sub channel, which the canvas's SSE endpoint consumes for live overlay updates. See Execution Streaming for the full streaming architecture.
type ExecutorEvent =
| { type: 'tier_start'; tier: number; nodeIds: string[] }
| { type: 'node_start'; nodeId: string; tier: number }
| { type: 'node_complete'; nodeId: string; tier: number; result: StepResult }
| { type: 'node_retry'; nodeId: string; tier: number; attempt: number; error: string }
| { type: 'node_waiting'; nodeId: string; tier: number }
| { type: 'node_skipped'; nodeId: string; tier: number; reason: SkipReason }
| { type: 'node_mocked'; nodeId: string; tier: number }
| { type: 'tier_complete'; tier: number }
| { type: 'execution_complete'; status: 'completed' | 'failed' | 'waiting' }Test mode
Pass opts.test = { runMode: 'test' } to execute to run the workflow with all nodes auto-mocked. Nodes in unmockNodeIds run for real; nodes without a declared mock fall through to real execution and are flagged with fallthrough: true. Gate nodes without a mock auto-complete in test mode so the workflow proceeds past them without blocking.
The executor returns a testSummary on the ExecutionResult with per-node outcomes, assertion pass/fail counts, and the list of fallthrough (uncovered) nodes. See Testing Workflows for the full test-run surface.
Instance lifecycle states
The full workflow instance state machine managed by InstanceLifecycle:
| Status | Description |
|---|---|
pending | Created, not yet started |
running | DAG executor is active |
suspended | Paused at a gate, awaiting signal |
completed | All tiers finished successfully |
failed | One or more nodes failed after retries |
cancelled | Operator-cancelled via API |
Terminal states (completed, failed, cancelled) can never transition back to running. Concurrent-instance limits are enforced by InstanceLifecycle before run-instance.ts starts the DAG.
Related pages
- Workflow Gates - suspension, signal matching, and timer-driven resume
- Loop Executor - iteration semantics inside the DAG
- Execution Streaming - Redis Pub/Sub → SSE canvas overlay
- Workflow Expressions -
{{ }}template syntax forwhenguards and node configs