bpmn

package module
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2025 License: LGPL-2.1 Imports: 17 Imported by: 0

README

Introduce | 中文

a lightweight bpmn engine

Download

go get -u github.com/olive-io/bpmn/schema
go get -u github.com/olive-io/bpmn/v2

Usage

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	GitCommit = ""
	GitTag    = ""
	BuildDate = ""
)
View Source
var (
	DefaultTaskExecTimeout = time.Second * 30
)

Functions

func ApplyTaskDataOutput

func ApplyTaskDataOutput(element schema.BaseElementInterface, dataOutputs map[string]any) map[string]data.IItem

func ApplyTaskResult added in v2.2.0

func ApplyTaskResult(element schema.BaseElementInterface, results map[string]any) map[string]data.IItem

func FetchTaskDataInput

func FetchTaskDataInput(locator data.IFlowDataLocator, element schema.BaseElementInterface) (headers map[string]string, properties, dataObjects map[string]any)

func FetchTaskTimeout added in v2.2.4

func FetchTaskTimeout(element schema.BaseElementInterface) time.Duration

FetchTaskTimeout returns timeout by schema.BaseElementInterface

func GoV

func GoV() string

func Version

func Version() string

Types

type ActionTransformer

type ActionTransformer func(sequenceFlowId *schema.IdRef, action IAction) IAction

type ActiveBoundaryTrace

type ActiveBoundaryTrace struct {
	Start bool
	Node  schema.FlowNodeInterface
}

func (ActiveBoundaryTrace) Element

func (b ActiveBoundaryTrace) Element() any

type ActiveListeningTrace

type ActiveListeningTrace struct {
	Node *schema.CatchEvent
}

func (ActiveListeningTrace) Element

func (t ActiveListeningTrace) Element() any

type Activity

type Activity interface {
	IFlowNode
	Type() ActivityType
	// Cancel initiates a cancellation of activity and returns a channel
	// that will signal a boolean (`true` if cancellation was successful,
	// `false` otherwise)
	Cancel() <-chan bool
}

Activity is a generic interface to flow nodes that are activities

type ActivityType added in v2.3.0

type ActivityType string
const (
	TaskActivity         ActivityType = "Task"
	ServiceTaskActivity  ActivityType = "ServiceTask"
	ScriptTaskActivity   ActivityType = "ScriptTask"
	UserTaskActivity     ActivityType = "UserTask"
	ManualTaskActivity   ActivityType = "ManualTask"
	CallActivity         ActivityType = "CallActivity"
	BusinessRuleActivity ActivityType = "BusinessRuleTask"
	SendTaskActivity     ActivityType = "SendTask"
	ReceiveTaskActivity  ActivityType = "ReceiveTask"
	SubprocessActivity   ActivityType = "Subprocess"
)

type CancellationFlowNodeTrace

type CancellationFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (CancellationFlowNodeTrace) Element

func (t CancellationFlowNodeTrace) Element() any

type CancellationFlowTrace

type CancellationFlowTrace struct {
	FlowId id.Id
}

func (CancellationFlowTrace) Element

func (t CancellationFlowTrace) Element() any

type CatchEvent

type CatchEvent struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewCatchEvent

func NewCatchEvent(ctx context.Context, wiring *Wiring, catchEvent *schema.CatchEvent) (evt *CatchEvent, err error)

func (*CatchEvent) ConsumeEvent

func (evt *CatchEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*CatchEvent) Element

func (evt *CatchEvent) Element() schema.FlowNodeInterface

func (*CatchEvent) NextAction

func (evt *CatchEvent) NextAction(flow Flow) chan IAction

type CeaseFlowTrace

type CeaseFlowTrace struct {
	Process schema.Element
}

func (CeaseFlowTrace) Element

func (t CeaseFlowTrace) Element() any

type CompleteAction

type CompleteAction struct{}

type CompletionTrace

type CompletionTrace struct {
	Node schema.FlowNodeInterface
}

func (CompletionTrace) Element

func (t CompletionTrace) Element() any

type Constructor

type Constructor = func(*Wiring) (node Activity, err error)

func NewSubProcess

func NewSubProcess(ctx context.Context,
	eventDefinitionBuilder event.IDefinitionInstanceBuilder,
	idGenerator id.IGenerator,
	tracer tracing.ITracer,
	subProcess *schema.SubProcess) Constructor

func NewTask

func NewTask(ctx context.Context, element schema.FlowNodeInterface, activityType ActivityType) Constructor

type DeterminationMadeTrace

type DeterminationMadeTrace struct {
	Node schema.FlowNodeInterface
}

func (DeterminationMadeTrace) Element

func (t DeterminationMadeTrace) Element() any

type DoOption

type DoOption func(*DoResponse)

func DoWithErr added in v2.2.0

func DoWithErr(err error) DoOption

func DoWithErrHandle added in v2.2.0

func DoWithErrHandle(err error, ch <-chan ErrHandler) DoOption

func DoWithObjects added in v2.2.0

func DoWithObjects(dataObjects map[string]any) DoOption

func DoWithResults added in v2.2.0

func DoWithResults(results map[string]any) DoOption

func DoWithValue added in v2.2.0

func DoWithValue(key, value any) DoOption

type DoResponse

type DoResponse struct {
	Context     context.Context
	DataObjects map[string]any
	Results     map[string]any
	Err         error
	HandlerCh   <-chan ErrHandler
}

type EndEvent

type EndEvent struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewEndEvent

func NewEndEvent(ctx context.Context, wiring *Wiring, endEvent *schema.EndEvent) (evt *EndEvent, err error)

func (*EndEvent) Element

func (evt *EndEvent) Element() schema.FlowNodeInterface

func (*EndEvent) NextAction

func (evt *EndEvent) NextAction(Flow) chan IAction

type ErrHandleMode

type ErrHandleMode int
const (
	RetryMode ErrHandleMode = iota + 1
	SkipMode
	ExitMode
)

type ErrHandler

type ErrHandler struct {
	Mode    ErrHandleMode
	Retries int32
}

type ErrorTrace

type ErrorTrace struct {
	Error error
}

func (ErrorTrace) Element

func (t ErrorTrace) Element() any

type EventBasedGateway

type EventBasedGateway struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewEventBasedGateway

func NewEventBasedGateway(ctx context.Context, wiring *Wiring, eventBasedGateway *schema.EventBasedGateway) (gw *EventBasedGateway, err error)

func (*EventBasedGateway) Element

func (*EventBasedGateway) NextAction

func (gw *EventBasedGateway) NextAction(flow Flow) chan IAction

type EventObservedTrace

type EventObservedTrace struct {
	Node  *schema.CatchEvent
	Event event.IEvent
}

EventObservedTrace signals the fact that a particular event has in fact observed by the node

func (EventObservedTrace) Element

func (t EventObservedTrace) Element() any

type ExclusiveGateway

type ExclusiveGateway struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewExclusiveGateway

func NewExclusiveGateway(ctx context.Context, wiring *Wiring, exclusiveGateway *schema.ExclusiveGateway) (gw *ExclusiveGateway, err error)

func (*ExclusiveGateway) Element

func (*ExclusiveGateway) NextAction

func (gw *ExclusiveGateway) NextAction(flow Flow) chan IAction

type ExclusiveNoEffectiveSequenceFlows

type ExclusiveNoEffectiveSequenceFlows struct {
	*schema.ExclusiveGateway
}

func (ExclusiveNoEffectiveSequenceFlows) Error

type Flow

type Flow interface {
	// Id returns flow's unique identifier
	Id() id.Id
	// SequenceFlow returns an inbound sequence flow this flow
	// is currently at.
	SequenceFlow() *SequenceFlow
}

Flow specifies an interface for BPMN flows

type FlowAction

type FlowAction struct {
	Response *FlowActionResponse

	SequenceFlows []*SequenceFlow
	// Index of sequence flows that should flow without
	// conditionExpression being evaluated
	UnconditionalFlows []int
	// The actions produced by the targets should be produced by
	// this function
	ActionTransformer ActionTransformer
	// If a supplied channel sends a function that returns true, the flow action
	// is to be terminated if it wasn't already
	Terminate Terminate
}

type FlowActionResponse

type FlowActionResponse struct {
	DataObjects map[string]data.IItem
	Variables   map[string]data.IItem
	Err         error
	Handler     <-chan ErrHandler
}

type FlowNodeMapping

type FlowNodeMapping struct {
	// contains filtered or unexported fields
}

func NewLockedFlowNodeMapping

func NewLockedFlowNodeMapping() *FlowNodeMapping

func (*FlowNodeMapping) Finalize

func (mapping *FlowNodeMapping) Finalize()

func (*FlowNodeMapping) RegisterElementToFlowNode

func (mapping *FlowNodeMapping) RegisterElementToFlowNode(element schema.FlowNodeInterface, flowNode IFlowNode) error

func (*FlowNodeMapping) ResolveElementToFlowNode

func (mapping *FlowNodeMapping) ResolveElementToFlowNode(element schema.FlowNodeInterface) (flowNode IFlowNode, found bool)

type FlowTrace

type FlowTrace struct {
	Source schema.FlowNodeInterface
	Flows  []Snapshot
}

func (FlowTrace) Element

func (t FlowTrace) Element() any

type Harness

type Harness struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewHarness

func NewHarness(ctx context.Context, wiring *Wiring, idGenerator id.IGenerator, constructor Constructor) (node *Harness, err error)

func (*Harness) Activity

func (node *Harness) Activity() Activity

func (*Harness) ConsumeEvent

func (node *Harness) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*Harness) Element

func (node *Harness) Element() schema.FlowNodeInterface

func (*Harness) NextAction

func (node *Harness) NextAction(flow Flow) chan IAction

func (*Harness) RegisterEventConsumer

func (node *Harness) RegisterEventConsumer(consumer event.IConsumer) (err error)

type IAction

type IAction interface {
	// contains filtered or unexported methods
}

type IFlowNode

type IFlowNode interface {
	IOutgoing
	Element() schema.FlowNodeInterface
}

type IOutgoing

type IOutgoing interface {
	NextAction(flow Flow) chan IAction
}

type InclusiveGateway

type InclusiveGateway struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewInclusiveGateway

func NewInclusiveGateway(ctx context.Context, wiring *Wiring, inclusiveGateway *schema.InclusiveGateway) (gw *InclusiveGateway, err error)

func (*InclusiveGateway) Element

func (*InclusiveGateway) NextAction

func (gw *InclusiveGateway) NextAction(flow Flow) chan IAction

type InclusiveNoEffectiveSequenceFlows

type InclusiveNoEffectiveSequenceFlows struct {
	*schema.InclusiveGateway
}

func (InclusiveNoEffectiveSequenceFlows) Error

type IncomingFlowProcessedTrace

type IncomingFlowProcessedTrace struct {
	Node *schema.ParallelGateway
	Flow Flow
}

IncomingFlowProcessedTrace signals that a particular flow has been processed. If any action has been taken, it has already happened

func (IncomingFlowProcessedTrace) Element

func (t IncomingFlowProcessedTrace) Element() any

type Instance

type Instance struct {
	*Options
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance(process *schema.Process, definitions *schema.Definitions, options *Options) (instance *Instance, err error)

func (*Instance) ConsumeEvent

func (ins *Instance) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*Instance) FlowNodeMapping

func (ins *Instance) FlowNodeMapping() *FlowNodeMapping

func (*Instance) Id

func (ins *Instance) Id() id.Id

func (*Instance) Locator

func (ins *Instance) Locator() data.IFlowDataLocator

func (*Instance) Process

func (ins *Instance) Process() *schema.Process

func (*Instance) RegisterEventConsumer

func (ins *Instance) RegisterEventConsumer(ev event.IConsumer) (err error)

func (*Instance) StartAll

func (ins *Instance) StartAll() (err error)

StartAll explicitly starts the instance by triggering all start events, if any

func (*Instance) StartWith

func (ins *Instance) StartWith(ctx context.Context, startEvent schema.StartEventInterface) (err error)

StartWith explicitly starts the instance by triggering a given start event

func (*Instance) Tracer

func (ins *Instance) Tracer() tracing.ITracer

func (*Instance) WaitUntilComplete

func (ins *Instance) WaitUntilComplete(ctx context.Context) (complete bool)

WaitUntilComplete waits until the instance is complete. Returns true if the instance was complete, false if the context signaled `Done`

type InstanceTrace

type InstanceTrace struct {
	InstanceId id.Id
	Trace      tracing.ITrace
}

InstanceTrace wraps any trace with process instance id

func (InstanceTrace) Element

func (t InstanceTrace) Element() any

func (InstanceTrace) Unwrap

func (t InstanceTrace) Unwrap() tracing.ITrace

type InstantiationTrace

type InstantiationTrace struct {
	InstanceId id.Id
}

InstantiationTrace denotes instantiation of a given process

func (InstantiationTrace) Element

func (i InstantiationTrace) Element() any

type LeaveTrace

type LeaveTrace struct {
	Node schema.FlowNodeInterface
}

func (LeaveTrace) Element

func (t LeaveTrace) Element() any

type NewFlowNodeTrace

type NewFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (NewFlowNodeTrace) Element

func (t NewFlowNodeTrace) Element() any

type NewFlowTrace

type NewFlowTrace struct {
	FlowId id.Id
}

func (NewFlowTrace) Element

func (t NewFlowTrace) Element() any

type NoAction

type NoAction struct{}

type Option

type Option func(*Options)

Option allows to modify configuration of an instance in a flexible fashion (as it's just a modification function)

func WithContext

func WithContext(ctx context.Context) Option

WithContext will pass a given context to a new instance instead of implicitly generated one

func WithDataObjects

func WithDataObjects(dataObjects map[string]any) Option

func WithEventDefinitionInstanceBuilder

func WithEventDefinitionInstanceBuilder(builder event.IDefinitionInstanceBuilder) Option

func WithEventEgress

func WithEventEgress(source event.ISource) Option

func WithEventIngress

func WithEventIngress(consumer event.IConsumer) Option

func WithIdGenerator

func WithIdGenerator(builder id.IGeneratorBuilder) Option

func WithLocator

func WithLocator(locator data.IFlowDataLocator) Option

func WithTracer

func WithTracer(tracer tracing.ITracer) Option

WithTracer overrides instance's tracer

func WithVariables

func WithVariables(variables map[string]any) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions(opts ...Option) *Options

type ParallelGateway

type ParallelGateway struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewParallelGateway

func NewParallelGateway(ctx context.Context, wiring *Wiring, parallelGateway *schema.ParallelGateway) (gateway *ParallelGateway, err error)

func (*ParallelGateway) Element

func (*ParallelGateway) NextAction

func (gw *ParallelGateway) NextAction(flow Flow) chan IAction

type ProbeAction

type ProbeAction struct {
	SequenceFlows []*SequenceFlow
	// ProbeReport is a function that needs to be called
	// wth sequence flow indices that have successful
	// condition expressions (or none)
	ProbeReport func([]int)
}

type Process

type Process struct {
	*Options

	Element     *schema.Process
	Definitions *schema.Definitions
	// contains filtered or unexported fields
}

func MakeProcess

func MakeProcess(element *schema.Process, definitions *schema.Definitions, opts ...Option) Process

func NewProcess

func NewProcess(element *schema.Process, definitions *schema.Definitions, opts ...Option) *Process

func (*Process) Instantiate

func (p *Process) Instantiate(opts ...Option) (inst *Instance, err error)

func (*Process) Process

func (p *Process) Process() *schema.Process

func (*Process) Tracer

func (p *Process) Tracer() tracing.ITracer

type ProcessLandMarkTrace

type ProcessLandMarkTrace struct {
	Node schema.FlowNodeInterface
}

ProcessLandMarkTrace denotes instantiation of a given sub process

func (ProcessLandMarkTrace) Element

func (t ProcessLandMarkTrace) Element() any

type ProcessTrace

type ProcessTrace struct {
	Process *schema.Process
	Trace   tracing.ITrace
}

ProcessTrace wraps any trace within a given process

func (ProcessTrace) Element

func (t ProcessTrace) Element() any

func (ProcessTrace) Unwrap

func (t ProcessTrace) Unwrap() tracing.ITrace

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Retry describes the retry handler for flow

func (*Retry) IsContinue

func (r *Retry) IsContinue() bool

func (*Retry) Reset

func (r *Retry) Reset(retries int32)

func (*Retry) Step

func (r *Retry) Step()

Step Attempts += 1

type SequenceFlow

type SequenceFlow struct {
	*schema.SequenceFlow
	// contains filtered or unexported fields
}

func AllSequenceFlows

func AllSequenceFlows(sequenceFlows *[]SequenceFlow, exclusion ...func(*SequenceFlow) bool) (result []*SequenceFlow)

func MakeSequenceFlow

func MakeSequenceFlow(sf *schema.SequenceFlow, process schema.Element) SequenceFlow

func NewSequenceFlow

func NewSequenceFlow(sf *schema.SequenceFlow, process schema.Element) *SequenceFlow

func (*SequenceFlow) Source

func (sf *SequenceFlow) Source() (schema.FlowNodeInterface, error)

func (*SequenceFlow) Target

func (sf *SequenceFlow) Target() (schema.FlowNodeInterface, error)

func (*SequenceFlow) TargetIndex

func (sf *SequenceFlow) TargetIndex() (index int, err error)

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func (*Snapshot) Id

func (s *Snapshot) Id() id.Id

func (*Snapshot) SequenceFlow

func (s *Snapshot) SequenceFlow() *SequenceFlow

type StartEvent

type StartEvent struct {
	*Wiring
	// contains filtered or unexported fields
}

func NewStartEvent

func NewStartEvent(ctx context.Context, wiring *Wiring,
	startEvent *schema.StartEvent, idGenerator id.IGenerator,
) (evt *StartEvent, err error)

func (*StartEvent) ConsumeEvent

func (evt *StartEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*StartEvent) Element

func (evt *StartEvent) Element() schema.FlowNodeInterface

func (*StartEvent) NextAction

func (evt *StartEvent) NextAction(flow Flow) chan IAction

func (*StartEvent) Trigger

func (evt *StartEvent) Trigger()

type SubProcess

type SubProcess struct {
	*Wiring
	// contains filtered or unexported fields
}

func (*SubProcess) Cancel

func (p *SubProcess) Cancel() <-chan bool

func (*SubProcess) ConsumeEvent

func (p *SubProcess) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*SubProcess) Element

func (p *SubProcess) Element() schema.FlowNodeInterface

func (*SubProcess) NextAction

func (p *SubProcess) NextAction(Flow) chan IAction

func (*SubProcess) RegisterEventConsumer

func (p *SubProcess) RegisterEventConsumer(ev event.IConsumer) (err error)

func (*SubProcess) Type

func (p *SubProcess) Type() ActivityType

type Task

type Task struct {
	*Wiring
	// contains filtered or unexported fields
}

func (*Task) Cancel

func (task *Task) Cancel() <-chan bool

func (*Task) Element

func (task *Task) Element() schema.FlowNodeInterface

func (*Task) NextAction

func (task *Task) NextAction(Flow) chan IAction

func (*Task) Type

func (task *Task) Type() ActivityType

type TaskTrace

type TaskTrace interface {
	Element() any
	Context() context.Context
	GetActivity() Activity
	GetDataObjects() map[string]any
	GetHeaders() map[string]string
	GetProperties() map[string]any
	Do(options ...DoOption)
}

TaskTrace describes a common channel handler for all tasks

type Terminate

type Terminate func(sequenceFlowId *schema.IdRef) chan bool

type TerminationTrace

type TerminationTrace struct {
	FlowId id.Id
	Source schema.FlowNodeInterface
}

func (TerminationTrace) Element

func (t TerminationTrace) Element() any

type VisitTrace

type VisitTrace struct {
	Node schema.FlowNodeInterface
}

func (VisitTrace) Element

func (t VisitTrace) Element() any

type Wiring

type Wiring struct {
	ProcessInstanceId              id.Id
	FlowNodeId                     schema.Id
	Definitions                    *schema.Definitions
	Incoming                       []SequenceFlow
	Outgoing                       []SequenceFlow
	EventIngress                   event.IConsumer
	EventEgress                    event.ISource
	Tracer                         tracing.ITracer
	Process                        schema.Element
	FlowNodeMapping                *FlowNodeMapping
	FlowWaitGroup                  *sync.WaitGroup
	EventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder
	Locator                        data.IFlowDataLocator
}

Wiring holds all necessary "wiring" for functioning of flow nodes: definitions, process, sequence flow, event management, tracer, flow node mapping and a flow wait group

func NewWiring

func NewWiring(
	processInstanceId id.Id,
	process schema.Element,
	definitions *schema.Definitions,
	flowNode *schema.FlowNode,
	eventIngress event.IConsumer,
	eventEgress event.ISource,
	tracer tracing.ITracer,
	flowNodeMapping *FlowNodeMapping,
	flowWaitGroup *sync.WaitGroup,
	eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder,
	locator data.IFlowDataLocator,
) (node *Wiring, err error)

func (*Wiring) CloneFor

func (wr *Wiring) CloneFor(node *schema.FlowNode) (result *Wiring, err error)

CloneFor copies receiver, overriding FlowNodeId, Incoming, Outgoing for a given flowNode

Directories

Path Synopsis
examples
basic command
gateway command
properties command
subprocess command
user_task command
pkg
id
logic
Package logic provides commonly shared "logic units" (or algorithms)
Package logic provides commonly shared "logic units" (or algorithms)
tracing
Package tracing is a capability to get an ordered stream of structured records of what has happened.
Package tracing is a capability to get an ordered stream of structured records of what has happened.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL