bpmn

package module
v2.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

README

Introduce | 中文

Lightweight BPMN 2.0 workflow engine written in Go

Go Reference Build Status Last Commit


What Is This?

github.com/olive-io/bpmn is an embeddable BPMN 2.0 runtime for Go applications.

Use it when you want to:

  • Execute BPMN process definitions inside your Go service.
  • Handle user/service/script tasks with your own business logic.
  • Track process runtime via trace streams for debugging and observability.

It includes two modules:

  • Runtime module: github.com/olive-io/bpmn/v2
  • BPMN schema module: github.com/olive-io/bpmn/schema

Installation

go get -u github.com/olive-io/bpmn/schema
go get -u github.com/olive-io/bpmn/v2

Quick Start

This example loads a BPMN file, starts a process, handles task traces, and waits for completion.

package main

import (
	"context"
	"log"
	"os"

	"github.com/olive-io/bpmn/schema"
	"github.com/olive-io/bpmn/v2"
	"github.com/olive-io/bpmn/v2/pkg/tracing"
)

func main() {
	data, err := os.ReadFile("task.bpmn")
	if err != nil {
		log.Fatalf("read bpmn file: %v", err)
	}

	definitions, err := schema.Parse(data)
	if err != nil {
		log.Fatalf("parse bpmn xml: %v", err)
	}

	engine := bpmn.NewEngine()
	ctx := context.Background()

	proc, err := engine.NewProcess(definitions,
		bpmn.WithVariables(map[string]any{"customer": "alice"}),
	)
	if err != nil {
		log.Fatalf("create process: %v", err)
	}

	traces := proc.Tracer().Subscribe()
	defer proc.Tracer().Unsubscribe(traces)

	if err := proc.StartAll(ctx); err != nil {
		log.Fatalf("start process: %v", err)
	}

	go func() {
		for trace := range traces {
			trace = tracing.Unwrap(trace)
			switch t := trace.(type) {
			case bpmn.TaskTrace:
				// Complete user/service/script task with results.
				t.Do(bpmn.DoWithResults(map[string]any{"approved": true}))
			case bpmn.ErrorTrace:
				log.Printf("process error: %v", t.Error)
			}
		}
	}()

	if ok := proc.WaitUntilComplete(ctx); !ok {
		log.Printf("process cancelled")
	}

	log.Printf("variables: %#v", proc.Locator().CloneVariables())
}

Core Concepts

  • Engine: creates process instances from BPMN definitions.
  • Process: executes one executable process.
  • ProcessSet: executes multiple executable processes in one definition.
  • Tracer: runtime event stream; all traces are accessible via subscribe/unsubscribe.
  • TaskTrace: callback point to provide task result, data objects, headers, and errors.

Single Process vs Process Set

Use NewProcess when your BPMN definitions contain exactly one executable process.

Use NewProcessSet when definitions contain multiple executable processes.

proc, err := engine.NewProcess(definitions)
// Returns an error if multiple executable processes are present.

set, err := engine.NewProcessSet(&definitions)
// Runs all executable processes in the same definitions.

Observability and Runtime Errors

Subscribe tracer events to inspect runtime behavior:

  • FlowTrace: sequence flow transitions.
  • TaskTrace: task is waiting for external decision/result.
  • ErrorTrace: runtime error happened.
  • CeaseFlowTrace / CeaseProcessSetTrace: process or process set completed.

ID generator fallback behavior:

  • Runtime attempts to initialize the default SNO-based ID generator.
  • If initialization fails, runtime falls back to a local generator.
  • A warning trace is emitted to make this visible in observability pipelines.

BPMN Coverage

This engine supports core BPMN elements used by most service workflows, including:

  • Tasks: User, Service, Script, Manual, Business Rule, Call Activity.
  • Events: Start, End, Intermediate Catch/Throw, Timer-related flows.
  • Gateways: Exclusive, Inclusive, Parallel, Event-based.
  • Sub-processes and sequence flows.

For concrete usage patterns, see examples and tests in this repository.


Development Commands

From repository root:

# all runtime tests
go test -v ./...

# schema module tests
go test -v ./schema

# static analysis
go vet ./...

# run one test by name
go test -v ./... -run 'TestUserTask'

# run one test in a package, bypass cache
go test -v ./pkg/data -run '^TestContainer$' -count=1

Examples


Contributing

PRs are welcome. For contributor/agent workflow guidance, see AGENTS.md.


License

Apache-2.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyTaskDataOutput

func ApplyTaskDataOutput(element schema.BaseElementInterface, dataOutputs map[string]any) map[string]data.IItem

func ApplyTaskResult added in v2.2.0

func ApplyTaskResult(element schema.BaseElementInterface, results map[string]any) map[string]data.IItem

func FetchTaskDataInput

func FetchTaskDataInput(locator data.IFlowDataLocator, element schema.BaseElementInterface) (headers map[string]string, properties, dataObjects map[string]data.IItem)

func FetchTaskTimeout added in v2.2.4

func FetchTaskTimeout(element schema.BaseElementInterface) time.Duration

FetchTaskTimeout returns timeout by schema.BaseElementInterface

Types

type ActionTransformer

type ActionTransformer func(sequenceFlowId *schema.IdRef, action IAction) IAction

type ActiveBoundaryTrace

type ActiveBoundaryTrace struct {
	Start bool
	Node  schema.FlowNodeInterface
}

func (ActiveBoundaryTrace) Unpack added in v2.4.2

func (b ActiveBoundaryTrace) Unpack() any

type ActiveListeningTrace

type ActiveListeningTrace struct {
	Node *schema.CatchEvent
}

func (ActiveListeningTrace) Unpack added in v2.4.2

func (t ActiveListeningTrace) Unpack() any

type Activity

type Activity interface {
	IFlowNode
	Type() ActivityType
	// Cancel initiates a cancellation of activity and returns a channel
	// that will signal a boolean (`true` if cancellation was successful,
	// `false` otherwise)
	Cancel() <-chan bool
}

Activity is a generic interface to flow nodes that are activities

type ActivityType added in v2.3.0

type ActivityType string
const (
	TaskActivity         ActivityType = "Task"
	ServiceTaskActivity  ActivityType = "ServiceTask"
	ScriptTaskActivity   ActivityType = "ScriptTask"
	UserTaskActivity     ActivityType = "UserTask"
	ManualTaskActivity   ActivityType = "ManualTask"
	CallActivity         ActivityType = "CallActivity"
	BusinessRuleActivity ActivityType = "BusinessRuleTask"
	SendTaskActivity     ActivityType = "SendTask"
	ReceiveTaskActivity  ActivityType = "ReceiveTask"
	SubprocessActivity   ActivityType = "Subprocess"
)

type CancellationFlowNodeTrace

type CancellationFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (CancellationFlowNodeTrace) Unpack added in v2.4.2

func (t CancellationFlowNodeTrace) Unpack() any

type CancellationFlowTrace

type CancellationFlowTrace struct {
	FlowId id.Id
	Node   schema.FlowNodeInterface
}

func (CancellationFlowTrace) Unpack added in v2.4.2

func (t CancellationFlowTrace) Unpack() any

type CeaseFlowTrace

type CeaseFlowTrace struct {
	Process schema.Element
}

func (CeaseFlowTrace) Unpack added in v2.4.2

func (t CeaseFlowTrace) Unpack() any

type CeaseProcessSetTrace added in v2.6.0

type CeaseProcessSetTrace struct {
	Definitions *schema.Definitions
}

func (CeaseProcessSetTrace) Unpack added in v2.6.0

func (t CeaseProcessSetTrace) Unpack() any

type CompletionTrace

type CompletionTrace struct {
	Node schema.FlowNodeInterface
}

func (CompletionTrace) Unpack added in v2.4.2

func (t CompletionTrace) Unpack() any

type DeterminationMadeTrace

type DeterminationMadeTrace struct {
	Node schema.FlowNodeInterface
}

func (DeterminationMadeTrace) Unpack added in v2.4.2

func (t DeterminationMadeTrace) Unpack() any

type DoOption

type DoOption func(*DoResponse)

func DoWithErr added in v2.2.0

func DoWithErr(err error) DoOption

func DoWithErrHandle added in v2.2.0

func DoWithErrHandle(err error, ch <-chan ErrHandler) DoOption

func DoWithObjects added in v2.2.0

func DoWithObjects(dataObjects map[string]any) DoOption

func DoWithResults added in v2.2.0

func DoWithResults(results map[string]any) DoOption

func DoWithValue added in v2.2.0

func DoWithValue(key, value any) DoOption

type DoResponse

type DoResponse struct {
	Context     context.Context
	DataObjects map[string]any
	Results     map[string]any
	Err         error
	HandlerCh   <-chan ErrHandler
}

type Engine added in v2.5.1

type Engine struct {
	*EngineOptions
}

func NewEngine added in v2.5.1

func NewEngine(opts ...EngineOption) *Engine

func (*Engine) NewProcess added in v2.5.1

func (engine *Engine) NewProcess(definitions *schema.Definitions, opts ...Option) (process *Process, err error)

func (*Engine) NewProcessSet added in v2.6.0

func (engine *Engine) NewProcessSet(definitions *schema.Definitions, opts ...Option) (*ProcessSet, error)

type EngineOption added in v2.5.1

type EngineOption func(*EngineOptions)

func WithEngineContext added in v2.5.5

func WithEngineContext(ctx context.Context) EngineOption

func WithEventDefinitionInstanceBuilder

func WithEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) EngineOption

func WithIdGeneratorBuilder added in v2.5.1

func WithIdGeneratorBuilder(idGeneratorBuilder id.IGeneratorBuilder) EngineOption

type EngineOptions added in v2.5.1

type EngineOptions struct {
	// contains filtered or unexported fields
}

func NewEngineOptions added in v2.5.1

func NewEngineOptions(opts ...EngineOption) *EngineOptions

type ErrHandleMode

type ErrHandleMode int
const (
	RetryMode ErrHandleMode = iota + 1
	SkipMode
	ExitMode
)

type ErrHandler

type ErrHandler struct {
	Mode    ErrHandleMode
	Retries int32
}

type ErrorTrace

type ErrorTrace struct {
	Error error
}

func (ErrorTrace) Unpack added in v2.4.2

func (t ErrorTrace) Unpack() any

type EventObservedTrace

type EventObservedTrace struct {
	Node  *schema.CatchEvent
	Event event.IEvent
}

EventObservedTrace signals the fact that a particular event has in fact observed by the node

func (EventObservedTrace) Unpack added in v2.4.2

func (t EventObservedTrace) Unpack() any

type ExclusiveNoEffectiveSequenceFlows

type ExclusiveNoEffectiveSequenceFlows struct {
	*schema.ExclusiveGateway
}

func (ExclusiveNoEffectiveSequenceFlows) Error

type Flow

type Flow interface {
	// Id returns flow's unique identifier
	Id() id.Id
	// SequenceFlow returns an inbound sequence flow this flow
	// is currently at.
	SequenceFlow() *SequenceFlow
}

Flow specifies an interface for BPMN flows

type FlowActionResponse

type FlowActionResponse struct {
	// contains filtered or unexported fields
}

type FlowNodeMapping

type FlowNodeMapping struct {
	// contains filtered or unexported fields
}

func NewLockedFlowNodeMapping

func NewLockedFlowNodeMapping() *FlowNodeMapping

func (*FlowNodeMapping) Finalize

func (mapping *FlowNodeMapping) Finalize()

func (*FlowNodeMapping) RegisterElementToFlowNode

func (mapping *FlowNodeMapping) RegisterElementToFlowNode(element schema.FlowNodeInterface, flowNode IFlowNode) error

func (*FlowNodeMapping) ResolveElementToFlowNode

func (mapping *FlowNodeMapping) ResolveElementToFlowNode(element schema.FlowNodeInterface) (flowNode IFlowNode, found bool)

type FlowTrace

type FlowTrace struct {
	Source schema.FlowNodeInterface
	Flows  []Snapshot
}

func (FlowTrace) Unpack added in v2.4.2

func (t FlowTrace) Unpack() any

type IAction

type IAction interface {
	// contains filtered or unexported methods
}

type IFlowNode

type IFlowNode interface {
	IOutgoing
	Element() schema.FlowNodeInterface
}

type IOutgoing

type IOutgoing interface {
	NextAction(ctx context.Context, flow Flow) chan IAction
}

type InclusiveNoEffectiveSequenceFlows

type InclusiveNoEffectiveSequenceFlows struct {
	*schema.InclusiveGateway
}

func (InclusiveNoEffectiveSequenceFlows) Error

type IncomingFlowProcessedTrace

type IncomingFlowProcessedTrace struct {
	Node *schema.ParallelGateway
	Flow Flow
}

IncomingFlowProcessedTrace signals that a particular flow has been processed. If any action has been taken, it has already happened

func (IncomingFlowProcessedTrace) Unpack added in v2.4.2

func (t IncomingFlowProcessedTrace) Unpack() any

type InstanceTrace

type InstanceTrace struct {
	InstanceId id.Id
	Trace      tracing.ITrace
}

InstanceTrace wraps any trace with process instance id

func (InstanceTrace) Unpack added in v2.4.2

func (t InstanceTrace) Unpack() any

func (InstanceTrace) Unwrap

func (t InstanceTrace) Unwrap() tracing.ITrace

type InstantiationTrace

type InstantiationTrace struct {
	InstanceId id.Id
}

InstantiationTrace denotes instantiation of a given process

func (InstantiationTrace) Unpack added in v2.4.2

func (i InstantiationTrace) Unpack() any

type LeaveTrace

type LeaveTrace struct {
	Node schema.FlowNodeInterface
}

func (LeaveTrace) Unpack added in v2.4.2

func (t LeaveTrace) Unpack() any

type NewFlowNodeTrace

type NewFlowNodeTrace struct {
	Node schema.FlowNodeInterface
}

func (NewFlowNodeTrace) Unpack added in v2.4.2

func (t NewFlowNodeTrace) Unpack() any

type NewFlowTrace

type NewFlowTrace struct {
	FlowId id.Id
}

func (NewFlowTrace) Unpack added in v2.4.2

func (t NewFlowTrace) Unpack() any

type Option

type Option func(*Options)

Option allows to modify configuration of an instance in a flexible fashion (as it's just a modification function)

func WithContext

func WithContext(ctx context.Context) Option

WithContext will pass a given context to a new instance instead of implicitly generated one

func WithDataObjects

func WithDataObjects(dataObjects map[string]any) Option

func WithEventEgress

func WithEventEgress(source event.ISource) Option

func WithEventIngress

func WithEventIngress(consumer event.IConsumer) Option

func WithIdGenerator

func WithIdGenerator(idGenerator id.IGenerator) Option

func WithLocator

func WithLocator(locator data.IFlowDataLocator) Option

func WithProcessEventDefinitionInstanceBuilder added in v2.5.1

func WithProcessEventDefinitionInstanceBuilder(eventDefinitionInstanceBuilder event.IDefinitionInstanceBuilder) Option

func WithTracer

func WithTracer(tracer tracing.ITracer) Option

WithTracer overrides instance's tracer

func WithVariables

func WithVariables(variables map[string]any) Option

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions(opts ...Option) *Options

type Process

type Process struct {
	*Options
	// contains filtered or unexported fields
}

func NewProcess

func NewProcess(processElem *schema.Process, definitions *schema.Definitions, opts ...Option) (process *Process, err error)

func (*Process) ConsumeEvent added in v2.5.1

func (p *Process) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error)

func (*Process) Element

func (p *Process) Element() *schema.Process

func (*Process) FlowNodeMapping added in v2.5.1

func (p *Process) FlowNodeMapping() *FlowNodeMapping

func (*Process) Id added in v2.5.1

func (p *Process) Id() id.Id

func (*Process) Locator added in v2.5.1

func (p *Process) Locator() data.IFlowDataLocator

func (*Process) RegisterEventConsumer added in v2.5.1

func (p *Process) RegisterEventConsumer(ev event.IConsumer) (err error)

func (*Process) StartAll added in v2.5.1

func (p *Process) StartAll(ctx context.Context) error

StartAll explicitly starts the instance by triggering all start events

func (*Process) StartWith added in v2.5.1

func (p *Process) StartWith(ctx context.Context, element schema.FlowNodeInterface) (err error)

StartWith explicitly starts the instance by triggering a given start event or throw event

func (*Process) ThrowAll added in v2.6.0

func (p *Process) ThrowAll(ctx context.Context) error

ThrowAll explicitly starts the instance by triggering all intermediate throw events.

func (*Process) Tracer

func (p *Process) Tracer() tracing.ITracer

func (*Process) WaitUntilComplete added in v2.5.1

func (p *Process) WaitUntilComplete(ctx context.Context) (complete bool)

WaitUntilComplete waits until the instance is complete. Returns true if the instance was complete, false if the context signaled `Done`

type ProcessLandMarkTrace

type ProcessLandMarkTrace struct {
	Node schema.FlowNodeInterface
}

ProcessLandMarkTrace denotes instantiation of a given sub process

func (ProcessLandMarkTrace) Unpack added in v2.4.2

func (t ProcessLandMarkTrace) Unpack() any

type ProcessSet added in v2.6.0

type ProcessSet struct {
	*Options
	// contains filtered or unexported fields
}

func NewProcessSet added in v2.6.0

func NewProcessSet(executeProcesses, waitingProcesses []*schema.Process, definitions *schema.Definitions, opts ...Option) (*ProcessSet, error)

func (*ProcessSet) Locator added in v2.6.0

func (ps *ProcessSet) Locator() data.IFlowDataLocator

func (*ProcessSet) StartAll added in v2.6.0

func (ps *ProcessSet) StartAll(ctx context.Context) error

func (*ProcessSet) Tracer added in v2.6.0

func (ps *ProcessSet) Tracer() tracing.ITracer

func (*ProcessSet) WaitUntilComplete added in v2.6.0

func (ps *ProcessSet) WaitUntilComplete(ctx context.Context) (complete bool)

WaitUntilComplete waits until the instance is complete. Returns true if the instance was complete, false if the context signaled `Done`

type ProcessTrace

type ProcessTrace struct {
	Process *schema.Process
	Trace   tracing.ITrace
}

ProcessTrace wraps any trace within a given process

func (ProcessTrace) Unpack added in v2.4.2

func (t ProcessTrace) Unpack() any

func (ProcessTrace) Unwrap

func (t ProcessTrace) Unwrap() tracing.ITrace

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Retry describes the retry handler for flow

func (*Retry) IsContinue

func (r *Retry) IsContinue() bool

func (*Retry) Reset

func (r *Retry) Reset(retries int32)

func (*Retry) Step

func (r *Retry) Step()

Step Attempts += 1

type SequenceFlow

type SequenceFlow struct {
	*schema.SequenceFlow
	// contains filtered or unexported fields
}

func MakeSequenceFlow

func MakeSequenceFlow(sf *schema.SequenceFlow, process schema.Element) SequenceFlow

func NewSequenceFlow

func NewSequenceFlow(sf *schema.SequenceFlow, process schema.Element) *SequenceFlow

func (*SequenceFlow) Source

func (sf *SequenceFlow) Source() (schema.FlowNodeInterface, error)

func (*SequenceFlow) Target

func (sf *SequenceFlow) Target() (schema.FlowNodeInterface, error)

func (*SequenceFlow) TargetIndex

func (sf *SequenceFlow) TargetIndex() (index int, err error)

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

func (*Snapshot) Id

func (s *Snapshot) Id() id.Id

func (*Snapshot) SequenceFlow

func (s *Snapshot) SequenceFlow() *SequenceFlow

type TaskTrace

type TaskTrace interface {
	Unpack() any
	Context() context.Context
	GetActivity() Activity
	GetDataObjects() map[string]data.IItem
	GetHeaders() map[string]string
	GetProperties() map[string]data.IItem
	Do(options ...DoOption)
}

TaskTrace describes a common channel handler for all tasks

type Terminate

type Terminate func(sequenceFlowId *schema.IdRef) chan bool

type TerminationTrace

type TerminationTrace struct {
	FlowId id.Id
	Source schema.FlowNodeInterface
}

func (TerminationTrace) Unpack added in v2.4.2

func (t TerminationTrace) Unpack() any

type VisitTrace

type VisitTrace struct {
	Node schema.FlowNodeInterface
}

func (VisitTrace) Unpack added in v2.4.2

func (t VisitTrace) Unpack() any

Directories

Path Synopsis
examples
basic command
catch_event command
collaboration command
gateway command
gateway_expr command
multiprocess command
properties command
subprocess command
user_task command
pkg
id
logic
Package logic provides commonly shared "logic units" (or algorithms)
Package logic provides commonly shared "logic units" (or algorithms)
tracing
Package tracing is a capability to get an ordered stream of structured records of what has happened.
Package tracing is a capability to get an ordered stream of structured records of what has happened.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL