Skip to content

Building Pipelines

A pipeline is an ordered sequence of steps where each step’s input can be bound to work item fields, pipeline-level additional inputs, prior step outputs, or signals extracted from prior results.

Basic pipeline

The recommended way to define a pipeline is the fluent pipeline() builder. Each .step() mapper receives a typed input accessor that always includes workItemTitle and workItemDescription.

import { pipeline } from "@boboddy/sdk/definitions/pipelines";
export default pipeline({
key: "code-quality-pipeline",
name: "Code Quality Pipeline",
status: "active",
})
.step(reviewCodeStep, ({ input }) => ({
title: input.workItemTitle,
code: input.workItemDescription,
}))
.advance(() => ({ default: "continue" }))
.build();

Scaffold pipeline definitions

Run this command to fetch your existing step and pipeline definitions from the server and write them as editable TypeScript files:

Terminal window
boboddy pipelines pull

This creates (or overwrites) the following files inside .boboddy/pipeline-builder/:

FileDescription
steps.tsOne defineStep() export per step (latest version of each key)
<pipeline-key>.tsOne pipeline export per pipeline
package.jsonSDK and zod dependencies (written once, never overwritten)
tsconfig.jsonTypeScript config for the package (written once, never overwritten)

For a brand-new project with no definitions on the server yet, use boboddy pipelines init instead to get a starter template.

After pulling, install dependencies inside the directory:

Terminal window
cd .boboddy/pipeline-builder && npm install

When you’re ready to publish changes back:

Terminal window
boboddy pipelines push

This pushes steps first, then pipelines, in a single command.

pipeline() options

FieldTypeRequiredDescription
keystringYesUnique identifier for this pipeline
namestringYesHuman-readable display name
versionnumberNoVersion number (defaults to 1)
descriptionstringNoBrief description
status"draft" | "active"NoDraft pipelines are not executed
additionalPipelineInputobjectNoCustom input fields beyond the built-in work item fields

Call .step(...), then .advance(...) (required before the next step or .build()), and finally .build() to produce the wire-format pipeline spec. Timeouts are set via the optional configFn third argument to .step().

Input binding

Inside a .step() mapper, four context helpers cover every binding source:

input.<path> — built-in and additional pipeline input

The input accessor always exposes workItemTitle (string) and workItemDescription (string | null), plus any custom fields defined in additionalPipelineInput.schema. Drill into the shape; each property access returns a typed binding.

.step(reviewCodeStep, ({ input }) => ({
title: input.workItemTitle,
body: input.workItemDescription,
// custom field from additionalPipelineInput:
code: input.code,
}))

Nested fields work as you’d expect — input.ticket.title binds to the dotted path "ticket.title". Do not spread or coerce the accessor (${input.code}, { ...input.metadata }): it will throw at build time. Drill into specific fields instead.

signal(step, signalKey) — bind to a prior step’s signal

.step(refactorStep, ({ input, signal }) => ({
code: input.code,
previousScore: signal(reviewCodeStep, "clarity_score"),
}))

signalKey is constrained to the prior step’s declared signal keys, so typos are compile errors.

output(step) — bind to a prior step’s whole output

.step(refactorStep, ({ input, output }) => ({
code: input.code,
reviewResult: output(reviewCodeStep),
}))

literal(value) — a hardcoded constant

.step(myStep, ({ literal }) => ({
model: literal("gpt-4o"),
}))

Additional pipeline input

When a step needs data beyond the built-in work item fields, define it with additionalPipelineInput. Both schema and bindings are required when the object is provided.

import { z } from "zod";
export default pipeline({
key: "ticket-analyzer",
name: "Ticket Analyzer",
additionalPipelineInput: {
schema: z.object({
storyPoints: z.number().nullable(),
team: z.string(),
}),
bindings: ({ workItem, literal }) => ({
storyPoints: workItem.field("Story Points"),
team: literal("platform"),
}),
},
})
.step(analyzeStep, ({ input }) => ({
title: input.workItemTitle,
storyPoints: input.storyPoints,
team: input.team,
}))
.advance(() => ({ default: "continue" }))
.build();

The bindings callback receives { workItem, literal }:

  • workItem.title / workItem.description — the work item’s title or description
  • workItem.field(name) — a named custom field on the work item (e.g. workItem.field("Story Points"))
  • literal(value) — a hardcoded constant

Pipeline-level bindings are defaults applied to every step automatically. Explicit bindings in a .step() mapper override them for that step.

additionalStepInput on pipeline(...) compiles into default bindings applied to every step in the pipeline. Explicit .step() bindings override these defaults.

Advancement policies

.advance(callback) attaches a policy to the most recently added step. The callback receives a context with signal, all, any, route, and every computed-signal factory. The signal keys are typed against the just-added step.

.step(reviewCodeStep, ({ input }) => ({ code: input.code }))
.advance(({ signal }) => ({
default: "block",
rules: [signal("clarity_score").gt(7).then("continue")],
}))

If the default outcome is "block" and no rule fires, the pipeline halts at that step and marks the execution as needing review.

Comparators on SignalRef

MethodWire operator
.eq(value)equal
.ne(value)notEqual
.gt(n)greaterThan
.gte(n)greaterThanInclusive
.lt(n)lessThan
.lte(n)lessThanInclusive
.in(values)in
.notIn(values)notIn
.contains(value)contains
.doesNotContain(v)doesNotContain

Each returns a RuleLeaf. Call .then(outcome) to finalize as a rule, or pass it into all(...) / any(...) to nest.

Grouping with all and any

.advance(({ signal, all, any }) => ({
default: "block",
rules: [
all(
signal("clarity_score").gte(7),
any(
signal("reviewer_approved").eq(true),
signal("auto_approved").eq(true),
),
).then("continue"),
],
}))

Groups are nestable arbitrarily. Each .then(outcome) closes the rule.

Routing to another pipeline

.advance(({ signal, route }) => ({
default: "complete",
rules: [
signal("flagged").eq(true).then(route("triage-pipeline", { reason: "flagged" })),
],
}))

Computed signals

Computed signals aggregate multiple raw signals into a derived value inline. The factories live on the same .advance() context — no separate declaration on the step required.

.advance(({ avg, signal, stepSignals }) => ({
default: "block",
rules: [
avg(stepSignals.quality_score, stepSignals.security_score).gte(7).then("continue"),
signal("flagged").eq(true).then("block"),
],
}))

Available factories

MethodDescriptionInput signal types
avg(...keys)Arithmetic mean of the input signalsnumber
weightedAvg(...keys)Weighted mean (pass weights via configJson later)number
sum(...keys)Sum of the input signalsnumber
min(...keys)Minimum value across the input signalsnumber
max(...keys)Maximum value across the input signalsnumber
count(...keys)Count of truthy or present signal valuesany
booleanAny(...keys)true if any input signal is truthyboolean
booleanAll(...keys)true only if all input signals are truthyboolean

Each factory requires at least two signal keys. The same call across multiple rules is deduplicated into a single computed-signal definition at build time.

Multi-step pipeline example

import { pipeline } from "@boboddy/sdk/definitions/pipelines";
import { z } from "zod";
import { reviewCodeStep, refactorStep, verifyStep } from "./steps";
export default pipeline({
key: "full-review",
name: "Full Code Review Pipeline",
status: "active",
additionalPipelineInput: {
schema: z.object({ code: z.string() }),
bindings: ({ workItem }) => ({
code: workItem.field("Code"),
}),
},
})
.step(reviewCodeStep, ({ input }) => ({
code: input.code,
}))
.advance(({ signal }) => ({
default: "block",
rules: [signal("clarity_score").gt(6).then("continue")],
}))
.step(
refactorStep,
({ input, output }) => ({
code: input.code,
suggestions: output(reviewCodeStep),
}),
(cfg) => { cfg.timeout = 60; },
)
.advance(() => ({ default: "continue" }))
.step(verifyStep, ({ input, signal }) => ({
original: input.code,
refactoredScore: signal(reviewCodeStep, "clarity_score"),
}))
.advance(() => ({ default: "continue" }))
.build();

Timeouts

Pass a third configFn argument to .step() to cap how long a worker can spend on that step. Set cfg.timeout in seconds:

.step(
heavyAnalysisStep,
({ input }) => ({ payload: input.payload }),
(cfg) => { cfg.timeout = 120; },
)

Legacy definePipeline form

The original object-based API is still supported and produces identical wire output. New pipelines should prefer the builder.

import {
definePipeline,
fromPipelineInput,
Rule,
} from "@boboddy/sdk/definitions/pipelines";
import { z } from "zod";
import { reviewCodeStep } from "./steps";
const inputSchema = z.object({ code: z.string() });
export default definePipeline({
key: "code-quality-pipeline",
name: "Code Quality Pipeline",
status: "active",
steps: [
{
step: reviewCodeStep,
input: {
code: fromPipelineInput(inputSchema, "code"),
},
advancement: {
defaultOutcome: "block",
rules: [Rule.when("clarity_score", "greaterThan", 7, "continue")],
},
},
],
});

fromPipelineInput, fromSignal, stepOutput, Rule, and Computed remain exported from @boboddy/sdk/definitions/pipelines.