Documentation
¶
Overview ¶
Package fluxor provides a generic, extensible workflow engine.
The engine executes workflows defined declaratively (for example in YAML or JSON) and comes with pluggable service layers such as:
- runtime – orchestration of workflow execution
- allocator – task allocation and state management
- executor – task execution through custom actions
- approval – optional human-in-the-loop task approval
Fluxor is designed to be embedded in host applications. End-users typically interact with the engine via the high-level Service façade exposed by the root package:
srv := fluxor.New() rt := srv.Runtime() wf, _ := rt.LoadWorkflow(ctx, "workflow.yaml") _, wait, _ := rt.StartProcess(ctx, wf, nil) out, _ := wait(ctx, time.Minute)
For more details see the README and individual sub-packages.
Index ¶
- Variables
- type Config
- type Option
- func WithApprovalService(svc approval.Service) Option
- func WithEventService(service *event.Service) Option
- func WithExecutorOptions(opts ...executor.Option) Option
- func WithExtensionServices(services ...types.Service) Option
- func WithExtensionTypes(types ...*x.Type) Option
- func WithMetaBaseURL(url string) Option
- func WithMetaFsOptions(options ...storage.Option) Option
- func WithMetaService(service *meta.Service) Option
- func WithProcessDAO(dao dao.Service[string, execution.Process]) Option
- func WithProcessorWorkers(count int) Option
- func WithQueue(queue messaging.Queue[execution.Execution]) Option
- func WithRootTaskNodeName(name string) Option
- func WithStateListeners(listeners ...execution.StateListener) Option
- func WithTaskExecutionDAO(dao dao.Service[string, execution.Execution]) Option
- func WithTracing(serviceName, serviceVersion, outputFile string) Option
- func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option
- func WithWhenListeners(listeners ...execution.WhenListener) Option
- type ProcessorConfig
- type Runtime
- func (r *Runtime) DecodeYAMLWorkflow(data []byte) (*model.Workflow, error)
- func (r *Runtime) Execution(ctx context.Context, id string) (*execution.Execution, error)
- func (r *Runtime) LoadWorkflow(ctx context.Context, location string) (*model.Workflow, error)
- func (r *Runtime) Process(ctx context.Context, id string) (*execution.Process, error)
- func (r *Runtime) ProcessFromContext(ctx context.Context) *execution.Process
- func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)
- func (r *Runtime) RefreshWorkflow(location string) error
- func (r *Runtime) RunTaskOnce(ctx context.Context, wf *model.Workflow, taskID string, input interface{}) (interface{}, error)
- func (r *Runtime) ScheduleExecution(ctx context.Context, exec *execution.Execution) (func(duration time.Duration) (*execution.Execution, error), error)
- func (r *Runtime) Shutdown(ctx context.Context) error
- func (r *Runtime) Start(ctx context.Context) error
- func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, ...) (*execution.Process, execution.Wait, error)
- func (r *Runtime) UpsertDefinition(location string, data []byte) error
- type Service
- func (s *Service) Actions() *extension.Actions
- func (s *Service) ApprovalService() approval.Service
- func (s *Service) EventService() *event.Service
- func (s *Service) NewContext(ctx context.Context) context.Context
- func (s *Service) RegisterExtensionServices(services ...types.Service)
- func (s *Service) RegisterExtensionType(aType *x.Type)
- func (s *Service) RegisterExtensionTypes(types ...*x.Type)
- func (s *Service) Runtime() *Runtime
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskNotFound = errors.New("task not found in workflow") ErrMethodNotFound = errors.New("method not found in service") )
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.1.1
type Config struct {
Processor ProcessorConfig `json:"processor" yaml:"processor"`
}
func DefaultConfig ¶ added in v0.1.1
func DefaultConfig() *Config
DefaultConfig returns a Config populated with exactly the same default values that were previously hard-coded in the constructors. Callers may modify the returned struct before passing it to NewFromConfig.
type Option ¶
type Option func(s *Service)
Service represents fluxor service
func WithApprovalService ¶ added in v0.1.1
WithApprovalService sets the approvalService service
func WithEventService ¶
func WithExecutorOptions ¶ added in v0.1.1
WithExecutorOptions lets the caller supply additional options passed to executor.NewService (e.g. disabling the default StdoutListener).
func WithExtensionServices ¶
WithExtensionServices sets the extension services
func WithExtensionTypes ¶
WithExtensionTypes sets the extension types
func WithMetaBaseURL ¶
WithMetaBaseURL sets the meta base URL
func WithMetaFsOptions ¶
WithMetaFsOptions with meta file system options
func WithMetaService ¶
WithMetaService sets the meta service
func WithProcessDAO ¶
WithProcessDAO sets the processor DAO
func WithProcessorWorkers ¶
WithProcessorWorkers sets the processor workers
func WithRootTaskNodeName ¶
WithRootTaskNodeName sets the root task node name
func WithStateListeners ¶ added in v0.1.2
func WithStateListeners(listeners ...execution.StateListener) Option
func WithTaskExecutionDAO ¶
WithTaskExecutionDAO sets the task execution DAO
func WithTracing ¶
WithTracing configures OpenTelemetry tracing for the service. If outputFile is empty the stdout exporter is used; otherwise traces are written to the supplied file path. The function is safe to call multiple times – the first successful initialisation wins.
func WithTracingExporter ¶
func WithTracingExporter(serviceName, serviceVersion string, exporter sdktrace.SpanExporter) Option
WithTracingExporter configures OpenTelemetry tracing using a custom SpanExporter. This enables integrations with exporters other than the built-in stdout exporter, for example OTLP, Jaeger or Zipkin. The function is safe to call multiple times – the first successful initialisation wins.
func WithWhenListeners ¶ added in v0.1.2
func WithWhenListeners(listeners ...execution.WhenListener) Option
WithWhenListeners registers callbacks invoked after every when-condition evaluation.
type ProcessorConfig ¶ added in v0.1.1
type ProcessorConfig struct {
WorkerCount int `json:"workers" yaml:"workers"`
}
type Runtime ¶
type Runtime struct {
// contains filtered or unexported fields
}
Runtime represents a workflow engine runtime
func (*Runtime) DecodeYAMLWorkflow ¶
DecodeYAMLWorkflow loads a workflow
func (*Runtime) LoadWorkflow ¶
LoadWorkflow loads a workflow
func (*Runtime) ProcessFromContext ¶ added in v0.1.7
ProcessFromContext return process from context
func (*Runtime) Processes ¶
func (r *Runtime) Processes(ctx context.Context, parameter ...*dao.Parameter) ([]*execution.Process, error)
Processes returns a list of processes
func (*Runtime) RefreshWorkflow ¶ added in v0.1.19
RefreshWorkflow discards any cached copy of the workflow definition located at the given URL/location. The next LoadWorkflow call will reload the file via the configured meta-service (i.e. one extra disk/cloud round-trip).
func (*Runtime) RunTaskOnce ¶ added in v0.1.9
func (r *Runtime) RunTaskOnce(ctx context.Context, wf *model.Workflow, taskID string, input interface{}) (interface{}, error)
RunTaskOnce is a convenience helper that executes a *single* task from the supplied workflow and waits for its completion. It is intended for quick ad-hoc jobs, debugging and unit tests where launching the entire workflow would be unnecessary overhead.
The helper works by submitting an "at-hoc" execution to the shared allocator/processor queue, therefore semantics (retries, policies, tracing etc.) are identical to regular executions. The returned value is whatever the task's action populates as its output.
func (*Runtime) ScheduleExecution ¶ added in v0.1.7
func (*Runtime) StartProcess ¶
func (r *Runtime) StartProcess(ctx context.Context, aWorkflow *model.Workflow, initialState map[string]interface{}, tasks ...string) (*execution.Process, execution.Wait, error)
StartProcess starts a new process
func (*Runtime) UpsertDefinition ¶ added in v0.1.19
UpsertDefinition parses the supplied YAML bytes and stores the resulting workflow definition in the in-memory cache under the specified location. When data is nil the call falls back to RefreshWorkflow, causing a lazy reload on next use.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
func NewFromConfig ¶ added in v0.1.1
NewFromConfig constructs the Fluxor engine using a declarative configuration that can be further customised by functional options. The precedence order is:
- package defaults (via DefaultConfig)
- values present in cfg (may be nil – treated as empty)
- values set by Option functions (highest priority)
func (*Service) ApprovalService ¶ added in v0.1.1
func (*Service) EventService ¶
EventService returns event service
func (*Service) RegisterExtensionServices ¶
func (*Service) RegisterExtensionType ¶
func (*Service) RegisterExtensionTypes ¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
workflow
command
|
|
|
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs).
|
Package extension provides run-time registries that allow Fluxor to work with user-defined Go types (for example custom action inputs or outputs). |
|
internal
|
|
|
clock
Package clock provides test-friendly wrappers around the standard library time utilities.
|
Package clock provides test-friendly wrappers around the standard library time utilities. |
|
idgen
Package idgen wraps the UUID generator so that it can be stubbed in tests.
|
Package idgen wraps the UUID generator so that it can be stubbed in tests. |
|
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine.
|
Package model contains the in-memory representation of workflow definitions, runtime state and supporting types used by the Fluxor engine. |
|
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints.
|
Package policy provides optional declarative rules that can be applied on top of a running Fluxor engine – for example to require human approval for selected tasks or to enforce execution constraints. |
|
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime.
|
Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime. |
|
runtime
|
|
|
evaluator
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time.
|
Package evaluator is responsible for evaluating (interpolating) dynamic expressions found in workflow definitions at run-time. |
|
execution
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions.
|
Package execution contains the core entities representing workflow execution – processes, sessions and individual task executions. |
|
expander
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor.
|
Package expander converts concise workflow definitions that use the "matrix" shortcut notation into an explicit representation understood by the executor. |
|
service
|
|
|
allocator
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines.
|
Package allocator owns the execution queue and is the only service allowed to mutate `Process` instances according to the project guidelines. |
|
approval
Package approval implements the optional human-in-the-loop approval layer.
|
Package approval implements the optional human-in-the-loop approval layer. |
|
event
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events.
|
Package event defines Fluxor's publish-subscribe mechanism that allows observers to react to execution lifecycle events. |
|
executor
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions.
|
Package executor defines the interface that bridges tasks enqueued by the processor with the backing implementation of actions. |
|
messaging
Package messaging contains queue implementations used to decouple the allocator, processor and approval services.
|
Package messaging contains queue implementations used to decouple the allocator, processor and approval services. |
|
processor
Package processor hosts the workers that execute individual task executions.
|
Package processor hosts the workers that execute individual task executions. |
|
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information.
|
Package tracing integrates observability back-ends with the Fluxor engine to provide distributed tracing information. |