catbird

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2026 License: Apache-2.0 Imports: 28 Imported by: 4

README

Go Reference License Go Version Go Report Card

CatBird

Catbird

A PostgreSQL-powered message queue and task execution engine. Catbird brings reliability and simplicity to background job processing by using your database as the single source of truth—no extra services to manage, just your database coordinating everything.

Why Catbird?

  • Transactional by default: enqueue messages in the same DB transaction as your app writes; rollback means no message.
  • Exactly-once within a visibility window: safe retries after crashes, no duplicate processing.
  • Database as coordinator: horizontal workers, PostgreSQL handles distribution and state.
  • Workflows as DAGs: dependencies, branching, and data passing between steps.
  • Definition separate from implementation: define and start tasks and flows in one place, implement them elsewhere.
  • Persistence and auditability: queues, runs, and results live in PostgreSQL.
  • Resiliency baked in: retries, backoff, optional circuit breakers.
  • Operational UX: web dashboard and tui for runs, queues, and workers.
  • Optional real-time layer: opt-in pub/sub with SSE support for pushing events to browsers.

Flow Visualization

Quick Start

client := catbird.New(conn)
ctx := context.Background()

// Queues
err := client.CreateQueue(ctx, "my-queue")
err = client.Send(ctx, "my-queue", map[string]any{"user_id": 123}, catbird.SendOpts{
    ConcurrencyKey: "user-123",
})
messages, err := client.Read(ctx, "my-queue", 10, 30*time.Second)
for _, msg := range messages {
    err = client.Delete(ctx, "my-queue", msg.ID)
}

// Continuous reader: loops ReadPoll, ack on nil, nack on error
go client.Reader(ctx, "my-queue", 10, 30*time.Second,
    func(ctx context.Context, msg catbird.Message) error {
        return nil // ack (deletes message)
    },
)

// Delayed send
client.Send(ctx, "my_queue", map[string]any{"job": "cleanup"}, catbird.SendOpts{VisibleAt: time.Now().Add(30 * time.Minute)})

// Tasks and flows
task := catbird.NewTask("send-email").
    WithDescription("Send a transactional email to a user").Do(func(ctx context.Context, input string) (string, error) {
        return "sent", nil
    })

flow := catbird.NewFlow("double-add")
flow.AddStep(catbird.NewStep("double").Do(func(ctx context.Context, input int) (int, error) {
    return input * 2, nil
}))
flow.AddStep(catbird.NewStep("add").
    DependsOn("double").Do(func(ctx context.Context, input int, doubled int) (int, error) {
    return doubled + 1, nil
}))

worker := catbird.NewWorker(pool).
    AddTask(task).
    AddFlow(flow)
go worker.Start(ctx)

taskHandle, err := client.RunTask(ctx, "send-email", "hello")
var taskOut string
err = taskHandle.WaitForOutput(ctx, &taskOut)

flowHandle, err := client.RunFlow(ctx, "double-add", 10)
var flowOut int
err = flowHandle.WaitForOutput(ctx, &flowOut)

// Delayed execution
client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{VisibleAt: time.Now().Add(5 * time.Minute)})
client.RunFlow(ctx, "order_processing", map[string]any{"order_id": 123}, catbird.RunFlowOpts{VisibleAt: time.Now().Add(30 * time.Second)})

// Priority (higher = claimed first, default 0)
client.RunTask(ctx, "send-email", email, catbird.RunTaskOpts{Priority: 10})

// Ensure definitions exist before usage; this is not necessary if you
// just want to run a worker, definitions will be created for you.
err := client.CreateTask(ctx, taskA)
err = client.CreateTask(ctx, taskB)
err = client.CreateFlow(ctx, flowA)
err = client.CreateFlow(ctx, flowB)

// Direct package-level usage (no Client), for example in a transaction:
taskHandle, err := catbird.RunTask(ctx, tx, "send-email", "hello")

Deduplication

Catbird uses a ConcurrencyKey to prevent overlapping runs of tasks and flows, and overlapping messages in queues.

// ConcurrencyKey: prevent overlap
_, err := client.RunTask(ctx, "process-user", userID, catbird.RunTaskOpts{
    ConcurrencyKey: fmt.Sprintf("user-%d", userID),
})
Behavior
  • Deduplicates queued and started runs: a new run with the same key is rejected while one is active
  • After completion or failure: the same key can be used again
  • Return value on duplicate: RunTask()/RunFlow() return a handle to the existing run ID
  • Failure retries: allows retries on failed runs
  • No key = no deduplication: if you don't provide a key, duplicates are allowed
  • Queue messages: Use ConcurrencyKey in SendOpts for message deduplication

Topic-Based Routing

err := client.CreateQueue(ctx, "user-events")
err = client.CreateQueue(ctx, "audit-log")

err = client.Bind(ctx, "user-events", "events.user.created")
err = client.Bind(ctx, "user-events", "events.*.updated")
err = client.Bind(ctx, "audit-log", "events.#")

_, err = client.Publish(ctx, "events.user.created", map[string]any{
    "user_id": 123,
    "email":   "user@example.com",
})
_, err = client.Unbind(ctx, "user-events", "events.*.updated")

Wildcard rules:

  • * matches a single token (e.g., events.*.created matches events.user.created)
  • # matches zero or more tokens at the end (e.g., events.user.# matches events.user and events.user.created.v1)
  • # must appear as .# at the end of the pattern, or as # by itself
  • Tokens are separated by . and can contain a-z, A-Z, 0-9, _, -
Event-Triggered Runs

Bind tasks or flows to topic patterns so that publishing a message automatically creates a run.

// Bind a task to a topic pattern
err = client.BindTask(ctx, "send_email", "events.email.*")

// Bind a flow to a topic pattern
err = client.BindFlow(ctx, "order_processing", "events.order.#")

// Publishing now also triggers task/flow runs
_, err = client.Publish(ctx, "events.email.welcome", map[string]any{
    "user_id": 123,
    "email":   "user@example.com",
}, catbird.PublishOpts{
    ConcurrencyKey: "user-123",
})

// Unbind when done
_, err = client.UnbindTask(ctx, "send_email", "events.email.*")
_, err = client.UnbindFlow(ctx, "order_processing", "events.order.#")

Task Execution

// Define task (scheduling is separate)
task := catbird.NewTask("send-email").Do(func(ctx context.Context, input EmailRequest) (EmailResponse, error) {
        return EmailResponse{SentAt: time.Now()}, nil
    },
        catbird.WithConcurrency(5),
        catbird.WithMaxRetries(3),
        catbird.WithFullJitterBackoff(500*time.Millisecond, 10*time.Second),
        catbird.WithCircuitBreaker(5, 30*time.Second),
    )

// Define a task with a condition (skipped when condition is false)
conditionalTask := catbird.NewTask("premium-processing").
    WithCondition("input.is_premium"). // Skipped if is_premium = false
    Do(func(ctx context.Context, input ProcessRequest) (string, error) {
        return "processed", nil
    })

// Create worker (requires *pgxpool.Pool)
worker := catbird.NewWorker(pool).
    WithLogger(slog.Default()).
    WithShutdownTimeout(10 * time.Second).
    AddTask(task).
    AddTask(conditionalTask)
go worker.Start(ctx)

// Run the task
handle, err := client.RunTask(ctx, "send-email", EmailRequest{
    To:      "user@example.com",
    Subject: "Hello",
})

// Get result
var result EmailResponse
err = handle.WaitForOutput(ctx, &result)
OnFail Handlers

On-fail handlers run after a task reaches a failed state (after its own retries). They execute with their own HandlerOpt retry and backoff settings, and receive the original input plus rich failure context.

OnFail semantics (tasks and flows):

  • OnFail runs only after the main task/flow run reaches failed (after normal handler retries are exhausted).
  • OnFail has independent retry/backoff via its own HandlerOpt values.
  • A successful OnFail marks on-fail handling complete, but the original run remains failed.
  • If OnFail retries are exhausted, on-fail handling remains failed and no further retries are scheduled.
task := catbird.NewTask("charge-payment").Do(func(ctx context.Context, input ChargeRequest) (ChargeResult, error) {
        return ChargeResult{}, fmt.Errorf("gateway timeout")
    }).
    OnFail(func(ctx context.Context, input ChargeRequest, failure catbird.TaskFailure) error {
        // Send alert, enqueue compensation, or record audit log.
        return nil
    },
        catbird.WithMaxRetries(3),
        catbird.WithFullJitterBackoff(200*time.Millisecond, 5*time.Second),
    )

Flow Execution

A flow is a directed acyclic graph (DAG) of steps that execute when their dependencies are satisfied.

Summary
  • Steps with no dependencies start immediately; independent branches run in parallel.
  • Flow output is selected by output priority (configured with OutputPriority(...) or inferred from terminal steps).
  • Conditions can skip steps; downstream handlers must accept Optional[T] for any conditional dependency.
  • A step with a signal waits for both its dependencies and the signal input.
  • WaitForOutput() returns the selected flow output once the flow completes.
Examples: Workflows
flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, order Order) (ValidationResult, error) {
    if order.Amount <= 0 {
        return ValidationResult{Valid: false, Reason: "Invalid amount"}, nil
    }
    return ValidationResult{Valid: true}, nil
}))
flow.AddStep(catbird.NewStep("charge").
    DependsOn("validate").Do(func(ctx context.Context, order Order, validated ValidationResult) (ChargeResult, error) {
    if !validated.Valid {
        return ChargeResult{}, fmt.Errorf("cannot charge invalid order")
    }
    return ChargeResult{
        TransactionID: "txn-" + order.ID,
        Amount:        order.Amount,
    }, nil
}))
flow.AddStep(catbird.NewStep("check-inventory").
    DependsOn("validate").Do(func(ctx context.Context, order Order, validated ValidationResult) (InventoryCheck, error) {
    return InventoryCheck{
        InStock: true,
        Qty:     order.Amount,
    }, nil
}))
flow.AddStep(catbird.NewStep("ship").
    DependsOn("charge", "check-inventory").Do(func(ctx context.Context, order Order, chargeResult ChargeResult, inventory InventoryCheck) (ShipmentResult, error) {
    if !inventory.InStock {
        return ShipmentResult{}, fmt.Errorf("out of stock")
    }
    return ShipmentResult{
        TrackingNumber: "TRK-" + chargeResult.TransactionID,
        EstimatedDays:  3,
    }, nil
}))

// Create worker
worker := catbird.NewWorker(pool).
    AddFlow(flow)
go worker.Start(ctx)
Example: Multi-branch Output Ownership

Flows can have multiple terminal steps.

flow := catbird.NewFlow("approval-or-escalation")
flow.OutputPriority("approve", "escalate")

flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, req Request) (Validation, error) {
    return Validation{Score: req.Score}, nil
}))
flow.AddStep(catbird.NewStep("approve").
    DependsOn("validate").
    WithCondition("validate.score gte 80").Do(func(ctx context.Context, req Request, v Validation) (Decision, error) {
    return Decision{Status: "approved"}, nil
}))
flow.AddStep(catbird.NewStep("escalate").
    DependsOn("validate").
    WithCondition("validate.score lt 80").Do(func(ctx context.Context, req Request, v Validation) (Decision, error) {
    return Decision{Status: "escalated"}, nil
}))

If you omit OutputPriority(...), Catbird uses terminal steps in definition order as the default priority.

flow := catbird.NewFlow("default-terminal-priority")
flow.AddStep(catbird.NewStep("a").Do(func(ctx context.Context, in int) (int, error) { return in, nil }))
flow.AddStep(catbird.NewStep("left").DependsOn("a").Do(func(ctx context.Context, in int, a int) (int, error) { return a + 1, nil }))
flow.AddStep(catbird.NewStep("right").DependsOn("a").Do(func(ctx context.Context, in int, a int) (int, error) { return a + 2, nil }))

// Effective priority: left, then right.
OnFail Handlers

On-fail handlers run after a flow reaches a failed state (after its own retries). They execute with their own HandlerOpt retry and backoff settings, and receive the original input plus rich failure context.

flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("charge").Do(func(ctx context.Context, order Order) (string, error) {
    return "", fmt.Errorf("charge failed")
}))
flow.OnFail(func(ctx context.Context, order Order, failure catbird.FlowFailure) error {
    var failedInput Order
    if err := failure.FailedStepInputAs(&failedInput); err == nil {
        // failedInput has the step input that caused the error
    }

    var chargeResult ChargeResult
    if err := failure.OutputAs("charge", &chargeResult); err == nil {
        // access completed step output when available
    }

    return nil
})
Example: Signals & Human-in-the-Loop

Signals enable workflows that wait for external input before proceeding, such as approval workflows or webhooks.

flow := catbird.NewFlow("document_approval")
flow.AddStep(catbird.NewStep("submit").Do(func(ctx context.Context, doc Document) (string, error) {
    return doc.ID, nil
}))
flow.AddStep(catbird.NewStep("approve").
    DependsOn("submit").
    WithSignal().Do(func(ctx context.Context, doc Document, approval ApprovalInput, docID string) (ApprovalResult, error) {
    if !approval.Approved {
        return ApprovalResult{}, fmt.Errorf("approval denied by %s: %s", approval.ApproverID, approval.Notes)
    }
    return ApprovalResult{
        Status:     "approved",
        ApprovedBy: approval.ApproverID,
        Timestamp:  time.Now().Format(time.RFC3339),
    }, nil
}))
flow.AddStep(catbird.NewStep("publish").
    DependsOn("approve").Do(func(ctx context.Context, doc Document, approval ApprovalResult) (PublishResult, error) {
    return PublishResult{
        PublishedAt: time.Now().Format(time.RFC3339),
        URL:         "https://example.com/docs/" + approval.ApprovedBy,
    }, nil
}))

A step with both dependencies and a signal waits for both conditions: all dependencies must complete and the signal must be delivered before the step executes.

Example: Early Completion

Use CompleteEarly(ctx, output, reason) inside a flow step handler when you already have the final business output and want to stop remaining branches.

flow := catbird.NewFlow("fraud-check")
flow.AddStep(catbird.NewStep("quick_guard").Do(func(ctx context.Context, in Order) (string, error) {
    if in.IsKnownSafe {
        return "", catbird.CompleteEarly(ctx, Decision{Approved: true}, "known-safe fast path")
    }
    return "continue", nil
}))
flow.AddStep(catbird.NewStep("slow_analysis").Do(func(ctx context.Context, in Order) (string, error) {
    time.Sleep(2 * time.Second)
    return "done", nil
}))
flow.AddStep(catbird.NewStep("final").
    DependsOn("quick_guard", "slow_analysis").Do(func(ctx context.Context, in Order, guard string, analysis string) (Decision, error) {
    return Decision{Approved: guard == "continue" && analysis == "done"}, nil
}))

When early completion wins the race, the flow run becomes completed with the provided output, and in-flight sibling work is stopped cooperatively.

Map Steps

Map steps fan out array processing into per-item SQL-coordinated work and aggregate results back in item order.

  • Define map steps with AddStep(NewStep("name").MapFlowInput()...) or AddStep(NewStep("name").MapStepOutput("source")...)
  • Use MapFlowInput() to map over flow input (flow input must be a JSON array)
  • Use MapStepOutput("step_name") to map over a dependency step output array
  • Each mapped item runs as its own task, so retries happen per item instead of rerunning the whole step.
  • To fold mapped item outputs without materializing a full []Out, add an explicit reducer step with AddStep(NewStep("name").ReduceStep("mapped_step").Do(fn)).
Map flow input
flow := catbird.NewFlow("double-input")
flow.AddStep(catbird.NewStep("double").
    MapFlowInput().
    Do(func(ctx context.Context, n int) (int, error) {
    return n * 2, nil
}))

handle, _ := client.RunFlow(ctx, "double-input", []int{1, 2, 3})
var out []int
_ = handle.WaitForOutput(ctx, &out)
// out == []int{2, 4, 6}
Map dependency output
flow := catbird.NewFlow("double-numbers")
flow.AddStep(catbird.NewStep("numbers").Do(func(ctx context.Context, _ string) ([]int, error) {
    return []int{1, 2, 3}, nil
}))
flow.AddStep(catbird.NewStep("double").
    MapStepOutput("numbers").
    Do(func(ctx context.Context, _ string, n int) (int, error) {
    return n * 2, nil
}))

// Reduce mapped outputs with an explicit reducer step
flow = catbird.NewFlow("double-numbers-reduced")
flow.AddStep(catbird.NewStep("numbers").Do(func(ctx context.Context, _ string) ([]int, error) {
    return []int{1, 2, 3}, nil
}))
flow.AddStep(catbird.NewStep("double").
    MapStepOutput("numbers").
    Do(func(ctx context.Context, _ string, n int) (int, error) {
    return n * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
    ReduceStep("double").
    Do(func(ctx context.Context, acc int, out int) (int, error) {
    return acc + out, nil
}))
Ignore Output

Use IgnoreOutput("step") when a step must wait for a dependency but doesn't need its output. The dependency's output is never aggregated or transmitted — important for map steps with large fan-outs. The handler omits the parameter for that dependency.

flow.AddStep(catbird.NewStep("finish").
    DependsOn("expensive-map").
    IgnoreOutput("expensive-map").
    Do(func(ctx context.Context, in Input) (string, error) {
    return "done", nil
}))
Generator Steps

Generator steps act like normal flow steps with an extra trailing yield callback for streaming items; yielded items are processed by a per-item handler.

  • Define the step with flow.AddStep(NewStep("name").Generate(...).Do(...))
  • Optionally add DependsOn(...) and/or WithSignal() like a normal step
  • Provide a generator with signature func(context.Context, In[, Signal][, Dep1, Dep2, ...], func(ItemType) error) error
  • Provide an item handler with signature func(context.Context, ItemType) (OutType, error)
  • To fold yielded item outputs, add an explicit reducer step with AddStep(NewStep(...).ReduceStep("generator_step").Do(fn))
  • Generator steps do not support MapFlowInput() or MapStepOutput()
flow := catbird.NewFlow("generate-double-sum")
flow.AddStep(catbird.NewStep("seed").Do(func(ctx context.Context, in int) (int, error) {
    return in, nil
}))
flow.AddStep(catbird.NewStep("generate").
    DependsOn("seed").
    Generate(func(ctx context.Context, in int, seed int, yield func(int) error) error {
    for i := 0; i < seed; i++ {
        if err := yield(i); err != nil {
            return err
        }
    }
    return nil
}).
    Do(func(ctx context.Context, item int) (int, error) {
    return item * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
    DependsOn("generate").Do(func(ctx context.Context, in int, generated []int) (int, error) {
    total := 0
    for _, v := range generated {
        total += v
    }
    return total, nil
}))

handle, _ := client.RunFlow(ctx, "generate-double-sum", 5)
var out int
_ = handle.WaitForOutput(ctx, &out)
// out == 20

Use an explicit reducer step when you want bounded generator output instead of storing all item outputs as []Out:

flow := catbird.NewFlow("generate-double-sum-reduced")
flow.AddStep(catbird.NewStep("generate").
    Generate(func(ctx context.Context, input int, yield func(int) error) error {
    for i := 0; i < input; i++ {
        if err := yield(i); err != nil {
            return err
        }
    }
    return nil
}).
    Do(func(ctx context.Context, item int) (int, error) {
    return item * 2, nil
}))
flow.AddStep(catbird.NewStep("sum").
    ReduceStep("generate").
    Do(func(ctx context.Context, acc int, out int) (int, error) {
    return acc + out, nil
}))

handle, _ := client.RunFlow(ctx, "generate-double-sum-reduced", 5)
var out int
_ = handle.WaitForOutput(ctx, &out)
// out == 20
Status Values
Status Meaning Used by
queued Runnable and never picked up by a worker Task runs, flow step runs, map item runs
waiting_for_dependencies Not runnable yet because one or more dependencies are still incomplete Flow step runs
waiting_for_signal Dependencies are resolved, but required signal input has not been delivered yet Flow step runs
waiting_for_map_tasks Parent map/reducer step is waiting for spawned map item runs to finish Flow step runs
started Picked up by a worker at least once (including retries) Task runs, flow runs, flow step runs, map item runs
completed Finished successfully and output is available Task runs, flow runs, flow step runs, map item runs
failed Finished with an error Task runs, flow runs, flow step runs, map item runs
skipped Intentionally skipped (typically due to a condition evaluating false) Task runs, flow step runs
canceling Cancellation requested; run is transitioning to canceled Task runs, flow runs
canceled Run was canceled before completing normally Task runs, flow runs
Advanced: Step Communication at Runtime
flow := catbird.NewFlow("parallel_watch_flow")
flow.AddStep(catbird.NewStep("long_job").Do(func(ctx context.Context, in Order) (string, error) {
    time.Sleep(500 * time.Millisecond)
    return "job-finished", nil
}))
flow.AddStep(catbird.NewStep("watch_job").Do(func(ctx context.Context, in Order) (string, error) {
    step, err := catbird.WaitForStep(ctx, "long_job", catbird.WaitOpts{PollInterval: 25 * time.Millisecond})
    if err != nil {
        return "", err
    }
    if !step.IsCompleted() {
        return "", fmt.Errorf("long_job ended with status=%s", step.Status)
    }
    return "watcher-confirmed:" + step.Status, nil
}))

This runs long_job and watch_job in parallel. watch_job blocks on WaitForStep(...) until long_job reaches a terminal state, then exits immediately.

flow := catbird.NewFlow("loop_until_peer_done")
flow.AddStep(catbird.NewStep("controller").Do(func(ctx context.Context, in string) (string, error) {
    time.Sleep(2 * time.Second)
    return "stop-now", nil
}))
flow.AddStep(catbird.NewStep("worker_loop").Do(func(ctx context.Context, in string) (string, error) {
    for {
        controller, err := catbird.GetStep(ctx, "controller")
        if err != nil {
            return "", err
        }
        if controller.IsDone() {
            return "loop-stopped:" + controller.Status, nil
        }

        select {
        case <-ctx.Done():
            return "", ctx.Err()
        case <-time.After(100 * time.Millisecond):
        }
    }
}))

This pattern keeps worker_loop alive until controller reaches any terminal state, then exits cleanly.

Scheduling

Tasks and flows can be scheduled with cron expressions using CreateTaskSchedule and CreateFlowSchedule. Schedules are stored in PostgreSQL and polled by workers — no external cron daemon needed.

// Schedule a task
client.CreateTaskSchedule(ctx, "send-email", "@hourly")

// Schedule with static input
client.CreateTaskSchedule(ctx, "send-report", "*/15 * * * *",
    catbird.WithInput(EmailRequest{To: "ops@example.com", Subject: "Report"}),
)

// Schedule a flow
client.CreateFlowSchedule(ctx, "order-processing", "0 2 * * *")

// Skip all missed ticks on recovery (no catch-up runs)
client.CreateTaskSchedule(ctx, "stats", "@hourly", catbird.WithSkipCatchUp())

// Replay every missed tick on recovery
client.CreateTaskSchedule(ctx, "billing", "0 * * * *", catbird.WithCatchUpAll())
Cron Syntax

Standard 5-field cron format: minute hour day-of-month month day-of-week

Field Values Wildcards
Minute 0-59 *, */N, N-M, N-M/S, comma-separated
Hour 0-23 same
Day of month 1-31 same
Month 1-12 same
Day of week 0-6 (0 = Sunday, 7 also accepted as Sunday) same

When both day-of-month and day-of-week are restricted (not *), the date matches if either field matches (standard cron OR semantics).

Shorthand descriptors: @yearly / @annually, @monthly, @weekly, @daily / @midnight, @hourly.

All cron evaluation is in UTC.

Catch-Up Policies

When a worker restarts after downtime, the catch-up policy controls how missed ticks are handled:

Policy Option On recovery (5 missed ticks) Enqueues
skip WithSkipCatchUp() Skip all missed ticks, jump to future 0 runs
one (default) Enqueue one catch-up run (oldest), jump to future 1 run
all WithCatchUpAll() Replay every missed tick, one at a time All runs

Conditional Execution

Both tasks and flow steps support conditional execution via WithCondition on the builder methods. If the condition evaluates to false (or a referenced field is missing), the task/step is marked skipped and its handler does not run.

Rules at a Glance
  • Prefixes: tasks use input.*; flow steps use input.*, step_name.*, or signal.*.
  • Operators: eq, ne, gt, gte, lt, lte, in, exists, contains, plus not <expr>.
  • Optional outputs: if a step can be skipped, downstream handlers must accept Optional[T] for that dependency.
  • Map steps: define with AddStep(NewStep("name")...), then use MapFlowInput() or MapStepOutput("step_name"); map source values must be arrays.
  • No AND/OR: only one expression per task/step; compute a derived field upstream if needed.
Tasks with Conditions

Tasks can use conditions to skip execution based on input fields.

type ProcessRequest struct {
    UserID     int    `json:"user_id"`
    IsPremium  bool   `json:"is_premium"`
    Amount     int    `json:"amount"`
    Environment string `json:"environment"`
}

// Only process premium users
premiumTask := catbird.NewTask("premium_processing").
    WithCondition("input.is_premium"). // Skipped if is_premium = false
    Do(func(ctx context.Context, req ProcessRequest) (string, error) {
        return fmt.Sprintf("Processed premium user %d", req.UserID), nil
    })

// Run task - may be skipped based on input
client.RunTask(ctx, "premium_processing", ProcessRequest{UserID: 123, IsPremium: false})
// This task run will be skipped (is_premium = false)
Flows with Conditions

Flow steps can branch based on prior outputs. Use Optional[T] to handle skipped dependencies.

flow := catbird.NewFlow("payment_processing")
flow.OutputPriority("charge", "free_order")

flow.AddStep(catbird.NewStep("validate").Do(func(ctx context.Context, order Order) (ValidationResult, error) {
    return ValidationResult{Valid: order.Amount > 0}, nil
}))
flow.AddStep(catbird.NewStep("charge").
    DependsOn("validate").
    WithCondition("validate.valid").Do(func(ctx context.Context, order Order, validation ValidationResult) (FinalResult, error) {
    return FinalResult{Status: "charged", TxnID: "txn-123"}, nil
}))
flow.AddStep(catbird.NewStep("free_order").
    DependsOn("validate").
    WithCondition("not validate.valid").Do(func(ctx context.Context, order Order, validation ValidationResult) (FinalResult, error) {
    return FinalResult{Status: "free_order", TxnID: ""}, nil
}))

Resiliency

Catbird includes multiple resiliency layers for runtime failures. Handler-level retries are configured with HandlerOpt values such as WithMaxRetries(...) and WithFullJitterBackoff(...), and external calls can be protected with WithCircuitBreaker(failureThreshold, openTimeout) to avoid cascading outages. In worker database paths, PostgreSQL reads/writes are retried with bounded attempts and full-jitter backoff; retries stop immediately on context cancellation or deadline expiry.

For reducer-step workflows, retries are two-phase: item handlers retry first per item, then reducer-step finalization retries at the reducer step. If retries are exhausted in either phase, the parent step fails and task/flow OnFail handlers run with the same terminal failure semantics as non-reduced steps.

Be aware of side effects

Catbird deduplication (ConcurrencyKey) controls duplicate run creation, while handler retries can still re-attempt the same run after transient failures. For non-repeatable side effects (payments, email, webhooks), use idempotent write patterns or upstream idempotency keys so retry attempts remain safe.

Cancellation

Cancellation semantics:

  • Cancellation is a distinct terminal outcome (canceled), separate from failed and completed.
  • Task cancellation moves queued/started runs directly to canceled; already-terminal task runs are unchanged.
  • Flow cancellation is two-phase: flow goes to canceling, queued child work is canceled, then flow becomes canceled after in-flight started work drains.
  • Cancellation requests are idempotent: repeated requests are successful no-ops.
  • A cancel request does not rewrite an already-terminal run.

External cancellation:

taskHandle, _ := client.RunTask(ctx, "send-email", "hello")
_, _ = client.CancelTaskRun(ctx, "send-email", taskHandle.ID, catbird.CancelOpts{Reason: "operator requested stop"})

flowHandle, _ := client.RunFlow(ctx, "order-processing", map[string]any{"order_id": 123})
_, _ = client.CancelFlowRun(ctx, "order-processing", flowHandle.ID, catbird.CancelOpts{Reason: "customer canceled order"})

Internal cancellation from handlers:

task := catbird.NewTask("validate-order").Do(func(ctx context.Context, input Order) (string, error) {
        if input.Amount <= 0 {
            if err := catbird.Cancel(ctx, catbird.CancelOpts{Reason: "invalid amount"}); err != nil {
                return "", err
            }
            return "", nil
        }
        return "ok", nil
    })

flow := catbird.NewFlow("order-processing")
flow.AddStep(catbird.NewStep("guard").Do(func(ctx context.Context, input Order) (string, error) {
    if input.Amount <= 0 {
        if err := catbird.Cancel(ctx, catbird.CancelOpts{Reason: "invalid amount"}); err != nil {
            return "", err
        }
        return "", nil
    }
    return "proceed", nil
}))

Wire: Real-Time Notifications

Wire is an optional real-time pub/sub layer — use it when you need to push events to browsers or react to notifications server-side. If you're only using queues, tasks, and flows, you can ignore Wire entirely.

Wire provides topic-matched event dispatch with SSE support and presence tracking. Notifications are ephemeral (at-most-once, no storage).

wire := catbird.NewWire(pool, secret)

// Listen: server-side callbacks on topic patterns
wire.Listen("order.*", func(ctx context.Context, topic, message string) {
    log.Println(topic, message)
})

// RenderSSE: transform events for SSE clients (acts as allowlist)
wire.RenderSSE("task.*.completed", func(r *http.Request, topic, message string) (catbird.SSEEvent, error) {
    return catbird.SSEEvent{Event: "task-done", Data: "<div>Task done</div>"}, nil
})

go wire.Start(ctx)

// Notify: local dispatch + pg_notify for cross-node delivery
wire.Notify(ctx, "order.created", `{"id": 123}`)

// SSE: app controls token retrieval
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
    token := r.URL.Query().Get("token")
    wire.ServeSSE(w, r, token)
})
Listen vs RenderSSE
Method Runs on Purpose
wire.Listen Every node Server-side side effects (logging, webhooks)
wire.RenderSSE Node with SSE client Transform for SSE push (e.g., JSON → HTML)

Topics without a renderer pass through as-is. Multiple renderers matching the same topic each produce an SSE event. Render handlers receive the SSE client's *http.Request for access to user context (auth, language, etc).

Notify
// Package-level: explicit Conn, works inside transactions (fires on commit)
catbird.Notify(ctx, tx, "order.progress", `{"step": 1}`)

// Client method: uses client's Conn
client.Notify(ctx, "order.created", `{"id": 123}`)

// Wire method: local dispatch + pg_notify for cross-node
wire.Notify(ctx, "order.created", `{"id": 123}`)
SSEEvent

SSEEvent controls the full SSE output: event name, data, and optional ID. It implements io.Writer, so templates can write directly to it:

// Typed helper — unmarshals JSON, gives full SSEEvent control
catbird.RenderSSE[TaskEvent](wire, "task.*", func(r *http.Request, topic string, data TaskEvent) (catbird.SSEEvent, error) {
    ev := catbird.SSEEvent{Event: "task-update", ID: topic}
    err := views.TaskCompleted(r, data).Render(r.Context(), &ev)
    return ev, err
})
Tokens & SSE

SSE connections are authorized via encrypted tokens that specify which topics the client can subscribe to:

token := wire.Token([]string{"order.*", "task.invoice.#"}, catbird.TokenOpts{
    Identity: "user-123",
    ValidFor: time.Hour,
})
Presence

Track which identities are connected to a topic (across all nodes):

identities, err := wire.Presence(ctx, "dashboard")
Handler Access to Conn

Handlers can access the database connection from context for transactional work:

task := catbird.NewTask("process-order").Do(func(ctx context.Context, input Order) (Result, error) {
    conn, _ := catbird.GetConn(ctx)
    tx, _ := conn.Begin(ctx)
    defer tx.Rollback(ctx)

    catbird.Send(ctx, tx, "audit", map[string]any{"order_id": input.ID})
    catbird.Notify(ctx, tx, "order.progress", `{"step": 1}`)  // fires on commit

    tx.Commit(ctx)
    return Result{}, nil
})

Naming Rules

  • Queue, task, flow, and step names: Lowercase letters, digits, and underscores only (a-z, 0-9, _). Max 58 characters. Step names must be unique within a flow. Reserved step names: input, signal.
  • Topics/Patterns: Letters (upper/lower), digits, dots, underscores, and hyphens (a-z, A-Z, 0-9, ., _, -, plus wildcards *, #).

Query Helpers

Use query builders when you want SQL + args directly (for pgx.Batch or custom execution):

  • SendQuery(queue, body, opts)
  • PublishQuery(topic, body, opts)
  • RunTaskQuery(name, input, opts)
  • RunFlowQuery(name, input, opts)
// Queue into a batch
var batch pgx.Batch
q1, args1, err := catbird.SendQuery("my-queue", map[string]any{"user_id": 123})
if err != nil {
    return err
}
batch.Queue(q1, args1...)

PostgreSQL API Reference

Catbird is built on PostgreSQL functions, so you can use the API directly from any language or tool with PostgreSQL support (psql, Python, Node.js, Ruby, etc.).

For the full SQL function reference and practical SQL examples (queues, tasks, workflows, and run monitoring), see the SQL API reference.

Dashboard

The dashboard provides a web UI for monitoring queues, tasks, flows, and workers. You can run it standalone with the cb CLI or embed it as an http.Handler.

go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb dashboard

The dashboard is a standard http.Handler and can be embedded in any Go web application:

import (
    "log/slog"
    "net/http"

    "github.com/ugent-library/catbird"
    "github.com/ugent-library/catbird/dashboard"
)

func main() {
    client := catbird.New(conn)
    dash := dashboard.New(dashboard.Config{
        Client:     client,
        Log:        slog.Default(), // Optional: provide custom logger
        PathPrefix: "",              // Optional: mount at a subpath (e.g., "/admin")
    })
    http.ListenAndServe(":8080", dash.Handler())
}

Terminal UI

The terminal UI provides an interactive dashboard-like view in your terminal.

go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"
cb ui

You can also start it from the root command using interactive mode:

cb -i

Data Retention

Set a retention period on a task or flow definition to have cb_gc() automatically delete terminal runs older than that duration. GC runs opportunistically from the worker heartbeat and can also be triggered manually via client.GC(ctx) or the standalone purge helpers below.

gcInfo, err := client.GC(ctx)
if err != nil {
    // handle error
}

_ = gcInfo.ExpiredQueuesDeleted
_ = gcInfo.StaleWorkersDeleted
_ = gcInfo.TaskRunsPurged
_ = gcInfo.FlowRunsPurged
task := catbird.NewTask("send-email").
    RetentionPeriod(7 * 24 * time.Hour). // NULL by default = no cleanup
    Do(func(ctx context.Context, in EmailInput) (string, error) {
        return "sent", nil
    })

flow := catbird.NewFlow("order-processing")
flow.RetentionPeriod(90 * 24 * time.Hour)
flow.AddStep(catbird.NewStep("step1").Do(func(ctx context.Context, in OrderInput) (string, error) {
    return "done", nil
}))
  • Task runs cleaned up: completed, failed, skipped, canceled older than the retention period
  • Flow runs cleaned up: completed, failed, canceled older than the retention period; associated step runs and map tasks are removed automatically via cascade
  • Non-terminal rows are never touched: queued, started, waiting_*, canceling are left alone
Purge helpers

For targeted or ad-hoc cleanup independent of the retention period:

// Delete task runs older than 30 days
taskPurged, err := client.PurgeTaskRuns(ctx, "send-email", 30*24*time.Hour)

// Delete flow runs older than 90 days
flowPurged, err := client.PurgeFlowRuns(ctx, "order-processing", 90*24*time.Hour)

_ = taskPurged
_ = flowPurged

External archiving

For SQL-based archiving patterns and example queries, see the External archiving section in the SQL API reference.

Migrations

Standalone CLI

When Catbird owns its own database (or you want to manage its schema outside your application's migration flow), the cb CLI applies the schema directly:

go install github.com/ugent-library/catbird/cmd/cb@latest
export CB_CONN="postgres://user:pass@localhost:5432/mydb?sslmode=disable"

cb migrate up              # apply all pending migrations
cb migrate status          # list applied and pending migrations
cb migrate down --to 1     # roll back down to a target version
Embedding in your own migrations

If your application already uses Goose migrations, you can register Catbird's schema migration as a normal app migration:

Create a migration file such as 00003_add_catbird.go:

package migrations

import (
    "context"
    "database/sql"

    "github.com/pressly/goose/v3"
    "github.com/ugent-library/catbird"
)

func init() {
    goose.AddMigrationNoTxContext(addCatbirdUp, addCatbirdDown)
}

func addCatbirdUp(ctx context.Context, db *sql.DB) error {
    return catbird.MigrateUpTo(ctx, db, 13)
}

func addCatbirdDown(ctx context.Context, db *sql.DB) error {
    return catbird.MigrateDownTo(ctx, db, 0)
}

Keep an explicit pinned version (for example 13) in this migration file. Do not use catbird.SchemaVersion.

Documentation

Acknowledgments

SQL code is taken from or inspired by the excellent pgmq and pgflow projects.

Documentation

Overview

Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.

Index

Constants

View Source
const (
	StatusWaitingForDependencies = "waiting_for_dependencies"
	StatusWaitingForSignal       = "waiting_for_signal"
	StatusWaitingForMapTasks     = "waiting_for_map_tasks"
	StatusQueued                 = "queued"
	StatusStarted                = "started"
	StatusCanceling              = "canceling"
	StatusCompleted              = "completed"
	StatusFailed                 = "failed"
	StatusSkipped                = "skipped"
	StatusCanceled               = "canceled"
	StatusExpired                = "expired"
)
View Source
const (
	CatchUpSkip = "skip" // Skip all missed ticks, jump to future
	CatchUpOne  = "one"  // Enqueue one catch-up run (oldest), jump to future (default)
	CatchUpAll  = "all"  // Replay every missed tick, one at a time
)

Catch-up policy constants control how the scheduler handles missed ticks after downtime.

View Source
const SchemaVersion = 2

Variables

View Source
var (
	// ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run
	ErrRunFailed = fmt.Errorf("catbird: run failed")
	// ErrRunCanceled is returned when you try to wait for output from a canceled task or flow run
	ErrRunCanceled = fmt.Errorf("catbird: run canceled")
	// ErrNotFound is returned when a requested run or resource cannot be found
	ErrNotFound = fmt.Errorf("catbird: not found")
	// ErrNoRunContext is returned when cancellation helpers are called outside handler run context
	ErrNoRunContext = fmt.Errorf("catbird: no run context")
	// ErrUnknownStepOutput is returned when a requested step output is not present in completed outputs.
	ErrUnknownStepOutput = fmt.Errorf("catbird: unknown step output")
	// ErrNoFailedStepInput is returned when failed step input is not available.
	ErrNoFailedStepInput = fmt.Errorf("catbird: failed step input not available")
	// ErrNoFailedStepSignal is returned when failed step signal input is not available.
	ErrNoFailedStepSignal = fmt.Errorf("catbird: failed step signal input not available")
	// ErrNoOutputCandidate is returned when a flow completes without any configured output candidate producing output.
	ErrNoOutputCandidate = fmt.Errorf("catbird: no output candidate produced output")
	// ErrNotDefined is returned when an operation references a queue, task, or flow that has not been created.
	ErrNotDefined = fmt.Errorf("catbird: not defined")
	// ErrRunSkipped is returned when waiting for output from a skipped task or flow run.
	ErrRunSkipped = fmt.Errorf("catbird: run skipped")
	// ErrRunNotCompleted is returned when waiting for output from a run that is still in progress.
	ErrRunNotCompleted = fmt.Errorf("catbird: run not completed")
	// ErrSignalNotDelivered is returned when a signal could not be delivered to a flow step.
	ErrSignalNotDelivered = fmt.Errorf("catbird: signal not delivered")
)

Functions

func Bind

func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error

Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"

func BindFlow added in v0.1.0

func BindFlow(ctx context.Context, conn Conn, flowName string, pattern string) error

BindFlow subscribes a flow to a topic pattern. When a message is published to a matching topic, a flow run is created with the message body as input. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail).

func BindTask added in v0.1.0

func BindTask(ctx context.Context, conn Conn, taskName string, pattern string) error

BindTask subscribes a task to a topic pattern. When a message is published to a matching topic, a task run is created with the message body as input. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail).

func Cancel added in v0.0.8

func Cancel(ctx context.Context, opts ...CancelOpts) error

Cancel requests cancellation for the current run from inside a task or flow handler.

func CancelFlowRun added in v0.0.8

func CancelFlowRun(ctx context.Context, conn Conn, flowName string, runID int64, opts ...CancelOpts) (bool, error)

CancelFlowRun cancels a flow run. Returns true when the run exists (including idempotent no-op), false when it does not exist.

func CancelTaskRun added in v0.0.8

func CancelTaskRun(ctx context.Context, conn Conn, taskName string, runID int64, opts ...CancelOpts) (bool, error)

CancelTaskRun cancels a task run. Returns true when the run exists (including idempotent no-op), false when it does not exist.

func ClearFlowRuns added in v0.0.15

func ClearFlowRuns(ctx context.Context, conn Conn, flowName string) (int, error)

ClearFlowRuns deletes all runs for the given flow regardless of status, including in-progress runs. Step runs and map tasks are deleted via cascade. Use with caution — in-flight work will be lost.

func ClearTaskRuns added in v0.0.15

func ClearTaskRuns(ctx context.Context, conn Conn, taskName string) (int, error)

ClearTaskRuns deletes all runs for the given task regardless of status, including in-progress runs. Use with caution — in-flight work will be lost.

func CompleteEarly added in v0.0.8

func CompleteEarly(ctx context.Context, output any, reason string) error

CompleteEarly requests early completion for the current flow run.

Return this from a flow step handler to signal that the flow should complete early with the provided output and optional reason.

func CreateFlow

func CreateFlow(ctx context.Context, conn Conn, flow *Flow) error

CreateFlow creates a flow definition.

func CreateFlowSchedule added in v0.0.5

func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpt) error

CreateFlowSchedule creates a cron-based schedule for a flow.

func CreateQueue

func CreateQueue(ctx context.Context, conn Conn, queueName string, opts ...QueueOpts) error

CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.

func CreateTask

func CreateTask(ctx context.Context, conn Conn, task *Task) error

CreateTask creates a task definition.

func CreateTaskSchedule added in v0.0.5

func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpt) error

CreateTaskSchedule creates a cron-based schedule for a task.

func Delete

func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)

Delete deletes a single message from the queue. Returns true if the message existed.

func DeleteFlowSchedule added in v0.1.2

func DeleteFlowSchedule(ctx context.Context, conn Conn, flowName string) (bool, error)

DeleteFlowSchedule removes the cron schedule for a flow. It reports whether a schedule existed; deleting a missing schedule is a no-op.

func DeleteMany

func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) ([]int64, error)

DeleteMany deletes multiple messages from the queue. Returns the IDs that were actually deleted.

func DeleteQueue

func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)

DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.

func DeleteTaskSchedule added in v0.1.2

func DeleteTaskSchedule(ctx context.Context, conn Conn, taskName string) (bool, error)

DeleteTaskSchedule removes the cron schedule for a task. It reports whether a schedule existed; deleting a missing schedule is a no-op.

func GetFlowRunID added in v0.1.0

func GetFlowRunID(ctx context.Context) (int64, error)

GetFlowRunID returns the flow run ID from inside a flow step handler. Returns ErrNoRunContext if called outside a flow step handler.

func GetTaskRunID added in v0.1.0

func GetTaskRunID(ctx context.Context) (int64, error)

GetTaskRunID returns the task run ID from inside a task handler. Returns ErrNoRunContext if called outside a task handler.

func Hide

func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)

Hide hides a single message from being read for the specified duration. Returns true if the message existed.

func HideMany

func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)

HideMany hides multiple messages from being read for the specified duration. Returns the IDs that were actually hidden.

func MigrateDownTo

func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error

func MigrateUpTo

func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error

func Notify added in v0.1.0

func Notify(ctx context.Context, conn Conn, topic, message string, opts ...NotifyOpts) error

Notify sends an ephemeral notification via pg NOTIFY. Every Wire instance (on any node) picks it up and delivers to its local subscribers and Listen handlers. Set NotifyOpts.SentBy to skip delivery to the sender.

func Publish added in v0.0.3

func Publish(ctx context.Context, conn Conn, topic string, body any, opts ...PublishOpts) (int, error)

Publish sends a message to topic-subscribed queues with options. Pass no opts to use defaults.

func PublishMany added in v0.0.8

func PublishMany(ctx context.Context, conn Conn, topic string, bodies []any, opts ...PublishManyOpts) (int, error)

PublishMany sends multiple messages to topic-subscribed queues with options. Pass no opts to use defaults.

func PublishManyQuery added in v0.0.8

func PublishManyQuery(topic string, bodies []any, opts ...PublishManyOpts) (string, []any, error)

PublishManyQuery builds the SQL query and args for a PublishMany operation. Pass no opts to use defaults.

func PublishQuery added in v0.0.4

func PublishQuery(topic string, body any, opts ...PublishOpts) (string, []any, error)

PublishQuery builds the SQL query and args for a Publish operation. Pass no opts to use defaults.

func PurgeFlowRuns added in v0.0.8

func PurgeFlowRuns(ctx context.Context, conn Conn, flowName string, olderThan time.Duration) (int, error)

PurgeFlowRuns deletes terminal flow runs (completed, failed, canceled) older than the given duration. Step runs and map tasks are deleted via cascade. Useful for manual cleanup or targeted removal independent of the configured retention period.

func PurgeTaskRuns added in v0.0.8

func PurgeTaskRuns(ctx context.Context, conn Conn, taskName string, olderThan time.Duration) (int, error)

PurgeTaskRuns deletes terminal task runs (completed, failed, skipped, canceled) older than the given duration. Useful for manual cleanup or targeted removal independent of the configured retention period.

func Reader added in v0.1.0

func Reader(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, handler ReaderHandler, opts ...ReadPollOpts) error

Reader continuously reads messages from a queue and processes them with the given handler. Blocks until ctx is cancelled. Mirrors the ReadPoll signature: quantity and hideFor are required, polling behavior is optional.

func RenderSSE added in v0.1.0

func RenderSSE[T any](w *Wire, pattern string, fn func(r *http.Request, topic string, data T) (SSEEvent, error))

RenderSSE registers a typed SSE render handler that unmarshals JSON messages into type T and passes them to fn for full SSEEvent control.

func RunFlowQuery added in v0.0.4

func RunFlowQuery(flowName string, input any, opts ...RunFlowOpts) (string, []any, error)

RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass no opts to use defaults.

func RunTaskQuery added in v0.0.4

func RunTaskQuery(taskName string, input any, opts ...RunTaskOpts) (string, []any, error)

RunTaskQuery builds the SQL query and args for a RunTask operation. Pass no opts to use defaults.

func Send

func Send(ctx context.Context, conn Conn, queueName string, body any, opts ...SendOpts) error

Send enqueues a message to the specified queue. Pass no opts to use defaults.

func SendMany added in v0.0.8

func SendMany(ctx context.Context, conn Conn, queueName string, bodies []any, opts ...SendManyOpts) error

SendMany enqueues multiple messages to the specified queue. Pass no opts to use defaults.

func SendManyQuery added in v0.0.8

func SendManyQuery(queueName string, bodies []any, opts ...SendManyOpts) (string, []any, error)

SendManyQuery builds the SQL query and args for a SendMany operation. Pass no opts to use defaults.

func SendQuery added in v0.0.4

func SendQuery(queueName string, body any, opts ...SendOpts) (string, []any, error)

SendQuery builds the SQL query and args for a Send operation. Pass no opts to use defaults.

func SignalFlow

func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error

SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with `.WithSignal()`. Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.

func Unbind

func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) (bool, error)

Unbind unsubscribes a queue from a topic pattern. Returns true if a binding was removed, false if it was already absent.

func UnbindFlow added in v0.1.0

func UnbindFlow(ctx context.Context, conn Conn, flowName string, pattern string) (bool, error)

UnbindFlow removes a flow trigger binding. Returns true if a binding was removed, false if it was already absent.

func UnbindTask added in v0.1.0

func UnbindTask(ctx context.Context, conn Conn, taskName string, pattern string) (bool, error)

UnbindTask removes a task trigger binding. Returns true if a binding was removed, false if it was already absent.

Types

type BackoffStrategy added in v0.0.3

type BackoffStrategy interface {
	// Validate returns an error if configuration is invalid.
	Validate() error
	// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
	// Implementations should always return a positive duration.
	NextDelay(deliveryCount int) time.Duration
}

BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.

type CancelOpts added in v0.0.8

type CancelOpts struct {
	Reason string
}

CancelOpts configures cancellation behavior and metadata.

type CircuitBreaker added in v0.0.3

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewCircuitBreaker added in v0.0.3

func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker

func (*CircuitBreaker) Allow added in v0.0.3

func (c *CircuitBreaker) Allow(now time.Time) (bool, time.Duration)

func (*CircuitBreaker) RecordFailure added in v0.0.3

func (c *CircuitBreaker) RecordFailure(now time.Time)

func (*CircuitBreaker) RecordSuccess added in v0.0.3

func (c *CircuitBreaker) RecordSuccess()

func (*CircuitBreaker) Validate added in v0.0.3

func (c *CircuitBreaker) Validate() error

type CircuitBreakerStrategy added in v0.0.3

type CircuitBreakerStrategy interface {
	// Validate returns an error if configuration is invalid.
	Validate() error
	// Allow returns whether a call is permitted and how long to wait if not.
	Allow(now time.Time) (bool, time.Duration)
	// RecordSuccess updates breaker state after a successful call.
	RecordSuccess()
	// RecordFailure updates breaker state after a failed call.
	RecordFailure(now time.Time)
}

CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.

type Client

type Client struct {
	Conn Conn
}

Client is a facade for interacting with Catbird

func New

func New(conn Conn) *Client

New creates a new Client with the given database connection.

The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.

func (*Client) Bind

func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error

Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"

func (*Client) BindFlow added in v0.1.0

func (c *Client) BindFlow(ctx context.Context, flowName string, pattern string) error

BindFlow subscribes a flow to a topic pattern. When a message is published to a matching topic, a flow run is created with the message body as input.

func (*Client) BindTask added in v0.1.0

func (c *Client) BindTask(ctx context.Context, taskName string, pattern string) error

BindTask subscribes a task to a topic pattern. When a message is published to a matching topic, a task run is created with the message body as input.

func (*Client) CancelFlowRun added in v0.0.8

func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error)

CancelFlowRun cancels a flow run.

func (*Client) CancelTaskRun added in v0.0.8

func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error)

CancelTaskRun cancels a task run.

func (*Client) ClearFlowRuns added in v0.0.15

func (c *Client) ClearFlowRuns(ctx context.Context, flowName string) (int, error)

ClearFlowRuns deletes all runs for the given flow regardless of status. See ClearFlowRuns for details.

func (*Client) ClearTaskRuns added in v0.0.15

func (c *Client) ClearTaskRuns(ctx context.Context, taskName string) (int, error)

ClearTaskRuns deletes all runs for the given task regardless of status. See ClearTaskRuns for details.

func (*Client) CreateFlow

func (c *Client) CreateFlow(ctx context.Context, flow *Flow) error

CreateFlow creates a flow definition.

func (*Client) CreateFlowSchedule added in v0.0.5

func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error

CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.

func (*Client) CreateQueue

func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error

CreateQueue creates a queue with the given name and optional options.

func (*Client) CreateTask

func (c *Client) CreateTask(ctx context.Context, task *Task) error

CreateTask creates a task definition.

func (*Client) CreateTaskSchedule added in v0.0.5

func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error

CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)

Delete deletes a single message from the queue. Returns true if the message existed.

func (*Client) DeleteFlowSchedule added in v0.1.2

func (c *Client) DeleteFlowSchedule(ctx context.Context, flowName string) (bool, error)

DeleteFlowSchedule removes the cron schedule for a flow. It reports whether a schedule existed; deleting a missing schedule is a no-op.

func (*Client) DeleteMany

func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) ([]int64, error)

DeleteMany deletes multiple messages from the queue.

func (*Client) DeleteQueue

func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)

DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.

func (*Client) DeleteTaskSchedule added in v0.1.2

func (c *Client) DeleteTaskSchedule(ctx context.Context, taskName string) (bool, error)

DeleteTaskSchedule removes the cron schedule for a task. It reports whether a schedule existed; deleting a missing schedule is a no-op.

func (*Client) GC

func (c *Client) GC(ctx context.Context) (*GCInfo, error)

GC runs garbage collection and returns a summary report.

func (*Client) GetFlow

func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)

GetFlow retrieves flow metadata by name.

func (*Client) GetFlowRun

func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)

GetFlowRun retrieves a specific flow run result by ID.

func (*Client) GetFlowRunSteps

func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)

GetFlowRunSteps retrieves all step runs for a specific flow run.

func (*Client) GetQueue

func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)

GetQueue retrieves queue metadata by name.

func (*Client) GetTask

func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)

GetTask retrieves task metadata by name.

func (*Client) GetTaskRun

func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)

GetTaskRun retrieves a specific task run result by ID.

func (*Client) Hide

func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)

Hide hides a single message from being read for the specified duration. Returns true if the message existed.

func (*Client) HideMany

func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)

HideMany hides multiple messages from being read for the specified duration.

func (*Client) ListFlowRuns

func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error)

ListFlowRuns returns recent flow runs for the specified flow.

func (*Client) ListFlowSchedules added in v0.0.5

func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)

ListFlowSchedules returns all flow schedules ordered by next_run_at.

func (*Client) ListFlows

func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)

ListFlows returns all flows

func (*Client) ListQueues

func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)

ListQueues returns all queues

func (*Client) ListTaskRuns

func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error)

ListTaskRuns returns recent task runs for the specified task.

func (*Client) ListTaskSchedules added in v0.0.5

func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)

ListTaskSchedules returns all task schedules ordered by next_run_at.

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)

ListTasks returns all tasks

func (*Client) ListWorkers

func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)

ListWorkers returns all registered workers.

func (*Client) Notify added in v0.1.0

func (c *Client) Notify(ctx context.Context, topic, message string, opts ...NotifyOpts) error

Notify sends an ephemeral notification via pg NOTIFY.

func (*Client) Publish added in v0.0.3

func (c *Client) Publish(ctx context.Context, topic string, body any, opts ...PublishOpts) (int, error)

Publish sends a message to all queues subscribed to the specified topic.

func (*Client) PublishMany added in v0.0.8

func (c *Client) PublishMany(ctx context.Context, topic string, bodies []any, opts ...PublishManyOpts) (int, error)

PublishMany sends multiple messages to all queues subscribed to the specified topic.

func (*Client) PurgeFlowRuns added in v0.0.8

func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error)

PurgeFlowRuns deletes terminal flow runs older than the given duration. See PurgeFlowRuns for details.

func (*Client) PurgeTaskRuns added in v0.0.8

func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error)

PurgeTaskRuns deletes terminal task runs older than the given duration. See PurgeTaskRuns for details.

func (*Client) Read

func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)

Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.

func (*Client) ReadPoll

func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)

ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.

func (*Client) Reader added in v0.1.0

func (c *Client) Reader(ctx context.Context, queueName string, quantity int, hideFor time.Duration, handler ReaderHandler, opts ...ReadPollOpts) error

Reader continuously reads messages from a queue and processes them. Blocks until ctx is cancelled.

func (*Client) RunFlow

func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)

RunFlow enqueues a flow execution and returns a handle for monitoring.

func (*Client) RunTask

func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)

RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.

func (*Client) Send

func (c *Client) Send(ctx context.Context, queueName string, body any, opts ...SendOpts) error

Send enqueues a message to the specified queue.

func (*Client) SendMany added in v0.0.8

func (c *Client) SendMany(ctx context.Context, queueName string, bodies []any, opts ...SendManyOpts) error

SendMany enqueues multiple messages to the specified queue.

func (*Client) SignalFlow

func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error

SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.

func (*Client) Unbind

func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) (bool, error)

Unbind unsubscribes a queue from a topic pattern.

func (*Client) UnbindFlow added in v0.1.0

func (c *Client) UnbindFlow(ctx context.Context, flowName string, pattern string) (bool, error)

UnbindFlow removes a flow trigger binding.

func (*Client) UnbindTask added in v0.1.0

func (c *Client) UnbindTask(ctx context.Context, taskName string, pattern string) (bool, error)

UnbindTask removes a task trigger binding.

type Conn

type Conn interface {
	Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
	Query(context.Context, string, ...any) (pgx.Rows, error)
	QueryRow(context.Context, string, ...any) pgx.Row
}

Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool

func GetConn added in v0.1.0

func GetConn(ctx context.Context) (Conn, error)

GetConn returns the database connection from inside a task or flow handler. This gives handlers access to the full Conn interface, including transactions. Returns ErrNoRunContext if called outside a handler.

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow(name string) *Flow

func (*Flow) AddStep added in v0.0.3

func (f *Flow) AddStep(step *Step) *Flow

func (*Flow) OnFail added in v0.0.8

func (f *Flow) OnFail(fn any, opts ...HandlerOpt) *Flow

OnFail sets a flow failure handler and execution options. fn must have signature (context.Context, In, FlowFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).

func (*Flow) Output added in v0.0.8

func (f *Flow) Output(stepName string) *Flow

func (*Flow) OutputPriority added in v0.0.8

func (f *Flow) OutputPriority(stepNames ...string) *Flow

func (*Flow) RetentionPeriod added in v0.0.8

func (f *Flow) RetentionPeriod(d time.Duration) *Flow

RetentionPeriod sets how long terminal runs (completed, failed, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.

func (*Flow) WithDescription added in v0.0.8

func (f *Flow) WithDescription(description string) *Flow

type FlowFailure added in v0.0.8

type FlowFailure struct {
	FlowName              string          `json:"flow_name"`
	FlowRunID             int64           `json:"flow_run_id"`
	FailedStepName        string          `json:"failed_step_name,omitempty"`
	ErrorMessage          string          `json:"error_message"`
	Attempts              int             `json:"attempts"`
	OnFailAttempts        int             `json:"on_fail_attempts"`
	StartedAt             time.Time       `json:"started_at,omitzero"`
	FailedAt              time.Time       `json:"failed_at,omitzero"`
	ConcurrencyKey        string          `json:"concurrency_key,omitempty"`
	FailedStepInput       json.RawMessage `json:"failed_step_input,omitempty"`
	FailedStepSignalInput json.RawMessage `json:"failed_step_signal_input,omitempty"`
	// contains filtered or unexported fields
}

func (FlowFailure) FailedStepInputAs added in v0.0.8

func (f FlowFailure) FailedStepInputAs(out any) error

func (FlowFailure) FailedStepSignalAs added in v0.0.8

func (f FlowFailure) FailedStepSignalAs(out any) error

func (FlowFailure) Output added in v0.0.8

func (f FlowFailure) Output(ctx context.Context, step string) (json.RawMessage, error)

func (FlowFailure) OutputAs added in v0.0.8

func (f FlowFailure) OutputAs(ctx context.Context, step string, out any) error

type FlowHandle added in v0.0.6

type FlowHandle struct {
	Name string
	ID   int64
	// contains filtered or unexported fields
}

FlowHandle is a handle to a flow execution.

func RunFlow

func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)

RunFlow enqueues a flow execution and returns a handle for monitoring.

func (*FlowHandle) WaitForOutput added in v0.0.6

func (h *FlowHandle) WaitForOutput(ctx context.Context, out any, opts ...WaitOpts) error

WaitForOutput blocks until the flow execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.

type FlowInfo

type FlowInfo struct {
	Name            string        `json:"name"`
	Description     string        `json:"description,omitempty"`
	Steps           []StepInfo    `json:"steps"`
	OutputPriority  []string      `json:"output_priority,omitempty"`
	RetentionPeriod time.Duration `json:"retention_period,omitzero"`
	CreatedAt       time.Time     `json:"created_at"`
}

func GetFlow

func GetFlow(ctx context.Context, conn Conn, flowName string) (*FlowInfo, error)

GetFlow retrieves flow metadata by name.

func ListFlows

func ListFlows(ctx context.Context, conn Conn) ([]*FlowInfo, error)

ListFlows returns all flows

type FlowRunInfo added in v0.0.6

type FlowRunInfo struct {
	ID                int64           `json:"id"`
	Priority          int             `json:"priority"`
	ConcurrencyKey    string          `json:"concurrency_key,omitempty"`
	Status            string          `json:"status"`
	Input             json.RawMessage `json:"input,omitempty"`
	Headers           json.RawMessage `json:"headers,omitempty"`
	Output            json.RawMessage `json:"output,omitempty"`
	ErrorMessage      string          `json:"error_message,omitempty"`
	CancelReason      string          `json:"cancel_reason,omitempty"`
	CancelRequestedAt time.Time       `json:"cancel_requested_at,omitzero"`
	CanceledAt        time.Time       `json:"canceled_at,omitzero"`
	StartedAt         time.Time       `json:"started_at,omitzero"`
	CompletedAt       time.Time       `json:"completed_at,omitzero"`
	FailedAt          time.Time       `json:"failed_at,omitzero"`
}

FlowRunInfo represents the details of a flow execution.

func GetFlowRun

func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*FlowRunInfo, error)

GetFlowRun retrieves a specific flow run result by ID.

func ListFlowRuns

func ListFlowRuns(ctx context.Context, conn Conn, flowName string) ([]*FlowRunInfo, error)

ListFlowRuns returns recent flow runs for the specified flow.

func (*FlowRunInfo) IsCompleted added in v0.0.8

func (r *FlowRunInfo) IsCompleted() bool

IsCompleted reports whether the flow run completed successfully.

func (*FlowRunInfo) IsDone added in v0.0.8

func (r *FlowRunInfo) IsDone() bool

IsDone reports whether the flow run reached a terminal state.

func (*FlowRunInfo) OutputAs added in v0.0.6

func (r *FlowRunInfo) OutputAs(out any) error

OutputAs unmarshals the output of a completed flow run. Returns an error if the flow run has failed or is not completed yet.

type FlowScheduleInfo added in v0.0.5

type FlowScheduleInfo struct {
	FlowName       string    `json:"flow_name"`
	CronSpec       string    `json:"cron_spec"`
	NextRunAt      time.Time `json:"next_run_at"`
	LastRunAt      time.Time `json:"last_run_at,omitzero"`
	LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
	Enabled        bool      `json:"enabled"`
	CatchUp        string    `json:"catch_up"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

FlowScheduleInfo contains metadata about a scheduled flow.

func ListFlowSchedules added in v0.0.5

func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)

ListFlowSchedules returns all flow schedules ordered by next_run_at.

type FullJitterBackoff added in v0.0.3

type FullJitterBackoff struct {
	MinDelay time.Duration
	MaxDelay time.Duration
}

FullJitterBackoff implements exponential backoff with full jitter.

func NewFullJitterBackoff added in v0.0.3

func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff

NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.

func (*FullJitterBackoff) NextDelay added in v0.0.3

func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration

NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.

func (*FullJitterBackoff) Validate added in v0.0.3

func (b *FullJitterBackoff) Validate() error

Validate checks the backoff configuration for consistency.

type GCInfo added in v0.0.8

type GCInfo struct {
	ExpiredQueuesDeleted   int `json:"expired_queues_deleted"`
	ExpiredMessagesDeleted int `json:"expired_messages_deleted"`
	ExpiredTaskRuns        int `json:"expired_task_runs"`
	ExpiredFlowRuns        int `json:"expired_flow_runs"`
	StaleWorkersDeleted    int `json:"stale_workers_deleted"`
	StaleWireNodesDeleted  int `json:"stale_wire_nodes_deleted"`
	TaskRunsPurged         int `json:"task_runs_purged"`
	FlowRunsPurged         int `json:"flow_runs_purged"`
}

GCInfo is the garbage collection report returned by cb_gc().

func GC

func GC(ctx context.Context, conn Conn) (*GCInfo, error)

GC runs garbage collection to clean up expired queues and stale workers. Note: Worker heartbeats automatically perform cleanup, so this is mainly useful for deployments without workers or for manual control.

type HandlerOpt

type HandlerOpt func(*handlerOpts)

func WithBatchSize

func WithBatchSize(batchSize int) HandlerOpt

WithBatchSize sets how many claims are fetched per poll.

func WithCircuitBreaker

func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt

WithCircuitBreaker sets optional circuit breaker strategy.

func WithConcurrency

func WithConcurrency(concurrency int) HandlerOpt

WithConcurrency sets maximum concurrent handler executions.

func WithFullJitterBackoff added in v0.0.8

func WithFullJitterBackoff(minDelay, maxDelay time.Duration) HandlerOpt

WithFullJitterBackoff sets full-jitter retry backoff strategy.

func WithMaxRetries

func WithMaxRetries(maxRetries int) HandlerOpt

WithMaxRetries sets retry attempts for handler failures.

func WithTimeout added in v0.0.8

func WithTimeout(timeout time.Duration) HandlerOpt

WithTimeout sets per-handler execution timeout.

type ListenHandler added in v0.1.0

type ListenHandler = func(ctx context.Context, topic, message string)

ListenHandler is called when a notification matches a registered pattern. Handlers run synchronously in the dispatch goroutine — don't block.

type Message

type Message struct {
	ID             int64           `json:"id"`
	ConcurrencyKey string          `json:"concurrency_key,omitempty"`
	Topic          string          `json:"topic"`
	Body           json.RawMessage `json:"body"`
	Headers        json.RawMessage `json:"headers,omitempty"`
	Priority       int             `json:"priority"`
	Deliveries     int             `json:"deliveries"`
	CreatedAt      time.Time       `json:"created_at"`
	VisibleAt      time.Time       `json:"visible_at"`
	ExpiresAt      time.Time       `json:"expires_at,omitzero"`
}

Message represents a message in a queue

func Read

func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)

Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.

func ReadPoll

func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)

ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached. Pass optional ReadPollOpts to configure polling behavior; defaults are used when omitted.

type MigrationInfo added in v0.1.3

type MigrationInfo struct {
	Version int
	Name    string
	Applied bool
}

MigrationInfo reports a single migration's applied state.

func MigrationStatus added in v0.1.3

func MigrationStatus(ctx context.Context, db *sql.DB) ([]MigrationInfo, error)

MigrationStatus returns the state of every known migration, ordered by version.

type NotifyOpts added in v0.1.0

type NotifyOpts struct {
	// SentBy identifies the sender. Wire instances that match this ID
	// will skip delivery, avoiding echo. Use wire.ID().
	SentBy string
}

NotifyOpts configures notification delivery.

type Optional

type Optional[T any] struct {
	IsSet bool
	Value T
}

Optional wraps a dependency output that may be absent.

type PublishManyOpts added in v0.0.8

type PublishManyOpts struct {
	ConcurrencyKeys []string
	Headers         []map[string]any
	VisibleAt       time.Time
	ExpiresAt       time.Time
	Priority        int
}

type PublishOpts added in v0.0.3

type PublishOpts struct {
	ConcurrencyKey string
	Headers        map[string]any
	VisibleAt      *time.Time
	ExpiresAt      time.Time
	Priority       int
}

type QueueInfo

type QueueInfo struct {
	Name        string    `json:"name"`
	Description string    `json:"description,omitempty"`
	CreatedAt   time.Time `json:"created_at"`
	ExpiresAt   time.Time `json:"expires_at,omitzero"`
}

func GetQueue

func GetQueue(ctx context.Context, conn Conn, queueName string) (*QueueInfo, error)

GetQueue retrieves queue metadata by name.

func ListQueues

func ListQueues(ctx context.Context, conn Conn) ([]*QueueInfo, error)

ListQueues returns all queues

type QueueOpts

type QueueOpts struct {
	ExpiresAt   time.Time
	Description string
}

type ReadPollOpts added in v0.0.6

type ReadPollOpts struct {
	PollFor      time.Duration
	PollInterval time.Duration
}

ReadPollOpts configures ReadPoll polling behavior. Zero values use defaults.

type ReaderHandler added in v0.1.0

type ReaderHandler = func(ctx context.Context, msg Message) error

ReaderHandler processes a queue message. Return nil to ack (delete). Return an error to nack (message becomes visible again after hideFor).

type RunFlowOpts

type RunFlowOpts struct {
	ConcurrencyKey string // Prevents overlapping runs; allows reruns after completion
	Headers        map[string]any
	VisibleAt      time.Time
	ExpiresAt      time.Time
	Priority       int
}

type RunTaskOpts added in v0.0.6

type RunTaskOpts struct {
	ConcurrencyKey string // Prevents overlapping runs; allows reruns after completion
	Headers        map[string]any
	VisibleAt      time.Time
	ExpiresAt      time.Time
	Priority       int
}

type SSEEvent added in v0.1.0

type SSEEvent struct {
	Event string // SSE event name; empty = use original topic
	Data  string // SSE data field
	ID    string // SSE id field; empty = omit
}

SSEEvent represents a fully rendered SSE event ready for client delivery.

func (*SSEEvent) Write added in v0.1.0

func (e *SSEEvent) Write(p []byte) (int, error)

Write implements io.Writer, appending p to the Data field. This allows SSEEvent to be used as a target for io.WriterTo (e.g. Templ components).

type SSERenderHandler added in v0.1.0

type SSERenderHandler = func(r *http.Request, topic, message string) (SSEEvent, error)

SSERenderHandler transforms a Wire event into an SSE event for client delivery. It receives the SSE client's HTTP request for access to user context (auth, language, etc). Only topics with a registered renderer are delivered to SSE clients — the renderer acts as an allowlist.

type ScheduleOpt

type ScheduleOpt func(*scheduleOpts)

ScheduleOpt configures scheduled task/flow behavior.

func WithCatchUpAll added in v0.0.13

func WithCatchUpAll() ScheduleOpt

WithCatchUpAll configures the schedule to replay every missed tick on recovery.

func WithInput

func WithInput(input any) ScheduleOpt

WithInput sets static input body for scheduled task/flow runs.

func WithSkipCatchUp added in v0.0.13

func WithSkipCatchUp() ScheduleOpt

WithSkipCatchUp configures the schedule to skip all missed ticks on recovery.

type SendManyOpts added in v0.0.8

type SendManyOpts struct {
	Topic           string
	ConcurrencyKeys []string
	Headers         []map[string]any
	VisibleAt       time.Time
	ExpiresAt       time.Time
	Priority        int
}

type SendOpts

type SendOpts struct {
	Topic          string
	ConcurrencyKey string
	Headers        map[string]any
	VisibleAt      time.Time
	ExpiresAt      time.Time
	Priority       int
}

type Step

type Step struct {
	// contains filtered or unexported fields
}

func NewStep added in v0.0.3

func NewStep(name string) *Step

func (*Step) DependsOn

func (s *Step) DependsOn(deps ...string) *Step

func (*Step) Do added in v0.0.8

func (s *Step) Do(fn any, opts ...HandlerOpt) *Step

func (*Step) Generate added in v0.0.8

func (s *Step) Generate(fn any) *Step

func (*Step) IgnoreOutput added in v0.0.16

func (s *Step) IgnoreOutput(deps ...string) *Step

func (*Step) MapFlowInput added in v0.0.8

func (s *Step) MapFlowInput() *Step

func (*Step) MapStepOutput added in v0.0.8

func (s *Step) MapStepOutput(stepName string) *Step

func (*Step) ReduceStep added in v0.0.8

func (s *Step) ReduceStep(stepName string) *Step

func (*Step) WithCondition added in v0.0.8

func (s *Step) WithCondition(condition string) *Step

func (*Step) WithDescription added in v0.0.8

func (s *Step) WithDescription(description string) *Step

func (*Step) WithSignal added in v0.0.8

func (s *Step) WithSignal() *Step

type StepDependencyInfo

type StepDependencyInfo struct {
	Name string `json:"name"`
}

type StepHandlerInfo

type StepHandlerInfo struct {
	FlowName string `json:"flow_name"`
	StepName string `json:"step_name"`
}

type StepInfo

type StepInfo struct {
	Name                 string               `json:"name"`
	Description          string               `json:"description,omitempty"`
	StepType             StepType             `json:"step_type,omitempty"`
	MapSourceStepName    string               `json:"map_source_step_name,omitempty"`
	ReduceSourceStepName string               `json:"reduce_source_step_name,omitempty"`
	Signal               bool                 `json:"signal,omitempty"`
	DependsOn            []StepDependencyInfo `json:"depends_on,omitempty"`
}

type StepRunInfo

type StepRunInfo struct {
	ID           int64           `json:"id"`
	Priority     int             `json:"priority"`
	StepName     string          `json:"step_name"`
	Status       string          `json:"status"`
	Attempts     int             `json:"attempts"`
	Output       json.RawMessage `json:"output,omitempty"`
	ErrorMessage string          `json:"error_message,omitempty"`
	CreatedAt    time.Time       `json:"created_at,omitzero"`
	VisibleAt    time.Time       `json:"visible_at,omitzero"`
	StartedAt    time.Time       `json:"started_at,omitzero"`
	CompletedAt  time.Time       `json:"completed_at,omitzero"`
	FailedAt     time.Time       `json:"failed_at,omitzero"`
	SkippedAt    time.Time       `json:"skipped_at,omitzero"`
	CanceledAt   time.Time       `json:"canceled_at,omitzero"`
}

StepRunInfo represents the execution state of a single step within a flow run.

func GetFlowRunSteps

func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)

GetFlowRunSteps retrieves all step runs for a specific flow run.

func GetStep added in v0.0.8

func GetStep(ctx context.Context, stepName string) (*StepRunInfo, error)

GetStep retrieves status details for a step in the current flow run. Intended for use inside flow step handlers.

func WaitForStep added in v0.0.8

func WaitForStep(ctx context.Context, stepName string, opts ...WaitOpts) (*StepRunInfo, error)

WaitForStep blocks until the given step reaches a terminal state in the current flow run. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.

func (*StepRunInfo) IsCompleted added in v0.0.8

func (r *StepRunInfo) IsCompleted() bool

IsCompleted reports whether the step run completed successfully.

func (*StepRunInfo) IsDone added in v0.0.8

func (r *StepRunInfo) IsDone() bool

IsDone reports whether the step run reached a terminal state.

type StepType added in v0.0.8

type StepType string
const (
	StepTypeNormal    StepType = "normal"
	StepTypeMapper    StepType = "mapper"
	StepTypeGenerator StepType = "generator"
	StepTypeReducer   StepType = "reducer"
)

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a reflection-based task with optional handler. Use NewTask().Do(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.

func NewTask

func NewTask(name string) *Task

NewTask creates a new task definition with the given name. Chain .Do() to add a handler, otherwise returns a definition-only task.

func (*Task) Do added in v0.0.8

func (t *Task) Do(fn any, opts ...HandlerOpt) *Task

Do sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).

func (*Task) OnFail added in v0.0.8

func (t *Task) OnFail(fn any, opts ...HandlerOpt) *Task

OnFail sets a task failure handler and execution options. fn must have signature (context.Context, In, TaskFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).

func (*Task) RetentionPeriod added in v0.0.8

func (t *Task) RetentionPeriod(d time.Duration) *Task

RetentionPeriod sets how long terminal runs (completed, failed, skipped, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.

func (*Task) WithCondition added in v0.0.8

func (t *Task) WithCondition(condition string) *Task

WithCondition sets the condition expression for the task.

func (*Task) WithDescription added in v0.0.8

func (t *Task) WithDescription(description string) *Task

WithDescription sets the description for the task definition.

type TaskFailure added in v0.0.8

type TaskFailure struct {
	TaskName       string    `json:"task_name"`
	TaskRunID      int64     `json:"task_run_id"`
	ErrorMessage   string    `json:"error_message"`
	Attempts       int       `json:"attempts"`
	OnFailAttempts int       `json:"on_fail_attempts"`
	StartedAt      time.Time `json:"started_at,omitzero"`
	FailedAt       time.Time `json:"failed_at,omitzero"`
	ConcurrencyKey string    `json:"concurrency_key,omitempty"`
}

type TaskHandle added in v0.0.6

type TaskHandle struct {
	Name string
	ID   int64
	// contains filtered or unexported fields
}

TaskHandle is a handle to a task execution.

func RunTask

func RunTask(ctx context.Context, conn Conn, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)

RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.

func (*TaskHandle) WaitForOutput added in v0.0.6

func (h *TaskHandle) WaitForOutput(ctx context.Context, out any, opts ...WaitOpts) error

WaitForOutput blocks until the task execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.

type TaskHandlerInfo

type TaskHandlerInfo struct {
	TaskName string `json:"task_name"`
}

type TaskInfo

type TaskInfo struct {
	Name            string        `json:"name"`
	Description     string        `json:"description,omitempty"`
	RetentionPeriod time.Duration `json:"retention_period,omitzero"`
	CreatedAt       time.Time     `json:"created_at"`
}

func GetTask

func GetTask(ctx context.Context, conn Conn, taskName string) (*TaskInfo, error)

GetTask retrieves task metadata by name.

func ListTasks

func ListTasks(ctx context.Context, conn Conn) ([]*TaskInfo, error)

ListTasks returns all tasks

type TaskRunInfo added in v0.0.6

type TaskRunInfo struct {
	ID                int64           `json:"id"`
	Priority          int             `json:"priority"`
	ConcurrencyKey    string          `json:"concurrency_key,omitempty"`
	Status            string          `json:"status"`
	Input             json.RawMessage `json:"input,omitempty"`
	Headers           json.RawMessage `json:"headers,omitempty"`
	Output            json.RawMessage `json:"output,omitempty"`
	ErrorMessage      string          `json:"error_message,omitempty"`
	CancelReason      string          `json:"cancel_reason,omitempty"`
	CancelRequestedAt time.Time       `json:"cancel_requested_at,omitzero"`
	CanceledAt        time.Time       `json:"canceled_at,omitzero"`
	StartedAt         time.Time       `json:"started_at,omitzero"`
	CompletedAt       time.Time       `json:"completed_at,omitzero"`
	FailedAt          time.Time       `json:"failed_at,omitzero"`
	SkippedAt         time.Time       `json:"skipped_at,omitzero"`
}

TaskRunInfo represents the details of a task execution.

func GetTaskRun

func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*TaskRunInfo, error)

GetTaskRun retrieves a specific task run result by ID.

func ListTaskRuns

func ListTaskRuns(ctx context.Context, conn Conn, taskName string) ([]*TaskRunInfo, error)

ListTaskRuns returns recent task runs for the specified task.

func (*TaskRunInfo) IsCompleted added in v0.0.8

func (r *TaskRunInfo) IsCompleted() bool

IsCompleted reports whether the task run completed successfully.

func (*TaskRunInfo) IsDone added in v0.0.8

func (r *TaskRunInfo) IsDone() bool

IsDone reports whether the task run reached a terminal state.

func (*TaskRunInfo) OutputAs added in v0.0.6

func (r *TaskRunInfo) OutputAs(out any) error

OutputAs unmarshals the output of a completed task run. Returns an error if the task run has failed or is not completed yet.

type TaskScheduleInfo added in v0.0.5

type TaskScheduleInfo struct {
	TaskName       string    `json:"task_name"`
	CronSpec       string    `json:"cron_spec"`
	NextRunAt      time.Time `json:"next_run_at"`
	LastRunAt      time.Time `json:"last_run_at,omitzero"`
	LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
	Enabled        bool      `json:"enabled"`
	CatchUp        string    `json:"catch_up"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

TaskScheduleInfo contains metadata about a scheduled task.

func ListTaskSchedules added in v0.0.5

func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)

ListTaskSchedules returns all task schedules ordered by next_run_at.

type TokenOpts added in v0.1.0

type TokenOpts struct {
	Identity string
	ValidFor time.Duration
}

TokenOpts configures token minting.

type WaitOpts added in v0.0.6

type WaitOpts struct {
	PollFor      time.Duration
	PollInterval time.Duration
}

WaitOpts configures WaitForOutput polling behavior. Zero values use defaults.

type Wire added in v0.1.0

type Wire struct {
	// contains filtered or unexported fields
}

Wire is a real-time pub/sub layer with SSE support and presence tracking. It absorbs Listener's topic-matched dispatch, adds local delivery to Notify, and serves SSE connections with per-transport rendering. Create with NewWire, configure with builder methods, then call Start.

func NewWire added in v0.1.0

func NewWire(pool *pgxpool.Pool, secret []byte) *Wire

NewWire creates a new Wire instance. The secret must be exactly 32 bytes (AES-256).

func (*Wire) ID added in v0.1.0

func (w *Wire) ID() string

ID returns the unique identifier for this Wire instance. Pass to NotifyOpts.From to skip delivery to this Wire.

func (*Wire) Listen added in v0.1.0

func (w *Wire) Listen(pattern string, fn ListenHandler) *Wire

Listen registers a handler for the given topic pattern. Handlers fire on every node that receives the event (local or cross-node). They're for server-side side effects — logging, webhooks, triggering work. Patterns use the same syntax as Bind: "." separates tokens, "*" matches one token, "#" matches zero or more trailing tokens. Must be called before Start.

func (*Wire) Notify added in v0.1.0

func (w *Wire) Notify(ctx context.Context, topic, message string) error

Notify delivers a notification to Listen handlers and SSE subscribers locally, then fires pg NOTIFY for cross-node delivery. The Wire's own LISTEN loop skips the echo (SentBy is set automatically).

func (*Wire) Presence added in v0.1.0

func (w *Wire) Presence(ctx context.Context, topic string) ([]string, error)

Presence returns the distinct identities connected to a topic across all nodes.

func (*Wire) RenderSSE added in v0.1.0

func (w *Wire) RenderSSE(pattern string, fn SSERenderHandler) *Wire

RenderSSE registers an SSE render handler for the given topic pattern. Multiple renderers matching the same topic each produce an SSE event. Topics without a renderer pass through as-is. Must be called before Start.

func (*Wire) ServeSSE added in v0.1.0

func (w *Wire) ServeSSE(rw http.ResponseWriter, r *http.Request, token string)

ServeSSE serves an SSE connection for the given token string. Invalid or expired tokens result in a 401 response.

func (*Wire) Start added in v0.1.0

func (w *Wire) Start(ctx context.Context) error

Start runs the Wire's background loops: LISTEN relay, heartbeat, and forward bridges. Blocks until ctx is cancelled.

func (*Wire) Token added in v0.1.0

func (w *Wire) Token(topics []string, opts ...TokenOpts) string

Token mints a signed, URL-safe SSE connection token for the given topics. The token is encrypted with AES-256-GCM and encoded as base64url (no padding). Panics if the secret is invalid or the system's random source fails.

func (*Wire) WithLogger added in v0.1.0

func (w *Wire) WithLogger(logger *slog.Logger) *Wire

WithLogger sets the Wire logger.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker processes tasks and flows from the queue

func NewWorker

func NewWorker(pool *pgxpool.Pool) *Worker

NewWorker creates a new worker with the given connection pool. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.

func (*Worker) AddFlow added in v0.0.3

func (w *Worker) AddFlow(f *Flow) *Worker

AddFlow registers a flow with the worker.

func (*Worker) AddTask added in v0.0.3

func (w *Worker) AddTask(t *Task) *Worker

AddTask registers a task with the worker.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start begins processing tasks and flows.

The worker will:

  • poll for new work and execute task and flow step handlers while ctx is active
  • run any configured cron-style task and flow schedules
  • send periodic heartbeats while it is running
  • register built-in garbage collection task running every 5 minutes

Shutdown behaviour:

  • when ctx is cancelled the worker immediately stops reading new work and begins shutting down
  • if WithShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
  • if WithShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish

func (*Worker) WithLogger added in v0.0.10

func (w *Worker) WithLogger(logger *slog.Logger) *Worker

WithLogger sets the worker logger.

func (*Worker) WithShutdownTimeout added in v0.0.10

func (w *Worker) WithShutdownTimeout(timeout time.Duration) *Worker

WithShutdownTimeout sets the graceful shutdown timeout. A value <= 0 disables graceful waiting and cancels handlers immediately.

type WorkerInfo

type WorkerInfo struct {
	ID              string             `json:"id"`
	TaskHandlers    []*TaskHandlerInfo `json:"task_handlers"`
	StepHandlers    []*StepHandlerInfo `json:"step_handlers"`
	StartedAt       time.Time          `json:"started_at"`
	LastHeartbeatAt time.Time          `json:"last_heartbeat_at"`
}

func ListWorkers

func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)

ListWorkers returns all registered workers.

Directories

Path Synopsis
cmd
cb command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL