The Pipeline IR is the new source of truth for ETL pipelines starting in KaireonAI Flow Phase 1. It replaces the legacyDocumentation Index
Fetch the complete documentation index at: https://docs.kaireonai.com/llms.txt
Use this file to discover all available pages before exploring further.
PipelineNode + PipelineEdge
relational format with a single typed JSON document that the runtime
interprets directly. Pipelines authored in IR mode are authored once and
read by:
- The visual canvas (renders the IR)
- The AI assistant (proposes diffs against the IR)
- The MCP server (accepts IR via tool calls)
- The runtime interpreter (executes the IR)
transform node can carry. The IR schema accepts 19 op
types; the visual editor exposes the 14 most-used ones in its toolbar
(complex ops like summarize, vector_embed, geo_resolve,
sentiment_score, language_detect are authored via the JSON IR tab
or the AI assistant).
Why IR-first
Generative AI is reliable when it produces structured data, not code. Constrained JSON output validated against a Zod schema cannot drift from the contract. This is what makes “AI builds the pipeline” actually work — every proposal is schema-valid by construction.Top-level shape
Node kinds (Phase 1)
| Kind | Purpose |
|---|---|
source | Read from external system (file/DB/API/stream). Built-in atomic staging moves the source file through .processing/ → .archive/{YYYY}/{MM}/{DD}/ on success or .failed/ on parse failure |
transform | Per-row column ops — 19 op types (Transforms) |
validate | Row-level + dataset-level validation |
target | Write to internal schema table |
branch | Conditional routing |
join | Multi-input join |
enrich | Per-row external call (LLM tag, geocode, ML score). Beta — node is validated and recorded but provider calls only fire when tenant LLM/geocoder credentials are configured. Without them, rows pass through with the configured output field set to null |
Archival is handled exclusively by the source node’s
atomicity config.
There is no separate archive node kind — it was removed entirely from
the IR, the executor registry, the structural validator, the visual
editor, and the AI prompt. The source moves the source file to
successFolder after parsing succeeds and to failureFolder after parse
errors. Date tokens {YYYY}, {MM}, {DD}, {YYYY-MM-DD}, {YYYYMMDD},
{HH}, {mm}, {ss} are expanded at run time, so a nested folder
layout like .archive/{YYYY}/{MM}/{DD}/ produces .archive/2026/05/12/.Authoring an IR with a node of kind: "archive" will now fail schema
validation at save time — file-level atomicity covers the common case
and the previous row-level archive executor only ever recorded intent
without actually moving data.Source node example
The IR schema
source.kind field accepts a fixed set of cloud-store
and filesystem kinds: s3, gcs, azure_blob, sftp, ftp,
local_fs, http_pull. Runtime executors for local_fs, s3,
gcs, azure_blob, sftp, and http_pull are all live. ftp is
documented as deprecated (plaintext) and never reaches a runtime
executor. Streaming kinds (kafka, kinesis, pulsar) are gated by
FLOW_STREAMING_ENABLED and only land when self-hosters provide their
own broker. Note that source.kind is a separate concept from the full
connector registry (85 types); pipelines reference a saved Connector
record by connectorId, not by kind directly.Target node load modes
| Mode | Status | Notes |
|---|---|---|
append | Live | Explicit-column INSERT with NULLIF + per-column CAST |
truncate | Live | TRUNCATE + INSERT wrapped in prisma.$transaction. Empty-source guard (failOnEmptySource: true default) aborts before the destructive statement when upstream produced 0 rows |
upsert | Live | Requires upsertKey; throws if every projected column is in upsertKey (no updatable columns) |
blue_green | Live | Loads to <table>_new, atomic 3-step rename, drops <table>_old. Empty-source guard applies |
incremental_watermark | Live | High-water persisted in pipeline_watermarks; INSERT + watermark-upsert run in a single transaction. Requires watermarkColumn |
cdc_mirror | Gated | Requires FLOW_STREAMING_ENABLED=true env + a configured Debezium connector. Runtime returns a clear “streaming disabled” error otherwise |
Optional safety fields on target nodes
| Field | Type | Behavior |
|---|---|---|
failOnEmptySource | boolean (default true) | Aborts truncate / blue_green BEFORE the destructive statement when upstream produced 0 rows. Set false only when “empty file means clear the table” is the genuine intent |
expectedRowCountDelta | { minPct?, maxPct?, windowRuns? } | Emits a warning System Health alert when today’s rowsLoaded is outside the band vs. the running mean over the last N successful runs (default 7). Run still completes |
backupBeforeLoad | { enabled, retainCount? } | Snapshots the target via CREATE TABLE … AS TABLE … before destructive load. Only acts on truncate and blue_green. Backups older than retainCount (default 3) are pruned post-success. On failure, the backup is left in place and the run record’s meta.backupTable carries the snapshot table name |
Tenant gate
Pipeline IR is on by default —tenant_settings.flowIrEnabled
defaults to true. New pipelines created via POST /api/v1/pipelines
with irVersion: "1.0" and an ir body field are stored in the
pipeline_ir_versions table.
The flag remains in the schema as an explicit kill-switch: setting
flowIrEnabled = false makes pipeline create/run/AI-author endpoints
return 403 flow_ir_disabled. The legacy ETL editor and runtime were
removed in the 2026-04-28 cleanup, so disabling the flag effectively
disables pipelines entirely.
API
POST /api/v1/pipelines — send { name, connectorId, schemaId, irVersion: "1.0", ir: { ... } }
to create an IR-native pipeline. See Pipelines API
for the full shape.
POST /api/v1/pipelines/:id/run — when the pipeline has irVersion
set, the request is handed to the in-process batch interpreter and
returns the per-node result synchronously. Legacy pipelines continue
going to the BullMQ worker queue.
Validation layers
The IR is enforced at three layers:- Authoring time — Zod schemas in the UI / AI prompt
- Save time — server re-validates on POST and stores as
pipeline_ir_versionsrow - Runtime — interpreter re-checks before executing