Documentation
¶
Overview ¶
Package dbos provides lightweight durable workflow orchestration with Postgres.
DBOS Transact enables developers to write resilient distributed applications using workflows and steps backed by PostgreSQL. All application state is automatically persisted, providing exactly-once execution guarantees and automatic recovery from failures.
Getting Started ¶
Create a DBOS context to start building durable applications:
dbosContext, err := dbos.NewDBOSContext(context.Background(), dbos.Config{ AppName: "my-app", DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"), }) defer dbos.Shutdown(dbosContext, 5 * time.Second) // Register workflows before launching dbos.RegisterWorkflow(dbosContext, myWorkflow) // Launch the context to start processing err = dbos.Launch(dbosContext)
Workflows ¶
Workflows provide durable execution, automatically resuming from the last completed step after any failure. Write workflows as normal Go functions that take a DBOSContext and return serializable values:
func myWorkflow(ctx dbos.DBOSContext, input string) (string, error) { // Workflow logic here result, err := dbos.RunAsStep(ctx, someOperation) if err != nil { return "", err } return result, nil }
Key workflow features:
- Automatic recovery: Workflows resume from the last completed step after crashes
- Idempotency: Assign workflow IDs to ensure operations run exactly once
- Determinism: Workflow functions must be deterministic; use steps for non-deterministic operations
- Timeouts: Set durable timeouts that persist across restarts
- Events & messaging: Workflows can emit events and receive messages for coordination
Steps ¶
Steps wrap non-deterministic operations (API calls, random numbers, current time) within workflows. If a workflow is interrupted, it resumes from the last completed step:
func fetchData(ctx context.Context) (string, error) { resp, err := http.Get("https://api.example.com/data") // Handle response... return data, nil } func workflow(ctx dbos.DBOSContext, input string) (string, error) { data, err := dbos.RunAsStep(ctx, fetchData, dbos.WithStepName("fetchData"), dbos.WithStepMaxRetries(3)) if err != nil { return "", err } return data, nil }
Steps support configurable retries with exponential backoff for handling transient failures.
Queues ¶
Queues manage workflow concurrency and rate limiting:
queue := dbos.NewWorkflowQueue(dbosContext, "task_queue", dbos.WithWorkerConcurrency(5), // Max 5 concurrent workflows per process dbos.WithRateLimiter(&dbos.RateLimiter{ Limit: 100, Period: 60 * time.Second, // 100 workflows per 60 seconds })) // Enqueue workflows with optional deduplication and priority handle, err := dbos.RunWorkflow(ctx, taskWorkflow, input, dbos.WithQueue(queue.Name), dbos.WithDeduplicationID("unique-id"), dbos.WithPriority(10))
Workflow Management ¶
DBOS provides comprehensive workflow management capabilities:
// List workflows workflows, err := dbos.ListWorkflows(ctx) // Cancel a running workflow err = dbos.CancelWorkflow(ctx, workflowID) // Resume a cancelled workflow err = dbos.ResumeWorkflow(ctx, workflowID) // Fork a workflow from a specific step newID, err := dbos.ForkWorkflow(ctx, originalID, stepNumber)
Workflows can also be visualized and managed through the DBOS Console web UI.
Testing ¶
DBOSContext is fully mockable for unit testing:
func TestWorkflow(t *testing.T) { mockCtx := mocks.NewMockDBOSContext(t) mockCtx.On("RunAsStep", mockCtx, mock.Anything, mock.Anything).Return("result", nil) result, err := myWorkflow(mockCtx, "input") assert.NoError(t, err) assert.Equal(t, "expected", result) }
For detailed documentation and examples, see https://docs.dbos.dev/golang/programming-guide
Index ¶
- func CancelWorkflow(ctx DBOSContext, workflowID string) error
- func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error)
- func GetStepID(ctx DBOSContext) (int, error)
- func GetWorkflowID(ctx DBOSContext) (string, error)
- func Launch(ctx DBOSContext) error
- func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error)
- func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption)
- func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error)
- func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error
- func SetEvent[P any](ctx DBOSContext, key string, message P) error
- func Shutdown(ctx DBOSContext, timeout time.Duration)
- func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error)
- type Client
- type ClientConfig
- type Config
- type DBOSContext
- type DBOSError
- type DBOSErrorCode
- type EnqueueOption
- type ForkWorkflowInput
- type ListWorkflowsOption
- func WithAppVersion(appVersion string) ListWorkflowsOption
- func WithEndTime(endTime time.Time) ListWorkflowsOption
- func WithExecutorIDs(executorIDs []string) ListWorkflowsOption
- func WithLimit(limit int) ListWorkflowsOption
- func WithLoadInput(loadInput bool) ListWorkflowsOption
- func WithLoadOutput(loadOutput bool) ListWorkflowsOption
- func WithName(name string) ListWorkflowsOption
- func WithOffset(offset int) ListWorkflowsOption
- func WithQueueName(queueName string) ListWorkflowsOption
- func WithQueuesOnly() ListWorkflowsOption
- func WithSortDesc() ListWorkflowsOption
- func WithStartTime(startTime time.Time) ListWorkflowsOption
- func WithStatus(status []WorkflowStatusType) ListWorkflowsOption
- func WithUser(user string) ListWorkflowsOption
- func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption
- func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption
- type QueueOption
- type RateLimiter
- type Step
- type StepFunc
- type StepInfo
- type StepOption
- type Workflow
- type WorkflowFunc
- type WorkflowHandle
- func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error)
- func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error)
- func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)
- func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)
- func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
- type WorkflowOption
- func WithApplicationVersion(version string) WorkflowOption
- func WithAssumedRole(role string) WorkflowOption
- func WithAuthenticatedRoles(roles []string) WorkflowOption
- func WithAuthenticatedUser(user string) WorkflowOption
- func WithDeduplicationID(id string) WorkflowOption
- func WithPriority(priority uint) WorkflowOption
- func WithQueue(queueName string) WorkflowOption
- func WithWorkflowID(id string) WorkflowOption
- type WorkflowQueue
- type WorkflowRegistrationOption
- type WorkflowSendInput
- type WorkflowSetEventInput
- type WorkflowStatus
- type WorkflowStatusType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CancelWorkflow ¶
func CancelWorkflow(ctx DBOSContext, workflowID string) error
CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED and removing it from the queue. Once cancelled, the workflow will stop executing at the start of the next step. Executing steps will not be interrupted.
Parameters:
- ctx: DBOS context for the operation
- workflowID: The unique identifier of the workflow to cancel
Returns an error if the workflow does not exist or if the cancellation operation fails.
Example:
err := dbos.CancelWorkflow(ctx, "workflow-to-cancel") if err != nil { log.Printf("Failed to cancel workflow: %v", err) }
func GetEvent ¶
func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error)
GetEvent retrieves a key-value event from a target workflow with type safety. This function blocks until the event is set or the timeout is reached.
When called within a workflow, the get operation becomes part of the workflow's durable state. The returned value is of type R and will be type-checked at runtime.
Example:
status, err := dbos.GetEvent[string](ctx, "target-workflow-id", "status", 30 * time.Second) if err != nil { // Handle timeout or error return err } log.Printf("Status: %s", status)
func GetStepID ¶
func GetStepID(ctx DBOSContext) (int, error)
GetStepID retrieves the current step ID from the context if called within a DBOS workflow. Returns -1 and an error if not called from within a workflow context.
Example:
stepID, err := dbos.GetStepID(ctx) if err != nil { log.Printf("Not within a workflow context") } else { log.Printf("Current step ID: %d", stepID) }
func GetWorkflowID ¶
func GetWorkflowID(ctx DBOSContext) (string, error)
GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow. Returns an error if not called from within a workflow context.
Example:
workflowID, err := dbos.GetWorkflowID(ctx) if err != nil { log.Printf("Not within a workflow context") } else { log.Printf("Current workflow ID: %s", workflowID) }
func Launch ¶ added in v0.7.0
func Launch(ctx DBOSContext) error
Launch launches the DBOS runtime using the provided DBOSContext. This is a package-level wrapper for the DBOSContext.Launch() method.
Example:
ctx, err := dbos.NewDBOSContext(context.Background(), config) if err != nil { log.Fatal(err) } if err := dbos.Launch(ctx); err != nil { log.Fatal(err) }
func Recv ¶
Recv receives a message sent to this workflow with type safety. This function blocks until a message is received or the timeout is reached. Messages are consumed in FIFO order and each message is delivered exactly once.
Recv can only be called from within a workflow and becomes part of the workflow's durable state.
Example:
message, err := dbos.Recv[string](ctx, "notifications", 30 * time.Second) if err != nil { // Handle timeout or error return err } log.Printf("Received: %s", message)
func RegisterWorkflow ¶
func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption)
RegisterWorkflow registers a function as a durable workflow that can be executed and recovered. The function is registered with type safety - P represents the input type and R the return type. Types are automatically registered with gob encoding for serialization.
Registration options include:
- WithMaxRetries: Set maximum retry attempts for workflow recovery
- WithSchedule: Register as a scheduled workflow with cron syntax
- WithWorkflowName:: Set a custom name for the workflow
Scheduled workflows receive a time.Time as input representing the scheduled execution time.
Example:
func MyWorkflow(ctx dbos.DBOSContext, input string) (int, error) { // workflow implementation return len(input), nil } dbos.RegisterWorkflow(ctx, MyWorkflow) // With options: dbos.RegisterWorkflow(ctx, MyWorkflow, dbos.WithMaxRetries(5), dbos.WithSchedule("0 0 * * * *")) // daily at midnight dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow
func RunAsStep ¶
func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error)
RunAsStep executes a function as a durable step within a workflow. Steps provide at-least-once execution guarantees and automatic retry capabilities. If a step has already been executed (e.g., during workflow recovery), its recorded result is returned instead of re-executing the function.
Steps can be configured with functional options:
data, err := dbos.RunAsStep(ctx, func(ctx context.Context) ([]byte, error) { return MyStep(ctx, "https://api.example.com/data") }, dbos.WithStepMaxRetries(3), dbos.WithBaseInterval(500*time.Millisecond))
Available options:
- WithStepName: Custom name for the step (only sets if not already set)
- WithStepMaxRetries: Maximum retry attempts (default: 0)
- WithBackoffFactor: Exponential backoff multiplier (default: 2.0)
- WithBaseInterval: Initial delay between retries (default: 100ms)
- WithMaxInterval: Maximum delay between retries (default: 5s)
Example:
func MyStep(ctx context.Context, url string) ([]byte, error) { resp, err := http.Get(url) if err != nil { return nil, err } defer resp.Body.Close() return io.ReadAll(resp.Body) } // Within a workflow: data, err := dbos.RunAsStep(ctx, func(ctx context.Context) ([]byte, error) { return MyStep(ctx, "https://api.example.com/data") }, dbos.WithStepName("FetchData"), dbos.WithStepMaxRetries(3)) if err != nil { return nil, err }
Note that the function passed to RunAsStep must accept a context.Context as its first parameter and this context *must* be the one specified in the function's signature (not the context passed to RunAsStep). Under the hood, DBOS uses the provided context to manage durable execution.
func Send ¶
func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error
Send sends a message to another workflow with type safety. The message type P is automatically registered for gob encoding.
Send can be called from within a workflow (as a durable step) or from outside workflows. When called within a workflow, the send operation becomes part of the workflow's durable state.
Example:
err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications")
func SetEvent ¶
func SetEvent[P any](ctx DBOSContext, key string, message P) error
SetEvent sets a key-value event for the current workflow with type safety. Events are persistent and can be retrieved by other workflows using GetEvent. The event type P is automatically registered for gob encoding.
SetEvent can only be called from within a workflow and becomes part of the workflow's durable state. Setting an event with the same key will overwrite the previous value.
Example:
err := dbos.SetEvent(ctx, "status", "processing-complete")
func Shutdown ¶ added in v0.7.0
func Shutdown(ctx DBOSContext, timeout time.Duration)
Shutdown gracefully shuts down the DBOS runtime using the provided DBOSContext and timeout. This is a package-level wrapper for the DBOSContext.Shutdown() method.
Example:
ctx, err := dbos.NewDBOSContext(context.Background(), config) if err != nil { log.Fatal(err) } defer dbos.Shutdown(ctx, 30*time.Second)
func Sleep ¶
Sleep pauses workflow execution for the specified duration. This is a durable sleep - if the workflow is recovered during the sleep period, it will continue sleeping for the remaining time. Returns the actual duration slept.
Example:
actualDuration, err := dbos.Sleep(ctx, 5*time.Second) if err != nil { return err }
Types ¶
type Client ¶
type Client interface { Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error) ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error) Send(destinationID string, message any, topic string) error GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error) RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error) CancelWorkflow(workflowID string) error ResumeWorkflow(workflowID string) (WorkflowHandle[any], error) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error) Shutdown(timeout time.Duration) // Simply close the system DB connection pool }
Client provides a programmatic way to interact with your DBOS application from external code. It manages the underlying DBOSContext and provides methods for workflow operations without requiring direct management of the context lifecycle.
func NewClient ¶
func NewClient(ctx context.Context, config ClientConfig) (Client, error)
NewClient creates a new DBOS client with the provided configuration. The client manages its own DBOSContext internally.
Example:
config := dbos.ClientConfig{ DatabaseURL: "postgres://user:pass@localhost:5432/dbname", } client, err := dbos.NewClient(context.Background(), config) if err != nil { log.Fatal(err) }
type ClientConfig ¶
type ClientConfig struct { DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required. SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided. DatabaseSchema string // Database schema name (defaults to "dbos") Logger *slog.Logger // Optional custom logger }
type Config ¶
type Config struct { AppName string // Application name for identification (required) DatabaseURL string // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required. SystemDBPool *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided. DatabaseSchema string // Database schema name (defaults to "dbos") Logger *slog.Logger // Custom logger instance (defaults to a new slog logger) AdminServer bool // Enable Transact admin HTTP server (disabled by default) AdminServerPort int // Port for the admin HTTP server (default: 3001) ConductorURL string // DBOS conductor service URL (optional) ConductorAPIKey string // DBOS conductor API key (optional) ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var) ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var) }
Config holds configuration parameters for initializing a DBOS context. DatabaseURL and AppName are required.
type DBOSContext ¶
type DBOSContext interface { context.Context // Context Lifecycle Launch() error // Launch the DBOS runtime including system database, queues, and perform a workflow recovery for the local executor Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS resources // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows) GetStepID() (int, error) // Get the current step ID (only available within workflows) // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow // Accessors GetApplicationVersion() string // Get the application version for this context GetExecutorID() string // Get the executor ID for this context GetApplicationID() string // Get the application ID for this context }
DBOSContext represents a DBOS execution context that provides workflow orchestration capabilities. It extends the standard Go context.Context and adds methods for running workflows and steps, inter-workflow communication, and state management.
The context manages the lifecycle of workflows, provides durability guarantees, and enables recovery of interrupted workflows.
func NewDBOSContext ¶
func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error)
NewDBOSContext creates a new DBOS context with the provided configuration. The context must be launched with Launch() for workflow execution and should be shut down with Shutdown(). This function initializes the DBOS system database, sets up the queue sub-system, and prepares the workflow registry.
Example:
config := dbos.Config{ DatabaseURL: "postgres://user:pass@localhost:5432/dbname", AppName: "my-app", } ctx, err := dbos.NewDBOSContext(context.Background(), config) if err != nil { log.Fatal(err) } defer ctx.Shutdown(30*time.Second) if err := ctx.Launch(); err != nil { log.Fatal(err) }
func WithTimeout ¶
func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.CancelFunc)
WithTimeout returns a copy of the DBOS context with a timeout. The returned context will be canceled after the specified duration. No-op if the provided context is not a concrete dbos.dbosContext.
func WithValue ¶
func WithValue(ctx DBOSContext, key, val any) DBOSContext
WithValue returns a copy of the DBOS context with the given key-value pair. This is similar to context.WithValue but maintains DBOS context capabilities. No-op if the provided context is not a concrete dbos.dbosContext.
func WithoutCancel ¶
func WithoutCancel(ctx DBOSContext) DBOSContext
WithoutCancel returns a copy of the DBOS context that is not canceled when the parent context is canceled. This can be used to detach a child workflow. No-op if the provided context is not a concrete dbos.dbosContext.
type DBOSError ¶
type DBOSError struct { Message string // Human-readable error message Code DBOSErrorCode // Error type code for programmatic handling // Optional context fields - only set when relevant to the error WorkflowID string // Associated workflow identifier DestinationID string // Target workflow identifier (for communication errors) StepName string // Step function name (for step errors) QueueName string // Queue name (for queue-related errors) DeduplicationID string // Deduplication identifier StepID int // Step sequence number ExpectedName string // Expected function name (for determinism errors) RecordedName string // Actually recorded function name (for determinism errors) MaxRetries int // Maximum retry limit (for retry-related errors) // contains filtered or unexported fields }
DBOSError is the unified error type for all DBOS operations. It provides structured error information with context-specific fields and error codes for programmatic handling.
type DBOSErrorCode ¶
type DBOSErrorCode int
DBOSErrorCode represents the different types of errors that can occur in DBOS operations.
const ( ConflictingIDError DBOSErrorCode = iota + 1 // Workflow ID conflicts or duplicate operations InitializationError // DBOS context initialization failures NonExistentWorkflowError // Referenced workflow does not exist ConflictingWorkflowError // Workflow with same ID already exists with different parameters WorkflowCancelled // Workflow was cancelled during execution UnexpectedStep // Step function mismatch during recovery (non-deterministic workflow) AwaitedWorkflowCancelled // A workflow being awaited was cancelled ConflictingRegistrationError // Attempting to register a workflow/queue that already exists WorkflowUnexpectedTypeError // Type mismatch in workflow input/output WorkflowExecutionError // General workflow execution error StepExecutionError // General step execution error DeadLetterQueueError // Workflow moved to dead letter queue after max retries MaxStepRetriesExceeded // Step exceeded maximum retry attempts QueueDeduplicated // Workflow was deduplicated in the queue )
type EnqueueOption ¶
type EnqueueOption func(*enqueueOptions)
EnqueueOption is a functional option for configuring workflow enqueue parameters.
func WithEnqueueApplicationVersion ¶
func WithEnqueueApplicationVersion(version string) EnqueueOption
WithEnqueueApplicationVersion overrides the application version for the enqueued workflow.
func WithEnqueueDeduplicationID ¶
func WithEnqueueDeduplicationID(id string) EnqueueOption
WithEnqueueDeduplicationID sets a deduplication ID for the enqueued workflow.
func WithEnqueuePriority ¶
func WithEnqueuePriority(priority uint) EnqueueOption
WithEnqueuePriority sets the execution priority for the enqueued workflow.
func WithEnqueueTimeout ¶
func WithEnqueueTimeout(timeout time.Duration) EnqueueOption
WithEnqueueTimeout sets the maximum execution time for the enqueued workflow.
func WithEnqueueWorkflowID ¶
func WithEnqueueWorkflowID(id string) EnqueueOption
WithEnqueueWorkflowID sets a custom workflow ID instead of generating one automatically.
type ForkWorkflowInput ¶
type ForkWorkflowInput struct { OriginalWorkflowID string // Required: The UUID of the original workflow to fork from ForkedWorkflowID string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty) StartStep uint // Optional: Step to start the forked workflow from (default: 0) ApplicationVersion string // Optional: Application version for the forked workflow (inherits from original if empty) }
ForkWorkflowInput holds configuration parameters for forking workflows. OriginalWorkflowID is required. Other fields are optional.
type ListWorkflowsOption ¶
type ListWorkflowsOption func(*listWorkflowsOptions)
ListWorkflowsOption is a functional option for configuring workflow listing parameters.
func WithAppVersion ¶
func WithAppVersion(appVersion string) ListWorkflowsOption
WithAppVersion filters workflows by the specified application version.
func WithEndTime ¶
func WithEndTime(endTime time.Time) ListWorkflowsOption
WithEndTime filters workflows created before the specified time.
func WithExecutorIDs ¶
func WithExecutorIDs(executorIDs []string) ListWorkflowsOption
WithExecutorIDs filters workflows by the specified executor IDs.
func WithLimit ¶
func WithLimit(limit int) ListWorkflowsOption
WithLimit limits the number of workflows returned.
func WithLoadInput ¶
func WithLoadInput(loadInput bool) ListWorkflowsOption
WithLoadInput controls whether to load workflow input data (default: true).
func WithLoadOutput ¶
func WithLoadOutput(loadOutput bool) ListWorkflowsOption
WithLoadOutput controls whether to load workflow output data (default: true).
func WithName ¶
func WithName(name string) ListWorkflowsOption
WithName filters workflows by the specified workflow function name.
func WithOffset ¶
func WithOffset(offset int) ListWorkflowsOption
WithOffset sets the offset for pagination.
func WithQueueName ¶
func WithQueueName(queueName string) ListWorkflowsOption
WithQueueName filters workflows by the specified queue name. This is typically used when listing queued workflows.
func WithQueuesOnly ¶
func WithQueuesOnly() ListWorkflowsOption
WithQueuesOnly filters to only return workflows that are in a queue.
func WithSortDesc ¶
func WithSortDesc() ListWorkflowsOption
WithSortDesc enables descending sort by creation time (default is ascending).
func WithStartTime ¶
func WithStartTime(startTime time.Time) ListWorkflowsOption
WithStartTime filters workflows created after the specified time.
func WithStatus ¶
func WithStatus(status []WorkflowStatusType) ListWorkflowsOption
WithStatus filters workflows by the specified list of statuses.
func WithUser ¶
func WithUser(user string) ListWorkflowsOption
WithUser filters workflows by the specified authenticated user.
func WithWorkflowIDPrefix ¶
func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption
WithWorkflowIDPrefix filters workflows by workflow ID prefix.
func WithWorkflowIDs ¶
func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption
WithWorkflowIDs filters workflows by the specified workflow IDs.
type QueueOption ¶
type QueueOption func(*WorkflowQueue)
QueueOption is a functional option for configuring a workflow queue
func WithGlobalConcurrency ¶
func WithGlobalConcurrency(concurrency int) QueueOption
WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue across all executors. This provides global concurrency control.
func WithMaxTasksPerIteration ¶
func WithMaxTasksPerIteration(maxTasks int) QueueOption
WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration. This controls batch sizes for queue processing.
func WithPriorityEnabled ¶
func WithPriorityEnabled() QueueOption
WithPriorityEnabled enables priority-based scheduling for the queue. When enabled, workflows with lower priority numbers are executed first.
func WithRateLimiter ¶
func WithRateLimiter(limiter *RateLimiter) QueueOption
WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services. The rate limiter enforces a maximum number of workflow starts within a time period.
func WithWorkerConcurrency ¶
func WithWorkerConcurrency(concurrency int) QueueOption
WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue. This provides per-executor concurrency control.
type RateLimiter ¶
type RateLimiter struct { Limit int // Maximum number of workflows to start within the period Period time.Duration // Time period for the rate limit }
RateLimiter configures rate limiting for workflow queue execution. Rate limits prevent overwhelming external services and provide backpressure.
type StepInfo ¶
type StepInfo struct { StepID int // The sequential ID of the step within the workflow StepName string // The name of the step function Output any // The output returned by the step (if any) Error error // The error returned by the step (if any) ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable) }
StepInfo contains information about a workflow step execution.
func GetWorkflowSteps ¶
func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error)
GetWorkflowSteps retrieves the execution steps of a workflow. Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs. The list is sorted by step ID in ascending order.
Parameters:
- ctx: DBOS context for the operation
- workflowID: The unique identifier of the workflow
Returns a slice of StepInfo structs containing information about each executed step.
Example:
steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id") if err != nil { log.Fatal(err) } for _, step := range steps { log.Printf("Step %d: %s", step.StepID, step.StepName) }
type StepOption ¶
type StepOption func(*stepOptions)
StepOption is a functional option for configuring step execution parameters.
func WithBackoffFactor ¶
func WithBackoffFactor(factor float64) StepOption
WithBackoffFactor sets the exponential backoff multiplier between retries. The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1)) Default value is 2.0.
func WithBaseInterval ¶
func WithBaseInterval(interval time.Duration) StepOption
WithBaseInterval sets the initial delay between retries. Default value is 100ms.
func WithMaxInterval ¶
func WithMaxInterval(interval time.Duration) StepOption
WithMaxInterval sets the maximum delay between retries. Default value is 5s.
func WithStepMaxRetries ¶
func WithStepMaxRetries(maxRetries int) StepOption
WithStepMaxRetries sets the maximum number of retry attempts for the step. A value of 0 means no retries (default behavior).
func WithStepName ¶
func WithStepName(name string) StepOption
WithStepName sets a custom name for the step. If the step name has already been set by a previous call to WithStepName, this option will be ignored
type Workflow ¶
type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error)
Workflow represents a type-safe workflow function with specific input and output types. P is the input parameter type and R is the return type. All workflow functions must accept a DBOSContext as their first parameter.
type WorkflowFunc ¶
type WorkflowFunc func(ctx DBOSContext, input any) (any, error)
WorkflowFunc represents a type-erased workflow function used internally.
type WorkflowHandle ¶
type WorkflowHandle[R any] interface { GetResult() (R, error) // Wait for workflow completion and return the result GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting GetWorkflowID() string // Get the unique workflow identifier }
WorkflowHandle provides methods to interact with a running or completed workflow. The type parameter R represents the expected return type of the workflow. Handles can be used to wait for workflow completion, check status, and retrieve results.
func Enqueue ¶
func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error)
Enqueue adds a workflow to a named queue for later execution with type safety. The workflow will be persisted with ENQUEUED status until picked up by a DBOS process. This provides asynchronous workflow execution with durability guarantees.
Parameters:
- c: Client instance for the operation
- queueName: Name of the queue to enqueue the workflow to
- workflowName: Name of the registered workflow function to execute
- input: Input parameters to pass to the workflow (type P)
- opts: Optional configuration options
Available options:
- WithEnqueueWorkflowID: Custom workflow ID (auto-generated if not provided)
- WithEnqueueApplicationVersion: Application version override
- WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
- WithEnqueuePriority: Execution priority
- WithEnqueueTimeout: Maximum execution time for the workflow
Returns a typed workflow handle that can be used to check status and retrieve results. The handle uses polling to check workflow completion since the execution is asynchronous.
Example usage:
// Enqueue a workflow with string input and int output handle, err := dbos.Enqueue[string, int](client, "data-processing", "ProcessDataWorkflow", "input data", dbos.WithEnqueueTimeout(30 * time.Minute)) if err != nil { log.Fatal(err) } // Check status status, err := handle.GetStatus() if err != nil { log.Printf("Failed to get status: %v", err) } // Wait for completion and get result result, err := handle.GetResult() if err != nil { log.Printf("Workflow failed: %v", err) } else { log.Printf("Result: %d", result) } // Enqueue with deduplication and custom workflow ID handle, err := dbos.Enqueue[MyInputType, MyOutputType](client, "my-queue", "MyWorkflow", MyInputType{Field: "value"}, dbos.WithEnqueueWorkflowID("custom-workflow-id"), dbos.WithEnqueueDeduplicationID("unique-operation-id"))
func ForkWorkflow ¶
func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error)
ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step. The forked workflow will have a new UUID and will execute from the specified StartStep. If StartStep > 0, the forked workflow will reuse the operation outputs from steps 0 to StartStep-1 copied from the original workflow.
Parameters:
- ctx: DBOS context for the operation
- input: Configuration parameters for the forked workflow
Returns a typed workflow handle for the newly created forked workflow.
Example usage:
// Basic fork from step 5 handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{ OriginalWorkflowID: "original-workflow-id", StartStep: 5, }) if err != nil { log.Fatal(err) } // Fork with custom workflow ID and application version handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{ OriginalWorkflowID: "original-workflow-id", ForkedWorkflowID: "my-custom-fork-id", StartStep: 3, ApplicationVersion: "v2.0.0", }) if err != nil { log.Fatal(err) }
func ResumeWorkflow ¶
func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)
ResumeWorkflow resumes a workflow by starting it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue. If the workflow is already completed, this is a no-op. Returns a handle that can be used to wait for completion and retrieve results. Returns an error if the workflow does not exist or if the operation fails.
Example:
handle, err := dbos.ResumeWorkflow[int](ctx, "workflow-id") if err != nil { log.Printf("Failed to resume workflow: %v", err) } else { result, err := handle.GetResult() if err != nil { log.Printf("Workflow failed: %v", err) } else { log.Printf("Result: %d", result) } }
func RetrieveWorkflow ¶
func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)
RetrieveWorkflow returns a typed handle to an existing workflow. The handle can be used to check status and wait for results. The type parameter R must match the workflow's actual return type.
Example:
handle, err := dbos.RetrieveWorkflow[int](ctx, "workflow-id") if err != nil { log.Fatal(err) } result, err := handle.GetResult() if err != nil { log.Printf("Workflow failed: %v", err) } else { log.Printf("Result: %d", result) }
func RunWorkflow ¶
func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
RunWorkflow executes a workflow function with type safety and durability guarantees. The workflow can be executed immediately or enqueued for later execution based on options. Returns a typed handle that can be used to wait for completion and retrieve results.
The workflow will be automatically recovered if the process crashes or is interrupted. All workflow state is persisted to ensure exactly-once execution semantics.
Example:
handle, err := dbos.RunWorkflow(ctx, MyWorkflow, "input string", dbos.WithWorkflowID("my-custom-id")) if err != nil { log.Fatal(err) } result, err := handle.GetResult() if err != nil { log.Printf("Workflow failed: %v", err) } else { log.Printf("Result: %v", result) }
type WorkflowOption ¶
type WorkflowOption func(*workflowOptions)
WorkflowOption is a functional option for configuring workflow execution parameters.
func WithApplicationVersion ¶
func WithApplicationVersion(version string) WorkflowOption
WithApplicationVersion overrides the DBOS Context application version for this workflow. This affects workflow recovery.
func WithAssumedRole ¶ added in v0.7.0
func WithAssumedRole(role string) WorkflowOption
Sets the assumed role for the workflow
func WithAuthenticatedRoles ¶ added in v0.7.0
func WithAuthenticatedRoles(roles []string) WorkflowOption
Sets the authenticated role for the workflow
func WithAuthenticatedUser ¶ added in v0.7.0
func WithAuthenticatedUser(user string) WorkflowOption
Sets the authenticated user for the workflow
func WithDeduplicationID ¶
func WithDeduplicationID(id string) WorkflowOption
WithDeduplicationID sets a deduplication ID for a queue workflow.
func WithPriority ¶
func WithPriority(priority uint) WorkflowOption
WithPriority sets the execution priority for a queue workflow.
func WithQueue ¶
func WithQueue(queueName string) WorkflowOption
WithQueue enqueues the workflow to the specified queue instead of executing immediately. Queued workflows will be processed by the queue runner according to the queue's configuration.
func WithWorkflowID ¶
func WithWorkflowID(id string) WorkflowOption
WithWorkflowID sets a custom workflow ID instead of generating one automatically.
type WorkflowQueue ¶
type WorkflowQueue struct { Name string `json:"name"` // Unique queue name WorkerConcurrency *int `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor GlobalConcurrency *int `json:"concurrency,omitempty"` // Max concurrent workflows across all executors PriorityEnabled bool `json:"priorityEnabled,omitempty"` // Enable priority-based scheduling RateLimit *RateLimiter `json:"rateLimit,omitempty"` // Rate limiting configuration MaxTasksPerIteration int `json:"maxTasksPerIteration"` // Max workflows to dequeue per iteration }
WorkflowQueue defines a named queue for workflow execution. Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.
func NewWorkflowQueue ¶
func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue
NewWorkflowQueue creates a new workflow queue with the specified name and configuration options. The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow. Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.
Example:
queue := dbos.NewWorkflowQueue(ctx, "email-queue", dbos.WithWorkerConcurrency(5), dbos.WithRateLimiter(&dbos.RateLimiter{ Limit: 100, Period: 60 * time.Second, // 100 workflows per minute }), dbos.WithPriorityEnabled(), ) // Enqueue workflows to this queue: handle, err := dbos.RunWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))
type WorkflowRegistrationOption ¶
type WorkflowRegistrationOption func(*workflowRegistrationOptions)
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) WorkflowRegistrationOption
WithMaxRetries sets the maximum number of retry attempts for workflow recovery. If a workflow fails or is interrupted, it will be retried up to this many times. After exceeding max retries, the workflow status becomes MAX_RECOVERY_ATTEMPTS_EXCEEDED.
func WithSchedule ¶
func WithSchedule(schedule string) WorkflowRegistrationOption
WithSchedule registers the workflow as a scheduled workflow using cron syntax. The schedule string follows standard cron format with second precision. Scheduled workflows automatically receive a time.Time input parameter.
func WithWorkflowName ¶
func WithWorkflowName(name string) WorkflowRegistrationOption
type WorkflowSendInput ¶
type WorkflowSetEventInput ¶
type WorkflowStatus ¶
type WorkflowStatus struct { ID string `json:"workflow_uuid"` // Unique identifier for the workflow Status WorkflowStatusType `json:"status"` // Current execution status Name string `json:"name"` // Function name of the workflow AuthenticatedUser string `json:"authenticated_user,omitempty"` // User who initiated the workflow (if applicable) AssumedRole string `json:"assumed_role,omitempty"` // Role assumed during execution (if applicable) AuthenticatedRoles []string `json:"authenticated_roles,omitempty"` // Roles available to the user (if applicable) Output any `json:"output,omitempty"` // Workflow output (available after completion) Error error `json:"error,omitempty"` // Error information (if status is ERROR) ExecutorID string `json:"executor_id"` // ID of the executor running this workflow CreatedAt time.Time `json:"created_at"` // When the workflow was created UpdatedAt time.Time `json:"updated_at"` // When the workflow status was last updated ApplicationVersion string `json:"application_version"` // Version of the application that created this workflow ApplicationID string `json:"application_id,omitempty"` // Application identifier Attempts int `json:"attempts"` // Number of execution attempts QueueName string `json:"queue_name,omitempty"` // Queue name (if workflow was enqueued) Timeout time.Duration `json:"timeout,omitempty"` // Workflow timeout duration Deadline time.Time `json:"deadline"` // Absolute deadline for workflow completion StartedAt time.Time `json:"started_at"` // When the workflow execution actually started DeduplicationID string `json:"deduplication_id,omitempty"` // Queue deduplication identifier Input any `json:"input,omitempty"` // Input parameters passed to the workflow Priority int `json:"priority,omitempty"` // Queue execution priority (lower numbers have higher priority) }
WorkflowStatus contains comprehensive information about a workflow's current state and execution history.
func ListWorkflows ¶
func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
ListWorkflows retrieves a list of workflows based on the provided filters.
The function supports filtering by workflow IDs, status, time ranges, names, application versions, workflow ID prefixes, and more. It also supports pagination through limit/offset parameters and sorting control (ascending by default, or descending with WithSortDesc).
By default, both input and output data are loaded for each workflow. This can be controlled using WithLoadInput(false) and WithLoadOutput(false) options for better performance when the data is not needed.
Parameters:
- opts: Functional options to configure the query filters and parameters
Returns a slice of WorkflowStatus structs containing the workflow information.
Example usage:
// List all successful workflows from the last 24 hours workflows, err := dbos.ListWorkflows( dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}), dbos.WithStartTime(time.Now().Add(-24*time.Hour)), dbos.WithLimit(100)) if err != nil { log.Fatal(err) } // List workflows by specific IDs without loading input/output data workflows, err := dbos.ListWorkflows( dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}), dbos.WithLoadInput(false), dbos.WithLoadOutput(false)) if err != nil { log.Fatal(err) } // List workflows with pagination workflows, err := dbos.ListWorkflows( dbos.WithUser("john.doe"), dbos.WithOffset(50), dbos.WithLimit(25), dbos.WithSortDesc() if err != nil { log.Fatal(err) }
type WorkflowStatusType ¶
type WorkflowStatusType string
WorkflowStatusType represents the current execution state of a workflow.
const ( WorkflowStatusPending WorkflowStatusType = "PENDING" // Workflow is running or ready to run WorkflowStatusEnqueued WorkflowStatusType = "ENQUEUED" // Workflow is queued and waiting for execution WorkflowStatusSuccess WorkflowStatusType = "SUCCESS" // Workflow completed successfully WorkflowStatusError WorkflowStatusType = "ERROR" // Workflow completed with an error WorkflowStatusCancelled WorkflowStatusType = "CANCELLED" // Workflow was cancelled (manually or due to timeout) WorkflowStatusMaxRecoveryAttemptsExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts )