Documentation
¶
Index ¶
- Constants
- Variables
- type ActivityTask
- type Backend
- type BackendOption
- func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
- func WithConverter(converter converter.Converter) BackendOption
- func WithLogger(logger *slog.Logger) BackendOption
- func WithMaxHistorySize(size int64) BackendOption
- func WithMetrics(client metrics.Client) BackendOption
- func WithRemoveContinuedAsNewInstances() BackendOption
- func WithStickyTimeout(timeout time.Duration) BackendOption
- func WithTracerProvider(tp trace.TracerProvider) BackendOption
- type ErrNotSupported
- type Feature
- type MockBackend
- func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, ...) error
- func (_m *MockBackend) Close() error
- func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
- func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, ...) error
- func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
- func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error
- func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
- func (_m *MockBackend) FeatureSupported(feature Feature) bool
- func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)
- func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
- func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
- func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
- func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)
- func (_m *MockBackend) Metrics() metrics.Client
- func (_m *MockBackend) Options() *Options
- func (_m *MockBackend) PrepareActivityQueues(ctx context.Context, queues []core.Queue) error
- func (_m *MockBackend) PrepareWorkflowQueues(ctx context.Context, queues []core.Queue) error
- func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
- func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
- func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
- func (_m *MockBackend) Tracer() trace.Tracer
- type Options
- type RemovalOption
- type RemovalOptions
- type Stats
- type WorkflowTask
Constants ¶
const TracerName = "go-workflow"
Variables ¶
var DefaultRemovalOptions = RemovalOptions{
BatchSize: 100,
}
var ErrInstanceAlreadyExists = errors.New("workflow instance already exists")
var ErrInstanceNotFinished = errors.New("workflow instance is not finished")
var ErrInstanceNotFound = errors.New("workflow instance not found")
Functions ¶
This section is empty.
Types ¶
type ActivityTask ¶
type ActivityTask struct {
ID string
// ActivityID is the ID of the activity event
ActivityID string
Queue workflow.Queue
WorkflowInstance *core.WorkflowInstance
Event *history.Event
}
ActivityTask represents one activity execution.
type Backend ¶
type Backend interface {
// CreateWorkflowInstance creates a new workflow instance
CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error
// CancelWorkflowInstance cancels a running workflow instance
CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance, cancelEvent *history.Event) error
// RemoveWorkflowInstance removes a workflow instance
RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error
// RemoveWorkflowInstances removes multiple workflow instances
RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
// GetWorkflowInstanceState returns the state of the given workflow instance
GetWorkflowInstanceState(ctx context.Context, instance *workflow.Instance) (core.WorkflowInstanceState, error)
// GetWorkflowInstanceHistory returns the workflow history for the given instance. When lastSequenceID
// is given, only events after that event are returned. Otherwise the full history is returned.
GetWorkflowInstanceHistory(ctx context.Context, instance *workflow.Instance, lastSequenceID *int64) ([]*history.Event, error)
// SignalWorkflow signals a running workflow instance
//
// If the given instance does not exist, it will return an error
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
// PrepareWorkflowQueues prepares workflow queues for later consumption using this backend instane
PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error
// PrepareActivityQueues prepares activity queues for later consumption using this backend instance
PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error
// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*WorkflowTask, error)
// ExtendWorkflowTask extends the lock of a workflow task
ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
// CompleteWorkflowTask checkpoints a workflow task retrieved using GetWorkflowTask
//
// This checkpoints the execution. events are new events from the last workflow execution
// which will be added to the workflow instance history. workflowEvents are new events for the
// completed or other workflow instances.
CompleteWorkflowTask(
ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState,
executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error
// GetActivityTask returns a pending activity task or nil if there are no pending activities
GetActivityTask(ctx context.Context, queues []workflow.Queue) (*ActivityTask, error)
// ExtendActivityTask extends the lock of an activity task
ExtendActivityTask(ctx context.Context, task *ActivityTask) error
// CompleteActivityTask completes an activity task retrieved using GetActivityTask
CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
// GetStats returns stats about the backend
GetStats(ctx context.Context) (*Stats, error)
// Tracer returns the configured trace provider for the backend
Tracer() trace.Tracer
// Metrics returns the configured metrics client for the backend
Metrics() metrics.Client
// Options returns the configured options for the backend
Options() *Options
// Close closes any underlying resources
Close() error
// FeatureSupported returns true if the given feature is supported by the backend
FeatureSupported(feature Feature) bool
}
type BackendOption ¶
type BackendOption func(*Options)
func WithContextPropagator ¶
func WithContextPropagator(prop workflow.ContextPropagator) BackendOption
func WithConverter ¶
func WithConverter(converter converter.Converter) BackendOption
func WithLogger ¶
func WithLogger(logger *slog.Logger) BackendOption
func WithMaxHistorySize ¶
func WithMaxHistorySize(size int64) BackendOption
func WithMetrics ¶
func WithMetrics(client metrics.Client) BackendOption
func WithRemoveContinuedAsNewInstances ¶
func WithRemoveContinuedAsNewInstances() BackendOption
func WithStickyTimeout ¶
func WithStickyTimeout(timeout time.Duration) BackendOption
func WithTracerProvider ¶
func WithTracerProvider(tp trace.TracerProvider) BackendOption
type ErrNotSupported ¶
type ErrNotSupported struct {
Message string
}
func (ErrNotSupported) Error ¶
func (e ErrNotSupported) Error() string
type MockBackend ¶
MockBackend is an autogenerated mock type for the Backend type
func NewMockBackend ¶
func NewMockBackend(t mockConstructorTestingTNewMockBackend) *MockBackend
NewMockBackend creates a new instance of MockBackend. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockBackend) CancelWorkflowInstance ¶
func (_m *MockBackend) CancelWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, cancelEvent *history.Event) error
CancelWorkflowInstance provides a mock function with given fields: ctx, instance, cancelEvent
func (*MockBackend) Close ¶
func (_m *MockBackend) Close() error
Close provides a mock function with given fields:
func (*MockBackend) CompleteActivityTask ¶
func (_m *MockBackend) CompleteActivityTask(ctx context.Context, task *ActivityTask, result *history.Event) error
CompleteActivityTask provides a mock function with given fields: ctx, task, result
func (*MockBackend) CompleteWorkflowTask ¶
func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error
CompleteWorkflowTask provides a mock function with given fields: ctx, task, state, executedEvents, activityEvents, timerEvents, workflowEvents
func (*MockBackend) CreateWorkflowInstance ¶
func (_m *MockBackend) CreateWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance, event *history.Event) error
CreateWorkflowInstance provides a mock function with given fields: ctx, instance, event
func (*MockBackend) ExtendActivityTask ¶
func (_m *MockBackend) ExtendActivityTask(ctx context.Context, task *ActivityTask) error
ExtendActivityTask provides a mock function with given fields: ctx, task
func (*MockBackend) ExtendWorkflowTask ¶
func (_m *MockBackend) ExtendWorkflowTask(ctx context.Context, task *WorkflowTask) error
ExtendWorkflowTask provides a mock function with given fields: ctx, task
func (*MockBackend) FeatureSupported ¶
func (_m *MockBackend) FeatureSupported(feature Feature) bool
FeatureSupported provides a mock function with given fields: feature
func (*MockBackend) GetActivityTask ¶
func (_m *MockBackend) GetActivityTask(ctx context.Context, queues []core.Queue) (*ActivityTask, error)
GetActivityTask provides a mock function with given fields: ctx, queues
func (*MockBackend) GetStats ¶
func (_m *MockBackend) GetStats(ctx context.Context) (*Stats, error)
GetStats provides a mock function with given fields: ctx
func (*MockBackend) GetWorkflowInstanceHistory ¶
func (_m *MockBackend) GetWorkflowInstanceHistory(ctx context.Context, instance *core.WorkflowInstance, lastSequenceID *int64) ([]*history.Event, error)
GetWorkflowInstanceHistory provides a mock function with given fields: ctx, instance, lastSequenceID
func (*MockBackend) GetWorkflowInstanceState ¶
func (_m *MockBackend) GetWorkflowInstanceState(ctx context.Context, instance *core.WorkflowInstance) (core.WorkflowInstanceState, error)
GetWorkflowInstanceState provides a mock function with given fields: ctx, instance
func (*MockBackend) GetWorkflowTask ¶
func (_m *MockBackend) GetWorkflowTask(ctx context.Context, queues []core.Queue) (*WorkflowTask, error)
GetWorkflowTask provides a mock function with given fields: ctx, queues
func (*MockBackend) Metrics ¶
func (_m *MockBackend) Metrics() metrics.Client
Metrics provides a mock function with given fields:
func (*MockBackend) Options ¶
func (_m *MockBackend) Options() *Options
Options provides a mock function with given fields:
func (*MockBackend) PrepareActivityQueues ¶
PrepareActivityQueues provides a mock function with given fields: ctx, queues
func (*MockBackend) PrepareWorkflowQueues ¶
PrepareWorkflowQueues provides a mock function with given fields: ctx, queues
func (*MockBackend) RemoveWorkflowInstance ¶
func (_m *MockBackend) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error
RemoveWorkflowInstance provides a mock function with given fields: ctx, instance
func (*MockBackend) RemoveWorkflowInstances ¶
func (_m *MockBackend) RemoveWorkflowInstances(ctx context.Context, options ...RemovalOption) error
RemoveWorkflowInstances provides a mock function with given fields: ctx, options
func (*MockBackend) SignalWorkflow ¶
func (_m *MockBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
SignalWorkflow provides a mock function with given fields: ctx, instanceID, event
func (*MockBackend) Tracer ¶
func (_m *MockBackend) Tracer() trace.Tracer
Tracer provides a mock function with given fields:
type Options ¶
type Options struct {
Logger *slog.Logger
Metrics metrics.Client
TracerProvider trace.TracerProvider
// Converter is the converter to use for serializing and deserializing inputs and results. If not explicitly set
// converter.DefaultConverter is used.
Converter converter.Converter
// ContextPropagators is a list of context propagators to use for passing context into workflows and activities.
ContextPropagators []workflow.ContextPropagator
StickyTimeout time.Duration
// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
// by that timeframe, it's considered abandoned and another worker might pick it up.
//
// For long running workflow tasks, combine this with heartbearts.
WorkflowLockTimeout time.Duration
// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
// by that timeframe, it's considered abandoned and another worker might pick it up
ActivityLockTimeout time.Duration
// RemoveContinuedAsNewInstances determines whether instances that were completed using ContinueAsNew should be
// removed immediately, including their history. If set to false, the instance will be removed after the configured
// retention period or never.
RemoveContinuedAsNewInstances bool
// MaxHistorySize is the maximum size of a workflow history. If a workflow exceeds this size, it will be failed.
MaxHistorySize int64
}
var DefaultOptions Options = Options{ StickyTimeout: 30 * time.Second, WorkflowLockTimeout: time.Minute, ActivityLockTimeout: time.Minute * 2, Logger: slog.Default(), Metrics: mi.NewNoopMetricsClient(), TracerProvider: noop.NewTracerProvider(), Converter: converter.DefaultConverter, ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}}, RemoveContinuedAsNewInstances: false, MaxHistorySize: 10_000, }
func ApplyOptions ¶
func ApplyOptions(opts ...BackendOption) *Options
type RemovalOption ¶
type RemovalOption func(o *RemovalOptions)
func RemoveFinishedBatchSize ¶
func RemoveFinishedBatchSize(size int) RemovalOption
func RemoveFinishedBefore ¶
func RemoveFinishedBefore(t time.Time) RemovalOption
type RemovalOptions ¶
type Stats ¶
type Stats struct {
ActiveWorkflowInstances int64
// PendingActivities are the number of activities that are currently in the queue,
// waiting to be processed by a worker
PendingActivityTasks map[workflow.Queue]int64
// PendingWorkflowTasks are the number of workflow tasks that are currently in the queue,
// waiting to be processed by a worker
PendingWorkflowTasks map[workflow.Queue]int64
}
type WorkflowTask ¶
type WorkflowTask struct {
// ID is an identifier for this task. It's set by the backend
ID string
// Queue is the queue of the workflow instance
Queue workflow.Queue
// WorkflowInstance is the workflow instance that this task is for
WorkflowInstance *core.WorkflowInstance
// WorkflowInstanceState is the state of the workflow instance when the task was dequeued
WorkflowInstanceState core.WorkflowInstanceState
// Metadata is the metadata of the workflow instance
Metadata *metadata.WorkflowMetadata
// LastSequenceID is the sequence ID of the newest event in the workflow instances's history
LastSequenceID int64
// NewEvents are new events since the last task execution
NewEvents []*history.Event
// Backend specific data, only the producer of the task should rely on this.
CustomData any
}
WorkflowTask represents work for one workflow execution slice.