Documentation
¶
Overview ¶
Package processor hosts the workers that execute individual task executions. Every worker consumes items from the queue owned by the allocator and updates the execution state so that the allocator can decide what to schedule next.
Index ¶
- type Config
- type Option
- func WithConfig(config Config) Option
- func WithExecutor(executor executor.Service) Option
- func WithMessageQueue(queue messaging.Queue[execution.Execution]) Option
- func WithProcessDAO(processDAO dao.Service[string, execution.Process]) Option
- func WithResultQueue(q messaging.Queue[execution.Execution]) Option
- func WithSessionListeners(fns ...execution.StateListener) Option
- func WithTaskExecutionDAO(taskExecutionDao dao.Service[string, execution.Execution]) Option
- func WithTaskExecutor(executor executor.Service) Option
- func WithWhenListeners(fns ...execution.WhenListener) Option
- func WithWorkers(count int) Option
- type Service
- func (s *Service) CancelProcess(ctx context.Context, processID string, reason string) error
- func (s *Service) GetProcess(ctx context.Context, processID string) (*execution.Process, error)
- func (s *Service) Listeners() []execution.StateListener
- func (s *Service) NewProcess(ctx context.Context, processID string, workflow *model.Workflow, ...) (*execution.Process, error)
- func (s *Service) PauseProcess(ctx context.Context, processID string) error
- func (s *Service) ResumeFailedProcess(ctx context.Context, processID string) error
- func (s *Service) ResumeProcess(ctx context.Context, processID string) error
- func (s *Service) Shutdown()
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) StartProcess(ctx context.Context, workflow *model.Workflow, init map[string]interface{}, ...) (aProcess *execution.Process, err error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// WorkerCount is the number of workers processing tasks
WorkerCount int
// MaxTaskRetries is the maximum number of retries for a task
MaxTaskRetries int
// RetryDelay is the delay between task retry attempts
RetryDelay time.Duration
}
Config represents executor service configuration
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns the default executor configuration
type Option ¶
type Option func(*Service)
Package executor provides a service executor.
func WithConfig ¶
WithConfig sets the configuration for the service
func WithExecutor ¶
WithExecutor sets the task executor for the service
func WithMessageQueue ¶
WithMessageQueue sets the message queue implementation
func WithProcessDAO ¶
WithProcessDAO sets the process store implementation
func WithResultQueue ¶ added in v0.1.24
WithResultQueue sets the queue used by processor to publish completion events back to allocator.
func WithSessionListeners ¶ added in v0.1.2
func WithSessionListeners(fns ...execution.StateListener) Option
WithSessionListeners registers immutable state listeners that will be copied to every Session created during task execution.
func WithTaskExecutionDAO ¶
func WithTaskExecutor ¶
WithTaskExecutor sets a custom task executor function
func WithWhenListeners ¶ added in v0.1.2
func WithWhenListeners(fns ...execution.WhenListener) Option
WithWhenListeners registers callbacks invoked after every when-condition evaluation.
func WithWorkers ¶
WithWorkers sets the number of worker goroutines
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service handles workflow execution
func (*Service) CancelProcess ¶ added in v0.1.2
CancelProcess requests cancellation of a running or paused process. It marks the process as cancelRequested and propagates context cancellation so that any in-flight task can terminate early. The allocator will move the process to the final "cancelled" state once the current stack is drained.
func (*Service) GetProcess ¶
GetProcess retrieves a process by ID
func (*Service) Listeners ¶ added in v0.1.2
func (s *Service) Listeners() []execution.StateListener
func (*Service) NewProcess ¶ added in v0.1.7
func (*Service) PauseProcess ¶
PauseProcess pauses a running process
func (*Service) ResumeFailedProcess ¶ added in v0.1.2
ResumeFailedProcess resets a failed or cancelled workflow so that execution can continue. It rewinds every execution that ended with failed/cancelled state back to pending and switches the parent process to running. The allocator will then pick up the work in its next iteration.
func (*Service) ResumeProcess ¶
ResumeProcess resumes a paused process