worker

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2025 License: MIT Imports: 13 Imported by: 23

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultOptions = Options{
	WorkflowWorkerOptions: WorkflowWorkerOptions{
		WorkflowPollers:           2,
		WorkflowPollingInterval:   200 * time.Millisecond,
		MaxParallelWorkflowTasks:  0,
		WorkflowHeartbeatInterval: 25 * time.Second,

		WorkflowExecutorCacheSize: 128,
		WorkflowExecutorCacheTTL:  time.Second * 10,
		WorkflowExecutorCache:     nil,
	},

	ActivityWorkerOptions: ActivityWorkerOptions{
		ActivityPollers:           2,
		ActivityPollingInterval:   200 * time.Millisecond,
		MaxParallelActivityTasks:  0,
		ActivityHeartbeatInterval: 25 * time.Second,
	},

	SingleWorkerMode: false,
}

Functions

This section is empty.

Types

type ActivityWorkerOptions added in v0.19.0

type ActivityWorkerOptions struct {
	// ActivityPollers is the number of pollers to start. Defaults to 2.
	ActivityPollers int

	// MaxParallelActivityTasks determines the maximum number of concurrent activity tasks processed
	// by the worker. The default is 0 which is no limit.
	MaxParallelActivityTasks int

	// ActivityHeartbeatInterval is the interval between heartbeat attempts for activity tasks. Defaults
	// to 25 seconds
	ActivityHeartbeatInterval time.Duration

	// ActivityPollingInterval is the interval between polling for new activity tasks.
	// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
	// Defaults to 200ms.
	ActivityPollingInterval time.Duration

	// ActivityQueues are the queues the worker listens to
	ActivityQueues []workflow.Queue
}

type Options

type Options struct {
	WorkflowWorkerOptions
	ActivityWorkerOptions

	// SingleWorkerMode enables automatic registration of workflows and activities
	// when they are used in workflows. This is useful for simple scenarios where
	// you don't want to explicitly register each workflow and activity.
	SingleWorkerMode bool
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func New

func New(backend backend.Backend, options *Options) *Worker

New creates a worker that processes workflows and activities.

func NewActivityWorker added in v0.19.0

func NewActivityWorker(backend backend.Backend, options *ActivityWorkerOptions) *Worker

NewActivityWorker creates a worker that only processes activities.

func NewWorkflowWorker added in v0.19.0

func NewWorkflowWorker(backend backend.Backend, options *WorkflowWorkerOptions) *Worker

NewWorkflowWorker creates a worker that only processes workflows.

func (*Worker) RegisterActivity added in v0.17.0

func (w *Worker) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error

RegisterActivity registers an activity with the worker's registry.

func (*Worker) RegisterWorkflow added in v0.17.0

func (w *Worker) RegisterWorkflow(wf workflow.Workflow, opts ...registry.RegisterOption) error

RegisterWorkflow registers a workflow with the worker's registry.

func (*Worker) Start

func (w *Worker) Start(ctx context.Context) error

Start starts the worker.

To stop the worker, cancel the context passed to Start. To wait for completion of the active tasks, call `WaitForCompletion`.

func (*Worker) WaitForCompletion added in v0.2.0

func (w *Worker) WaitForCompletion() error

WaitForCompletion waits for all active tasks to complete.

type WorkflowOrchestrator added in v1.0.0

type WorkflowOrchestrator struct {
	Client *client.Client // Exposed for direct access to GetWorkflowResult
	// contains filtered or unexported fields
}

WorkflowOrchestrator combines a worker and client into a single entity. It orchestrates the entire workflow lifecycle, from creation to execution.

func NewWorkflowOrchestrator added in v1.0.0

func NewWorkflowOrchestrator(backend backend.Backend, options *Options) *WorkflowOrchestrator

NewWorkflowOrchestrator creates a new orchestrator with client capabilities and optional registration.

func (*WorkflowOrchestrator) CreateWorkflowInstance added in v1.0.0

func (o *WorkflowOrchestrator) CreateWorkflowInstance(ctx context.Context, options client.WorkflowInstanceOptions, wf workflow.Workflow, args ...any) (*workflow.Instance, error)

CreateWorkflowInstance creates a new workflow instance using the client. Automatically registers the workflow if it's not already registered.

func (*WorkflowOrchestrator) RemoveWorkflowInstance added in v1.0.0

func (o *WorkflowOrchestrator) RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error

RemoveWorkflowInstance removes a workflow instance.

func (*WorkflowOrchestrator) SignalWorkflow added in v1.0.0

func (o *WorkflowOrchestrator) SignalWorkflow(ctx context.Context, instanceID string, name string, arg any) error

SignalWorkflow signals a workflow instance.

func (*WorkflowOrchestrator) Start added in v1.0.0

func (o *WorkflowOrchestrator) Start(ctx context.Context) error

Start starts the worker with single worker mode enabled.

func (*WorkflowOrchestrator) WaitForCompletion added in v1.0.0

func (o *WorkflowOrchestrator) WaitForCompletion() error

WaitForCompletion waits for the worker to complete processing.

func (*WorkflowOrchestrator) WaitForWorkflowInstance added in v1.0.0

func (o *WorkflowOrchestrator) WaitForWorkflowInstance(ctx context.Context, instance *workflow.Instance, timeout time.Duration) error

WaitForWorkflowInstance waits for a workflow instance to complete.

type WorkflowWorkerOptions added in v0.19.0

type WorkflowWorkerOptions struct {
	// WorkflowsPollers is the number of pollers to start. Defaults to 2.
	WorkflowPollers int

	// MaxParallelWorkflowTasks determines the maximum number of concurrent workflow tasks processed
	// by the worker. The default is 0 which is no limit.
	MaxParallelWorkflowTasks int

	// WorkflowHeartbeatInterval is the interval between heartbeat attempts on workflow tasks. Defaults
	// to 25 seconds
	WorkflowHeartbeatInterval time.Duration

	// WorkflowPollingInterval is the interval between polling for new workflow tasks.
	// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
	// Defaults to 200ms.
	WorkflowPollingInterval time.Duration

	// WorkflowExecutorCache is the max size of the workflow executor cache. Defaults to 128
	WorkflowExecutorCacheSize int

	// WorkflowExecutorCache is the max TTL of the workflow executor cache. Defaults to 10 seconds
	WorkflowExecutorCacheTTL time.Duration

	// WorkflowExecutorCache is the cache to use for workflow executors. If nil, a default cache implementation
	// will be used.
	WorkflowExecutorCache executor.Cache

	// WorkflowQueues are the queue the worker listens to
	WorkflowQueues []workflow.Queue
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL