Documentation
¶
Overview ¶
Package fluxor provides a generic, extensible workflow engine.
The engine executes workflows defined declaratively (for example in YAML or JSON) and comes with pluggable service layers such as:
- runtime – orchestration of workflow execution
- allocator – task allocation and state management
- executor – task execution through custom actions
- approval – optional human-in-the-loop task approval
Fluxor is designed to be embedded in host applications. End-users typically interact with the engine via the high-level Service façade exposed by the root package:
srv := fluxor.New() rt := srv.Runtime() wf, _ := rt.LoadWorkflow(ctx, "workflow.yaml") _, wait, _ := rt.StartProcess(ctx, wf, nil) out, _ := wait(ctx, time.Minute)
For more details see the README and individual sub-packages.
Index ¶
- Variables
- type Config
- type Option
- func WithApprovalService(svc approval.Service) Option
- func WithEventService(service *event.Service) Option
- func WithExecutorOptions(opts ...executor.Option) Option
- func WithExtensionServices(services ...types.Service) Option
- func WithExtensionTypes(types ...*x.Type) Option
- func WithMetaBaseURL(url string) Option
- func WithMetaFsOptions(options ...storage.Option) Option
- func WithMetaService(service *meta.Service) Option
- func WithProcessDAO(dao dao.Service[string, execution.Process]) Option
- func WithProcessorWorkers(count int) Option
- func WithQueue(queue messaging.Queue[execution.Execution]) Option
- func WithRootTaskNodeName(name string) Option
- func WithStateListeners(listeners ...execution.StateListener) Option
- func WithTaskExecutionDAO(dao dao.Service[string, execution.Execution]) Option
- func WithTracing(serviceName, serviceVersion, outputFile string) Option
- func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option
- func WithWhenListeners(listeners ...execution.WhenListener) Option
- type ProcessorConfig
- type Runtime
- func (r *Runtime) AwaitGroup(ctx context.Context, id string, timeout time.Duration) ([]interface{}, error)
- func (r *Runtime) DecodeYAMLWorkflow(data []byte) (*model.Workflow, error)
- func (r *Runtime) EmitExecutions(ctx context.Context, parent *execution.Execution, ...) (string, error)
- func (r *Runtime) Execution(ctx context.Context, id string) (*execution.Execution, error)
- func (r *Runtime) LoadWorkflow(ctx context.Context, location string) (*model.Workflow, error)
- func (r *Runtime) Process(ctx context.Context, id string) (*execution.Process, error)
- func (r *Runtime) ProcessFromContext(ctx context.Context) *execution.Process
- func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)
- func (r *Runtime) RefreshWorkflow(location string) error
- func (r *Runtime) RunTaskOnce(ctx context.Context, wf *model.Workflow, taskID string, input interface{}) (interface{}, error)
- func (r *Runtime) ScheduleExecution(ctx context.Context, exec *execution.Execution) (func(duration time.Duration) (*execution.Execution, error), error)
- func (r *Runtime) Shutdown(ctx context.Context) error
- func (r *Runtime) Start(ctx context.Context) error
- func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, ...) (*execution.Process, execution.Wait, error)
- func (r *Runtime) UpsertDefinition(location string, data []byte) error
- func (r *Runtime) WaitForUnblock(ctx context.Context, execID string, timeout time.Duration) (*execution.Execution, error)
- type Service
- func (s *Service) Actions() *extension.Actions
- func (s *Service) ApprovalService() approval.Service
- func (s *Service) EventService() *event.Service
- func (s *Service) NewContext(ctx context.Context) context.Context
- func (s *Service) RegisterExtensionServices(services ...types.Service)
- func (s *Service) RegisterExtensionType(aType *x.Type)
- func (s *Service) RegisterExtensionTypes(types ...*x.Type)
- func (s *Service) Runtime() *Runtime
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskNotFound = errors.New("task not found in workflow") ErrMethodNotFound = errors.New("method not found in service") )
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.1.1
type Config struct {
Processor ProcessorConfig `json:"processor" yaml:"processor"`
}
func DefaultConfig ¶ added in v0.1.1
func DefaultConfig() *Config
DefaultConfig returns a Config populated with exactly the same default values that were previously hard-coded in the constructors. Callers may modify the returned struct before passing it to NewFromConfig.
type Option ¶
type Option func(s *Service)
Service represents fluxor service
func WithApprovalService ¶ added in v0.1.1
WithApprovalService sets the approvalService service
func WithEventService ¶
func WithExecutorOptions ¶ added in v0.1.1
WithExecutorOptions lets the caller supply additional options passed to executor.NewService (e.g. disabling the default StdoutListener).
func WithExtensionServices ¶
WithExtensionServices sets the extension services
func WithExtensionTypes ¶
WithExtensionTypes sets the extension types
func WithMetaBaseURL ¶
WithMetaBaseURL sets the meta base URL
func WithMetaFsOptions ¶
WithMetaFsOptions with meta file system options
func WithMetaService ¶
WithMetaService sets the meta service
func WithProcessDAO ¶
WithProcessDAO sets the processor DAO
func WithProcessorWorkers ¶
WithProcessorWorkers sets the processor workers
func WithRootTaskNodeName ¶
WithRootTaskNodeName sets the root task node name
func WithStateListeners ¶ added in v0.1.2
func WithStateListeners(listeners ...execution.StateListener) Option
func WithTaskExecutionDAO ¶
WithTaskExecutionDAO sets the task execution DAO
func WithTracing ¶
WithTracing configures OpenTelemetry tracing for the service. If outputFile is empty the stdout exporter is used; otherwise traces are written to the supplied file path. The function is safe to call multiple times – the first successful initialisation wins.
func WithTracingExporter ¶
func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option
WithTracingExporter configures OpenTelemetry tracing using a custom SpanExporter. This enables integrations with exporters other than the built-in stdout exporter, for example OTLP, Jaeger or Zipkin. The function is safe to call multiple times – the first successful initialisation wins.
func WithWhenListeners ¶ added in v0.1.2
func WithWhenListeners(listeners ...execution.WhenListener) Option
WithWhenListeners registers callbacks invoked after every when-condition evaluation.
type ProcessorConfig ¶ added in v0.1.1
type ProcessorConfig struct {
WorkerCount int `json:"workers" yaml:"workers"`
}
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime represents a workflow engine runtime
func (*Runtime) AwaitGroup ¶ added in v0.1.24
func (r *Runtime) AwaitGroup(ctx context.Context, id string, timeout time.Duration) ([]interface{}, error)
AwaitGroup blocks until the correlation group with the given id completes or the timeout elapses. It returns the aggregated child outputs as recorded by the allocator when unblocking the parent. The method observes ctx.Done.
func (*Runtime) DecodeYAMLWorkflow ¶
DecodeYAMLWorkflow loads a workflow
func (*Runtime) EmitExecutions ¶ added in v0.1.24
func (r *Runtime) EmitExecutions(ctx context.Context, parent *execution.Execution, children []*execution.Execution) (string, error)
EmitExecutions fan-outs child executions and marks the parent as waiting. It returns immediately; the caller may ignore the returned correlation id if the task uses await:true and lets the engine resume automatically.
func (*Runtime) LoadWorkflow ¶
LoadWorkflow loads a workflow
func (*Runtime) ProcessFromContext ¶ added in v0.1.7
ProcessFromContext return process from context
func (*Runtime) Processes ¶
func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)
Processes returns a list of processes
func (*Runtime) RefreshWorkflow ¶ added in v0.1.19
RefreshWorkflow discards any cached copy of the workflow definition located at the given URL/location. The next LoadWorkflow call will reload the file via the configured meta-service (i.e. one extra disk/cloud round-trip).
func (*Runtime) RunTaskOnce ¶ added in v0.1.9
func (r *Runtime) RunTaskOnce(ctx context.Context, wf *model.Workflow, taskID string, input interface{}) (interface{}, error)
RunTaskOnce is a convenience helper that executes a *single* task from the supplied workflow and waits for its completion. It is intended for quick ad-hoc jobs, debugging and unit tests where launching the entire workflow would be unnecessary overhead.
The helper works by submitting an "at-hoc" execution to the shared allocator/processor queue, therefore semantics (retries, policies, tracing etc.) are identical to regular executions. The returned value is whatever the task's action populates as its output.
func (*Runtime) ScheduleExecution ¶ added in v0.1.7
func (*Runtime) StartProcess ¶
func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, initialState map[string]interface{}, tasks ...string) (*execution.Process, execution.Wait, error)
StartProcess starts a new process
func (*Runtime) UpsertDefinition ¶ added in v0.1.19
UpsertDefinition parses the supplied YAML bytes and stores the resulting workflow definition in the in-memory cache under the specified location. When data is nil the call falls back to RefreshWorkflow, causing a lazy reload on next use.
func (*Runtime) WaitForUnblock ¶ added in v0.1.24
func (r *Runtime) WaitForUnblock(ctx context.Context, execID string, timeout time.Duration) (*execution.Execution, error)
WaitForUnblock blocks until the specified execution transitions out of the waitAsync state (i.e. allocator unblocks the parent after async children are completed). It returns the refreshed execution.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewFromConfig ¶ added in v0.1.1
NewFromConfig constructs the Fluxor engine using a declarative configuration that can be further customised by functional options. The precedence order is:
- package defaults (via DefaultConfig)
- values present in cfg (may be nil – treated as empty)
- values set by Option functions (highest priority)
func (*Service) ApprovalService ¶ added in v0.1.1
func (*Service) EventService ¶
EventService returns event service
func (*Service) RegisterExtensionServices ¶
func (*Service) RegisterExtensionType ¶
func (*Service) RegisterExtensionTypes ¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
workflow
command
|
|
|
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs).
|
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs). |
|
internal
|
|
|
clock
Package clock provides test-friendly wrappers around the standard library time utilities.
|
Package clock provides test-friendly wrappers around the standard library time utilities. |
|
idgen
Package idgen wraps the UUID generator so that it can be stubbed in tests.
|
Package idgen wraps the UUID generator so that it can be stubbed in tests. |
|
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine.
|
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine. |
|
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints.
|
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints. |
|
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime.
|
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime. |
|
runtime
|
|
|
evaluator
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time.
|
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time. |
|
execution
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions.
|
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions. |
|
expander
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor.
|
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor. |
|
orchestrator
Package orchestrator exposes a lightweight programmatic emit/await API for task actions.
|
Package orchestrator exposes a lightweight programmatic emit/await API for task actions. |
|
service
|
|
|
allocator
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines.
|
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines. |
|
approval
Package approval implements the optional human-in-the-loop approval layer.
|
Package approval implements the optional human-in-the-loop approval layer. |
|
event
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events.
|
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events. |
|
executor
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions.
|
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions. |
|
messaging
Package messaging contains queue implementations used to decouple the allocator, processor and approval services.
|
Package messaging contains queue implementations used to decouple the allocator, processor and approval services. |
|
processor
Package processor hosts the workers that execute individual task executions.
|
Package processor hosts the workers that execute individual task executions. |
|
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information.
|
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information. |