processor

package
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package processor hosts the workers that execute individual task executions. Every worker consumes items from the queue owned by the allocator and updates the execution state so that the allocator can decide what to schedule next.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// WorkerCount is the number of workers processing tasks
	WorkerCount int

	// MaxTaskRetries is the maximum number of retries for a task
	MaxTaskRetries int

	// RetryDelay is the delay between task retry attempts
	RetryDelay time.Duration
}

Config represents executor service configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default executor configuration

type Option

type Option func(*Service)

Package executor provides a service executor.

func WithConfig

func WithConfig(config Config) Option

WithConfig sets the configuration for the service

func WithExecutor

func WithExecutor(executor executor.Service) Option

WithExecutor sets the task executor for the service

func WithMessageQueue

func WithMessageQueue(queue messaging.Queue[execution.Execution]) Option

WithMessageQueue sets the message queue implementation

func WithProcessDAO

func WithProcessDAO(processDAO dao.Service[string, execution.Process]) Option

WithProcessDAO sets the process store implementation

func WithSessionListeners added in v0.1.2

func WithSessionListeners(fns ...execution.StateListener) Option

WithSessionListeners registers immutable state listeners that will be copied to every Session created during task execution.

func WithTaskExecutionDAO

func WithTaskExecutionDAO(taskExecutionDao dao.Service[string, execution.Execution]) Option

func WithTaskExecutor

func WithTaskExecutor(executor executor.Service) Option

WithTaskExecutor sets a custom task executor function

func WithWhenListeners added in v0.1.2

func WithWhenListeners(fns ...execution.WhenListener) Option

WithWhenListeners registers callbacks invoked after every when-condition evaluation.

func WithWorkers

func WithWorkers(count int) Option

WithWorkers sets the number of worker goroutines

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service handles workflow execution

func New

func New(options ...Option) (*Service, error)

New creates a new executor service

func (*Service) CancelProcess added in v0.1.2

func (s *Service) CancelProcess(ctx context.Context, processID string, reason string) error

CancelProcess requests cancellation of a running or paused process. It marks the process as cancelRequested and propagates context cancellation so that any in-flight task can terminate early. The allocator will move the process to the final "cancelled" state once the current stack is drained.

func (*Service) GetProcess

func (s *Service) GetProcess(ctx context.Context, processID string) (*execution.Process, error)

GetProcess retrieves a process by ID

func (*Service) Listeners added in v0.1.2

func (s *Service) Listeners() []execution.StateListener

func (*Service) NewProcess added in v0.1.7

func (s *Service) NewProcess(ctx context.Context, processID string, workflow *model.Workflow, init map[string]interface{}) (*execution.Process, error)

func (*Service) PauseProcess

func (s *Service) PauseProcess(ctx context.Context, processID string) error

PauseProcess pauses a running process

func (*Service) ResumeFailedProcess added in v0.1.2

func (s *Service) ResumeFailedProcess(ctx context.Context, processID string) error

ResumeFailedProcess resets a failed or cancelled workflow so that execution can continue. It rewinds every execution that ended with failed/cancelled state back to pending and switches the parent process to running. The allocator will then pick up the work in its next iteration.

func (*Service) ResumeProcess

func (s *Service) ResumeProcess(ctx context.Context, processID string) error

ResumeProcess resumes a paused process

func (*Service) Shutdown

func (s *Service) Shutdown()

Shutdown stops the executor service

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start begins the task execution service

func (*Service) StartProcess

func (s *Service) StartProcess(ctx context.Context, workflow *model.Workflow, init map[string]interface{}, customTasks ...string) (aProcess *execution.Process, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL