Extraction Pipeline
Strategy selection, operator-selected model, and the per-project-gated, cost-capped extraction cron.
The extraction pipeline converts raw observations from the observations table into graph nodes and edges in graph_nodes / graph_edges. Each observation goes through strategy selection, three-stage entity resolution, and a store upsert - all in a single extractAndStore call.
The pipeline is idempotent: re-running on the same observation always produces the same node IDs (UUID5 dedup) and the same edge set.
Beta - dormant in production. The pipeline is built and merged, but no tenant has knowledge graph memory enabled yet, so the extraction cron currently enumerates an empty set and produces no triplets. Extraction only runs for projects where the enablement gate is satisfied and a platform operator has selected an extraction model. Treat the mechanics below as proven in isolation but unexercised at production scale.
Pipeline flow
Observation types and strategies
observation.type | Strategy | LLM required | Notes |
|---|---|---|---|
file_operation | AST-driven import scan | No | Uses TypeScript compiler API; produces Module nodes + depends_on edges. Handles static, dynamic, type-only, namespace, side-effect imports and require() calls. |
decision | LLM extraction | Yes | Produces a Decision node with rationale + alternatives edges |
error | LLM extraction | Yes | Produces a Pattern (anti-pattern) node + workaround edge |
session_summary | LLM extraction | Yes | Extracts key entities from free-form summary text |
explicit | LLM extraction | Yes | User-determined entity set |
LLM output schema
All LLM-backed strategies constrain the model to return a JSON object matching:
{
"nodes": [
{
"id": "<slug>",
"name": "<name>",
"type": "Service|Module|API|Database|Decision|Pattern|Person|Config|Dependency",
"description": "<text>"
}
],
"edges": [
{
"sourceNodeId": "<id>",
"targetNodeId": "<id>",
"relationshipName": "<snake_case>"
}
]
}The strategy layer validates this output against the KnowledgeGraphSchema Zod schema. Validation failures return an empty graph and log a warning - the pipeline never throws to the caller.
The extraction model
The language model that backs every LLM strategy is not a hardcoded environment key. It is an operator-selected model profile, chosen by a platform operator and resolved at extraction time. There is no *_API_KEY env var driving extraction and no OpenRouter route.
| Aspect | Behavior |
|---|---|
| Selection | An operator picks a backend-scoped model profile in the platform admin console. Selection logic lives in kg-model-selection.ts; the resolved profile is loaded by backend-profile.ts when a strategy needs the model. |
| Default | When nothing is selected, extraction uses native Claude Haiku 4.5 via Anthropic's OpenAI-compatible endpoint (https://api.anthropic.com/v1) - the cost-appropriate default for high-volume triplet extraction. |
| Credential | The Anthropic API key is attached to the selected profile (encrypted at rest, write-only), not read from the process environment. Rotation happens on the profile, with no redeploy. |
| Visibility | The extraction profile is backend-only: invisible to tenant users and non-referenceable from any tenant-facing model selector. |
The model, its credential, and the extraction cadence/cost controls are all configured on the operator side. Tenants never see or select the extraction model. Operators configure it in the knowledge graph memory console.
extractAndStore API
import { extractAndStore, type ExtractionPipelineDeps } from '@/lib/graph/extraction/pipeline'
const deps: ExtractionPipelineDeps = {
store: pgGraphStore.withScope({ orgId, projectId }),
llm: llmExtractor,
embedder: async (text) => vectorize(text), // optional
audit: async (event) => writeAuditLog(event), // optional
}
const result = await extractAndStore(observation, deps)
// result: { nodesCreated: number, edgesCreated: number, mergedCount: number }The embedder is optional but strongly recommended - without it, Stage 2 of entity resolution (fuzzy vector dedup) is skipped and you will accumulate near-duplicate nodes for similar entities described with slightly different text.
Batch extraction worker
The cron-triggered extraction worker (extraction-cron.ts) iterates over unprocessed observations, calls extractAndStore on each, and isolates per-observation failures so a single bad record does not halt the batch.
The worker checks for DB and LLM credential availability before starting. When either is unavailable it exits gracefully with a skip log - this is the reason the graph is dormant in production without explicit enablement.
# Trigger via the admin API route
POST /api/graph/extraction/run
Content-Type: application/json
Authorization: Bearer rsk_...
{ "limit": 50 }The route is guarded by operator-admin authorization and is the manual override when the cron hasn't run yet or you want to process a specific batch immediately.
Audit events
Each extractAndStore call emits one ExtractionAuditEvent:
interface ExtractionAuditEvent {
observationId: string
observationType: string
nodesCreated: number
edgesCreated: number
mergedCount: number
durationMs: number
}The mergedCount field counts nodes that were resolved to an existing node ID rather than inserted fresh. A consistently high merge rate is a healthy sign - it means the resolver is correctly deduplicating across observation sources.
Edge validity filter
Before entity resolution, the pipeline filters edges whose sourceNodeId or targetNodeId does not appear in the current extraction's node set. This guards against LLM hallucinations that reference node IDs not present in the nodes array.
// Exported for testing
export function filterInvalidEdges(graph: ExtractedGraph): ExtractedGraphRelated pages
- Entity Resolution - UUID5 + fuzzy + LLM dedup
- Knowledge Graph Store - PgGraphStore upsert API
- Auto-Ingest - session-terminal extraction trigger
- PR Ingest - GitHub PR merge trigger