dbos

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: MIT Imports: 34 Imported by: 0

Documentation

Overview

Package dbos provides lightweight durable workflow orchestration with Postgres.

DBOS Transact enables developers to write resilient distributed applications using workflows and steps backed by PostgreSQL. All application state is automatically persisted, providing exactly-once execution guarantees and automatic recovery from failures.

Getting Started

Create a DBOS context to start building durable applications:

dbosContext, err := dbos.NewDBOSContext(context.Background(), dbos.Config{
    AppName:     "my-app",
    DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"),
})
defer dbos.Shutdown(dbosContext, 5 * time.Second)

// Register workflows before launching
dbos.RegisterWorkflow(dbosContext, myWorkflow)

// Launch the context to start processing
err = dbos.Launch(dbosContext)

Workflows

Workflows provide durable execution, automatically resuming from the last completed step after any failure. Write workflows as normal Go functions that take a DBOSContext and return serializable values:

func myWorkflow(ctx dbos.DBOSContext, input string) (string, error) {
    // Workflow logic here
    result, err := dbos.RunAsStep(ctx, someOperation)
    if err != nil {
        return "", err
    }
    return result, nil
}

Key workflow features:

  • Automatic recovery: Workflows resume from the last completed step after crashes
  • Idempotency: Assign workflow IDs to ensure operations run exactly once
  • Determinism: Workflow functions must be deterministic; use steps for non-deterministic operations
  • Timeouts: Set durable timeouts that persist across restarts
  • Events & messaging: Workflows can emit events and receive messages for coordination

Steps

Steps wrap non-deterministic operations (API calls, random numbers, current time) within workflows. If a workflow is interrupted, it resumes from the last completed step:

func fetchData(ctx context.Context) (string, error) {
    resp, err := http.Get("https://api.example.com/data")
    // Handle response...
    return data, nil
}

func workflow(ctx dbos.DBOSContext, input string) (string, error) {
    data, err := dbos.RunAsStep(ctx, fetchData,
        dbos.WithStepName("fetchData"),
        dbos.WithStepMaxRetries(3))
    if err != nil {
        return "", err
    }
    return data, nil
}

Steps support configurable retries with exponential backoff for handling transient failures.

Queues

Queues manage workflow concurrency and rate limiting:

queue := dbos.NewWorkflowQueue(dbosContext, "task_queue",
    dbos.WithWorkerConcurrency(5),    // Max 5 concurrent workflows per process
    dbos.WithRateLimiter(&dbos.RateLimiter{
        Limit:  100,
        Period: 60 * time.Second,  // 100 workflows per 60 seconds
    }))

// Enqueue workflows with optional deduplication and priority
handle, err := dbos.RunWorkflow(ctx, taskWorkflow, input,
    dbos.WithQueue(queue.Name),
    dbos.WithDeduplicationID("unique-id"),
    dbos.WithPriority(10))

Workflow Management

DBOS provides comprehensive workflow management capabilities:

// List workflows
workflows, err := dbos.ListWorkflows(ctx)

// Cancel a running workflow
err = dbos.CancelWorkflow(ctx, workflowID)

// Resume a cancelled workflow
err = dbos.ResumeWorkflow(ctx, workflowID)

// Fork a workflow from a specific step
newID, err := dbos.ForkWorkflow(ctx, originalID, stepNumber)

Workflows can also be visualized and managed through the DBOS Console web UI.

Testing

DBOSContext is fully mockable for unit testing:

func TestWorkflow(t *testing.T) {
    mockCtx := mocks.NewMockDBOSContext(t)
    mockCtx.On("RunAsStep", mockCtx, mock.Anything, mock.Anything).Return("result", nil)

    result, err := myWorkflow(mockCtx, "input")
    assert.NoError(t, err)
    assert.Equal(t, "expected", result)
}

For detailed documentation and examples, see https://docs.dbos.dev/golang/programming-guide

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CancelWorkflow

func CancelWorkflow(ctx DBOSContext, workflowID string) error

CancelWorkflow cancels a running or enqueued workflow by setting its status to CANCELLED and removing it from the queue. Once cancelled, the workflow will stop executing at the start of the next step. Executing steps will not be interrupted.

Parameters:

  • ctx: DBOS context for the operation
  • workflowID: The unique identifier of the workflow to cancel

Returns an error if the workflow does not exist or if the cancellation operation fails.

Example:

err := dbos.CancelWorkflow(ctx, "workflow-to-cancel")
if err != nil {
    log.Printf("Failed to cancel workflow: %v", err)
}

func GetEvent

func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error)

GetEvent retrieves a key-value event from a target workflow with type safety. This function blocks until the event is set or the timeout is reached.

When called within a workflow, the get operation becomes part of the workflow's durable state. The returned value is of type R and will be type-checked at runtime.

Example:

status, err := dbos.GetEvent[string](ctx, "target-workflow-id", "status", 30 * time.Second)
if err != nil {
    // Handle timeout or error
    return err
}
log.Printf("Status: %s", status)

func GetStepID

func GetStepID(ctx DBOSContext) (int, error)

GetStepID retrieves the current step ID from the context if called within a DBOS workflow. Returns -1 and an error if not called from within a workflow context.

Example:

stepID, err := dbos.GetStepID(ctx)
if err != nil {
    log.Printf("Not within a workflow context")
} else {
    log.Printf("Current step ID: %d", stepID)
}

func GetWorkflowID

func GetWorkflowID(ctx DBOSContext) (string, error)

GetWorkflowID retrieves the workflow ID from the context if called within a DBOS workflow. Returns an error if not called from within a workflow context.

Example:

workflowID, err := dbos.GetWorkflowID(ctx)
if err != nil {
    log.Printf("Not within a workflow context")
} else {
    log.Printf("Current workflow ID: %s", workflowID)
}

func Launch added in v0.7.0

func Launch(ctx DBOSContext) error

Launch launches the DBOS runtime using the provided DBOSContext. This is a package-level wrapper for the DBOSContext.Launch() method.

Example:

ctx, err := dbos.NewDBOSContext(context.Background(), config)
if err != nil {
    log.Fatal(err)
}

if err := dbos.Launch(ctx); err != nil {
    log.Fatal(err)
}

func Recv

func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error)

Recv receives a message sent to this workflow with type safety. This function blocks until a message is received or the timeout is reached. Messages are consumed in FIFO order and each message is delivered exactly once.

Recv can only be called from within a workflow and becomes part of the workflow's durable state.

Example:

message, err := dbos.Recv[string](ctx, "notifications", 30 * time.Second)
if err != nil {
    // Handle timeout or error
    return err
}
log.Printf("Received: %s", message)

func RegisterWorkflow

func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...WorkflowRegistrationOption)

RegisterWorkflow registers a function as a durable workflow that can be executed and recovered. The function is registered with type safety - P represents the input type and R the return type. Types are automatically registered with gob encoding for serialization.

Registration options include:

  • WithMaxRetries: Set maximum retry attempts for workflow recovery
  • WithSchedule: Register as a scheduled workflow with cron syntax
  • WithWorkflowName:: Set a custom name for the workflow

Scheduled workflows receive a time.Time as input representing the scheduled execution time.

Example:

func MyWorkflow(ctx dbos.DBOSContext, input string) (int, error) {
    // workflow implementation
    return len(input), nil
}

dbos.RegisterWorkflow(ctx, MyWorkflow)

// With options:
dbos.RegisterWorkflow(ctx, MyWorkflow,
    dbos.WithMaxRetries(5),
    dbos.WithSchedule("0 0 * * * *")) // daily at midnight
	dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow

func RunAsStep

func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error)

RunAsStep executes a function as a durable step within a workflow. Steps provide at-least-once execution guarantees and automatic retry capabilities. If a step has already been executed (e.g., during workflow recovery), its recorded result is returned instead of re-executing the function.

Steps can be configured with functional options:

data, err := dbos.RunAsStep(ctx, func(ctx context.Context) ([]byte, error) {
    return MyStep(ctx, "https://api.example.com/data")
}, dbos.WithStepMaxRetries(3), dbos.WithBaseInterval(500*time.Millisecond))

Available options:

  • WithStepName: Custom name for the step (only sets if not already set)
  • WithStepMaxRetries: Maximum retry attempts (default: 0)
  • WithBackoffFactor: Exponential backoff multiplier (default: 2.0)
  • WithBaseInterval: Initial delay between retries (default: 100ms)
  • WithMaxInterval: Maximum delay between retries (default: 5s)

Example:

func MyStep(ctx context.Context, url string) ([]byte, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return io.ReadAll(resp.Body)
}

// Within a workflow:
data, err := dbos.RunAsStep(ctx, func(ctx context.Context) ([]byte, error) {
    return MyStep(ctx, "https://api.example.com/data")
}, dbos.WithStepName("FetchData"), dbos.WithStepMaxRetries(3))
if err != nil {
    return nil, err
}

Note that the function passed to RunAsStep must accept a context.Context as its first parameter and this context *must* be the one specified in the function's signature (not the context passed to RunAsStep). Under the hood, DBOS uses the provided context to manage durable execution.

func Send

func Send[P any](ctx DBOSContext, destinationID string, message P, topic string) error

Send sends a message to another workflow with type safety. The message type P is automatically registered for gob encoding.

Send can be called from within a workflow (as a durable step) or from outside workflows. When called within a workflow, the send operation becomes part of the workflow's durable state.

Example:

err := dbos.Send(ctx, "target-workflow-id", "Hello from sender", "notifications")

func SetEvent

func SetEvent[P any](ctx DBOSContext, key string, message P) error

SetEvent sets a key-value event for the current workflow with type safety. Events are persistent and can be retrieved by other workflows using GetEvent. The event type P is automatically registered for gob encoding.

SetEvent can only be called from within a workflow and becomes part of the workflow's durable state. Setting an event with the same key will overwrite the previous value.

Example:

err := dbos.SetEvent(ctx, "status", "processing-complete")

func Shutdown added in v0.7.0

func Shutdown(ctx DBOSContext, timeout time.Duration)

Shutdown gracefully shuts down the DBOS runtime using the provided DBOSContext and timeout. This is a package-level wrapper for the DBOSContext.Shutdown() method.

Example:

ctx, err := dbos.NewDBOSContext(context.Background(), config)
if err != nil {
    log.Fatal(err)
}
defer dbos.Shutdown(ctx, 30*time.Second)

func Sleep

func Sleep(ctx DBOSContext, duration time.Duration) (time.Duration, error)

Sleep pauses workflow execution for the specified duration. This is a durable sleep - if the workflow is recovered during the sleep period, it will continue sleeping for the remaining time. Returns the actual duration slept.

Example:

actualDuration, err := dbos.Sleep(ctx, 5*time.Second)
if err != nil {
    return err
}

Types

type Client

type Client interface {
	Enqueue(queueName, workflowName string, input any, opts ...EnqueueOption) (WorkflowHandle[any], error)
	ListWorkflows(opts ...ListWorkflowsOption) ([]WorkflowStatus, error)
	Send(destinationID string, message any, topic string) error
	GetEvent(targetWorkflowID, key string, timeout time.Duration) (any, error)
	RetrieveWorkflow(workflowID string) (WorkflowHandle[any], error)
	CancelWorkflow(workflowID string) error
	ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
	ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
	Shutdown(timeout time.Duration) // Simply close the system DB connection pool
}

Client provides a programmatic way to interact with your DBOS application from external code. It manages the underlying DBOSContext and provides methods for workflow operations without requiring direct management of the context lifecycle.

func NewClient

func NewClient(ctx context.Context, config ClientConfig) (Client, error)

NewClient creates a new DBOS client with the provided configuration. The client manages its own DBOSContext internally.

Example:

config := dbos.ClientConfig{
    DatabaseURL: "postgres://user:pass@localhost:5432/dbname",
}
client, err := dbos.NewClient(context.Background(), config)
if err != nil {
    log.Fatal(err)
}

type ClientConfig

type ClientConfig struct {
	DatabaseURL    string        // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
	SystemDBPool   *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
	DatabaseSchema string        // Database schema name (defaults to "dbos")
	Logger         *slog.Logger  // Optional custom logger
}

type Config

type Config struct {
	AppName            string        // Application name for identification (required)
	DatabaseURL        string        // DatabaseURL is a PostgreSQL connection string. Either this or SystemDBPool is required.
	SystemDBPool       *pgxpool.Pool // SystemDBPool is a custom System Database Pool. It's optional and takes precedence over DatabaseURL if both are provided.
	DatabaseSchema     string        // Database schema name (defaults to "dbos")
	Logger             *slog.Logger  // Custom logger instance (defaults to a new slog logger)
	AdminServer        bool          // Enable Transact admin HTTP server (disabled by default)
	AdminServerPort    int           // Port for the admin HTTP server (default: 3001)
	ConductorURL       string        // DBOS conductor service URL (optional)
	ConductorAPIKey    string        // DBOS conductor API key (optional)
	ApplicationVersion string        // Application version (optional, overridden by DBOS__APPVERSION env var)
	ExecutorID         string        // Executor ID (optional, overridden by DBOS__VMID env var)
}

Config holds configuration parameters for initializing a DBOS context. DatabaseURL and AppName are required.

type DBOSContext

type DBOSContext interface {
	context.Context

	// Context Lifecycle
	Launch() error                  // Launch the DBOS runtime including system database, queues, and perform a workflow recovery for the local executor
	Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS resources

	// Workflow operations
	RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error)                                      // Execute a function as a durable step within a workflow
	RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
	Send(_ DBOSContext, destinationID string, message any, topic string) error                                  // Send a message to another workflow
	Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error)                                       // Receive a message sent to this workflow
	SetEvent(_ DBOSContext, key string, message any) error                                                      // Set a key-value event for this workflow
	GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error)            // Get a key-value event from a target workflow
	Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error)                                         // Durable sleep that survives workflow recovery
	GetWorkflowID() (string, error)                                                                             // Get the current workflow ID (only available within workflows)
	GetStepID() (int, error)                                                                                    // Get the current step ID (only available within workflows)

	// Workflow management
	RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error)     // Get a handle to an existing workflow
	CancelWorkflow(_ DBOSContext, workflowID string) error                              // Cancel a workflow by setting its status to CANCELLED
	ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error)       // Resume a cancelled workflow
	ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error)   // Fork a workflow from a specific step
	ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
	GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error)              // Get the execution steps of a workflow

	// Accessors
	GetApplicationVersion() string // Get the application version for this context
	GetExecutorID() string         // Get the executor ID for this context
	GetApplicationID() string      // Get the application ID for this context
}

DBOSContext represents a DBOS execution context that provides workflow orchestration capabilities. It extends the standard Go context.Context and adds methods for running workflows and steps, inter-workflow communication, and state management.

The context manages the lifecycle of workflows, provides durability guarantees, and enables recovery of interrupted workflows.

func NewDBOSContext

func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error)

NewDBOSContext creates a new DBOS context with the provided configuration. The context must be launched with Launch() for workflow execution and should be shut down with Shutdown(). This function initializes the DBOS system database, sets up the queue sub-system, and prepares the workflow registry.

Example:

config := dbos.Config{
    DatabaseURL: "postgres://user:pass@localhost:5432/dbname",
    AppName:     "my-app",
}
ctx, err := dbos.NewDBOSContext(context.Background(), config)
if err != nil {
    log.Fatal(err)
}
defer ctx.Shutdown(30*time.Second)

if err := ctx.Launch(); err != nil {
    log.Fatal(err)
}

func WithTimeout

func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.CancelFunc)

WithTimeout returns a copy of the DBOS context with a timeout. The returned context will be canceled after the specified duration. No-op if the provided context is not a concrete dbos.dbosContext.

func WithValue

func WithValue(ctx DBOSContext, key, val any) DBOSContext

WithValue returns a copy of the DBOS context with the given key-value pair. This is similar to context.WithValue but maintains DBOS context capabilities. No-op if the provided context is not a concrete dbos.dbosContext.

func WithoutCancel

func WithoutCancel(ctx DBOSContext) DBOSContext

WithoutCancel returns a copy of the DBOS context that is not canceled when the parent context is canceled. This can be used to detach a child workflow. No-op if the provided context is not a concrete dbos.dbosContext.

type DBOSError

type DBOSError struct {
	Message string        // Human-readable error message
	Code    DBOSErrorCode // Error type code for programmatic handling

	// Optional context fields - only set when relevant to the error
	WorkflowID      string // Associated workflow identifier
	DestinationID   string // Target workflow identifier (for communication errors)
	StepName        string // Step function name (for step errors)
	QueueName       string // Queue name (for queue-related errors)
	DeduplicationID string // Deduplication identifier
	StepID          int    // Step sequence number
	ExpectedName    string // Expected function name (for determinism errors)
	RecordedName    string // Actually recorded function name (for determinism errors)
	MaxRetries      int    // Maximum retry limit (for retry-related errors)
	// contains filtered or unexported fields
}

DBOSError is the unified error type for all DBOS operations. It provides structured error information with context-specific fields and error codes for programmatic handling.

func (*DBOSError) Error

func (e *DBOSError) Error() string

Error returns a formatted error message including the error code. This implements the standard Go error interface.

func (*DBOSError) Unwrap added in v0.7.0

func (e *DBOSError) Unwrap() error

Unwrap returns the underlying error, if any. This enables Go's error unwrapping functionality with errors.Is and errors.As.

type DBOSErrorCode

type DBOSErrorCode int

DBOSErrorCode represents the different types of errors that can occur in DBOS operations.

const (
	ConflictingIDError           DBOSErrorCode = iota + 1 // Workflow ID conflicts or duplicate operations
	InitializationError                                   // DBOS context initialization failures
	NonExistentWorkflowError                              // Referenced workflow does not exist
	ConflictingWorkflowError                              // Workflow with same ID already exists with different parameters
	WorkflowCancelled                                     // Workflow was cancelled during execution
	UnexpectedStep                                        // Step function mismatch during recovery (non-deterministic workflow)
	AwaitedWorkflowCancelled                              // A workflow being awaited was cancelled
	ConflictingRegistrationError                          // Attempting to register a workflow/queue that already exists
	WorkflowUnexpectedTypeError                           // Type mismatch in workflow input/output
	WorkflowExecutionError                                // General workflow execution error
	StepExecutionError                                    // General step execution error
	DeadLetterQueueError                                  // Workflow moved to dead letter queue after max retries
	MaxStepRetriesExceeded                                // Step exceeded maximum retry attempts
	QueueDeduplicated                                     // Workflow was deduplicated in the queue
)

type EnqueueOption

type EnqueueOption func(*enqueueOptions)

EnqueueOption is a functional option for configuring workflow enqueue parameters.

func WithEnqueueApplicationVersion

func WithEnqueueApplicationVersion(version string) EnqueueOption

WithEnqueueApplicationVersion overrides the application version for the enqueued workflow.

func WithEnqueueDeduplicationID

func WithEnqueueDeduplicationID(id string) EnqueueOption

WithEnqueueDeduplicationID sets a deduplication ID for the enqueued workflow.

func WithEnqueuePriority

func WithEnqueuePriority(priority uint) EnqueueOption

WithEnqueuePriority sets the execution priority for the enqueued workflow.

func WithEnqueueTimeout

func WithEnqueueTimeout(timeout time.Duration) EnqueueOption

WithEnqueueTimeout sets the maximum execution time for the enqueued workflow.

func WithEnqueueWorkflowID

func WithEnqueueWorkflowID(id string) EnqueueOption

WithEnqueueWorkflowID sets a custom workflow ID instead of generating one automatically.

type ForkWorkflowInput

type ForkWorkflowInput struct {
	OriginalWorkflowID string // Required: The UUID of the original workflow to fork from
	ForkedWorkflowID   string // Optional: Custom workflow ID for the forked workflow (auto-generated if empty)
	StartStep          uint   // Optional: Step to start the forked workflow from (default: 0)
	ApplicationVersion string // Optional: Application version for the forked workflow (inherits from original if empty)
}

ForkWorkflowInput holds configuration parameters for forking workflows. OriginalWorkflowID is required. Other fields are optional.

type ListWorkflowsOption

type ListWorkflowsOption func(*listWorkflowsOptions)

ListWorkflowsOption is a functional option for configuring workflow listing parameters.

func WithAppVersion

func WithAppVersion(appVersion string) ListWorkflowsOption

WithAppVersion filters workflows by the specified application version.

func WithEndTime

func WithEndTime(endTime time.Time) ListWorkflowsOption

WithEndTime filters workflows created before the specified time.

func WithExecutorIDs

func WithExecutorIDs(executorIDs []string) ListWorkflowsOption

WithExecutorIDs filters workflows by the specified executor IDs.

func WithLimit

func WithLimit(limit int) ListWorkflowsOption

WithLimit limits the number of workflows returned.

func WithLoadInput

func WithLoadInput(loadInput bool) ListWorkflowsOption

WithLoadInput controls whether to load workflow input data (default: true).

func WithLoadOutput

func WithLoadOutput(loadOutput bool) ListWorkflowsOption

WithLoadOutput controls whether to load workflow output data (default: true).

func WithName

func WithName(name string) ListWorkflowsOption

WithName filters workflows by the specified workflow function name.

func WithOffset

func WithOffset(offset int) ListWorkflowsOption

WithOffset sets the offset for pagination.

func WithQueueName

func WithQueueName(queueName string) ListWorkflowsOption

WithQueueName filters workflows by the specified queue name. This is typically used when listing queued workflows.

func WithQueuesOnly

func WithQueuesOnly() ListWorkflowsOption

WithQueuesOnly filters to only return workflows that are in a queue.

func WithSortDesc

func WithSortDesc() ListWorkflowsOption

WithSortDesc enables descending sort by creation time (default is ascending).

func WithStartTime

func WithStartTime(startTime time.Time) ListWorkflowsOption

WithStartTime filters workflows created after the specified time.

func WithStatus

func WithStatus(status []WorkflowStatusType) ListWorkflowsOption

WithStatus filters workflows by the specified list of statuses.

func WithUser

func WithUser(user string) ListWorkflowsOption

WithUser filters workflows by the specified authenticated user.

func WithWorkflowIDPrefix

func WithWorkflowIDPrefix(prefix string) ListWorkflowsOption

WithWorkflowIDPrefix filters workflows by workflow ID prefix.

func WithWorkflowIDs

func WithWorkflowIDs(workflowIDs []string) ListWorkflowsOption

WithWorkflowIDs filters workflows by the specified workflow IDs.

type QueueOption

type QueueOption func(*WorkflowQueue)

QueueOption is a functional option for configuring a workflow queue

func WithGlobalConcurrency

func WithGlobalConcurrency(concurrency int) QueueOption

WithGlobalConcurrency limits the total number of workflows that can run concurrently from the queue across all executors. This provides global concurrency control.

func WithMaxTasksPerIteration

func WithMaxTasksPerIteration(maxTasks int) QueueOption

WithMaxTasksPerIteration sets the maximum number of workflows to dequeue in a single iteration. This controls batch sizes for queue processing.

func WithPriorityEnabled

func WithPriorityEnabled() QueueOption

WithPriorityEnabled enables priority-based scheduling for the queue. When enabled, workflows with lower priority numbers are executed first.

func WithRateLimiter

func WithRateLimiter(limiter *RateLimiter) QueueOption

WithRateLimiter configures rate limiting for the queue to prevent overwhelming external services. The rate limiter enforces a maximum number of workflow starts within a time period.

func WithWorkerConcurrency

func WithWorkerConcurrency(concurrency int) QueueOption

WithWorkerConcurrency limits the number of workflows this executor can run concurrently from the queue. This provides per-executor concurrency control.

type RateLimiter

type RateLimiter struct {
	Limit  int           // Maximum number of workflows to start within the period
	Period time.Duration // Time period for the rate limit
}

RateLimiter configures rate limiting for workflow queue execution. Rate limits prevent overwhelming external services and provide backpressure.

type Step

type Step[R any] func(ctx context.Context) (R, error)

Step represents a type-safe step function with a specific output type R.

type StepFunc

type StepFunc func(ctx context.Context) (any, error)

StepFunc represents a type-erased step function used internally.

type StepInfo

type StepInfo struct {
	StepID          int    // The sequential ID of the step within the workflow
	StepName        string // The name of the step function
	Output          any    // The output returned by the step (if any)
	Error           error  // The error returned by the step (if any)
	ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
}

StepInfo contains information about a workflow step execution.

func GetWorkflowSteps

func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error)

GetWorkflowSteps retrieves the execution steps of a workflow. Returns a list of step information including step IDs, names, outputs, errors, and child workflow IDs. The list is sorted by step ID in ascending order.

Parameters:

  • ctx: DBOS context for the operation
  • workflowID: The unique identifier of the workflow

Returns a slice of StepInfo structs containing information about each executed step.

Example:

steps, err := dbos.GetWorkflowSteps(ctx, "workflow-id")
if err != nil {
    log.Fatal(err)
}
for _, step := range steps {
    log.Printf("Step %d: %s", step.StepID, step.StepName)
}

type StepOption

type StepOption func(*stepOptions)

StepOption is a functional option for configuring step execution parameters.

func WithBackoffFactor

func WithBackoffFactor(factor float64) StepOption

WithBackoffFactor sets the exponential backoff multiplier between retries. The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1)) Default value is 2.0.

func WithBaseInterval

func WithBaseInterval(interval time.Duration) StepOption

WithBaseInterval sets the initial delay between retries. Default value is 100ms.

func WithMaxInterval

func WithMaxInterval(interval time.Duration) StepOption

WithMaxInterval sets the maximum delay between retries. Default value is 5s.

func WithStepMaxRetries

func WithStepMaxRetries(maxRetries int) StepOption

WithStepMaxRetries sets the maximum number of retry attempts for the step. A value of 0 means no retries (default behavior).

func WithStepName

func WithStepName(name string) StepOption

WithStepName sets a custom name for the step. If the step name has already been set by a previous call to WithStepName, this option will be ignored

type Workflow

type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error)

Workflow represents a type-safe workflow function with specific input and output types. P is the input parameter type and R is the return type. All workflow functions must accept a DBOSContext as their first parameter.

type WorkflowFunc

type WorkflowFunc func(ctx DBOSContext, input any) (any, error)

WorkflowFunc represents a type-erased workflow function used internally.

type WorkflowHandle

type WorkflowHandle[R any] interface {
	GetResult() (R, error)              // Wait for workflow completion and return the result
	GetStatus() (WorkflowStatus, error) // Get current workflow status without waiting
	GetWorkflowID() string              // Get the unique workflow identifier
}

WorkflowHandle provides methods to interact with a running or completed workflow. The type parameter R represents the expected return type of the workflow. Handles can be used to wait for workflow completion, check status, and retrieve results.

func Enqueue

func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, opts ...EnqueueOption) (WorkflowHandle[R], error)

Enqueue adds a workflow to a named queue for later execution with type safety. The workflow will be persisted with ENQUEUED status until picked up by a DBOS process. This provides asynchronous workflow execution with durability guarantees.

Parameters:

  • c: Client instance for the operation
  • queueName: Name of the queue to enqueue the workflow to
  • workflowName: Name of the registered workflow function to execute
  • input: Input parameters to pass to the workflow (type P)
  • opts: Optional configuration options

Available options:

  • WithEnqueueWorkflowID: Custom workflow ID (auto-generated if not provided)
  • WithEnqueueApplicationVersion: Application version override
  • WithEnqueueDeduplicationID: Deduplication identifier for idempotent enqueuing
  • WithEnqueuePriority: Execution priority
  • WithEnqueueTimeout: Maximum execution time for the workflow

Returns a typed workflow handle that can be used to check status and retrieve results. The handle uses polling to check workflow completion since the execution is asynchronous.

Example usage:

// Enqueue a workflow with string input and int output
handle, err := dbos.Enqueue[string, int](client, "data-processing", "ProcessDataWorkflow", "input data",
    dbos.WithEnqueueTimeout(30 * time.Minute))
if err != nil {
    log.Fatal(err)
}

// Check status
status, err := handle.GetStatus()
if err != nil {
    log.Printf("Failed to get status: %v", err)
}

// Wait for completion and get result
result, err := handle.GetResult()
if err != nil {
    log.Printf("Workflow failed: %v", err)
} else {
    log.Printf("Result: %d", result)
}

// Enqueue with deduplication and custom workflow ID
handle, err := dbos.Enqueue[MyInputType, MyOutputType](client, "my-queue", "MyWorkflow", MyInputType{Field: "value"},
    dbos.WithEnqueueWorkflowID("custom-workflow-id"),
    dbos.WithEnqueueDeduplicationID("unique-operation-id"))

func ForkWorkflow

func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHandle[R], error)

ForkWorkflow creates a new workflow instance by copying an existing workflow from a specific step. The forked workflow will have a new UUID and will execute from the specified StartStep. If StartStep > 0, the forked workflow will reuse the operation outputs from steps 0 to StartStep-1 copied from the original workflow.

Parameters:

  • ctx: DBOS context for the operation
  • input: Configuration parameters for the forked workflow

Returns a typed workflow handle for the newly created forked workflow.

Example usage:

// Basic fork from step 5
handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{
    OriginalWorkflowID: "original-workflow-id",
    StartStep:          5,
})
if err != nil {
    log.Fatal(err)
}

// Fork with custom workflow ID and application version
handle, err := dbos.ForkWorkflow[MyResultType](ctx, dbos.ForkWorkflowInput{
    OriginalWorkflowID: "original-workflow-id",
    ForkedWorkflowID:   "my-custom-fork-id",
    StartStep:          3,
    ApplicationVersion: "v2.0.0",
})
if err != nil {
    log.Fatal(err)
}

func ResumeWorkflow

func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)

ResumeWorkflow resumes a workflow by starting it from its last completed step. You can use this to resume workflows that are cancelled or have exceeded their maximum recovery attempts. You can also use this to start an enqueued workflow immediately, bypassing its queue. If the workflow is already completed, this is a no-op. Returns a handle that can be used to wait for completion and retrieve results. Returns an error if the workflow does not exist or if the operation fails.

Example:

handle, err := dbos.ResumeWorkflow[int](ctx, "workflow-id")
if err != nil {
    log.Printf("Failed to resume workflow: %v", err)
} else {
    result, err := handle.GetResult()
    if err != nil {
        log.Printf("Workflow failed: %v", err)
    } else {
        log.Printf("Result: %d", result)
    }
}

func RetrieveWorkflow

func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R], error)

RetrieveWorkflow returns a typed handle to an existing workflow. The handle can be used to check status and wait for results. The type parameter R must match the workflow's actual return type.

Example:

handle, err := dbos.RetrieveWorkflow[int](ctx, "workflow-id")
if err != nil {
    log.Fatal(err)
}

result, err := handle.GetResult()
if err != nil {
    log.Printf("Workflow failed: %v", err)
} else {
    log.Printf("Result: %d", result)
}

func RunWorkflow

func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error)

RunWorkflow executes a workflow function with type safety and durability guarantees. The workflow can be executed immediately or enqueued for later execution based on options. Returns a typed handle that can be used to wait for completion and retrieve results.

The workflow will be automatically recovered if the process crashes or is interrupted. All workflow state is persisted to ensure exactly-once execution semantics.

Example:

handle, err := dbos.RunWorkflow(ctx, MyWorkflow, "input string", dbos.WithWorkflowID("my-custom-id"))
if err != nil {
    log.Fatal(err)
}

result, err := handle.GetResult()
if err != nil {
    log.Printf("Workflow failed: %v", err)
} else {
    log.Printf("Result: %v", result)
}

type WorkflowOption

type WorkflowOption func(*workflowOptions)

WorkflowOption is a functional option for configuring workflow execution parameters.

func WithApplicationVersion

func WithApplicationVersion(version string) WorkflowOption

WithApplicationVersion overrides the DBOS Context application version for this workflow. This affects workflow recovery.

func WithAssumedRole added in v0.7.0

func WithAssumedRole(role string) WorkflowOption

Sets the assumed role for the workflow

func WithAuthenticatedRoles added in v0.7.0

func WithAuthenticatedRoles(roles []string) WorkflowOption

Sets the authenticated role for the workflow

func WithAuthenticatedUser added in v0.7.0

func WithAuthenticatedUser(user string) WorkflowOption

Sets the authenticated user for the workflow

func WithDeduplicationID

func WithDeduplicationID(id string) WorkflowOption

WithDeduplicationID sets a deduplication ID for a queue workflow.

func WithPriority

func WithPriority(priority uint) WorkflowOption

WithPriority sets the execution priority for a queue workflow.

func WithQueue

func WithQueue(queueName string) WorkflowOption

WithQueue enqueues the workflow to the specified queue instead of executing immediately. Queued workflows will be processed by the queue runner according to the queue's configuration.

func WithWorkflowID

func WithWorkflowID(id string) WorkflowOption

WithWorkflowID sets a custom workflow ID instead of generating one automatically.

type WorkflowQueue

type WorkflowQueue struct {
	Name                 string       `json:"name"`                        // Unique queue name
	WorkerConcurrency    *int         `json:"workerConcurrency,omitempty"` // Max concurrent workflows per executor
	GlobalConcurrency    *int         `json:"concurrency,omitempty"`       // Max concurrent workflows across all executors
	PriorityEnabled      bool         `json:"priorityEnabled,omitempty"`   // Enable priority-based scheduling
	RateLimit            *RateLimiter `json:"rateLimit,omitempty"`         // Rate limiting configuration
	MaxTasksPerIteration int          `json:"maxTasksPerIteration"`        // Max workflows to dequeue per iteration
}

WorkflowQueue defines a named queue for workflow execution. Queues provide controlled workflow execution with concurrency limits, priority scheduling, and rate limiting.

func NewWorkflowQueue

func NewWorkflowQueue(dbosCtx DBOSContext, name string, options ...QueueOption) WorkflowQueue

NewWorkflowQueue creates a new workflow queue with the specified name and configuration options. The queue must be created before workflows can be enqueued to it using the WithQueue option in RunWorkflow. Queues provide controlled execution with support for concurrency limits, priority scheduling, and rate limiting.

Example:

queue := dbos.NewWorkflowQueue(ctx, "email-queue",
    dbos.WithWorkerConcurrency(5),
    dbos.WithRateLimiter(&dbos.RateLimiter{
        Limit:  100,
        Period: 60 * time.Second, // 100 workflows per minute
    }),
    dbos.WithPriorityEnabled(),
)

// Enqueue workflows to this queue:
handle, err := dbos.RunWorkflow(ctx, SendEmailWorkflow, emailData, dbos.WithQueue("email-queue"))

type WorkflowRegistrationOption

type WorkflowRegistrationOption func(*workflowRegistrationOptions)

func WithMaxRetries

func WithMaxRetries(maxRetries int) WorkflowRegistrationOption

WithMaxRetries sets the maximum number of retry attempts for workflow recovery. If a workflow fails or is interrupted, it will be retried up to this many times. After exceeding max retries, the workflow status becomes MAX_RECOVERY_ATTEMPTS_EXCEEDED.

func WithSchedule

func WithSchedule(schedule string) WorkflowRegistrationOption

WithSchedule registers the workflow as a scheduled workflow using cron syntax. The schedule string follows standard cron format with second precision. Scheduled workflows automatically receive a time.Time input parameter.

func WithWorkflowName

func WithWorkflowName(name string) WorkflowRegistrationOption

type WorkflowSendInput

type WorkflowSendInput struct {
	DestinationID string
	Message       any
	Topic         string
}

type WorkflowSetEventInput

type WorkflowSetEventInput struct {
	Key     string
	Message any
}

type WorkflowStatus

type WorkflowStatus struct {
	ID                 string             `json:"workflow_uuid"`                 // Unique identifier for the workflow
	Status             WorkflowStatusType `json:"status"`                        // Current execution status
	Name               string             `json:"name"`                          // Function name of the workflow
	AuthenticatedUser  string             `json:"authenticated_user,omitempty"`  // User who initiated the workflow (if applicable)
	AssumedRole        string             `json:"assumed_role,omitempty"`        // Role assumed during execution (if applicable)
	AuthenticatedRoles []string           `json:"authenticated_roles,omitempty"` // Roles available to the user (if applicable)
	Output             any                `json:"output,omitempty"`              // Workflow output (available after completion)
	Error              error              `json:"error,omitempty"`               // Error information (if status is ERROR)
	ExecutorID         string             `json:"executor_id"`                   // ID of the executor running this workflow
	CreatedAt          time.Time          `json:"created_at"`                    // When the workflow was created
	UpdatedAt          time.Time          `json:"updated_at"`                    // When the workflow status was last updated
	ApplicationVersion string             `json:"application_version"`           // Version of the application that created this workflow
	ApplicationID      string             `json:"application_id,omitempty"`      // Application identifier
	Attempts           int                `json:"attempts"`                      // Number of execution attempts
	QueueName          string             `json:"queue_name,omitempty"`          // Queue name (if workflow was enqueued)
	Timeout            time.Duration      `json:"timeout,omitempty"`             // Workflow timeout duration
	Deadline           time.Time          `json:"deadline"`                      // Absolute deadline for workflow completion
	StartedAt          time.Time          `json:"started_at"`                    // When the workflow execution actually started
	DeduplicationID    string             `json:"deduplication_id,omitempty"`    // Queue deduplication identifier
	Input              any                `json:"input,omitempty"`               // Input parameters passed to the workflow
	Priority           int                `json:"priority,omitempty"`            // Queue execution priority (lower numbers have higher priority)
}

WorkflowStatus contains comprehensive information about a workflow's current state and execution history.

func ListWorkflows

func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error)

ListWorkflows retrieves a list of workflows based on the provided filters.

The function supports filtering by workflow IDs, status, time ranges, names, application versions, workflow ID prefixes, and more. It also supports pagination through limit/offset parameters and sorting control (ascending by default, or descending with WithSortDesc).

By default, both input and output data are loaded for each workflow. This can be controlled using WithLoadInput(false) and WithLoadOutput(false) options for better performance when the data is not needed.

Parameters:

  • opts: Functional options to configure the query filters and parameters

Returns a slice of WorkflowStatus structs containing the workflow information.

Example usage:

// List all successful workflows from the last 24 hours
workflows, err := dbos.ListWorkflows(
    dbos.WithStatus([]dbos.WorkflowStatusType{dbos.WorkflowStatusSuccess}),
    dbos.WithStartTime(time.Now().Add(-24*time.Hour)),
    dbos.WithLimit(100))
if err != nil {
    log.Fatal(err)
}

// List workflows by specific IDs without loading input/output data
workflows, err := dbos.ListWorkflows(
    dbos.WithWorkflowIDs([]string{"workflow1", "workflow2"}),
    dbos.WithLoadInput(false),
    dbos.WithLoadOutput(false))
if err != nil {
    log.Fatal(err)
}

// List workflows with pagination
workflows, err := dbos.ListWorkflows(
    dbos.WithUser("john.doe"),
    dbos.WithOffset(50),
    dbos.WithLimit(25),
    dbos.WithSortDesc()
if err != nil {
    log.Fatal(err)
}

type WorkflowStatusType

type WorkflowStatusType string

WorkflowStatusType represents the current execution state of a workflow.

const (
	WorkflowStatusPending                     WorkflowStatusType = "PENDING"                        // Workflow is running or ready to run
	WorkflowStatusEnqueued                    WorkflowStatusType = "ENQUEUED"                       // Workflow is queued and waiting for execution
	WorkflowStatusSuccess                     WorkflowStatusType = "SUCCESS"                        // Workflow completed successfully
	WorkflowStatusError                       WorkflowStatusType = "ERROR"                          // Workflow completed with an error
	WorkflowStatusCancelled                   WorkflowStatusType = "CANCELLED"                      // Workflow was cancelled (manually or due to timeout)
	WorkflowStatusMaxRecoveryAttemptsExceeded WorkflowStatusType = "MAX_RECOVERY_ATTEMPTS_EXCEEDED" // Workflow exceeded maximum retry attempts
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL