Documentation
¶
Index ¶
- Variables
- func RegisterTypedActivity0[TResult any](w *Worker, activity workflow.ActivityFunc0[TResult], ...) error
- func RegisterTypedActivity1[TInput, TResult any](w *Worker, activity workflow.ActivityFunc1[TInput, TResult], ...) error
- func RegisterTypedActivity2[TInput1, TInput2, TResult any](w *Worker, activity workflow.ActivityFunc2[TInput1, TInput2, TResult], ...) error
- func RegisterTypedActivity3[TInput1, TInput2, TInput3, TResult any](w *Worker, activity workflow.ActivityFunc3[TInput1, TInput2, TInput3, TResult], ...) error
- func RegisterTypedActivityNoResult0(w *Worker, activity workflow.ActivityFuncNoResult0, ...) error
- func RegisterTypedActivityNoResult1[TInput any](w *Worker, activity workflow.ActivityFuncNoResult1[TInput], ...) error
- func RegisterTypedActivityNoResult2[TInput1, TInput2 any](w *Worker, activity workflow.ActivityFuncNoResult2[TInput1, TInput2], ...) error
- func RegisterTypedActivityNoResult3[TInput1, TInput2, TInput3 any](w *Worker, activity workflow.ActivityFuncNoResult3[TInput1, TInput2, TInput3], ...) error
- func RegisterTypedWorkflow0[TResult any](w *Worker, wf workflow.WorkflowFunc0[TResult], opts ...registry.RegisterOption) error
- func RegisterTypedWorkflow1[TInput, TResult any](w *Worker, wf workflow.WorkflowFunc1[TInput, TResult], ...) error
- func RegisterTypedWorkflow2[TInput1, TInput2, TResult any](w *Worker, wf workflow.WorkflowFunc2[TInput1, TInput2, TResult], ...) error
- func RegisterTypedWorkflow3[TInput1, TInput2, TInput3, TResult any](w *Worker, wf workflow.WorkflowFunc3[TInput1, TInput2, TInput3, TResult], ...) error
- func RegisterTypedWorkflowNoResult0(w *Worker, wf workflow.WorkflowFuncNoResult0, opts ...registry.RegisterOption) error
- func RegisterTypedWorkflowNoResult1[TInput any](w *Worker, wf workflow.WorkflowFuncNoResult1[TInput], ...) error
- func RegisterTypedWorkflowNoResult2[TInput1, TInput2 any](w *Worker, wf workflow.WorkflowFuncNoResult2[TInput1, TInput2], ...) error
- func RegisterTypedWorkflowNoResult3[TInput1, TInput2, TInput3 any](w *Worker, wf workflow.WorkflowFuncNoResult3[TInput1, TInput2, TInput3], ...) error
- type ActivityWorkerOptions
- type Options
- type Worker
- type WorkflowOrchestrator
- func (o *WorkflowOrchestrator) CreateWorkflowInstance(ctx context.Context, options client.WorkflowInstanceOptions, ...) (*workflow.Instance, error)
- func (o *WorkflowOrchestrator) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error
- func (o *WorkflowOrchestrator) RegisterWorkflow(wf workflow.Workflow, opts ...registry.RegisterOption) error
- func (o *WorkflowOrchestrator) RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
- func (o *WorkflowOrchestrator) SignalWorkflow(ctx context.Context, instanceID string, name string, arg any) error
- func (o *WorkflowOrchestrator) Start(ctx context.Context) error
- func (o *WorkflowOrchestrator) WaitForCompletion() error
- func (o *WorkflowOrchestrator) WaitForWorkflowInstance(ctx context.Context, instance *workflow.Instance, timeout time.Duration) error
- type WorkflowWorkerOptions
Constants ¶
This section is empty.
Variables ¶
var DefaultOptions = Options{ WorkflowWorkerOptions: WorkflowWorkerOptions{ WorkflowPollers: 2, WorkflowPollingInterval: 200 * time.Millisecond, MaxParallelWorkflowTasks: 0, WorkflowHeartbeatInterval: 25 * time.Second, WorkflowExecutorCacheSize: 128, WorkflowExecutorCacheTTL: time.Second * 10, WorkflowExecutorCache: nil, }, ActivityWorkerOptions: ActivityWorkerOptions{ ActivityPollers: 2, ActivityPollingInterval: 200 * time.Millisecond, MaxParallelActivityTasks: 0, ActivityHeartbeatInterval: 25 * time.Second, }, SingleWorkerMode: false, }
Functions ¶
func RegisterTypedActivity0 ¶
func RegisterTypedActivity0[TResult any](w *Worker, activity workflow.ActivityFunc0[TResult], opts ...registry.RegisterOption) error
RegisterTypedActivity0 registers an activity function with no input parameters (type-safe).
func RegisterTypedActivity1 ¶
func RegisterTypedActivity1[TInput, TResult any](w *Worker, activity workflow.ActivityFunc1[TInput, TResult], opts ...registry.RegisterOption) error
RegisterTypedActivity1 registers an activity function with one input parameter (type-safe).
func RegisterTypedActivity2 ¶
func RegisterTypedActivity2[TInput1, TInput2, TResult any](w *Worker, activity workflow.ActivityFunc2[TInput1, TInput2, TResult], opts ...registry.RegisterOption) error
RegisterTypedActivity2 registers an activity function with two input parameters (type-safe).
func RegisterTypedActivity3 ¶
func RegisterTypedActivity3[TInput1, TInput2, TInput3, TResult any](w *Worker, activity workflow.ActivityFunc3[TInput1, TInput2, TInput3, TResult], opts ...registry.RegisterOption) error
RegisterTypedActivity3 registers an activity function with three input parameters (type-safe).
func RegisterTypedActivityNoResult0 ¶
func RegisterTypedActivityNoResult0(w *Worker, activity workflow.ActivityFuncNoResult0, opts ...registry.RegisterOption) error
RegisterTypedActivityNoResult0 registers an activity function with no input parameters that returns only error (type-safe).
func RegisterTypedActivityNoResult1 ¶
func RegisterTypedActivityNoResult1[TInput any](w *Worker, activity workflow.ActivityFuncNoResult1[TInput], opts ...registry.RegisterOption) error
RegisterTypedActivityNoResult1 registers an activity function with one input parameter that returns only error (type-safe).
func RegisterTypedActivityNoResult2 ¶
func RegisterTypedActivityNoResult2[TInput1, TInput2 any](w *Worker, activity workflow.ActivityFuncNoResult2[TInput1, TInput2], opts ...registry.RegisterOption) error
RegisterTypedActivityNoResult2 registers an activity function with two input parameters that returns only error (type-safe).
func RegisterTypedActivityNoResult3 ¶
func RegisterTypedActivityNoResult3[TInput1, TInput2, TInput3 any](w *Worker, activity workflow.ActivityFuncNoResult3[TInput1, TInput2, TInput3], opts ...registry.RegisterOption) error
RegisterTypedActivityNoResult3 registers an activity function with three input parameters that returns only error (type-safe).
func RegisterTypedWorkflow0 ¶
func RegisterTypedWorkflow0[TResult any](w *Worker, wf workflow.WorkflowFunc0[TResult], opts ...registry.RegisterOption) error
RegisterTypedWorkflow0 registers a workflow function with no input parameters (type-safe).
func RegisterTypedWorkflow1 ¶
func RegisterTypedWorkflow1[TInput, TResult any](w *Worker, wf workflow.WorkflowFunc1[TInput, TResult], opts ...registry.RegisterOption) error
RegisterTypedWorkflow1 registers a workflow function with one input parameter (type-safe).
func RegisterTypedWorkflow2 ¶
func RegisterTypedWorkflow2[TInput1, TInput2, TResult any](w *Worker, wf workflow.WorkflowFunc2[TInput1, TInput2, TResult], opts ...registry.RegisterOption) error
RegisterTypedWorkflow2 registers a workflow function with two input parameters (type-safe).
func RegisterTypedWorkflow3 ¶
func RegisterTypedWorkflow3[TInput1, TInput2, TInput3, TResult any](w *Worker, wf workflow.WorkflowFunc3[TInput1, TInput2, TInput3, TResult], opts ...registry.RegisterOption) error
RegisterTypedWorkflow3 registers a workflow function with three input parameters (type-safe).
func RegisterTypedWorkflowNoResult0 ¶
func RegisterTypedWorkflowNoResult0(w *Worker, wf workflow.WorkflowFuncNoResult0, opts ...registry.RegisterOption) error
RegisterTypedWorkflowNoResult0 registers a workflow function with no input parameters that returns only error (type-safe).
func RegisterTypedWorkflowNoResult1 ¶
func RegisterTypedWorkflowNoResult1[TInput any](w *Worker, wf workflow.WorkflowFuncNoResult1[TInput], opts ...registry.RegisterOption) error
RegisterTypedWorkflowNoResult1 registers a workflow function with one input parameter that returns only error (type-safe).
func RegisterTypedWorkflowNoResult2 ¶
func RegisterTypedWorkflowNoResult2[TInput1, TInput2 any](w *Worker, wf workflow.WorkflowFuncNoResult2[TInput1, TInput2], opts ...registry.RegisterOption) error
RegisterTypedWorkflowNoResult2 registers a workflow function with two input parameters that returns only error (type-safe).
func RegisterTypedWorkflowNoResult3 ¶
func RegisterTypedWorkflowNoResult3[TInput1, TInput2, TInput3 any](w *Worker, wf workflow.WorkflowFuncNoResult3[TInput1, TInput2, TInput3], opts ...registry.RegisterOption) error
RegisterTypedWorkflowNoResult3 registers a workflow function with three input parameters that returns only error (type-safe).
Types ¶
type ActivityWorkerOptions ¶
type ActivityWorkerOptions struct {
// ActivityPollers is the number of pollers to start. Defaults to 2.
ActivityPollers int
// MaxParallelActivityTasks determines the maximum number of concurrent activity tasks processed
// by the worker. The default is 0 which is no limit.
MaxParallelActivityTasks int
// ActivityHeartbeatInterval is the interval between heartbeat attempts for activity tasks. Defaults
// to 25 seconds
ActivityHeartbeatInterval time.Duration
// ActivityPollingInterval is the interval between polling for new activity tasks.
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
// Defaults to 200ms.
ActivityPollingInterval time.Duration
// ActivityQueues are the queues the worker listens to
ActivityQueues []workflow.Queue
}
type Options ¶
type Options struct {
WorkflowWorkerOptions
ActivityWorkerOptions
// SingleWorkerMode enables optimizations for scenarios where only a single worker
// is processing tasks. This should only be enabled when you have exactly one worker
// instance running.
SingleWorkerMode bool
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewActivityWorker ¶
func NewActivityWorker(backend backend.Backend, options *ActivityWorkerOptions) *Worker
NewActivityWorker creates a worker that only processes activities.
func NewWorkflowWorker ¶
func NewWorkflowWorker(backend backend.Backend, options *WorkflowWorkerOptions) *Worker
NewWorkflowWorker creates a worker that only processes workflows.
func (*Worker) RegisterActivity ¶
RegisterActivity registers an activity with the worker's registry.
func (*Worker) RegisterWorkflow ¶
RegisterWorkflow registers a workflow with the worker's registry.
func (*Worker) Start ¶
Start starts the worker.
To stop the worker, cancel the context passed to Start. To wait for completion of the active tasks, call `WaitForCompletion`.
func (*Worker) WaitForCompletion ¶
WaitForCompletion waits for all active tasks to complete.
type WorkflowOrchestrator ¶
type WorkflowOrchestrator struct {
Client *client.Client // Exposed for direct access to GetWorkflowResult
// contains filtered or unexported fields
}
WorkflowOrchestrator combines a worker and client into a single entity. It orchestrates the entire workflow lifecycle, from creation to execution.
func NewWorkflowOrchestrator ¶
func NewWorkflowOrchestrator(backend backend.Backend, options *Options) *WorkflowOrchestrator
NewWorkflowOrchestrator creates a new orchestrator with client capabilities and optional registration.
func (*WorkflowOrchestrator) CreateWorkflowInstance ¶
func (o *WorkflowOrchestrator) CreateWorkflowInstance(ctx context.Context, options client.WorkflowInstanceOptions, wf workflow.Workflow, args ...any) (*workflow.Instance, error)
CreateWorkflowInstance creates a new workflow instance using the client.
func (*WorkflowOrchestrator) RegisterActivity ¶
func (o *WorkflowOrchestrator) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error
RegisterActivity registers an activity with the orchestrator's registry.
func (*WorkflowOrchestrator) RegisterWorkflow ¶
func (o *WorkflowOrchestrator) RegisterWorkflow(wf workflow.Workflow, opts ...registry.RegisterOption) error
RegisterWorkflow registers a workflow with the orchestrator's registry.
func (*WorkflowOrchestrator) RemoveWorkflowInstance ¶
func (o *WorkflowOrchestrator) RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
RemoveWorkflowInstance removes a workflow instance.
func (*WorkflowOrchestrator) SignalWorkflow ¶
func (o *WorkflowOrchestrator) SignalWorkflow(ctx context.Context, instanceID string, name string, arg any) error
SignalWorkflow signals a workflow instance.
func (*WorkflowOrchestrator) Start ¶
func (o *WorkflowOrchestrator) Start(ctx context.Context) error
Start starts the worker.
func (*WorkflowOrchestrator) WaitForCompletion ¶
func (o *WorkflowOrchestrator) WaitForCompletion() error
WaitForCompletion waits for the worker to complete processing.
func (*WorkflowOrchestrator) WaitForWorkflowInstance ¶
func (o *WorkflowOrchestrator) WaitForWorkflowInstance(ctx context.Context, instance *workflow.Instance, timeout time.Duration) error
WaitForWorkflowInstance waits for a workflow instance to complete.
type WorkflowWorkerOptions ¶
type WorkflowWorkerOptions struct {
// WorkflowsPollers is the number of pollers to start. Defaults to 2.
WorkflowPollers int
// MaxParallelWorkflowTasks determines the maximum number of concurrent workflow tasks processed
// by the worker. The default is 0 which is no limit.
MaxParallelWorkflowTasks int
// WorkflowHeartbeatInterval is the interval between heartbeat attempts on workflow tasks. Defaults
// to 25 seconds
WorkflowHeartbeatInterval time.Duration
// WorkflowPollingInterval is the interval between polling for new workflow tasks.
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
// Defaults to 200ms.
WorkflowPollingInterval time.Duration
// WorkflowExecutorCache is the max size of the workflow executor cache. Defaults to 128
WorkflowExecutorCacheSize int
// WorkflowExecutorCache is the max TTL of the workflow executor cache. Defaults to 10 seconds
WorkflowExecutorCacheTTL time.Duration
// WorkflowExecutorCache is the cache to use for workflow executors. If nil, a default cache implementation
// will be used.
WorkflowExecutorCache executor.Cache
// WorkflowQueues are the queue the worker listens to
WorkflowQueues []workflow.Queue
}