Pipeline Engine

A generic, composable step runner for ordered and parallel data processing.


What is the Pipeline Engine?

The Pipeline Engine is a lightweight, domain-agnostic framework for composing processing steps into pipelines. It provides contract validation, immutable context passing, and built-in concurrency control — all in ~300 lines of pure Python with no external dependencies beyond the standard library.

Everything composes from three primitives:

Sequential — steps run one after another:

Branch — fork, run in parallel, join:

Nesting — a pipeline used as a step:

Steps declare what data they read and write. The pipeline validates ordering at construction time — before any data flows — so wiring errors surface immediately, not at runtime.


Core Principles

  • Three primitives — Sequential steps, parallel branches, and nested pipelines cover every composition pattern
  • Contracts — Steps declare requires and provides fields; the pipeline validates ordering at construction time
  • Immutable context — Steps receive a frozen context and return a new one via .replace(), making concurrent execution safe by default
  • Declared concurrency — Parallelism is configured on the step (max_workers, async_boundary), not the pipeline
  • Per-sample error isolation — One failing sample never blocks others; every sample produces a result
  • Observation hooksPipelineHook lets external code observe step transitions without modifying data flow (progress streaming, metrics, logging)
  • CancellationCancellationToken stops a running pipeline between steps; cancel_token_var makes the token readable inside steps for intra-step cancellation

Architecture at a Glance

Pipeline and Branch both satisfy StepProtocol through structural typing — no inheritance required. This means a Pipeline can be used as a step inside another pipeline, and a Branch slots into any step position.

ConceptWhat it isThreadingData flow
StepSingle unit of workSync internallyReceives and returns StepContext
PipelineOrdered chain of stepsworkers=N across samplesPasses StepContext step-to-step
BranchParallel fork/joinOne thread per branchCopies context in, merges outputs
Nested PipelinePipeline used as a stepInherits parent threadingSame StepContext flow

Async Boundary — Background Processing

One of the engine's key features is the async boundary: a way to split a pipeline into foreground (fast return) and background (fire-and-forget) stages.

Mark any step with async_boundary = True — the pipeline returns results immediately after the foreground steps, while everything from the boundary onward continues in background threads. Use pipe.wait_for_background() when you need the final results.

This is critical for pipelines where early steps produce user-facing output quickly but later steps (analysis, logging, scoring) are slow and don't need to block the caller. See Execution Model for full details.


When to Use

Good fit

  • Ordered multi-step processing with explicit data dependencies
  • Parallel fork/join patterns (multiple independent operations on the same data)
  • Fire-and-forget background processing with async_boundary
  • Any pipeline where you want construction-time contract validation

Not designed for

  • DAG scheduling with complex dependency graphs
  • Distributed computing across multiple machines
  • Stream processing with backpressure
  • ETL pipelines requiring a data catalog

Installation

The pipeline engine is included in the project with no extra dependencies:

from pipeline import Pipeline, Branch, StepContext, MergeStrategy, PipelineHook, CancellationToken

Using the Pipeline Engine with ACE

If you're building ACE pipelines, see Composing Pipelines for ACE-specific steps and patterns. All pipeline classes are also importable from ace directly: from ace import Pipeline, Branch, ...


What's Next