bpmn

package module
v2.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

README

Introduce | 中文

github.com/olive-io/bpmn

Lightweight Business Process Model and Notation (BPMN) 2.0 workflow engine implemented purely in Go

Go Reference Build Status Last Commit


Introduction

github.com/olive-io/bpmn is a lightweight Business Process Model and Notation (BPMN) 2.0 workflow engine implemented in Go, designed for modeling and executing business processes embedded in Go applications.
It supports core BPMN 2.0 elements including Activities (User Task, Service Task, Script Task), Events (Start Event, End Event, Intermediate Catch Event), Gateways (Exclusive Gateway, Inclusive Gateway, Parallel Gateway, Event-based Gateway), Sub-processes, Sequence Flows, and extensible process attributes.


Key Features

  • Standard BPMN 2.0 Compliance – Build process models using standardized Business Process Model and Notation elements, fully compliant with OMG specification.
  • Lightweight & Embeddable – Minimal dependencies; easily embedded into business systems with zero external service requirements.
  • Complete Activity Support – User Tasks, Service Tasks, Script Tasks, Manual Tasks, Business Rule Tasks, and custom Activity implementations.
  • Comprehensive Flow Control – Sub-processes, Parallel Gateways, Exclusive Gateways, Inclusive Gateways, and Event-based Gateways for complex decision logic.
  • Event-Driven Processing – Start Events, End Events, Intermediate Catch Events, Timer Events, and Boundary Events.
  • Extensible Process Attributes – Custom properties and data objects to meet diverse business process requirements.
  • Comprehensive Test Coverage – Examples and modules come with unit tests to ensure reliable execution.

Getting Started

Installation
go get -u github.com/olive-io/bpmn/schema
go get -u github.com/olive-io/bpmn/v2
Quick Start Example
package main

import (
	"context"
	"log"
	"os"

	"github.com/olive-io/bpmn/schema"
	"github.com/olive-io/bpmn/v2"
	"github.com/olive-io/bpmn/v2/pkg/tracing"
)

func main() {

	var err error
	data, err := os.ReadFile("task.bpmn")
	if err != nil {
		log.Fatalf("Can't read bpmn: %v", err)
	}
	definitions, err := schema.Parse(data)
	if err != nil {
		log.Fatalf("XML unmarshalling error: %v", err)
	}

	engine := bpmn.NewEngine()
	options := []bpmn.Option{
		bpmn.WithVariables(map[string]any{
			"c": map[string]string{"name": "cc"},
		}),
		bpmn.WithDataObjects(map[string]any{
			"a": struct{}{},
		}),
	}
	ctx := context.Background()
	ins, err := engine.NewProcess(&definitions, options...)
	if err != nil {
		log.Fatalf("failed to instantiate the process: %s", err)
		return
	}
	traces := ins.Tracer().Subscribe()
	defer ins.Tracer().Unsubscribe(traces)
	err = ins.StartAll(ctx)
	if err != nil {
		log.Fatalf("failed to run the instance: %s", err)
	}
	go func() {
		for {
			var trace tracing.ITrace
			select {
			case trace = <-traces:
			}

			trace = tracing.Unwrap(trace)
			switch trace := trace.(type) {
			case bpmn.FlowTrace:
			case bpmn.TaskTrace:
				trace.Do(bpmn.DoWithResults(
					map[string]any{
						"c": map[string]string{"name": "cc1"},
						"a": 2,
					}),
				)
			case bpmn.ErrorTrace:
				log.Fatalf("%#v", trace)
				return
			case bpmn.CeaseFlowTrace:
				return
			default:
				log.Printf("%#v", trace)
			}
		}
	}()
	ins.WaitUntilComplete(ctx)

	pros := ins.Locator().CloneVariables()
	log.Printf("%#v", pros)
}

More Examples

License

This project is licensed under the Apache-2.0 License. Commercial and derivative works are welcome.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultTaskExecTimeout = time.Second * 3
)

Functions

func ApplyTaskDataOutput

func ApplyTaskDataOutput(element schema.BaseElementInterface, dataOutputs map[string]any) map[string]data.IItem

func ApplyTaskResult added in v2.2.0

func ApplyTaskResult(element schema.BaseElementInterface, results map[string]any) map[string]data.IItem

func FetchTaskDataInput

func FetchTaskDataInput(locator data.IFlowDataLocator, element schema.BaseElementInterface) (headers map[string]string, properties, dataObjects map[string]any)

func FetchTaskTimeout added in v2.2.4

func FetchTaskTimeout(element schema.BaseElementInterface) time.Duration

FetchTaskTimeout returns timeout by schema.BaseElementInterface

Types

type ActionTransformer

type ActionTransformer func(sequenceFlowId *schema.IdRef, action IAction) IAction

type ActiveBoundaryTrace

type ActiveBoundaryTrace struct {
	Start bool
	Node  schema.FlowNodeInterface
}

func (ActiveBoundaryTrace) Unpack added in v2.4.2

func (b ActiveBoundaryTrace) Unpack() any

type ActiveListeningTrace

type ActiveListeningTrace struct {
	Node *schema.CatchEvent
}

func (ActiveListeningTrace) Unpack added in v2.4.2

func (t ActiveListeningTrace) Unpack() any

type Activity

type Activity interface {
	IFlowNode
	Type() ActivityType
	// Cancel initiates a cancellation of activity and returns a channel
	// that will signal a boolean (`true` if cancellation was successful,
	// `false` otherwise)
	Cancel() <-chan bool
}

Activity is a generic interface to flow nodes that are activities

type ActivityType added in v2.3.0

type ActivityType string
const (
	TaskActivity         ActivityType = "Task"
	ServiceTaskActivity  ActivityType = "ServiceTask"
	ScriptTaskActivity   ActivityType = "ScriptTask"
	UserTaskActivity     ActivityType = "UserTask"
	ManualTaskActivity   ActivityType = "ManualTask"
	CallActivity         ActivityType = "CallActivity"
	BusinessRuleActivity ActivityType = "BusinessRuleTask"
	SendTaskActivity     ActivityType = "SendTask"
	ReceiveTaskActivity  ActivityType = "ReceiveTask"
	SubprocessActivity   ActivityType = "Subprocess"
)

type CancellationFlowNodeTrace

type CancellationFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (CancellationFlowNodeTrace) Unpack added in v2.4.2

func (t CancellationFlowNodeTrace) Unpack() any

type CancellationFlowTrace

type CancellationFlowTrace struct {
	FlowId id.Id
	Node   schema.FlowNodeInterface
}

func (CancellationFlowTrace) Unpack added in v2.4.2

func (t CancellationFlowTrace) Unpack() any

type CeaseFlowTrace

type CeaseFlowTrace struct {
	Process schema.Element
}

func (CeaseFlowTrace) Unpack added in v2.4.2

func (t CeaseFlowTrace) Unpack() any

type CeaseProcessSetTrace added in v2.6.0

type CeaseProcessSetTrace struct {
	Definitions *schema.Definitions
}

func (CeaseProcessSetTrace) Unpack added in v2.6.0

func (t CeaseProcessSetTrace) Unpack() any

type CompletionTrace

type CompletionTrace struct {
	Node schema.FlowNodeInterface
}

func (CompletionTrace) Unpack added in v2.4.2

func (t CompletionTrace) Unpack() any

type DeterminationMadeTrace

type DeterminationMadeTrace struct {
	Node schema.FlowNodeInterface
}

func (DeterminationMadeTrace) Unpack added in v2.4.2

func (t DeterminationMadeTrace) Unpack() any

type DoOption

type DoOption func(*DoResponse)

func DoWithErr added in v2.2.0

func DoWithErr(err error) DoOption

func DoWithErrHandle added in v2.2.0

func DoWithErrHandle(err error, ch <-chan ErrHandler) DoOption

func DoWithObjects added in v2.2.0

func DoWithObjects(dataObjects map[string]any) DoOption

func DoWithResults added in v2.2.0

func DoWithResults(results map[string]any) DoOption

func DoWithValue added in v2.2.0

func DoWithValue(key, value any) DoOption

type DoResponse

type DoResponse struct {
	Context     context.Context
	DataObjects map[string]any
	Results     map[string]any
	Err         error
	HandlerCh   <-chan ErrHandler
}

type Engine added in v2.5.1

type Engine struct {
	*EngineOptions
}

func NewEngine added in v2.5.1

func NewEngine(opts ...EngineOption) *Engine

func (*Engine) NewProcess added in v2.5.1

func (engine *Engine) NewProcess(definitions *schema.Definitions, opts ...Option) (process *Process, err error)

func (*Engine) NewProcessSet added in v2.6.0

func (engine *Engine) NewProcessSet(definitions *schema.Definitions, opts ...Option) (*ProcessSet, error)

type EngineOption added in v2.5.1

type EngineOption func(*EngineOptions)

func WithEngineContext added in v2.5.5

func WithEngineContext(ctx context.Context) EngineOption

func WithEventDefinitionInstanceBuilder

func WithEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) EngineOption

func WithIdGeneratorBuilder added in v2.5.1

func WithIdGeneratorBuilder(idGeneratorBuilder id.IGeneratorBuilder) EngineOption

type EngineOptions added in v2.5.1

type EngineOptions struct {
	// contains filtered or unexported fields
}

func NewEngineOptions added in v2.5.1

func NewEngineOptions(opts ...EngineOption) *EngineOptions

type ErrHandleMode

type ErrHandleMode int
const (
	RetryMode ErrHandleMode = iota + 1
	SkipMode
	ExitMode
)

type ErrHandler

type ErrHandler struct {
	Mode    ErrHandleMode
	Retries int32
}

type ErrorTrace

type ErrorTrace struct {
	Error error
}

func (ErrorTrace) Unpack added in v2.4.2

func (t ErrorTrace) Unpack() any

type EventObservedTrace

type EventObservedTrace struct {
	Node  *schema.CatchEvent
	Event event.IEvent
}

EventObservedTrace signals the fact that a particular event has in fact observed by the node

func (EventObservedTrace) Unpack added in v2.4.2

func (t EventObservedTrace) Unpack() any

type ExclusiveNoEffectiveSequenceFlows

type ExclusiveNoEffectiveSequenceFlows struct {
	*schema.ExclusiveGateway
}

func (ExclusiveNoEffectiveSequenceFlows) Error

type Flow

type Flow interface {
	// Id returns flow's unique identifier
	Id() id.Id
	// SequenceFlow returns an inbound sequence flow this flow
	// is currently at.
	SequenceFlow() *SequenceFlow
}

Flow specifies an interface for BPMN flows

type FlowActionResponse

type FlowActionResponse struct {
	// contains filtered or unexported fields
}

type FlowNodeMapping

type FlowNodeMapping struct {
	// contains filtered or unexported fields
}

func NewLockedFlowNodeMapping

func NewLockedFlowNodeMapping() *FlowNodeMapping

func (*FlowNodeMapping) Finalize

func (mapping *FlowNodeMapping) Finalize()

func (*FlowNodeMapping) RegisterElementToFlowNode

func (mapping *FlowNodeMapping) RegisterElementToFlowNode(element schema.FlowNodeInterface, flowNode IFlowNode) error

func (*FlowNodeMapping) ResolveElementToFlowNode

func (mapping *FlowNodeMapping) ResolveElementToFlowNode(element schema.FlowNodeInterface) (flowNode IFlowNode, found bool)

type FlowTrace

type FlowTrace struct {
	Source schema.FlowNodeInterface
	Flows  []Snapshot
}

func (FlowTrace) Unpack added in v2.4.2

func (t FlowTrace) Unpack() any

type IAction

type IAction interface {
	// contains filtered or unexported methods
}

type IFlowNode

type IFlowNode interface {
	IOutgoing
	Element() schema.FlowNodeInterface
}

type IOutgoing

type IOutgoing interface {
	NextAction(ctx context.Context, flow Flow) chan IAction
}

type InclusiveNoEffectiveSequenceFlows

type InclusiveNoEffectiveSequenceFlows struct {
	*schema.InclusiveGateway
}

func (InclusiveNoEffectiveSequenceFlows) Error

type IncomingFlowProcessedTrace

type IncomingFlowProcessedTrace struct {
	Node *schema.ParallelGateway
	Flow Flow
}

IncomingFlowProcessedTrace signals that a particular flow has been processed. If any action has been taken, it has already happened

func (IncomingFlowProcessedTrace) Unpack added in v2.4.2

func (t IncomingFlowProcessedTrace) Unpack() any

type InstanceTrace

type InstanceTrace struct {
	InstanceId id.Id
	Trace      tracing.ITrace
}

InstanceTrace wraps any trace with process instance id

func (InstanceTrace) Unpack added in v2.4.2

func (t InstanceTrace) Unpack() any

func (InstanceTrace) Unwrap

func (t InstanceTrace) Unwrap() tracing.ITrace

type InstantiationTrace

type InstantiationTrace struct {
	InstanceId id.Id
}

InstantiationTrace denotes instantiation of a given process

func (InstantiationTrace) Unpack added in v2.4.2

func (i InstantiationTrace) Unpack() any

type LeaveTrace

type LeaveTrace struct {
	Node schema.FlowNodeInterface
}

func (LeaveTrace) Unpack added in v2.4.2

func (t LeaveTrace) Unpack() any

type NewFlowNodeTrace

type NewFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (NewFlowNodeTrace) Unpack added in v2.4.2

func (t NewFlowNodeTrace) Unpack() any

type NewFlowTrace

type NewFlowTrace struct {
	FlowId id.Id
}

func (NewFlowTrace) Unpack added in v2.4.2

func (t NewFlowTrace) Unpack() any

type Option

type Option func(*Options)

Option allows to modify configuration of an instance in a flexible fashion (as it's just a modification function)

func WithContext

func WithContext(ctx context.Context) Option

WithContext will pass a given context to a new instance instead of implicitly generated one

func WithDataObjects

func WithDataObjects(dataObjects map[string]any) Option

func WithEventEgress

func WithEventEgress(source event.ISource) Option

func WithEventIngress

func WithEventIngress(consumer event.IConsumer) Option

func WithIdGenerator

func WithIdGenerator(idGenerator id.IGenerator) Option

func WithLocator

func WithLocator(locator data.IFlowDataLocator) Option

func WithProcessEventDefinitionInstanceBuilder added in v2.5.1

func WithProcessEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) Option

func WithTracer

func WithTracer(tracer tracing.ITracer) Option

WithTracer overrides instance's tracer

func WithVariables

func WithVariables(variables map[string]any) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions(opts ...Option) *Options

type Process

type Process struct {
	*Options
	// contains filtered or unexported fields
}

func NewProcess

func NewProcess(processElem *schema.Process, definitions *schema.Definitions, opts ...Option) (process *Process, err error)

func (*Process) ConsumeEvent added in v2.5.1

func (p *Process) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*Process) Element

func (p *Process) Element() *schema.Process

func (*Process) FlowNodeMapping added in v2.5.1

func (p *Process) FlowNodeMapping() *FlowNodeMapping

func (*Process) Id added in v2.5.1

func (p *Process) Id() id.Id

func (*Process) Locator added in v2.5.1

func (p *Process) Locator() data.IFlowDataLocator

func (*Process) RegisterEventConsumer added in v2.5.1

func (p *Process) RegisterEventConsumer(ev event.IConsumer) (err error)

func (*Process) StartAll added in v2.5.1

func (p *Process) StartAll(ctx context.Context) error

StartAll explicitly starts the instance by triggering all start events

func (*Process) StartWith added in v2.5.1

func (p *Process) StartWith(ctx context.Context, element schema.FlowNodeInterface) (err error)

StartWith explicitly starts the instance by triggering a given start event or throw event

func (*Process) ThrowAll added in v2.6.0

func (p *Process) ThrowAll(ctx context.Context) error

ThrowAll explicitly starts the instance by triggering all intermediate throw events.

func (*Process) Tracer

func (p *Process) Tracer() tracing.ITracer

func (*Process) WaitUntilComplete added in v2.5.1

func (p *Process) WaitUntilComplete(ctx context.Context) (complete bool)

WaitUntilComplete waits until the instance is complete. Returns true if the instance was complete, false if the context signaled `Done`

type ProcessLandMarkTrace

type ProcessLandMarkTrace struct {
	Node schema.FlowNodeInterface
}

ProcessLandMarkTrace denotes instantiation of a given sub process

func (ProcessLandMarkTrace) Unpack added in v2.4.2

func (t ProcessLandMarkTrace) Unpack() any

type ProcessSet added in v2.6.0

type ProcessSet struct {
	*Options
	// contains filtered or unexported fields
}

func NewProcessSet added in v2.6.0

func NewProcessSet(executeProcesses, waitingProcesses []*schema.Process, definitions *schema.Definitions, opts ...Option) (*ProcessSet, error)

func (*ProcessSet) Locator added in v2.6.0

func (ps *ProcessSet) Locator() data.IFlowDataLocator

func (*ProcessSet) StartAll added in v2.6.0

func (ps *ProcessSet) StartAll(ctx context.Context) error

func (*ProcessSet) Tracer added in v2.6.0

func (ps *ProcessSet) Tracer() tracing.ITracer

func (*ProcessSet) WaitUntilComplete added in v2.6.0

func (ps *ProcessSet) WaitUntilComplete(ctx context.Context) (complete bool)

WaitUntilComplete waits until the instance is complete. Returns true if the instance was complete, false if the context signaled `Done`

type ProcessTrace

type ProcessTrace struct {
	Process *schema.Process
	Trace   tracing.ITrace
}

ProcessTrace wraps any trace within a given process

func (ProcessTrace) Unpack added in v2.4.2

func (t ProcessTrace) Unpack() any

func (ProcessTrace) Unwrap

func (t ProcessTrace) Unwrap() tracing.ITrace

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Retry describes the retry handler for flow

func (*Retry) IsContinue

func (r *Retry) IsContinue() bool

func (*Retry) Reset

func (r *Retry) Reset(retries int32)

func (*Retry) Step

func (r *Retry) Step()

Step Attempts += 1

type SequenceFlow

type SequenceFlow struct {
	*schema.SequenceFlow
	// contains filtered or unexported fields
}

func MakeSequenceFlow

func MakeSequenceFlow(sf *schema.SequenceFlow, process schema.Element) SequenceFlow

func NewSequenceFlow

func NewSequenceFlow(sf *schema.SequenceFlow, process schema.Element) *SequenceFlow

func (*SequenceFlow) Source

func (sf *SequenceFlow) Source() (schema.FlowNodeInterface, error)

func (*SequenceFlow) Target

func (sf *SequenceFlow) Target() (schema.FlowNodeInterface, error)

func (*SequenceFlow) TargetIndex

func (sf *SequenceFlow) TargetIndex() (index int, err error)

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func (*Snapshot) Id

func (s *Snapshot) Id() id.Id

func (*Snapshot) SequenceFlow

func (s *Snapshot) SequenceFlow() *SequenceFlow

type TaskTrace

type TaskTrace interface {
	Unpack() any
	Context() context.Context
	GetActivity() Activity
	GetDataObjects() map[string]any
	GetHeaders() map[string]string
	GetProperties() map[string]any
	Do(options ...DoOption)
}

TaskTrace describes a common channel handler for all tasks

type Terminate

type Terminate func(sequenceFlowId *schema.IdRef) chan bool

type TerminationTrace

type TerminationTrace struct {
	FlowId id.Id
	Source schema.FlowNodeInterface
}

func (TerminationTrace) Unpack added in v2.4.2

func (t TerminationTrace) Unpack() any

type VisitTrace

type VisitTrace struct {
	Node schema.FlowNodeInterface
}

func (VisitTrace) Unpack added in v2.4.2

func (t VisitTrace) Unpack() any

Directories

Path Synopsis
examples
basic command
catch_event command
collaboration command
gateway command
gateway_expr command
multiprocess command
properties command
subprocess command
user_task command
pkg
id
logic
Package logic provides commonly shared "logic units" (or algorithms)
Package logic provides commonly shared "logic units" (or algorithms)
tracing
Package tracing is a capability to get an ordered stream of structured records of what has happened.
Package tracing is a capability to get an ordered stream of structured records of what has happened.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL