Rensei docs
Runtime

DAG Executor

Kahn-BFS tiers, memoization, and null propagation.

The DAG executor is the core runtime engine that turns a published workflow definition into ordered, parallel-safe execution. It is a pure computation module - no database or Redis dependencies - that accepts an ExecutionGraph and a StepExecutorFn injected by the host, and drives execution tier-by-tier.

Graph construction

DAGExecutor.buildGraph(definition) converts a WorkflowDefinitionV2 spec into an ExecutionGraph:

  • Each spec.steps[] entry becomes an ExecutionNode keyed by step.id.
  • Edges are derived from step.next[] (unlabeled) and step.branches{} (labeled, for condition/gate routing).
  • Branch labels on branches edges double as sourceHandle identifiers so the W4 null-propagation check can pull the correct per-handle output type.
  • Group definitions (spec.groups[]) are threaded through so loop nodes can compile their body sub-graphs at run time.
const graph = DAGExecutor.buildGraph(definition)
// graph.nodes - Map<nodeId, ExecutionNode>
// graph.edges - ExecutionEdge[]  (source, target, label?, sourceHandle?)
// graph.groups - SerializedGroupDefinition[] | undefined

Kahn-BFS tier computation

computeTiers(graph, opts?) implements Kahn's algorithm to produce parallel execution tiers:

  1. Build an in-degree map for every node in the active subgraph.
  2. Collect all zero-in-degree nodes into Tier 0.
  3. "Remove" those nodes - decrement the in-degree of each successor.
  4. New zero-in-degree nodes form the next tier.
  5. Repeat until all nodes are assigned. A remaining unassigned node indicates a cycle and throws.

Nodes in the same tier have no dependency relationship and run concurrently via Promise.allSettled. This means a ten-step linear chain runs as ten sequential tiers of one, while a fan-out of ten parallel branches runs as a single tier of ten.

Multi-trigger subgraph pruning

When a workflow declares multiple trigger nodes (e.g. one for linear.agent_session.created and another for linear.issue.status_changed), only the firing trigger's downstream subgraph should execute on any given instance. The dispatcher records subscription.triggerId on the instance's triggerEvent.triggerId field; the tier computation reads this as opts.firingTriggerId and performs a forward BFS from the matching trigger node to compute the active subgraph before Kahn's pass runs.

const tiers = computeTiers(graph, { firingTriggerId: 'linear.agent_session.created' })
// → only nodes reachable from the matching trigger node are included

When firingTriggerId is absent or doesn't match any trigger node, the algorithm falls back to legacy "all roots in tier 0" semantics - preserving single-trigger and manual-dispatch workflows unchanged.

Condition and gate branching

After a condition node completes, applyConditionBranching marks non-taken downstream branches as skipped. The branch label returned by resolveBranchLabel(result.output) is matched against outgoing edge labels, with aliases for true/yes and false/no. Switch-mode conditions return a string label; if no edge matches, the executor routes through a default edge when one is present.

Every non-taken edge is recorded in an internal deadEdgeKeys set. The checkUpstreamSkipCascade guard consults this set so a downstream node with multiple inbound condition sources - none of which routed to it - is correctly skipped even when one of those sources completed normally (SUP-1825 fix).

Gate nodes (type 'gate') return status: 'waiting' instead of completing, which causes the executor to stop after the current tier and return { status: 'waiting' } to the caller. See Workflow Gates for the full suspension and resume lifecycle.

Step memoization and retry

The StepExecutor wraps every action through a dual-persist memoization layer:

  • In-flight: An in-memory memoization cache keyed by (instanceId, stepId) prevents a step from re-executing if a concurrent tier happens to dispatch it twice.
  • Crash recovery: A step_executions PostgreSQL row is written before the node starts and updated when it completes. On a gate resume, loadCompletedStepResults hydrates the results map from this table so already-completed tiers are skipped.

The default retry policy applied to every node that doesn't declare its own:

FieldDefault
maxAttempts3
backoff'exponential'
initialDelayMs1 000 ms
maxDelayMs30 000 ms

You can override per node in the YAML definition:

- id: call-external-api
  type: action
  config:
    action: github.pr.create
    retryPolicy:
      maxAttempts: 5
      backoff: exponential
      initialDelayMs: 2000
      maxDelayMs: 60000

Backoff formula for exponential: initialDelayMs × 2^(attempt-1), capped at maxDelayMs.

Null propagation (W4)

When typed port declarations are present on a node, the executor checks whether required input ports would receive null before scheduling the node. A node whose required port would receive null is skipped with skipReason: { kind: 'null_propagation' } - it is not failed, so the workflow continues along other branches.

The skip cascades: a skipped node's downstream targets accumulate skips transitively until a node with other live incoming edges is reached.

Optional ports accept null and allow execution to proceed. Nodes without any declared ports use legacy "untyped" semantics - all inputs are treated as optional.

// SkipReason union (inspector + memoization surface)
type SkipReason =
  | { kind: 'annotation' }
  | { kind: 'when_guard'; expression: string }
  | { kind: 'condition_branch'; conditionNodeId: string }
  | { kind: 'null_propagation'; sourceNodeId: string; portId: string }
  | { kind: 'upstream_skipped'; sourceNodeId: string }

When guards

Any node may carry a when expression field. This is evaluated as a boolean against the current ExecutionContext using the expression evaluator's $nodes / $trigger / $config namespaces. A guard that evaluates to false skips the node. Parse errors and unresolvable references default to allow - a malformed guard never silently skips a node.

- id: maybe-open-pr
  type: action
  config:
    action: github.pr.create
  when: "{{ nodes.classify.output.needsPr }}"

Executor event stream

The DAG executor emits ExecutorEvent values via the onEvent callback on every state transition. In production, createRedisEventHandler(instanceId) publishes these to the wf:<instanceId>:events Redis Pub/Sub channel, which the canvas's SSE endpoint consumes for live overlay updates. See Execution Streaming for the full streaming architecture.

type ExecutorEvent =
  | { type: 'tier_start'; tier: number; nodeIds: string[] }
  | { type: 'node_start'; nodeId: string; tier: number }
  | { type: 'node_complete'; nodeId: string; tier: number; result: StepResult }
  | { type: 'node_retry'; nodeId: string; tier: number; attempt: number; error: string }
  | { type: 'node_waiting'; nodeId: string; tier: number }
  | { type: 'node_skipped'; nodeId: string; tier: number; reason: SkipReason }
  | { type: 'node_mocked'; nodeId: string; tier: number }
  | { type: 'tier_complete'; tier: number }
  | { type: 'execution_complete'; status: 'completed' | 'failed' | 'waiting' }

Test mode

Pass opts.test = { runMode: 'test' } to execute to run the workflow with all nodes auto-mocked. Nodes in unmockNodeIds run for real; nodes without a declared mock fall through to real execution and are flagged with fallthrough: true. Gate nodes without a mock auto-complete in test mode so the workflow proceeds past them without blocking.

The executor returns a testSummary on the ExecutionResult with per-node outcomes, assertion pass/fail counts, and the list of fallthrough (uncovered) nodes. See Testing Workflows for the full test-run surface.

Instance lifecycle states

The full workflow instance state machine managed by InstanceLifecycle:

StatusDescription
pendingCreated, not yet started
runningDAG executor is active
suspendedPaused at a gate, awaiting signal
completedAll tiers finished successfully
failedOne or more nodes failed after retries
cancelledOperator-cancelled via API

Terminal states (completed, failed, cancelled) can never transition back to running. Concurrent-instance limits are enforced by InstanceLifecycle before run-instance.ts starts the DAG.

On this page