API Reference

Complete reference for all public classes, methods, and enums in the pipeline engine.


pipeline.context

StepContext

Frozen dataclass passed from step to step. The pipeline engine only reads sample and metadata — domain-specific fields are added by subclassing.

@dataclass(frozen=True)
class StepContext:
    sample: Any = None
    metadata: MappingProxyType = field(
        default_factory=lambda: MappingProxyType({})
    )
MethodSignatureDescription
replace(**changes: Any) -> StepContextReturn a new context with the given fields replaced. Uses dataclasses.replace internally.

Behavior:

  • metadata is auto-coerced from dict to MappingProxyType in __post_init__
  • Subclasses inherit .replace() — it works on all fields including subclass-defined ones

pipeline.protocol

StepProtocol

Structural protocol that every step (and Pipeline/Branch) must satisfy.

@runtime_checkable
class StepProtocol(Protocol):
    requires: AbstractSet[str]
    provides: AbstractSet[str]

    def __call__(self, ctx: StepContext) -> StepContext: ...
AttributeTypeDescription
requiresAbstractSet[str]Metadata keys the step reads
providesAbstractSet[str]Metadata keys the step writes
__call__(StepContext) -> StepContextExecute the step

Notes:

  • AbstractSet[str] accepts both set and frozenset
  • @runtime_checkable enables isinstance(step, StepProtocol) checks

SampleResult

Outcome for one sample after the pipeline has run.

@dataclass
class SampleResult:
    sample: Any
    output: StepContext | None
    error: Exception | None
    failed_at: str | None
    cause: Exception | None = None
FieldTypeDescription
sampleAnyThe original input sample
outputStepContext | NoneFinal context (None if any step failed)
errorException | NoneThe exception (None if succeeded)
failed_atstr | NoneClass name of the step that raised (None if succeeded)
causeException | NoneInner exception for BranchError failures (default None)

Notes:

  • Mutable — background threads update it in-place when background steps complete
  • For background steps, output/error may be None until wait_for_background() completes

pipeline.pipeline

Pipeline

Ordered sequence of steps. Satisfies StepProtocol — can be nested inside other pipelines.

Constructor

Pipeline(steps: list | None = None, hooks: list[PipelineHook] | None = None)
ParameterTypeDefaultDescription
stepslist | NoneNoneOptional initial list of steps
hookslist[PipelineHook] | NoneNoneObservation-only hooks fired around each foreground step

Validates step ordering and infers contracts at construction time.

Attributes

AttributeTypeDescription
requiresfrozenset[str]Fields the pipeline needs from external context (auto-inferred)
providesfrozenset[str]Fields the pipeline writes (auto-inferred, union of all steps)

Methods

then
def then(self, step: object) -> Pipeline

Append a step and return self for chaining. Validates ordering immediately.

ParameterTypeDescription
stepobjectAny object satisfying StepProtocol

Returns: self (for method chaining)

Raises: PipelineOrderError if the step requires a field produced by a later step


branch
def branch(
    self,
    *pipelines: object,
    merge: MergeStrategy | Callable = MergeStrategy.RAISE_ON_CONFLICT,
) -> Pipeline

Append a Branch step and return self for chaining. Shorthand for .then(Branch(*pipelines, merge=merge)).

Returns: self (for method chaining)


run
def run(
    self,
    contexts: Iterable[StepContext],
    workers: int = 1,
    on_sample_done: Callable[[SampleResult], None] | None = None,
    cancel_token: CancellationToken | None = None,
) -> list[SampleResult]

Process contexts through the pipeline (sync entry point).

ParameterTypeDefaultDescription
contextsIterable[StepContext]Input contexts to process
workersint1Max concurrent samples in foreground steps
on_sample_doneCallable | NoneNoneCallback after each sample's foreground steps complete (or fail). Must not block.
cancel_tokenCancellationToken | NoneNoneCancellation signal. Checked before each step and each new sample. Pass a fresh token per invocation.

Returns: list[SampleResult] — one result per input context

Notes: Calls asyncio.run(self.run_async(...)) internally. For background steps, call wait_for_background() after this returns. When cancel_token is provided, also sets cancel_token_var so code inside steps (e.g. LLM clients) can read it.


run_async
async def run_async(
    self,
    contexts: Iterable[StepContext],
    workers: int = 1,
    on_sample_done: Callable[[SampleResult], None] | None = None,
    cancel_token: CancellationToken | None = None,
) -> list[SampleResult]

Async entry point. Use await pipe.run_async(contexts) from coroutine contexts.

Same parameters and return type as run().


__call__
def __call__(self, ctx: StepContext) -> StepContext

Run all steps sequentially on a single context. Used when the pipeline is nested as a step inside another pipeline.

Notes: async_boundary markers are ignored in this mode — all steps run to completion.


wait_for_background
def wait_for_background(self, timeout: float | None = None) -> None

Block until all background tasks complete.

ParameterTypeDefaultDescription
timeoutfloat | NoneNoneMax seconds to wait. None = wait indefinitely.

Raises: TimeoutError if timeout elapses before completion


background_stats
def background_stats(self) -> dict[str, int]

Return a snapshot of background task progress. Thread-safe.

Returns: {"active": int, "completed": int}


pipeline.branch

MergeStrategy

Enum of built-in merge strategies for Branch outputs.

class MergeStrategy(Enum):
    RAISE_ON_CONFLICT = "raise_on_conflict"
    LAST_WRITE_WINS = "last_write_wins"
    NAMESPACED = "namespaced"
ValueBehavior
RAISE_ON_CONFLICTRaises ValueError if two branches write different values to the same named field. Metadata merges with last-writer-wins.
LAST_WRITE_WINSLast branch's value wins for every conflicting field.
NAMESPACEDEach branch's output stored at metadata["branch_N"]. No conflict possible.

Branch

Runs multiple pipelines in parallel, then merges their outputs. Satisfies StepProtocol.

Constructor

Branch(
    *pipelines: object,
    merge: MergeStrategy | Callable = MergeStrategy.RAISE_ON_CONFLICT,
)
ParameterTypeDefaultDescription
*pipelinesobjectChild pipelines to run in parallel (at least one required)
mergeMergeStrategy | CallableRAISE_ON_CONFLICTMerge strategy or custom fn(list[StepContext]) -> StepContext

Raises: ValueError if no pipelines are provided

Attributes

AttributeTypeDescription
requiresfrozenset[str]Union of all children's requires
providesfrozenset[str]Union of all children's provides
pipelineslistThe child pipelines

Methods

__call__
def __call__(self, ctx: StepContext) -> StepContext

Sync fan-out via ThreadPoolExecutor. All branches run to completion before any failure is raised.

Raises: BranchError if any branch fails


__call_async__
async def __call_async__(self, ctx: StepContext) -> StepContext

Async fan-out via asyncio.gather. Sync children are wrapped with asyncio.to_thread.

Raises: BranchError if any branch fails


pipeline.protocol — Hooks

PipelineHook

Observation-only protocol fired around each foreground step. Hooks cannot modify context — both methods return None.

@runtime_checkable
class PipelineHook(Protocol):
    def before_step(self, step_name: str, ctx: StepContext) -> None: ...
    def after_step(self, step_name: str, ctx: StepContext) -> None: ...
MethodParametersDescription
before_stepstep_name: str, ctx: StepContextCalled before each foreground step executes
after_stepstep_name: str, ctx: StepContextCalled after each foreground step completes

Notes:

  • step_name is type(step).__name__ — hooks know what ran but cannot inspect or mutate the step instance
  • Hooks fire for foreground steps only — background steps (after async_boundary) do not trigger hooks
  • If a hook raises, the pipeline logs the error and continues — a broken hook never kills the pipeline
  • For Branch steps, hooks fire once for "Branch" as a whole, not for inner steps

pipeline.errors

CancellationToken

Thread-safe cancellation signal. Create a fresh token per run() invocation.

class CancellationToken:
    def cancel(self) -> None: ...

    @property
    def is_cancelled(self) -> bool: ...
Method / PropertyDescription
cancel()Signal cancellation. Thread-safe, idempotent.
is_cancelledTrue after cancel() has been called.

cancel_token_var

ContextVar set by Pipeline.run_async() so code inside steps (e.g. LLM clients) can read the current cancel token without parameter changes.

cancel_token_var: ContextVar[CancellationToken | None]  # default: None

Notes:

  • Set before steps run, reset after run_async() completes
  • asyncio.to_thread() copies contextvars automatically — visible in sync steps too
  • Read with cancel_token_var.get(None) — returns None when no pipeline is running

PipelineCancelled

class PipelineCancelled(Exception): ...

A cancel_token was triggered. Surfaces in SampleResult.error — never propagated to the caller of run(). Callers check isinstance(result.error, PipelineCancelled) to distinguish cancellation from step failures.


PipelineOrderError

class PipelineOrderError(Exception): ...

A step requires a field that no earlier step provides (but a later step does). Raised at construction time.


PipelineConfigError

class PipelineConfigError(Exception): ...

Invalid pipeline wiring. Raised at construction time. Examples:

  • More than one async_boundary = True step in the same pipeline
  • An async_boundary = True step inside a Branch child

BranchError

class BranchError(Exception):
    failures: list[BaseException]

One or more branch pipelines failed. All branches always run to completion before this is raised. Raised at runtime.

AttributeTypeDescription
failureslist[BaseException]One exception per failed branch

Step class attributes

Optional attributes a step class can declare to control pipeline behavior:

AttributeTypeDefaultDescription
requiresset[str] | frozenset[str](required)Metadata keys the step reads
providesset[str] | frozenset[str](required)Metadata keys the step writes
async_boundaryboolFalseMarks the foreground/background split point
max_workersint1Max concurrent background threads for this step class