Rensei docs
Runtime

Execution Streaming

Redis pub/sub SSE to canvas for real-time execution events.

When a workflow instance runs, the DAG executor emits a stream of ExecutorEvent values for every node state transition. These are published to a Redis Pub/Sub channel and forwarded to connected browser clients over Server-Sent Events (SSE), enabling the execution canvas to display live per-node status overlays without polling.

Architecture

Publisher: Redis Pub/Sub

The createRedisEventHandler(instanceId) factory creates an ExecutorEventHandler that publishes each event to the Redis channel wf:<instanceId>:events as JSON. This handler is passed to DAGExecutor as the onEvent callback at run time:

import { createRedisEventHandler } from '@/lib/workflow/executor/execution-events'
import { DAGExecutor } from '@/lib/workflow/executor/dag-executor'

const eventHandler = createRedisEventHandler(instanceId)
const dag = new DAGExecutor({ onEvent: eventHandler, /* ... */ })

Publishing is fire-and-forget: errors are logged to console.error but never thrown, so a Redis publish failure never interrupts workflow execution.

The channel name follows the pattern wf:{instanceId}:events. A single publisher per instance run; the channel is never explicitly deleted - it expires naturally once Redis evicts the key or the subscription is cleaned up.

Subscriber: AsyncIterable

subscribeToExecutionEvents(instanceId) creates a dedicated Redis connection via redis.duplicate() (required by ioredis for subscriber mode) and returns an AsyncIterable<ExecutorEvent> plus a cleanup() function. Incoming messages are buffered and resolved through a promise-based queue, so the SSE route can consume them with a standard for await loop:

const { events, cleanup } = await subscribeToExecutionEvents(instanceId)

try {
  for await (const event of events) {
    if (event.type === 'execution_complete') break
    // forward to SSE
  }
} finally {
  await cleanup()
}

The cleanup() function unsubscribes and quits the subscriber connection. It is always called - via finally blocks and the request AbortSignal - to prevent leaked connections.

SSE endpoint

GET /api/workflows/{workflowId}/instances/{instanceId}/stream
Authorization: Bearer rsk_live_... | session cookie

The route:

  1. Verifies workflow and instance ownership (workspace-scoped).
  2. Returns a terminal complete event immediately if the instance is already in a terminal state (completed or failed) - no subscription needed.
  3. Creates the ReadableStream backed by the AsyncIterable.
  4. Sends a heartbeat event every 15 seconds to keep the connection alive through proxies and load balancers.
  5. Listens for request.signal.abort to clean up on client disconnect.
  6. Forwards each executor event as an SSE message. execution_complete events emit as event type complete and close the stream; all other events emit as event type step_state.

Response headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

Event types

Every event published to the Redis channel is an ExecutorEvent discriminated union:

Prop

Type

Consuming the SSE stream

From a browser (EventSource)

const es = new EventSource(
  `/api/workflows/${workflowId}/instances/${instanceId}/stream`,
  { withCredentials: true }
)

es.addEventListener('step_state', (e) => {
  const event = JSON.parse(e.data)
  console.log('executor event', event.type, event.nodeId)
})

es.addEventListener('complete', (e) => {
  const { status } = JSON.parse(e.data)
  console.log('execution finished', status)
  es.close()
})

es.addEventListener('heartbeat', () => {
  // keep-alive - no action needed
})

es.onerror = () => es.close()

From a curl session

curl -N \
  -H "Authorization: Bearer rsk_live_..." \
  "https://app.rensei.ai/api/workflows/<wfId>/instances/<instanceId>/stream"

# Sample output:
# event: step_state
# data: {"type":"tier_start","tier":0,"nodeIds":["trigger-1"]}
#
# event: step_state
# data: {"type":"node_start","nodeId":"trigger-1","tier":0}
#
# event: step_state
# data: {"type":"node_complete","nodeId":"trigger-1","tier":0,"result":{"status":"completed","output":{"issueId":"LIN-123"}}}
#
# event: complete
# data: {"status":"completed"}

Execution detail panel

In the workflow editor's Execution Mode, the ExecutionDetailPanel component subscribes to this SSE endpoint and overlays per-node status badges on the canvas in real time:

  • Running - pulsing spinner on the node card.
  • Completed - green check badge.
  • Failed - red X badge with expandable error summary.
  • Skipped - grey skip badge, expandable to show skipReason.
  • Waiting - clock/pause badge (gate suspended).
  • Mocked - yellow star badge (test mode).

The panel also surfaces the execution error summary (from node_complete events with result.status === 'failed') inline below the canvas.

Operational notes

Each connected browser tab holds a dedicated Redis subscriber connection. For high-concurrency scenarios (many simultaneous canvas viewers on the same instance), consider the connection budget of your Redis deployment. Each subscription is cleaned up when the browser disconnects or the stream closes.

  • The SSE endpoint uses the same getCliOrSessionAuth auth path as other platform API routes - both session cookies and bearer rsk_* API keys are accepted.
  • There is no SSE replay: if a client connects after the execution has already completed, the endpoint returns a single complete event immediately (reading from the workflow_instances.status column) rather than replaying historical events.
  • Execution events are not written to the audit log - they are ephemeral observability signals. The durable record of execution is in step_executions rows and the workflow_instances status/results columns.
  • DAG Executor - the event emitter; tier computation and memoization
  • Workflow Gates - node_waiting events and suspended instance resume
  • Loop Executor - loop iterations produce node_start / node_complete pairs per iteration

On this page