Documentation
¶
Overview ¶
Package catbird provides a PostgreSQL-based message queue with task and workflow execution engine.
Index ¶
- Constants
- Variables
- func Bind(ctx context.Context, conn Conn, queueName string, pattern string) error
- func BindFlow(ctx context.Context, conn Conn, flowName string, pattern string) error
- func BindTask(ctx context.Context, conn Conn, taskName string, pattern string) error
- func Cancel(ctx context.Context, opts ...CancelOpts) error
- func CancelFlowRun(ctx context.Context, conn Conn, flowName string, runID int64, ...) (bool, error)
- func CancelTaskRun(ctx context.Context, conn Conn, taskName string, runID int64, ...) (bool, error)
- func ClearFlowRuns(ctx context.Context, conn Conn, flowName string) (int, error)
- func ClearTaskRuns(ctx context.Context, conn Conn, taskName string) (int, error)
- func CompleteEarly(ctx context.Context, output any, reason string) error
- func CreateFlow(ctx context.Context, conn Conn, flow *Flow) error
- func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpt) error
- func CreateQueue(ctx context.Context, conn Conn, queueName string, opts ...QueueOpts) error
- func CreateTask(ctx context.Context, conn Conn, task *Task) error
- func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpt) error
- func Delete(ctx context.Context, conn Conn, queueName string, id int64) (bool, error)
- func DeleteFlowSchedule(ctx context.Context, conn Conn, flowName string) (bool, error)
- func DeleteMany(ctx context.Context, conn Conn, queueName string, ids []int64) ([]int64, error)
- func DeleteQueue(ctx context.Context, conn Conn, queueName string) (bool, error)
- func DeleteTaskSchedule(ctx context.Context, conn Conn, taskName string) (bool, error)
- func GetFlowRunID(ctx context.Context) (int64, error)
- func GetTaskRunID(ctx context.Context) (int64, error)
- func Hide(ctx context.Context, conn Conn, queueName string, id int64, ...) (bool, error)
- func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, ...) ([]int64, error)
- func MigrateDownTo(ctx context.Context, db *sql.DB, version int) error
- func MigrateUpTo(ctx context.Context, db *sql.DB, version int) error
- func Notify(ctx context.Context, conn Conn, topic, message string, opts ...NotifyOpts) error
- func Publish(ctx context.Context, conn Conn, topic string, body any, opts ...PublishOpts) (int, error)
- func PublishMany(ctx context.Context, conn Conn, topic string, bodies []any, ...) (int, error)
- func PublishManyQuery(topic string, bodies []any, opts ...PublishManyOpts) (string, []any, error)
- func PublishQuery(topic string, body any, opts ...PublishOpts) (string, []any, error)
- func PurgeFlowRuns(ctx context.Context, conn Conn, flowName string, olderThan time.Duration) (int, error)
- func PurgeTaskRuns(ctx context.Context, conn Conn, taskName string, olderThan time.Duration) (int, error)
- func Reader(ctx context.Context, conn Conn, queueName string, quantity int, ...) error
- func RenderSSE[T any](w *Wire, pattern string, ...)
- func RunFlowQuery(flowName string, input any, opts ...RunFlowOpts) (string, []any, error)
- func RunTaskQuery(taskName string, input any, opts ...RunTaskOpts) (string, []any, error)
- func Send(ctx context.Context, conn Conn, queueName string, body any, opts ...SendOpts) error
- func SendMany(ctx context.Context, conn Conn, queueName string, bodies []any, ...) error
- func SendManyQuery(queueName string, bodies []any, opts ...SendManyOpts) (string, []any, error)
- func SendQuery(queueName string, body any, opts ...SendOpts) (string, []any, error)
- func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, ...) error
- func Unbind(ctx context.Context, conn Conn, queueName string, pattern string) (bool, error)
- func UnbindFlow(ctx context.Context, conn Conn, flowName string, pattern string) (bool, error)
- func UnbindTask(ctx context.Context, conn Conn, taskName string, pattern string) (bool, error)
- type BackoffStrategy
- type CancelOpts
- type CircuitBreaker
- type CircuitBreakerStrategy
- type Client
- func (c *Client) Bind(ctx context.Context, queueName string, pattern string) error
- func (c *Client) BindFlow(ctx context.Context, flowName string, pattern string) error
- func (c *Client) BindTask(ctx context.Context, taskName string, pattern string) error
- func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error)
- func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error)
- func (c *Client) ClearFlowRuns(ctx context.Context, flowName string) (int, error)
- func (c *Client) ClearTaskRuns(ctx context.Context, taskName string) (int, error)
- func (c *Client) CreateFlow(ctx context.Context, flow *Flow) error
- func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error
- func (c *Client) CreateQueue(ctx context.Context, queueName string, opts ...QueueOpts) error
- func (c *Client) CreateTask(ctx context.Context, task *Task) error
- func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error
- func (c *Client) Delete(ctx context.Context, queueName string, id int64) (bool, error)
- func (c *Client) DeleteFlowSchedule(ctx context.Context, flowName string) (bool, error)
- func (c *Client) DeleteMany(ctx context.Context, queueName string, ids []int64) ([]int64, error)
- func (c *Client) DeleteQueue(ctx context.Context, queueName string) (bool, error)
- func (c *Client) DeleteTaskSchedule(ctx context.Context, taskName string) (bool, error)
- func (c *Client) GC(ctx context.Context) (*GCInfo, error)
- func (c *Client) GetFlow(ctx context.Context, flowName string) (*FlowInfo, error)
- func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
- func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
- func (c *Client) GetQueue(ctx context.Context, queueName string) (*QueueInfo, error)
- func (c *Client) GetTask(ctx context.Context, taskName string) (*TaskInfo, error)
- func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
- func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
- func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
- func (c *Client) ListFlowRuns(ctx context.Context, flowName string) ([]*FlowRunInfo, error)
- func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
- func (c *Client) ListFlows(ctx context.Context) ([]*FlowInfo, error)
- func (c *Client) ListQueues(ctx context.Context) ([]*QueueInfo, error)
- func (c *Client) ListTaskRuns(ctx context.Context, taskName string) ([]*TaskRunInfo, error)
- func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
- func (c *Client) ListTasks(ctx context.Context) ([]*TaskInfo, error)
- func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
- func (c *Client) Notify(ctx context.Context, topic, message string, opts ...NotifyOpts) error
- func (c *Client) Publish(ctx context.Context, topic string, body any, opts ...PublishOpts) (int, error)
- func (c *Client) PublishMany(ctx context.Context, topic string, bodies []any, opts ...PublishManyOpts) (int, error)
- func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error)
- func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error)
- func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
- func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, ...) ([]Message, error)
- func (c *Client) Reader(ctx context.Context, queueName string, quantity int, hideFor time.Duration, ...) error
- func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
- func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
- func (c *Client) Send(ctx context.Context, queueName string, body any, opts ...SendOpts) error
- func (c *Client) SendMany(ctx context.Context, queueName string, bodies []any, opts ...SendManyOpts) error
- func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, ...) error
- func (c *Client) Unbind(ctx context.Context, queueName string, pattern string) (bool, error)
- func (c *Client) UnbindFlow(ctx context.Context, flowName string, pattern string) (bool, error)
- func (c *Client) UnbindTask(ctx context.Context, taskName string, pattern string) (bool, error)
- type Conn
- type Flow
- func (f *Flow) AddStep(step *Step) *Flow
- func (f *Flow) OnFail(fn any, opts ...HandlerOpt) *Flow
- func (f *Flow) Output(stepName string) *Flow
- func (f *Flow) OutputPriority(stepNames ...string) *Flow
- func (f *Flow) RetentionPeriod(d time.Duration) *Flow
- func (f *Flow) WithDescription(description string) *Flow
- type FlowFailure
- type FlowHandle
- type FlowInfo
- type FlowRunInfo
- type FlowScheduleInfo
- type FullJitterBackoff
- type GCInfo
- type HandlerOpt
- func WithBatchSize(batchSize int) HandlerOpt
- func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
- func WithConcurrency(concurrency int) HandlerOpt
- func WithFullJitterBackoff(minDelay, maxDelay time.Duration) HandlerOpt
- func WithMaxRetries(maxRetries int) HandlerOpt
- func WithTimeout(timeout time.Duration) HandlerOpt
- type ListenHandler
- type Message
- type MigrationInfo
- type NotifyOpts
- type Optional
- type PublishManyOpts
- type PublishOpts
- type QueueInfo
- type QueueOpts
- type ReadPollOpts
- type ReaderHandler
- type RunFlowOpts
- type RunTaskOpts
- type SSEEvent
- type SSERenderHandler
- type ScheduleOpt
- type SendManyOpts
- type SendOpts
- type Step
- func (s *Step) DependsOn(deps ...string) *Step
- func (s *Step) Do(fn any, opts ...HandlerOpt) *Step
- func (s *Step) Generate(fn any) *Step
- func (s *Step) IgnoreOutput(deps ...string) *Step
- func (s *Step) MapFlowInput() *Step
- func (s *Step) MapStepOutput(stepName string) *Step
- func (s *Step) ReduceStep(stepName string) *Step
- func (s *Step) WithCondition(condition string) *Step
- func (s *Step) WithDescription(description string) *Step
- func (s *Step) WithSignal() *Step
- type StepDependencyInfo
- type StepHandlerInfo
- type StepInfo
- type StepRunInfo
- type StepType
- type Task
- type TaskFailure
- type TaskHandle
- type TaskHandlerInfo
- type TaskInfo
- type TaskRunInfo
- type TaskScheduleInfo
- type TokenOpts
- type WaitOpts
- type Wire
- func (w *Wire) ID() string
- func (w *Wire) Listen(pattern string, fn ListenHandler) *Wire
- func (w *Wire) Notify(ctx context.Context, topic, message string) error
- func (w *Wire) Presence(ctx context.Context, topic string) ([]string, error)
- func (w *Wire) RenderSSE(pattern string, fn SSERenderHandler) *Wire
- func (w *Wire) ServeSSE(rw http.ResponseWriter, r *http.Request, token string)
- func (w *Wire) Start(ctx context.Context) error
- func (w *Wire) Token(topics []string, opts ...TokenOpts) string
- func (w *Wire) WithLogger(logger *slog.Logger) *Wire
- type Worker
- type WorkerInfo
Constants ¶
const ( StatusWaitingForDependencies = "waiting_for_dependencies" StatusWaitingForSignal = "waiting_for_signal" StatusWaitingForMapTasks = "waiting_for_map_tasks" StatusQueued = "queued" StatusStarted = "started" StatusCanceling = "canceling" StatusCompleted = "completed" StatusFailed = "failed" StatusSkipped = "skipped" StatusCanceled = "canceled" StatusExpired = "expired" )
const ( CatchUpSkip = "skip" // Skip all missed ticks, jump to future CatchUpOne = "one" // Enqueue one catch-up run (oldest), jump to future (default) CatchUpAll = "all" // Replay every missed tick, one at a time )
Catch-up policy constants control how the scheduler handles missed ticks after downtime.
const SchemaVersion = 2
Variables ¶
var ( // ErrRunFailed is returned when you try to unmarshal the output of a failed task or flow run ErrRunFailed = fmt.Errorf("catbird: run failed") // ErrRunCanceled is returned when you try to wait for output from a canceled task or flow run ErrRunCanceled = fmt.Errorf("catbird: run canceled") // ErrNotFound is returned when a requested run or resource cannot be found ErrNotFound = fmt.Errorf("catbird: not found") // ErrNoRunContext is returned when cancellation helpers are called outside handler run context ErrNoRunContext = fmt.Errorf("catbird: no run context") // ErrUnknownStepOutput is returned when a requested step output is not present in completed outputs. ErrUnknownStepOutput = fmt.Errorf("catbird: unknown step output") // ErrNoFailedStepInput is returned when failed step input is not available. ErrNoFailedStepInput = fmt.Errorf("catbird: failed step input not available") // ErrNoFailedStepSignal is returned when failed step signal input is not available. ErrNoFailedStepSignal = fmt.Errorf("catbird: failed step signal input not available") // ErrNoOutputCandidate is returned when a flow completes without any configured output candidate producing output. ErrNoOutputCandidate = fmt.Errorf("catbird: no output candidate produced output") // ErrNotDefined is returned when an operation references a queue, task, or flow that has not been created. ErrNotDefined = fmt.Errorf("catbird: not defined") // ErrRunSkipped is returned when waiting for output from a skipped task or flow run. ErrRunSkipped = fmt.Errorf("catbird: run skipped") // ErrRunNotCompleted is returned when waiting for output from a run that is still in progress. ErrRunNotCompleted = fmt.Errorf("catbird: run not completed") // ErrSignalNotDelivered is returned when a signal could not be delivered to a flow step. ErrSignalNotDelivered = fmt.Errorf("catbird: signal not delivered") )
Functions ¶
func Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"
func BindFlow ¶ added in v0.1.0
BindFlow subscribes a flow to a topic pattern. When a message is published to a matching topic, a flow run is created with the message body as input. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail).
func BindTask ¶ added in v0.1.0
BindTask subscribes a task to a topic pattern. When a message is published to a matching topic, a task run is created with the message body as input. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail).
func Cancel ¶ added in v0.0.8
func Cancel(ctx context.Context, opts ...CancelOpts) error
Cancel requests cancellation for the current run from inside a task or flow handler.
func CancelFlowRun ¶ added in v0.0.8
func CancelFlowRun(ctx context.Context, conn Conn, flowName string, runID int64, opts ...CancelOpts) (bool, error)
CancelFlowRun cancels a flow run. Returns true when the run exists (including idempotent no-op), false when it does not exist.
func CancelTaskRun ¶ added in v0.0.8
func CancelTaskRun(ctx context.Context, conn Conn, taskName string, runID int64, opts ...CancelOpts) (bool, error)
CancelTaskRun cancels a task run. Returns true when the run exists (including idempotent no-op), false when it does not exist.
func ClearFlowRuns ¶ added in v0.0.15
ClearFlowRuns deletes all runs for the given flow regardless of status, including in-progress runs. Step runs and map tasks are deleted via cascade. Use with caution — in-flight work will be lost.
func ClearTaskRuns ¶ added in v0.0.15
ClearTaskRuns deletes all runs for the given task regardless of status, including in-progress runs. Use with caution — in-flight work will be lost.
func CompleteEarly ¶ added in v0.0.8
CompleteEarly requests early completion for the current flow run.
Return this from a flow step handler to signal that the flow should complete early with the provided output and optional reason.
func CreateFlow ¶
CreateFlow creates a flow definition.
func CreateFlowSchedule ¶ added in v0.0.5
func CreateFlowSchedule(ctx context.Context, conn Conn, flowName, cronSpec string, opts ...ScheduleOpt) error
CreateFlowSchedule creates a cron-based schedule for a flow.
func CreateQueue ¶
CreateQueue creates a queue with the given name and optional options. Use Bind() separately to create topic bindings.
func CreateTask ¶
CreateTask creates a task definition.
func CreateTaskSchedule ¶ added in v0.0.5
func CreateTaskSchedule(ctx context.Context, conn Conn, taskName, cronSpec string, opts ...ScheduleOpt) error
CreateTaskSchedule creates a cron-based schedule for a task.
func DeleteFlowSchedule ¶ added in v0.1.2
DeleteFlowSchedule removes the cron schedule for a flow. It reports whether a schedule existed; deleting a missing schedule is a no-op.
func DeleteMany ¶
DeleteMany deletes multiple messages from the queue. Returns the IDs that were actually deleted.
func DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func DeleteTaskSchedule ¶ added in v0.1.2
DeleteTaskSchedule removes the cron schedule for a task. It reports whether a schedule existed; deleting a missing schedule is a no-op.
func GetFlowRunID ¶ added in v0.1.0
GetFlowRunID returns the flow run ID from inside a flow step handler. Returns ErrNoRunContext if called outside a flow step handler.
func GetTaskRunID ¶ added in v0.1.0
GetTaskRunID returns the task run ID from inside a task handler. Returns ErrNoRunContext if called outside a task handler.
func Hide ¶
func Hide(ctx context.Context, conn Conn, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func HideMany ¶
func HideMany(ctx context.Context, conn Conn, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
HideMany hides multiple messages from being read for the specified duration. Returns the IDs that were actually hidden.
func Notify ¶ added in v0.1.0
Notify sends an ephemeral notification via pg NOTIFY. Every Wire instance (on any node) picks it up and delivers to its local subscribers and Listen handlers. Set NotifyOpts.SentBy to skip delivery to the sender.
func Publish ¶ added in v0.0.3
func Publish(ctx context.Context, conn Conn, topic string, body any, opts ...PublishOpts) (int, error)
Publish sends a message to topic-subscribed queues with options. Pass no opts to use defaults.
func PublishMany ¶ added in v0.0.8
func PublishMany(ctx context.Context, conn Conn, topic string, bodies []any, opts ...PublishManyOpts) (int, error)
PublishMany sends multiple messages to topic-subscribed queues with options. Pass no opts to use defaults.
func PublishManyQuery ¶ added in v0.0.8
PublishManyQuery builds the SQL query and args for a PublishMany operation. Pass no opts to use defaults.
func PublishQuery ¶ added in v0.0.4
PublishQuery builds the SQL query and args for a Publish operation. Pass no opts to use defaults.
func PurgeFlowRuns ¶ added in v0.0.8
func PurgeFlowRuns(ctx context.Context, conn Conn, flowName string, olderThan time.Duration) (int, error)
PurgeFlowRuns deletes terminal flow runs (completed, failed, canceled) older than the given duration. Step runs and map tasks are deleted via cascade. Useful for manual cleanup or targeted removal independent of the configured retention period.
func PurgeTaskRuns ¶ added in v0.0.8
func PurgeTaskRuns(ctx context.Context, conn Conn, taskName string, olderThan time.Duration) (int, error)
PurgeTaskRuns deletes terminal task runs (completed, failed, skipped, canceled) older than the given duration. Useful for manual cleanup or targeted removal independent of the configured retention period.
func Reader ¶ added in v0.1.0
func Reader(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, handler ReaderHandler, opts ...ReadPollOpts) error
Reader continuously reads messages from a queue and processes them with the given handler. Blocks until ctx is cancelled. Mirrors the ReadPoll signature: quantity and hideFor are required, polling behavior is optional.
func RenderSSE ¶ added in v0.1.0
func RenderSSE[T any](w *Wire, pattern string, fn func(r *http.Request, topic string, data T) (SSEEvent, error))
RenderSSE registers a typed SSE render handler that unmarshals JSON messages into type T and passes them to fn for full SSEEvent control.
func RunFlowQuery ¶ added in v0.0.4
RunFlowQuery builds the SQL query and args for a RunFlow operation. Pass no opts to use defaults.
func RunTaskQuery ¶ added in v0.0.4
RunTaskQuery builds the SQL query and args for a RunTask operation. Pass no opts to use defaults.
func SendMany ¶ added in v0.0.8
func SendMany(ctx context.Context, conn Conn, queueName string, bodies []any, opts ...SendManyOpts) error
SendMany enqueues multiple messages to the specified queue. Pass no opts to use defaults.
func SendManyQuery ¶ added in v0.0.8
SendManyQuery builds the SQL query and args for a SendMany operation. Pass no opts to use defaults.
func SendQuery ¶ added in v0.0.4
SendQuery builds the SQL query and args for a Send operation. Pass no opts to use defaults.
func SignalFlow ¶
func SignalFlow(ctx context.Context, conn Conn, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with `.WithSignal()`. Signals enable human-in-the-loop workflows where a step waits for external input before executing. Returns an error if the signal was already delivered or the step doesn't require a signal.
func Unbind ¶
Unbind unsubscribes a queue from a topic pattern. Returns true if a binding was removed, false if it was already absent.
func UnbindFlow ¶ added in v0.1.0
UnbindFlow removes a flow trigger binding. Returns true if a binding was removed, false if it was already absent.
Types ¶
type BackoffStrategy ¶ added in v0.0.3
type BackoffStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// NextDelay returns a delay for a zero-based delivery count (first retry = 0).
// Implementations should always return a positive duration.
NextDelay(deliveryCount int) time.Duration
}
BackoffStrategy defines how retry delays are calculated based on delivery count. Implementations must be safe for concurrent use.
type CancelOpts ¶ added in v0.0.8
type CancelOpts struct {
Reason string
}
CancelOpts configures cancellation behavior and metadata.
type CircuitBreaker ¶ added in v0.0.3
type CircuitBreaker struct {
// contains filtered or unexported fields
}
func NewCircuitBreaker ¶ added in v0.0.3
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
func (*CircuitBreaker) RecordFailure ¶ added in v0.0.3
func (c *CircuitBreaker) RecordFailure(now time.Time)
func (*CircuitBreaker) RecordSuccess ¶ added in v0.0.3
func (c *CircuitBreaker) RecordSuccess()
func (*CircuitBreaker) Validate ¶ added in v0.0.3
func (c *CircuitBreaker) Validate() error
type CircuitBreakerStrategy ¶ added in v0.0.3
type CircuitBreakerStrategy interface {
// Validate returns an error if configuration is invalid.
Validate() error
// Allow returns whether a call is permitted and how long to wait if not.
Allow(now time.Time) (bool, time.Duration)
// RecordSuccess updates breaker state after a successful call.
RecordSuccess()
// RecordFailure updates breaker state after a failed call.
RecordFailure(now time.Time)
}
CircuitBreakerStrategy defines the interface for circuit breaker behavior. Implementations must be safe for concurrent use.
type Client ¶
type Client struct {
Conn Conn
}
Client is a facade for interacting with Catbird
func New ¶
New creates a new Client with the given database connection.
The connection can be a *pgxpool.Pool, *pgx.Conn, or pgx.Tx.
func (*Client) Bind ¶
Bind subscribes a queue to a topic pattern. Pattern supports exact topics and wildcards: * (single token), # (multi-token tail). Examples: "foo.bar", "foo.*.bar", "foo.bar.#"
func (*Client) BindFlow ¶ added in v0.1.0
BindFlow subscribes a flow to a topic pattern. When a message is published to a matching topic, a flow run is created with the message body as input.
func (*Client) BindTask ¶ added in v0.1.0
BindTask subscribes a task to a topic pattern. When a message is published to a matching topic, a task run is created with the message body as input.
func (*Client) CancelFlowRun ¶ added in v0.0.8
func (c *Client) CancelFlowRun(ctx context.Context, flowName string, runID int64, opts ...CancelOpts) (bool, error)
CancelFlowRun cancels a flow run.
func (*Client) CancelTaskRun ¶ added in v0.0.8
func (c *Client) CancelTaskRun(ctx context.Context, taskName string, runID int64, opts ...CancelOpts) (bool, error)
CancelTaskRun cancels a task run.
func (*Client) ClearFlowRuns ¶ added in v0.0.15
ClearFlowRuns deletes all runs for the given flow regardless of status. See ClearFlowRuns for details.
func (*Client) ClearTaskRuns ¶ added in v0.0.15
ClearTaskRuns deletes all runs for the given task regardless of status. See ClearTaskRuns for details.
func (*Client) CreateFlow ¶
CreateFlow creates a flow definition.
func (*Client) CreateFlowSchedule ¶ added in v0.0.5
func (c *Client) CreateFlowSchedule(ctx context.Context, flowName, cronSpec string, opts ...ScheduleOpt) error
CreateFlowSchedule creates a cron-based schedule for a flow. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.
func (*Client) CreateQueue ¶
CreateQueue creates a queue with the given name and optional options.
func (*Client) CreateTask ¶
CreateTask creates a task definition.
func (*Client) CreateTaskSchedule ¶ added in v0.0.5
func (c *Client) CreateTaskSchedule(ctx context.Context, taskName, cronSpec string, opts ...ScheduleOpt) error
CreateTaskSchedule creates a cron-based schedule for a task. cronSpec should be in 5-field format (min hour day month dow) or descriptors like @hourly, @daily. opts are optional ScheduleOpt values configuring the schedule.
func (*Client) Delete ¶
Delete deletes a single message from the queue. Returns true if the message existed.
func (*Client) DeleteFlowSchedule ¶ added in v0.1.2
DeleteFlowSchedule removes the cron schedule for a flow. It reports whether a schedule existed; deleting a missing schedule is a no-op.
func (*Client) DeleteMany ¶
DeleteMany deletes multiple messages from the queue.
func (*Client) DeleteQueue ¶
DeleteQueue deletes a queue and all its messages. Returns true if the queue existed.
func (*Client) DeleteTaskSchedule ¶ added in v0.1.2
DeleteTaskSchedule removes the cron schedule for a task. It reports whether a schedule existed; deleting a missing schedule is a no-op.
func (*Client) GetFlowRun ¶
func (c *Client) GetFlowRun(ctx context.Context, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func (*Client) GetFlowRunSteps ¶
func (c *Client) GetFlowRunSteps(ctx context.Context, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func (*Client) GetTaskRun ¶
func (c *Client) GetTaskRun(ctx context.Context, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func (*Client) Hide ¶
func (c *Client) Hide(ctx context.Context, queueName string, id int64, hideFor time.Duration) (bool, error)
Hide hides a single message from being read for the specified duration. Returns true if the message existed.
func (*Client) HideMany ¶
func (c *Client) HideMany(ctx context.Context, queueName string, ids []int64, hideFor time.Duration) ([]int64, error)
HideMany hides multiple messages from being read for the specified duration.
func (*Client) ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*Client) ListFlowSchedules ¶ added in v0.0.5
func (c *Client) ListFlowSchedules(ctx context.Context) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
func (*Client) ListQueues ¶
ListQueues returns all queues
func (*Client) ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*Client) ListTaskSchedules ¶ added in v0.0.5
func (c *Client) ListTaskSchedules(ctx context.Context) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
func (*Client) ListWorkers ¶
func (c *Client) ListWorkers(ctx context.Context) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.
func (*Client) Publish ¶ added in v0.0.3
func (c *Client) Publish(ctx context.Context, topic string, body any, opts ...PublishOpts) (int, error)
Publish sends a message to all queues subscribed to the specified topic.
func (*Client) PublishMany ¶ added in v0.0.8
func (c *Client) PublishMany(ctx context.Context, topic string, bodies []any, opts ...PublishManyOpts) (int, error)
PublishMany sends multiple messages to all queues subscribed to the specified topic.
func (*Client) PurgeFlowRuns ¶ added in v0.0.8
func (c *Client) PurgeFlowRuns(ctx context.Context, flowName string, olderThan time.Duration) (int, error)
PurgeFlowRuns deletes terminal flow runs older than the given duration. See PurgeFlowRuns for details.
func (*Client) PurgeTaskRuns ¶ added in v0.0.8
func (c *Client) PurgeTaskRuns(ctx context.Context, taskName string, olderThan time.Duration) (int, error)
PurgeTaskRuns deletes terminal task runs older than the given duration. See PurgeTaskRuns for details.
func (*Client) Read ¶
func (c *Client) Read(ctx context.Context, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func (*Client) ReadPoll ¶
func (c *Client) ReadPoll(ctx context.Context, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached.
func (*Client) Reader ¶ added in v0.1.0
func (c *Client) Reader(ctx context.Context, queueName string, quantity int, hideFor time.Duration, handler ReaderHandler, opts ...ReadPollOpts) error
Reader continuously reads messages from a queue and processes them. Blocks until ctx is cancelled.
func (*Client) RunFlow ¶
func (c *Client) RunFlow(ctx context.Context, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*Client) RunTask ¶
func (c *Client) RunTask(ctx context.Context, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*Client) SendMany ¶ added in v0.0.8
func (c *Client) SendMany(ctx context.Context, queueName string, bodies []any, opts ...SendManyOpts) error
SendMany enqueues multiple messages to the specified queue.
func (*Client) SignalFlow ¶
func (c *Client) SignalFlow(ctx context.Context, flowName string, flowRunID int64, stepName string, input any) error
SignalFlow delivers a signal to a waiting step in a flow run. The step must have been defined with a signal variant (e.g., NewStepWithSignal). Returns an error if the signal was already delivered or the step doesn't require a signal.
func (*Client) UnbindFlow ¶ added in v0.1.0
UnbindFlow removes a flow trigger binding.
type Conn ¶
type Conn interface {
Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
Query(context.Context, string, ...any) (pgx.Rows, error)
QueryRow(context.Context, string, ...any) pgx.Row
}
Conn is an interface for database connections compatible with pgx.Conn and pgx.Pool
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
func (*Flow) OnFail ¶ added in v0.0.8
func (f *Flow) OnFail(fn any, opts ...HandlerOpt) *Flow
OnFail sets a flow failure handler and execution options. fn must have signature (context.Context, In, FlowFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Flow) OutputPriority ¶ added in v0.0.8
func (*Flow) RetentionPeriod ¶ added in v0.0.8
RetentionPeriod sets how long terminal runs (completed, failed, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.
func (*Flow) WithDescription ¶ added in v0.0.8
type FlowFailure ¶ added in v0.0.8
type FlowFailure struct {
FlowName string `json:"flow_name"`
FlowRunID int64 `json:"flow_run_id"`
FailedStepName string `json:"failed_step_name,omitempty"`
ErrorMessage string `json:"error_message"`
Attempts int `json:"attempts"`
OnFailAttempts int `json:"on_fail_attempts"`
StartedAt time.Time `json:"started_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
FailedStepInput json.RawMessage `json:"failed_step_input,omitempty"`
FailedStepSignalInput json.RawMessage `json:"failed_step_signal_input,omitempty"`
// contains filtered or unexported fields
}
func (FlowFailure) FailedStepInputAs ¶ added in v0.0.8
func (f FlowFailure) FailedStepInputAs(out any) error
func (FlowFailure) FailedStepSignalAs ¶ added in v0.0.8
func (f FlowFailure) FailedStepSignalAs(out any) error
func (FlowFailure) Output ¶ added in v0.0.8
func (f FlowFailure) Output(ctx context.Context, step string) (json.RawMessage, error)
type FlowHandle ¶ added in v0.0.6
FlowHandle is a handle to a flow execution.
func RunFlow ¶
func RunFlow(ctx context.Context, conn Conn, flowName string, input any, opts ...RunFlowOpts) (*FlowHandle, error)
RunFlow enqueues a flow execution and returns a handle for monitoring.
func (*FlowHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the flow execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type FlowInfo ¶
type FlowInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Steps []StepInfo `json:"steps"`
OutputPriority []string `json:"output_priority,omitempty"`
RetentionPeriod time.Duration `json:"retention_period,omitzero"`
CreatedAt time.Time `json:"created_at"`
}
type FlowRunInfo ¶ added in v0.0.6
type FlowRunInfo struct {
ID int64 `json:"id"`
Priority int `json:"priority"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Headers json.RawMessage `json:"headers,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CancelReason string `json:"cancel_reason,omitempty"`
CancelRequestedAt time.Time `json:"cancel_requested_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
}
FlowRunInfo represents the details of a flow execution.
func GetFlowRun ¶
func GetFlowRun(ctx context.Context, conn Conn, flowName string, flowRunID int64) (*FlowRunInfo, error)
GetFlowRun retrieves a specific flow run result by ID.
func ListFlowRuns ¶
ListFlowRuns returns recent flow runs for the specified flow.
func (*FlowRunInfo) IsCompleted ¶ added in v0.0.8
func (r *FlowRunInfo) IsCompleted() bool
IsCompleted reports whether the flow run completed successfully.
func (*FlowRunInfo) IsDone ¶ added in v0.0.8
func (r *FlowRunInfo) IsDone() bool
IsDone reports whether the flow run reached a terminal state.
func (*FlowRunInfo) OutputAs ¶ added in v0.0.6
func (r *FlowRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed flow run. Returns an error if the flow run has failed or is not completed yet.
type FlowScheduleInfo ¶ added in v0.0.5
type FlowScheduleInfo struct {
FlowName string `json:"flow_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CatchUp string `json:"catch_up"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
FlowScheduleInfo contains metadata about a scheduled flow.
func ListFlowSchedules ¶ added in v0.0.5
func ListFlowSchedules(ctx context.Context, conn Conn) ([]*FlowScheduleInfo, error)
ListFlowSchedules returns all flow schedules ordered by next_run_at.
type FullJitterBackoff ¶ added in v0.0.3
FullJitterBackoff implements exponential backoff with full jitter.
func NewFullJitterBackoff ¶ added in v0.0.3
func NewFullJitterBackoff(minDelay, maxDelay time.Duration) *FullJitterBackoff
NewFullJitterBackoff creates a FullJitterBackoff with the provided bounds.
func (*FullJitterBackoff) NextDelay ¶ added in v0.0.3
func (b *FullJitterBackoff) NextDelay(deliveryCount int) time.Duration
NextDelay returns the jittered delay for the given delivery count. deliveryCount is expected to be zero-based for the first retry.
func (*FullJitterBackoff) Validate ¶ added in v0.0.3
func (b *FullJitterBackoff) Validate() error
Validate checks the backoff configuration for consistency.
type GCInfo ¶ added in v0.0.8
type GCInfo struct {
ExpiredQueuesDeleted int `json:"expired_queues_deleted"`
ExpiredMessagesDeleted int `json:"expired_messages_deleted"`
ExpiredTaskRuns int `json:"expired_task_runs"`
ExpiredFlowRuns int `json:"expired_flow_runs"`
StaleWorkersDeleted int `json:"stale_workers_deleted"`
StaleWireNodesDeleted int `json:"stale_wire_nodes_deleted"`
TaskRunsPurged int `json:"task_runs_purged"`
FlowRunsPurged int `json:"flow_runs_purged"`
}
GCInfo is the garbage collection report returned by cb_gc().
type HandlerOpt ¶
type HandlerOpt func(*handlerOpts)
func WithBatchSize ¶
func WithBatchSize(batchSize int) HandlerOpt
WithBatchSize sets how many claims are fetched per poll.
func WithCircuitBreaker ¶
func WithCircuitBreaker(failureThreshold int, openTimeout time.Duration) HandlerOpt
WithCircuitBreaker sets optional circuit breaker strategy.
func WithConcurrency ¶
func WithConcurrency(concurrency int) HandlerOpt
WithConcurrency sets maximum concurrent handler executions.
func WithFullJitterBackoff ¶ added in v0.0.8
func WithFullJitterBackoff(minDelay, maxDelay time.Duration) HandlerOpt
WithFullJitterBackoff sets full-jitter retry backoff strategy.
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) HandlerOpt
WithMaxRetries sets retry attempts for handler failures.
func WithTimeout ¶ added in v0.0.8
func WithTimeout(timeout time.Duration) HandlerOpt
WithTimeout sets per-handler execution timeout.
type ListenHandler ¶ added in v0.1.0
ListenHandler is called when a notification matches a registered pattern. Handlers run synchronously in the dispatch goroutine — don't block.
type Message ¶
type Message struct {
ID int64 `json:"id"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
Topic string `json:"topic"`
Body json.RawMessage `json:"body"`
Headers json.RawMessage `json:"headers,omitempty"`
Priority int `json:"priority"`
Deliveries int `json:"deliveries"`
CreatedAt time.Time `json:"created_at"`
VisibleAt time.Time `json:"visible_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
Message represents a message in a queue
func Read ¶
func Read(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration) ([]Message, error)
Read reads up to quantity messages from the queue, hiding them from other readers for the specified duration.
func ReadPoll ¶
func ReadPoll(ctx context.Context, conn Conn, queueName string, quantity int, hideFor time.Duration, opts ...ReadPollOpts) ([]Message, error)
ReadPoll reads messages from a queue with polling support. It polls repeatedly at the specified interval until messages are available or the pollFor timeout is reached. Pass optional ReadPollOpts to configure polling behavior; defaults are used when omitted.
type MigrationInfo ¶ added in v0.1.3
MigrationInfo reports a single migration's applied state.
func MigrationStatus ¶ added in v0.1.3
MigrationStatus returns the state of every known migration, ordered by version.
type NotifyOpts ¶ added in v0.1.0
type NotifyOpts struct {
// SentBy identifies the sender. Wire instances that match this ID
// will skip delivery, avoiding echo. Use wire.ID().
SentBy string
}
NotifyOpts configures notification delivery.
type PublishManyOpts ¶ added in v0.0.8
type PublishOpts ¶ added in v0.0.3
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitzero"`
}
type ReadPollOpts ¶ added in v0.0.6
ReadPollOpts configures ReadPoll polling behavior. Zero values use defaults.
type ReaderHandler ¶ added in v0.1.0
ReaderHandler processes a queue message. Return nil to ack (delete). Return an error to nack (message becomes visible again after hideFor).
type RunFlowOpts ¶
type RunTaskOpts ¶ added in v0.0.6
type SSEEvent ¶ added in v0.1.0
type SSEEvent struct {
Event string // SSE event name; empty = use original topic
Data string // SSE data field
ID string // SSE id field; empty = omit
}
SSEEvent represents a fully rendered SSE event ready for client delivery.
type SSERenderHandler ¶ added in v0.1.0
SSERenderHandler transforms a Wire event into an SSE event for client delivery. It receives the SSE client's HTTP request for access to user context (auth, language, etc). Only topics with a registered renderer are delivered to SSE clients — the renderer acts as an allowlist.
type ScheduleOpt ¶
type ScheduleOpt func(*scheduleOpts)
ScheduleOpt configures scheduled task/flow behavior.
func WithCatchUpAll ¶ added in v0.0.13
func WithCatchUpAll() ScheduleOpt
WithCatchUpAll configures the schedule to replay every missed tick on recovery.
func WithInput ¶
func WithInput(input any) ScheduleOpt
WithInput sets static input body for scheduled task/flow runs.
func WithSkipCatchUp ¶ added in v0.0.13
func WithSkipCatchUp() ScheduleOpt
WithSkipCatchUp configures the schedule to skip all missed ticks on recovery.
type SendManyOpts ¶ added in v0.0.8
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
func (*Step) IgnoreOutput ¶ added in v0.0.16
func (*Step) MapFlowInput ¶ added in v0.0.8
func (*Step) MapStepOutput ¶ added in v0.0.8
func (*Step) ReduceStep ¶ added in v0.0.8
func (*Step) WithCondition ¶ added in v0.0.8
func (*Step) WithDescription ¶ added in v0.0.8
func (*Step) WithSignal ¶ added in v0.0.8
type StepDependencyInfo ¶
type StepDependencyInfo struct {
Name string `json:"name"`
}
type StepHandlerInfo ¶
type StepInfo ¶
type StepInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
StepType StepType `json:"step_type,omitempty"`
MapSourceStepName string `json:"map_source_step_name,omitempty"`
ReduceSourceStepName string `json:"reduce_source_step_name,omitempty"`
Signal bool `json:"signal,omitempty"`
DependsOn []StepDependencyInfo `json:"depends_on,omitempty"`
}
type StepRunInfo ¶
type StepRunInfo struct {
ID int64 `json:"id"`
Priority int `json:"priority"`
StepName string `json:"step_name"`
Status string `json:"status"`
Attempts int `json:"attempts"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CreatedAt time.Time `json:"created_at,omitzero"`
VisibleAt time.Time `json:"visible_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
}
StepRunInfo represents the execution state of a single step within a flow run.
func GetFlowRunSteps ¶
func GetFlowRunSteps(ctx context.Context, conn Conn, flowName string, flowRunID int64) ([]*StepRunInfo, error)
GetFlowRunSteps retrieves all step runs for a specific flow run.
func GetStep ¶ added in v0.0.8
func GetStep(ctx context.Context, stepName string) (*StepRunInfo, error)
GetStep retrieves status details for a step in the current flow run. Intended for use inside flow step handlers.
func WaitForStep ¶ added in v0.0.8
WaitForStep blocks until the given step reaches a terminal state in the current flow run. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
func (*StepRunInfo) IsCompleted ¶ added in v0.0.8
func (r *StepRunInfo) IsCompleted() bool
IsCompleted reports whether the step run completed successfully.
func (*StepRunInfo) IsDone ¶ added in v0.0.8
func (r *StepRunInfo) IsDone() bool
IsDone reports whether the step run reached a terminal state.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a reflection-based task with optional handler. Use NewTask().Do(fn, opts) for tasks with handlers. Use NewTask() for definition-only tasks.
func NewTask ¶
NewTask creates a new task definition with the given name. Chain .Do() to add a handler, otherwise returns a definition-only task.
func (*Task) Do ¶ added in v0.0.8
func (t *Task) Do(fn any, opts ...HandlerOpt) *Task
Do sets the task handler function and execution options. fn must have signature (context.Context, In) (Out, error). If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Task) OnFail ¶ added in v0.0.8
func (t *Task) OnFail(fn any, opts ...HandlerOpt) *Task
OnFail sets a task failure handler and execution options. fn must have signature (context.Context, In, TaskFailure) error. If opts is omitted, defaults are used (concurrency: 4, batchSize: 64, timeout: 30s, maxRetries: 2 with full-jitter backoff 100ms-2s).
func (*Task) RetentionPeriod ¶ added in v0.0.8
RetentionPeriod sets how long terminal runs (completed, failed, skipped, canceled) are retained before being deleted by the garbage collector. A zero duration disables cleanup.
func (*Task) WithCondition ¶ added in v0.0.8
WithCondition sets the condition expression for the task.
func (*Task) WithDescription ¶ added in v0.0.8
WithDescription sets the description for the task definition.
type TaskFailure ¶ added in v0.0.8
type TaskFailure struct {
TaskName string `json:"task_name"`
TaskRunID int64 `json:"task_run_id"`
ErrorMessage string `json:"error_message"`
Attempts int `json:"attempts"`
OnFailAttempts int `json:"on_fail_attempts"`
StartedAt time.Time `json:"started_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
}
type TaskHandle ¶ added in v0.0.6
TaskHandle is a handle to a task execution.
func RunTask ¶
func RunTask(ctx context.Context, conn Conn, taskName string, input any, opts ...RunTaskOpts) (*TaskHandle, error)
RunTask enqueues a task execution and returns a handle for monitoring progress and retrieving output.
func (*TaskHandle) WaitForOutput ¶ added in v0.0.6
WaitForOutput blocks until the task execution completes and unmarshals the output. Pass optional WaitOpts to customize polling behavior; defaults are used when omitted.
type TaskHandlerInfo ¶
type TaskHandlerInfo struct {
TaskName string `json:"task_name"`
}
type TaskInfo ¶
type TaskInfo struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
RetentionPeriod time.Duration `json:"retention_period,omitzero"`
CreatedAt time.Time `json:"created_at"`
}
type TaskRunInfo ¶ added in v0.0.6
type TaskRunInfo struct {
ID int64 `json:"id"`
Priority int `json:"priority"`
ConcurrencyKey string `json:"concurrency_key,omitempty"`
Status string `json:"status"`
Input json.RawMessage `json:"input,omitempty"`
Headers json.RawMessage `json:"headers,omitempty"`
Output json.RawMessage `json:"output,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
CancelReason string `json:"cancel_reason,omitempty"`
CancelRequestedAt time.Time `json:"cancel_requested_at,omitzero"`
CanceledAt time.Time `json:"canceled_at,omitzero"`
StartedAt time.Time `json:"started_at,omitzero"`
CompletedAt time.Time `json:"completed_at,omitzero"`
FailedAt time.Time `json:"failed_at,omitzero"`
SkippedAt time.Time `json:"skipped_at,omitzero"`
}
TaskRunInfo represents the details of a task execution.
func GetTaskRun ¶
func GetTaskRun(ctx context.Context, conn Conn, taskName string, taskRunID int64) (*TaskRunInfo, error)
GetTaskRun retrieves a specific task run result by ID.
func ListTaskRuns ¶
ListTaskRuns returns recent task runs for the specified task.
func (*TaskRunInfo) IsCompleted ¶ added in v0.0.8
func (r *TaskRunInfo) IsCompleted() bool
IsCompleted reports whether the task run completed successfully.
func (*TaskRunInfo) IsDone ¶ added in v0.0.8
func (r *TaskRunInfo) IsDone() bool
IsDone reports whether the task run reached a terminal state.
func (*TaskRunInfo) OutputAs ¶ added in v0.0.6
func (r *TaskRunInfo) OutputAs(out any) error
OutputAs unmarshals the output of a completed task run. Returns an error if the task run has failed or is not completed yet.
type TaskScheduleInfo ¶ added in v0.0.5
type TaskScheduleInfo struct {
TaskName string `json:"task_name"`
CronSpec string `json:"cron_spec"`
NextRunAt time.Time `json:"next_run_at"`
LastRunAt time.Time `json:"last_run_at,omitzero"`
LastEnqueuedAt time.Time `json:"last_enqueued_at,omitzero"`
Enabled bool `json:"enabled"`
CatchUp string `json:"catch_up"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskScheduleInfo contains metadata about a scheduled task.
func ListTaskSchedules ¶ added in v0.0.5
func ListTaskSchedules(ctx context.Context, conn Conn) ([]*TaskScheduleInfo, error)
ListTaskSchedules returns all task schedules ordered by next_run_at.
type WaitOpts ¶ added in v0.0.6
WaitOpts configures WaitForOutput polling behavior. Zero values use defaults.
type Wire ¶ added in v0.1.0
type Wire struct {
// contains filtered or unexported fields
}
Wire is a real-time pub/sub layer with SSE support and presence tracking. It absorbs Listener's topic-matched dispatch, adds local delivery to Notify, and serves SSE connections with per-transport rendering. Create with NewWire, configure with builder methods, then call Start.
func NewWire ¶ added in v0.1.0
NewWire creates a new Wire instance. The secret must be exactly 32 bytes (AES-256).
func (*Wire) ID ¶ added in v0.1.0
ID returns the unique identifier for this Wire instance. Pass to NotifyOpts.From to skip delivery to this Wire.
func (*Wire) Listen ¶ added in v0.1.0
func (w *Wire) Listen(pattern string, fn ListenHandler) *Wire
Listen registers a handler for the given topic pattern. Handlers fire on every node that receives the event (local or cross-node). They're for server-side side effects — logging, webhooks, triggering work. Patterns use the same syntax as Bind: "." separates tokens, "*" matches one token, "#" matches zero or more trailing tokens. Must be called before Start.
func (*Wire) Notify ¶ added in v0.1.0
Notify delivers a notification to Listen handlers and SSE subscribers locally, then fires pg NOTIFY for cross-node delivery. The Wire's own LISTEN loop skips the echo (SentBy is set automatically).
func (*Wire) Presence ¶ added in v0.1.0
Presence returns the distinct identities connected to a topic across all nodes.
func (*Wire) RenderSSE ¶ added in v0.1.0
func (w *Wire) RenderSSE(pattern string, fn SSERenderHandler) *Wire
RenderSSE registers an SSE render handler for the given topic pattern. Multiple renderers matching the same topic each produce an SSE event. Topics without a renderer pass through as-is. Must be called before Start.
func (*Wire) ServeSSE ¶ added in v0.1.0
ServeSSE serves an SSE connection for the given token string. Invalid or expired tokens result in a 401 response.
func (*Wire) Start ¶ added in v0.1.0
Start runs the Wire's background loops: LISTEN relay, heartbeat, and forward bridges. Blocks until ctx is cancelled.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker processes tasks and flows from the queue
func NewWorker ¶
NewWorker creates a new worker with the given connection pool. Use builder methods (AddTask, AddFlow, etc.) to configure the worker. Call Start(ctx) to begin processing tasks and flows.
func (*Worker) Start ¶
Start begins processing tasks and flows.
The worker will:
- poll for new work and execute task and flow step handlers while ctx is active
- run any configured cron-style task and flow schedules
- send periodic heartbeats while it is running
- register built-in garbage collection task running every 5 minutes
Shutdown behaviour:
- when ctx is cancelled the worker immediately stops reading new work and begins shutting down
- if WithShutdownTimeout is set to a value > 0, that duration is used as a grace period for in‑flight handlers after ctx is cancelled; once the grace period expires the handler context is cancelled and remaining handlers are asked to stop. The default graceful shutdown timeout is 5 seconds.
- if WithShutdownTimeout is not set or set to 0, there is no grace period: the handler context is cancelled immediately once ctx is cancelled and Start returns after all goroutines finish
func (*Worker) WithLogger ¶ added in v0.0.10
WithLogger sets the worker logger.
type WorkerInfo ¶
type WorkerInfo struct {
ID string `json:"id"`
TaskHandlers []*TaskHandlerInfo `json:"task_handlers"`
StepHandlers []*StepHandlerInfo `json:"step_handlers"`
StartedAt time.Time `json:"started_at"`
LastHeartbeatAt time.Time `json:"last_heartbeat_at"`
}
func ListWorkers ¶
func ListWorkers(ctx context.Context, conn Conn) ([]*WorkerInfo, error)
ListWorkers returns all registered workers.