Execution Streaming
Redis pub/sub SSE to canvas for real-time execution events.
When a workflow instance runs, the DAG executor emits a stream of ExecutorEvent values for every node state transition. These are published to a Redis Pub/Sub channel and forwarded to connected browser clients over Server-Sent Events (SSE), enabling the execution canvas to display live per-node status overlays without polling.
Architecture
Publisher: Redis Pub/Sub
The createRedisEventHandler(instanceId) factory creates an ExecutorEventHandler that publishes each event to the Redis channel wf:<instanceId>:events as JSON. This handler is passed to DAGExecutor as the onEvent callback at run time:
import { createRedisEventHandler } from '@/lib/workflow/executor/execution-events'
import { DAGExecutor } from '@/lib/workflow/executor/dag-executor'
const eventHandler = createRedisEventHandler(instanceId)
const dag = new DAGExecutor({ onEvent: eventHandler, /* ... */ })Publishing is fire-and-forget: errors are logged to console.error but never thrown, so a Redis publish failure never interrupts workflow execution.
The channel name follows the pattern wf:{instanceId}:events. A single publisher per instance run; the channel is never explicitly deleted - it expires naturally once Redis evicts the key or the subscription is cleaned up.
Subscriber: AsyncIterable
subscribeToExecutionEvents(instanceId) creates a dedicated Redis connection via redis.duplicate() (required by ioredis for subscriber mode) and returns an AsyncIterable<ExecutorEvent> plus a cleanup() function. Incoming messages are buffered and resolved through a promise-based queue, so the SSE route can consume them with a standard for await loop:
const { events, cleanup } = await subscribeToExecutionEvents(instanceId)
try {
for await (const event of events) {
if (event.type === 'execution_complete') break
// forward to SSE
}
} finally {
await cleanup()
}The cleanup() function unsubscribes and quits the subscriber connection. It is always called - via finally blocks and the request AbortSignal - to prevent leaked connections.
SSE endpoint
GET /api/workflows/{workflowId}/instances/{instanceId}/stream
Authorization: Bearer rsk_live_... | session cookieThe route:
- Verifies workflow and instance ownership (workspace-scoped).
- Returns a terminal
completeevent immediately if the instance is already in a terminal state (completedorfailed) - no subscription needed. - Creates the
ReadableStreambacked by theAsyncIterable. - Sends a
heartbeatevent every 15 seconds to keep the connection alive through proxies and load balancers. - Listens for
request.signal.abortto clean up on client disconnect. - Forwards each executor event as an SSE message.
execution_completeevents emit as event typecompleteand close the stream; all other events emit as event typestep_state.
Response headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-aliveEvent types
Every event published to the Redis channel is an ExecutorEvent discriminated union:
Prop
Type
Consuming the SSE stream
From a browser (EventSource)
const es = new EventSource(
`/api/workflows/${workflowId}/instances/${instanceId}/stream`,
{ withCredentials: true }
)
es.addEventListener('step_state', (e) => {
const event = JSON.parse(e.data)
console.log('executor event', event.type, event.nodeId)
})
es.addEventListener('complete', (e) => {
const { status } = JSON.parse(e.data)
console.log('execution finished', status)
es.close()
})
es.addEventListener('heartbeat', () => {
// keep-alive - no action needed
})
es.onerror = () => es.close()From a curl session
curl -N \
-H "Authorization: Bearer rsk_live_..." \
"https://app.rensei.ai/api/workflows/<wfId>/instances/<instanceId>/stream"
# Sample output:
# event: step_state
# data: {"type":"tier_start","tier":0,"nodeIds":["trigger-1"]}
#
# event: step_state
# data: {"type":"node_start","nodeId":"trigger-1","tier":0}
#
# event: step_state
# data: {"type":"node_complete","nodeId":"trigger-1","tier":0,"result":{"status":"completed","output":{"issueId":"LIN-123"}}}
#
# event: complete
# data: {"status":"completed"}Execution detail panel
In the workflow editor's Execution Mode, the ExecutionDetailPanel component subscribes to this SSE endpoint and overlays per-node status badges on the canvas in real time:
- Running - pulsing spinner on the node card.
- Completed - green check badge.
- Failed - red X badge with expandable error summary.
- Skipped - grey skip badge, expandable to show
skipReason. - Waiting - clock/pause badge (gate suspended).
- Mocked - yellow star badge (test mode).
The panel also surfaces the execution error summary (from node_complete events with result.status === 'failed') inline below the canvas.
Operational notes
Each connected browser tab holds a dedicated Redis subscriber connection. For high-concurrency scenarios (many simultaneous canvas viewers on the same instance), consider the connection budget of your Redis deployment. Each subscription is cleaned up when the browser disconnects or the stream closes.
- The SSE endpoint uses the same
getCliOrSessionAuthauth path as other platform API routes - both session cookies and bearerrsk_*API keys are accepted. - There is no SSE replay: if a client connects after the execution has already completed, the endpoint returns a single
completeevent immediately (reading from theworkflow_instances.statuscolumn) rather than replaying historical events. - Execution events are not written to the audit log - they are ephemeral observability signals. The durable record of execution is in
step_executionsrows and theworkflow_instancesstatus/results columns.
Related pages
- DAG Executor - the event emitter; tier computation and memoization
- Workflow Gates -
node_waitingevents and suspended instance resume - Loop Executor - loop iterations produce
node_start/node_completepairs per iteration