Documentation
¶
Index ¶
- Variables
- func ApplyTaskDataOutput(element schema.BaseElementInterface, dataOutputs map[string]any) map[string]data.IItem
- func ApplyTaskResult(element schema.BaseElementInterface, results map[string]any) map[string]data.IItem
- func FetchTaskDataInput(locator data.IFlowDataLocator, element schema.BaseElementInterface) (headers map[string]string, properties, dataObjects map[string]any)
- func FetchTaskTimeout(element schema.BaseElementInterface) time.Duration
- type ActionTransformer
- type ActiveBoundaryTrace
- type ActiveListeningTrace
- type Activity
- type ActivityType
- type CancellationFlowNodeTrace
- type CancellationFlowTrace
- type CatchEvent
- type CeaseFlowTrace
- type CompleteAction
- type CompletionTrace
- type Constructor
- type DeterminationMadeTrace
- type DoOption
- type DoResponse
- type EndEvent
- type Engine
- type EngineOption
- type EngineOptions
- type ErrHandleMode
- type ErrHandler
- type ErrorTrace
- type EventBasedGateway
- type EventObservedTrace
- type ExclusiveGateway
- type ExclusiveNoEffectiveSequenceFlows
- type Flow
- type FlowAction
- type FlowActionResponse
- type FlowNodeMapping
- type FlowTrace
- type Harness
- func (node *Harness) Activity() Activity
- func (node *Harness) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
- func (node *Harness) Element() schema.FlowNodeInterface
- func (node *Harness) NextAction(ctx context.Context, flow Flow) chan IAction
- func (node *Harness) RegisterEventConsumer(consumer event.IConsumer) (err error)
- type IAction
- type IFlowNode
- type IOutgoing
- type InclusiveGateway
- type InclusiveNoEffectiveSequenceFlows
- type IncomingFlowProcessedTrace
- type InstanceTrace
- type InstantiationTrace
- type LeaveTrace
- type NewFlowNodeTrace
- type NewFlowTrace
- type NoAction
- type Option
- func WithContext(ctx context.Context) Option
- func WithDataObjects(dataObjects map[string]any) Option
- func WithEventEgress(source event.ISource) Option
- func WithEventIngress(consumer event.IConsumer) Option
- func WithIdGenerator(idGenerator id.IGenerator) Option
- func WithLocator(locator data.IFlowDataLocator) Option
- func WithProcessEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) Option
- func WithTracer(tracer tracing.ITracer) Option
- func WithVariables(variables map[string]any) Option
- type Options
- type ParallelGateway
- type ProbeAction
- type Process
- func (p *Process) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
- func (p *Process) Element() *schema.Process
- func (p *Process) FlowNodeMapping() *FlowNodeMapping
- func (p *Process) Id() id.Id
- func (p *Process) Locator() data.IFlowDataLocator
- func (p *Process) RegisterEventConsumer(ev event.IConsumer) (err error)
- func (p *Process) StartAll(ctx context.Context) (err error)
- func (p *Process) StartWith(ctx context.Context, startEvent schema.StartEventInterface) (err error)
- func (p *Process) Tracer() tracing.ITracer
- func (p *Process) WaitUntilComplete(ctx context.Context) (complete bool)
- type ProcessLandMarkTrace
- type ProcessTrace
- type Retry
- type SequenceFlow
- type Snapshot
- type StartEvent
- type SubProcess
- func (p *SubProcess) Cancel() <-chan bool
- func (p *SubProcess) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
- func (p *SubProcess) Element() schema.FlowNodeInterface
- func (p *SubProcess) NextAction(ctx context.Context, flow Flow) chan IAction
- func (p *SubProcess) RegisterEventConsumer(ev event.IConsumer) (err error)
- func (p *SubProcess) Type() ActivityType
- type TaskTrace
- type Terminate
- type TerminationTrace
- type VisitTrace
- type Wiring
Constants ¶
This section is empty.
Variables ¶
var (
DefaultTaskExecTimeout = time.Second * 30
)
Functions ¶
func ApplyTaskDataOutput ¶
func ApplyTaskResult ¶ added in v2.2.0
func FetchTaskDataInput ¶
func FetchTaskDataInput(locator data.IFlowDataLocator, element schema.BaseElementInterface) (headers map[string]string, properties, dataObjects map[string]any)
func FetchTaskTimeout ¶ added in v2.2.4
func FetchTaskTimeout(element schema.BaseElementInterface) time.Duration
FetchTaskTimeout returns timeout by schema.BaseElementInterface
Types ¶
type ActionTransformer ¶
type ActiveBoundaryTrace ¶
type ActiveBoundaryTrace struct {
Start bool
Node schema.FlowNodeInterface
}
func (ActiveBoundaryTrace) Unpack ¶ added in v2.4.2
func (b ActiveBoundaryTrace) Unpack() any
type ActiveListeningTrace ¶
type ActiveListeningTrace struct {
Node *schema.CatchEvent
}
func (ActiveListeningTrace) Unpack ¶ added in v2.4.2
func (t ActiveListeningTrace) Unpack() any
type Activity ¶
type Activity interface {
IFlowNode
Type() ActivityType
// Cancel initiates a cancellation of activity and returns a channel
// that will signal a boolean (`true` if cancellation was successful,
// `false` otherwise)
Cancel() <-chan bool
}
Activity is a generic interface to flow nodes that are activities
type ActivityType ¶ added in v2.3.0
type ActivityType string
const ( TaskActivity ActivityType = "Task" ServiceTaskActivity ActivityType = "ServiceTask" ScriptTaskActivity ActivityType = "ScriptTask" UserTaskActivity ActivityType = "UserTask" ManualTaskActivity ActivityType = "ManualTask" CallActivity ActivityType = "CallActivity" BusinessRuleActivity ActivityType = "BusinessRuleTask" SendTaskActivity ActivityType = "SendTask" ReceiveTaskActivity ActivityType = "ReceiveTask" SubprocessActivity ActivityType = "Subprocess" )
type CancellationFlowNodeTrace ¶
type CancellationFlowNodeTrace struct {
Node schema.FlowNodeInterface
}
func (CancellationFlowNodeTrace) Unpack ¶ added in v2.4.2
func (t CancellationFlowNodeTrace) Unpack() any
type CancellationFlowTrace ¶
func (CancellationFlowTrace) Unpack ¶ added in v2.4.2
func (t CancellationFlowTrace) Unpack() any
type CatchEvent ¶
type CatchEvent struct {
*Wiring
// contains filtered or unexported fields
}
func NewCatchEvent ¶
func NewCatchEvent(wiring *Wiring, catchEvent *schema.CatchEvent) (evt *CatchEvent, err error)
func (*CatchEvent) ConsumeEvent ¶
func (evt *CatchEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
func (*CatchEvent) Element ¶
func (evt *CatchEvent) Element() schema.FlowNodeInterface
func (*CatchEvent) NextAction ¶
func (evt *CatchEvent) NextAction(ctx context.Context, flow Flow) chan IAction
type CeaseFlowTrace ¶
func (CeaseFlowTrace) Unpack ¶ added in v2.4.2
func (t CeaseFlowTrace) Unpack() any
type CompleteAction ¶
type CompleteAction struct{}
type CompletionTrace ¶
type CompletionTrace struct {
Node schema.FlowNodeInterface
}
func (CompletionTrace) Unpack ¶ added in v2.4.2
func (t CompletionTrace) Unpack() any
type Constructor ¶
func NewSubProcess ¶
func NewSubProcess( eventDefinitionBuilder event.IDefinitionInstanceBuilder, idGenerator id.IGenerator, subProcess *schema.SubProcess) Constructor
func NewTask ¶
func NewTask(element schema.FlowNodeInterface, activityType ActivityType) Constructor
type DeterminationMadeTrace ¶
type DeterminationMadeTrace struct {
Node schema.FlowNodeInterface
}
func (DeterminationMadeTrace) Unpack ¶ added in v2.4.2
func (t DeterminationMadeTrace) Unpack() any
type DoOption ¶
type DoOption func(*DoResponse)
func DoWithErrHandle ¶ added in v2.2.0
func DoWithErrHandle(err error, ch <-chan ErrHandler) DoOption
func DoWithObjects ¶ added in v2.2.0
func DoWithResults ¶ added in v2.2.0
func DoWithValue ¶ added in v2.2.0
type DoResponse ¶
type EndEvent ¶
type EndEvent struct {
*Wiring
// contains filtered or unexported fields
}
func NewEndEvent ¶
func (*EndEvent) Element ¶
func (evt *EndEvent) Element() schema.FlowNodeInterface
type Engine ¶ added in v2.5.1
type Engine struct {
*EngineOptions
}
func NewEngine ¶ added in v2.5.1
func NewEngine(opts ...EngineOption) *Engine
func (*Engine) NewProcess ¶ added in v2.5.1
type EngineOption ¶ added in v2.5.1
type EngineOption func(*EngineOptions)
func WithEventDefinitionInstanceBuilder ¶
func WithEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) EngineOption
func WithIdGeneratorBuilder ¶ added in v2.5.1
func WithIdGeneratorBuilder(idGeneratorBuilder id.IGeneratorBuilder) EngineOption
type EngineOptions ¶ added in v2.5.1
type EngineOptions struct {
// contains filtered or unexported fields
}
func NewEngineOptions ¶ added in v2.5.1
func NewEngineOptions(opts ...EngineOption) *EngineOptions
type ErrHandleMode ¶
type ErrHandleMode int
const ( RetryMode ErrHandleMode = iota + 1 SkipMode ExitMode )
type ErrHandler ¶
type ErrHandler struct {
Mode ErrHandleMode
Retries int32
}
type ErrorTrace ¶
type ErrorTrace struct {
Error error
}
func (ErrorTrace) Unpack ¶ added in v2.4.2
func (t ErrorTrace) Unpack() any
type EventBasedGateway ¶
type EventBasedGateway struct {
*Wiring
// contains filtered or unexported fields
}
func NewEventBasedGateway ¶
func NewEventBasedGateway(wiring *Wiring, eventBasedGateway *schema.EventBasedGateway) (gw *EventBasedGateway, err error)
func (*EventBasedGateway) Element ¶
func (gw *EventBasedGateway) Element() schema.FlowNodeInterface
func (*EventBasedGateway) NextAction ¶
func (gw *EventBasedGateway) NextAction(ctx context.Context, flow Flow) chan IAction
type EventObservedTrace ¶
type EventObservedTrace struct {
Node *schema.CatchEvent
Event event.IEvent
}
EventObservedTrace signals the fact that a particular event has in fact observed by the node
func (EventObservedTrace) Unpack ¶ added in v2.4.2
func (t EventObservedTrace) Unpack() any
type ExclusiveGateway ¶
type ExclusiveGateway struct {
*Wiring
// contains filtered or unexported fields
}
func NewExclusiveGateway ¶
func NewExclusiveGateway(wiring *Wiring, exclusiveGateway *schema.ExclusiveGateway) (gw *ExclusiveGateway, err error)
func (*ExclusiveGateway) Element ¶
func (gw *ExclusiveGateway) Element() schema.FlowNodeInterface
func (*ExclusiveGateway) NextAction ¶
func (gw *ExclusiveGateway) NextAction(ctx context.Context, flow Flow) chan IAction
type ExclusiveNoEffectiveSequenceFlows ¶
type ExclusiveNoEffectiveSequenceFlows struct {
*schema.ExclusiveGateway
}
func (ExclusiveNoEffectiveSequenceFlows) Error ¶
func (e ExclusiveNoEffectiveSequenceFlows) Error() string
type Flow ¶
type Flow interface {
// Id returns flow's unique identifier
Id() id.Id
// SequenceFlow returns an inbound sequence flow this flow
// is currently at.
SequenceFlow() *SequenceFlow
}
Flow specifies an interface for BPMN flows
type FlowAction ¶
type FlowAction struct {
Response *FlowActionResponse
SequenceFlows []*SequenceFlow
// Index of sequence flows that should flow without
// conditionExpression being evaluated
UnconditionalFlows []int
// The actions produced by the targets should be produced by
// this function
ActionTransformer ActionTransformer
// If a supplied channel sends a function that returns true, the flow action
// is to be terminated if it wasn't already
Terminate Terminate
}
type FlowActionResponse ¶
type FlowNodeMapping ¶
type FlowNodeMapping struct {
// contains filtered or unexported fields
}
func NewLockedFlowNodeMapping ¶
func NewLockedFlowNodeMapping() *FlowNodeMapping
func (*FlowNodeMapping) Finalize ¶
func (mapping *FlowNodeMapping) Finalize()
func (*FlowNodeMapping) RegisterElementToFlowNode ¶
func (mapping *FlowNodeMapping) RegisterElementToFlowNode(element schema.FlowNodeInterface, flowNode IFlowNode) error
func (*FlowNodeMapping) ResolveElementToFlowNode ¶
func (mapping *FlowNodeMapping) ResolveElementToFlowNode(element schema.FlowNodeInterface) (flowNode IFlowNode, found bool)
type FlowTrace ¶
type FlowTrace struct {
Source schema.FlowNodeInterface
Flows []Snapshot
}
type Harness ¶
type Harness struct {
*Wiring
// contains filtered or unexported fields
}
func NewHarness ¶
func NewHarness(wiring *Wiring, idGenerator id.IGenerator, constructor Constructor) (node *Harness, err error)
func (*Harness) ConsumeEvent ¶
func (*Harness) Element ¶
func (node *Harness) Element() schema.FlowNodeInterface
func (*Harness) NextAction ¶
type IFlowNode ¶
type IFlowNode interface {
IOutgoing
Element() schema.FlowNodeInterface
}
type InclusiveGateway ¶
type InclusiveGateway struct {
*Wiring
// contains filtered or unexported fields
}
func NewInclusiveGateway ¶
func NewInclusiveGateway(wiring *Wiring, inclusiveGateway *schema.InclusiveGateway) (gw *InclusiveGateway, err error)
func (*InclusiveGateway) Element ¶
func (gw *InclusiveGateway) Element() schema.FlowNodeInterface
func (*InclusiveGateway) NextAction ¶
func (gw *InclusiveGateway) NextAction(ctx context.Context, flow Flow) chan IAction
type InclusiveNoEffectiveSequenceFlows ¶
type InclusiveNoEffectiveSequenceFlows struct {
*schema.InclusiveGateway
}
func (InclusiveNoEffectiveSequenceFlows) Error ¶
func (e InclusiveNoEffectiveSequenceFlows) Error() string
type IncomingFlowProcessedTrace ¶
type IncomingFlowProcessedTrace struct {
Node *schema.ParallelGateway
Flow Flow
}
IncomingFlowProcessedTrace signals that a particular flow has been processed. If any action has been taken, it has already happened
func (IncomingFlowProcessedTrace) Unpack ¶ added in v2.4.2
func (t IncomingFlowProcessedTrace) Unpack() any
type InstanceTrace ¶
InstanceTrace wraps any trace with process instance id
func (InstanceTrace) Unpack ¶ added in v2.4.2
func (t InstanceTrace) Unpack() any
func (InstanceTrace) Unwrap ¶
func (t InstanceTrace) Unwrap() tracing.ITrace
type InstantiationTrace ¶
InstantiationTrace denotes instantiation of a given process
func (InstantiationTrace) Unpack ¶ added in v2.4.2
func (i InstantiationTrace) Unpack() any
type LeaveTrace ¶
type LeaveTrace struct {
Node schema.FlowNodeInterface
}
func (LeaveTrace) Unpack ¶ added in v2.4.2
func (t LeaveTrace) Unpack() any
type NewFlowNodeTrace ¶
type NewFlowNodeTrace struct {
Node schema.FlowNodeInterface
}
func (NewFlowNodeTrace) Unpack ¶ added in v2.4.2
func (t NewFlowNodeTrace) Unpack() any
type NewFlowTrace ¶
func (NewFlowTrace) Unpack ¶ added in v2.4.2
func (t NewFlowTrace) Unpack() any
type Option ¶
type Option func(*Options)
Option allows to modify configuration of an instance in a flexible fashion (as it's just a modification function)
func WithContext ¶
WithContext will pass a given context to a new instance instead of implicitly generated one
func WithDataObjects ¶
func WithEventEgress ¶
func WithEventIngress ¶
func WithIdGenerator ¶
func WithIdGenerator(idGenerator id.IGenerator) Option
func WithLocator ¶
func WithLocator(locator data.IFlowDataLocator) Option
func WithProcessEventDefinitionInstanceBuilder ¶ added in v2.5.1
func WithProcessEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) Option
func WithTracer ¶
WithTracer overrides instance's tracer
func WithVariables ¶
type ParallelGateway ¶
type ParallelGateway struct {
*Wiring
// contains filtered or unexported fields
}
func NewParallelGateway ¶
func NewParallelGateway(wiring *Wiring, parallelGateway *schema.ParallelGateway) (gateway *ParallelGateway, err error)
func (*ParallelGateway) Element ¶
func (gw *ParallelGateway) Element() schema.FlowNodeInterface
func (*ParallelGateway) NextAction ¶
func (gw *ParallelGateway) NextAction(ctx context.Context, flow Flow) chan IAction
type ProbeAction ¶
type ProbeAction struct {
SequenceFlows []*SequenceFlow
// ProbeReport is a function that needs to be called
// wth sequence flow indices that have successful
// condition expressions (or none)
ProbeReport func([]int)
}
type Process ¶
type Process struct {
*Options
// contains filtered or unexported fields
}
func NewProcess ¶
func (*Process) ConsumeEvent ¶ added in v2.5.1
func (*Process) FlowNodeMapping ¶ added in v2.5.1
func (p *Process) FlowNodeMapping() *FlowNodeMapping
func (*Process) Locator ¶ added in v2.5.1
func (p *Process) Locator() data.IFlowDataLocator
func (*Process) RegisterEventConsumer ¶ added in v2.5.1
func (*Process) StartAll ¶ added in v2.5.1
StartAll explicitly starts the instance by triggering all start events, if any
type ProcessLandMarkTrace ¶
type ProcessLandMarkTrace struct {
Node schema.FlowNodeInterface
}
ProcessLandMarkTrace denotes instantiation of a given sub process
func (ProcessLandMarkTrace) Unpack ¶ added in v2.4.2
func (t ProcessLandMarkTrace) Unpack() any
type ProcessTrace ¶
ProcessTrace wraps any trace within a given process
func (ProcessTrace) Unpack ¶ added in v2.4.2
func (t ProcessTrace) Unpack() any
func (ProcessTrace) Unwrap ¶
func (t ProcessTrace) Unwrap() tracing.ITrace
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Retry describes the retry handler for flow
func (*Retry) IsContinue ¶
type SequenceFlow ¶
type SequenceFlow struct {
*schema.SequenceFlow
// contains filtered or unexported fields
}
func AllSequenceFlows ¶
func AllSequenceFlows(sequenceFlows *[]SequenceFlow, exclusion ...func(*SequenceFlow) bool) (result []*SequenceFlow)
func MakeSequenceFlow ¶
func MakeSequenceFlow(sf *schema.SequenceFlow, process schema.Element) SequenceFlow
func NewSequenceFlow ¶
func NewSequenceFlow(sf *schema.SequenceFlow, process schema.Element) *SequenceFlow
func (*SequenceFlow) Source ¶
func (sf *SequenceFlow) Source() (schema.FlowNodeInterface, error)
func (*SequenceFlow) Target ¶
func (sf *SequenceFlow) Target() (schema.FlowNodeInterface, error)
func (*SequenceFlow) TargetIndex ¶
func (sf *SequenceFlow) TargetIndex() (index int, err error)
type Snapshot ¶
type Snapshot struct {
// contains filtered or unexported fields
}
func (*Snapshot) SequenceFlow ¶
func (s *Snapshot) SequenceFlow() *SequenceFlow
type StartEvent ¶
type StartEvent struct {
*Wiring
// contains filtered or unexported fields
}
func NewStartEvent ¶
func NewStartEvent(wiring *Wiring, startEvent *schema.StartEvent, idGenerator id.IGenerator, ) (evt *StartEvent, err error)
func (*StartEvent) ConsumeEvent ¶
func (evt *StartEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
func (*StartEvent) Element ¶
func (evt *StartEvent) Element() schema.FlowNodeInterface
func (*StartEvent) NextAction ¶
func (evt *StartEvent) NextAction(ctx context.Context, flow Flow) chan IAction
func (*StartEvent) Trigger ¶
func (evt *StartEvent) Trigger(ctx context.Context)
type SubProcess ¶
type SubProcess struct {
*Wiring
// contains filtered or unexported fields
}
func (*SubProcess) Cancel ¶
func (p *SubProcess) Cancel() <-chan bool
func (*SubProcess) ConsumeEvent ¶
func (p *SubProcess) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)
func (*SubProcess) Element ¶
func (p *SubProcess) Element() schema.FlowNodeInterface
func (*SubProcess) NextAction ¶
func (p *SubProcess) NextAction(ctx context.Context, flow Flow) chan IAction
func (*SubProcess) RegisterEventConsumer ¶
func (p *SubProcess) RegisterEventConsumer(ev event.IConsumer) (err error)
func (*SubProcess) Type ¶
func (p *SubProcess) Type() ActivityType
type TaskTrace ¶
type TaskTrace interface {
Unpack() any
Context() context.Context
GetActivity() Activity
GetDataObjects() map[string]any
GetHeaders() map[string]string
GetProperties() map[string]any
Do(options ...DoOption)
}
TaskTrace describes a common channel handler for all tasks
type TerminationTrace ¶
type TerminationTrace struct {
FlowId id.Id
Source schema.FlowNodeInterface
}
func (TerminationTrace) Unpack ¶ added in v2.4.2
func (t TerminationTrace) Unpack() any
type VisitTrace ¶
type VisitTrace struct {
Node schema.FlowNodeInterface
}
func (VisitTrace) Unpack ¶ added in v2.4.2
func (t VisitTrace) Unpack() any
type Wiring ¶
type Wiring struct {
ProcessInstanceId id.Id
FlowNodeId schema.Id
Definitions *schema.Definitions
Incoming []SequenceFlow
Outgoing []SequenceFlow
EventIngress event.IConsumer
EventEgress event.ISource
Tracer tracing.ITracer
Process schema.Element
FlowNodeMapping *FlowNodeMapping
FlowWaitGroup *sync.WaitGroup
EventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder
Locator data.IFlowDataLocator
}
Wiring holds all necessary "wiring" for functioning of flow nodes: definitions, process, sequence flow, event management, tracer, flow node mapping and a flow wait group
func NewWiring ¶
func NewWiring( processInstanceId id.Id, process schema.Element, definitions *schema.Definitions, flowNode *schema.FlowNode, eventIngress event.IConsumer, eventEgress event.ISource, tracer tracing.ITracer, flowNodeMapping *FlowNodeMapping, flowWaitGroup *sync.WaitGroup, eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder, locator data.IFlowDataLocator, ) (node *Wiring, err error)
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
|
|
|
gateway
command
|
|
|
properties
command
|
|
|
subprocess
command
|
|
|
user_task
command
|
|
|
pkg
|
|
|
logic
Package logic provides commonly shared "logic units" (or algorithms)
|
Package logic provides commonly shared "logic units" (or algorithms) |
|
tracing
Package tracing is a capability to get an ordered stream of structured records of what has happened.
|
Package tracing is a capability to get an ordered stream of structured records of what has happened. |