flowstate

package module
v0.0.0-...-48ed108 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: MIT Imports: 15 Imported by: 2

README

Flowstate

Flowstate is a lightweight Go library for building durable, long-running workflows with built-in state persistence, crash recovery, and flow orchestration. Whether you're managing retries, coordinating external services, or executing multi-step business logic, Flowstate ensures your progress is persisted and never lost, even in the face of panics, crashes, or OOMs.

You can even use Flowstate to power real-time applications like games. Watch it in action.

When to Use

Flowstate is a great fit when you need:

  • Durable, crash-resistant business workflows
  • Retry logic that survives restarts
  • Flow coordination without introducing heavyweight workflow engines
  • A minimal, embeddable orchestration layer for Go microservices
  • Distributed orchestration with persistence

Examples

Install

go get github.com/makasim/flowstate

Drivers

Flowstate supports multiple persistence backends via a simple driver interface:

  • In-memory
  • PostgreSQL
  • BadgerDB
  • Network driver

You can plug in your own driver by implementing the minimal flowstate.Driver interface.

Usage Modes

As a Library

Use Flowstate as an embedded Go library in your applications:

package main

import (
	"log"
	"log/slog"

	"github.com/makasim/flowstate"
	"github.com/makasim/flowstate/memdriver"
)

func main() {
	d := memdriver.New(slog.Default())
	fr := &flowstate.DefaultFlowRegistry{}
	e, err := flowstate.NewEngine(d, fr, slog.Default())
	if err != nil {
		log.Fatalf("cannot create engine: %v", err)
	}

	// Your workflow logic here
}
As a Server with Built-in UI

Run Flowstate as a standalone server with a web UI for monitoring and managing workflows:

go run app/flowstate.go

# Or using Docker
docker run -p 8080:8080 makasim/flowstate

The server works on :8080 and provides:

  • netdriver API
  • netflow API
  • UI

The server could be used as a driver for your applications:

package main

import (
	"log"
	"log/slog"

	"github.com/makasim/flowstate"
	"github.com/makasim/flowstate/netdriver"
)

func main() {
	d := netdriver.New(`http://localhost:8080`)
	fr := &flowstate.DefaultFlowRegistry{}
	e, err := flowstate.NewEngine(d, fr, slog.Default())
	if err != nil {
		log.Fatalf("cannot create engine: %v", err)
	}

	// Your workflow logic here
}

Contributing

Issues, feedback, and PRs are welcome!

License

MIT

Documentation

Index

Constants

View Source
const GetDelayedStatesDefaultLimit = 500
View Source
const GetStatesDefaultLimit = 50

Variables

View Source
var DefaultMaxRecoveryAttempts = 3
View Source
var DefaultRetryAfter = time.Minute * 2
View Source
var DelayCommitAnnotation = `flowstate.delay.commit`
View Source
var DelayUntilAnnotation = `flowstate.delay.until`
View Source
var ErrFlowNotFound = errors.New("flow not found")
View Source
var ErrNotFound = errors.New("state not found")
View Source
var MaxRecoveryAttemptsAnnotation = `flowstate.recovery.max_attempts`
View Source
var MaxRetryAfter = time.Minute * 5
View Source
var MinRetryAfter = time.Minute
View Source
var RecoveryAttemptAnnotation = `flowstate.recovery.attempt`
View Source
var RecoveryEnabledAnnotation = `flowstate.recovery.enabled`
View Source
var RetryAfterAnnotation = `flowstate.recovery.retry_after`

Functions

func Delayed

func Delayed(state State) bool

func DelayedUntil

func DelayedUntil(state State) time.Time

func DisableRecovery

func DisableRecovery(stateCtx *StateCtx)

func DoCommitSubCommand

func DoCommitSubCommand(d Driver, cmd0 Command) error

func IsErrRevMismatch

func IsErrRevMismatch(err error) bool

func IsErrRevMismatchContains

func IsErrRevMismatchContains(err error, sID StateID) bool

func LogCommand

func LogCommand(msg string, cmd0 Command, l *slog.Logger)

func MarshalCommand

func MarshalCommand(cmd Command, dst []byte) []byte

func MarshalData

func MarshalData(d *Data, dst []byte) []byte

func MarshalDelayedState

func MarshalDelayedState(ds DelayedState, dst []byte) []byte

func MarshalJSONCommand

func MarshalJSONCommand(cmd0 Command) ([]byte, error)

func MarshalJSONData

func MarshalJSONData(d *Data) ([]byte, error)

func MarshalJSONDelayedState

func MarshalJSONDelayedState(ds DelayedState) ([]byte, error)

func MarshalJSONState

func MarshalJSONState(s State) ([]byte, error)

func MarshalJSONStateCtx

func MarshalJSONStateCtx(s *StateCtx) ([]byte, error)

func MarshalJSONTransition

func MarshalJSONTransition(ts Transition) ([]byte, error)

func MarshalState

func MarshalState(s State, dst []byte) []byte

func MarshalStateCtx

func MarshalStateCtx(stateCtx *StateCtx, dst []byte) []byte

func MarshalTransition

func MarshalTransition(ts Transition, dst []byte) []byte

func MaxRecoveryAttempts

func MaxRecoveryAttempts(state State) int

func Parked

func Parked(state State) bool

func RecoveryAttempt

func RecoveryAttempt(state State) int

func SetMaxRecoveryAttempts

func SetMaxRecoveryAttempts(stateCtx *StateCtx, attempts int)

func SetRetryAfter

func SetRetryAfter(stateCtx *StateCtx, retryAfter time.Duration)

func UnmarshalData

func UnmarshalData(src []byte, d *Data) (err error)

func UnmarshalDelayedState

func UnmarshalDelayedState(src []byte, ds *DelayedState) (err error)

func UnmarshalJSONData

func UnmarshalJSONData(data []byte, d *Data) error

func UnmarshalJSONDelayedState

func UnmarshalJSONDelayedState(data []byte, ds *DelayedState) error

func UnmarshalJSONState

func UnmarshalJSONState(data []byte, s *State) error

func UnmarshalJSONStateCtx

func UnmarshalJSONStateCtx(data []byte, s *StateCtx) error

func UnmarshalJSONTransition

func UnmarshalJSONTransition(data []byte, ts *Transition) error

func UnmarshalState

func UnmarshalState(src []byte, s *State) (err error)

func UnmarshalStateCtx

func UnmarshalStateCtx(src []byte, stateCtx *StateCtx) (err error)

func UnmarshalTransition

func UnmarshalTransition(src []byte, ts *Transition) (err error)

Types

type Command

type Command interface {
	// contains filtered or unexported methods
}

func UnmarshalCommand

func UnmarshalCommand(src0 []byte) (Command, error)

func UnmarshalJSONCommand

func UnmarshalJSONCommand(data []byte) (Command, error)

type CommitCommand

type CommitCommand struct {
	Commands []Command
	// contains filtered or unexported fields
}

func Commit

func Commit(cmds ...Command) *CommitCommand

type CommittableCommand

type CommittableCommand interface {
	CommittableStateCtx() *StateCtx
}

type Data

type Data struct {
	Rev         int64             `json:"-"`
	Annotations map[string]string `json:"-"`
	Blob        []byte            `json:"-"`
	// contains filtered or unexported fields
}

func (*Data) CopyTo

func (d *Data) CopyTo(to *Data) *Data

func (*Data) IsBinary

func (d *Data) IsBinary() bool

func (*Data) Lock

func (*Data) Lock()

func (*Data) MarshalJSON

func (d *Data) MarshalJSON() ([]byte, error)

func (*Data) SetAnnotation

func (d *Data) SetAnnotation(name, value string)

func (*Data) SetBinary

func (d *Data) SetBinary(v bool)

func (*Data) Unlock

func (*Data) Unlock()

func (*Data) UnmarshalJSON

func (d *Data) UnmarshalJSON(data []byte) error

type DefaultFlowRegistry

type DefaultFlowRegistry struct {
	// contains filtered or unexported fields
}

func (*DefaultFlowRegistry) Flow

func (fr *DefaultFlowRegistry) Flow(id FlowID) (Flow, error)

func (*DefaultFlowRegistry) SetFlow

func (fr *DefaultFlowRegistry) SetFlow(id FlowID, flow Flow) error

func (*DefaultFlowRegistry) UnsetFlow

func (fr *DefaultFlowRegistry) UnsetFlow(id FlowID) error

type DelayCommand

type DelayCommand struct {
	StateCtx    *StateCtx
	ExecuteAt   time.Time
	Commit      bool
	To          FlowID
	Annotations map[string]string

	Result *DelayedState
	// contains filtered or unexported fields
}

func Delay

func Delay(stateCtx *StateCtx, to FlowID, dur time.Duration) *DelayCommand

func DelayUntil

func DelayUntil(stateCtx *StateCtx, to FlowID, executeAt time.Time) *DelayCommand

func (*DelayCommand) MustResult

func (cmd *DelayCommand) MustResult() DelayedState

func (*DelayCommand) Prepare

func (cmd *DelayCommand) Prepare() error

func (*DelayCommand) WithAnnotation

func (cmd *DelayCommand) WithAnnotation(name, value string) *DelayCommand

func (*DelayCommand) WithCommit

func (cmd *DelayCommand) WithCommit(commit bool) *DelayCommand

func (*DelayCommand) WithTransit

func (cmd *DelayCommand) WithTransit(to FlowID) *DelayCommand

type DelayedState

type DelayedState struct {
	State     State
	Offset    int64
	ExecuteAt time.Time
}

func (DelayedState) MarshalJSON

func (ds DelayedState) MarshalJSON() ([]byte, error)

func (*DelayedState) UnmarshalJSON

func (ds *DelayedState) UnmarshalJSON(data []byte) error

type Delayer

type Delayer struct {
	// contains filtered or unexported fields
}

func NewDelayer

func NewDelayer(e *Engine, l *slog.Logger) (*Delayer, error)

func (*Delayer) Shutdown

func (d *Delayer) Shutdown(ctx context.Context) error

type Driver

type Driver interface {
	// Init must be called by NewEngine only.
	Init(e *Engine) error

	GetStateByID(cmd *GetStateByIDCommand) error
	GetStateByLabels(cmd *GetStateByLabelsCommand) error
	GetStates(cmd *GetStatesCommand) error
	GetDelayedStates(cmd *GetDelayedStatesCommand) error
	Delay(cmd *DelayCommand) error
	Commit(cmd *CommitCommand) error
	GetData(cmd *GetDataCommand) error
	StoreData(cmd *StoreDataCommand) error
}

func NewCacheDriver

func NewCacheDriver(d Driver, maxSize int, l *slog.Logger) Driver

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(d Driver, fr FlowRegistry, l *slog.Logger) (*Engine, error)

func (*Engine) Do

func (e *Engine) Do(cmds ...Command) error

func (*Engine) Execute

func (e *Engine) Execute(stateCtx *StateCtx) error

func (*Engine) Iter

func (e *Engine) Iter(cmd *GetStatesCommand) *Iter

func (*Engine) Shutdown

func (e *Engine) Shutdown(ctx context.Context) error

type ErrRevMismatch

type ErrRevMismatch struct {
	IDS []StateID
}

ErrRevMismatch is an error that indicates a revision mismatch during a commit operation.

func (*ErrRevMismatch) Add

func (err *ErrRevMismatch) Add(id StateID)

func (*ErrRevMismatch) All

func (err *ErrRevMismatch) All() []StateID

func (ErrRevMismatch) As

func (err ErrRevMismatch) As(target interface{}) bool

func (*ErrRevMismatch) Contains

func (err *ErrRevMismatch) Contains(id StateID) bool

func (ErrRevMismatch) Error

func (err ErrRevMismatch) Error() string

type ExecuteCommand

type ExecuteCommand struct {
	StateCtx *StateCtx
	// contains filtered or unexported fields
}

func Execute

func Execute(stateCtx *StateCtx) *ExecuteCommand

type Flow

type Flow interface {
	Execute(stateCtx *StateCtx, e *Engine) (Command, error)
}

type FlowFunc

type FlowFunc func(stateCtx *StateCtx, e *Engine) (Command, error)

func (FlowFunc) Execute

func (f FlowFunc) Execute(stateCtx *StateCtx, e *Engine) (Command, error)

type FlowID

type FlowID string

type FlowRegistry

type FlowRegistry interface {
	Flow(id FlowID) (Flow, error)
	SetFlow(id FlowID, flow Flow) error
	UnsetFlow(id FlowID) error
}

type GetDataCommand

type GetDataCommand struct {
	StateCtx *StateCtx
	Alias    string
	// contains filtered or unexported fields
}

func GetData

func GetData(stateCtx *StateCtx, alias string) *GetDataCommand

func (*GetDataCommand) Prepare

func (cmd *GetDataCommand) Prepare() (bool, error)

type GetDelayedStatesCommand

type GetDelayedStatesCommand struct {
	Since time.Time
	Until time.Time
	// Offset is valid inside the since-until range.
	// Should be used to pagination results.
	Offset int64
	Limit  int

	Result *GetDelayedStatesResult
	// contains filtered or unexported fields
}

func GetDelayedStates

func GetDelayedStates(since, until time.Time, offset int64) *GetDelayedStatesCommand

func (*GetDelayedStatesCommand) MustResult

func (*GetDelayedStatesCommand) Prepare

func (cmd *GetDelayedStatesCommand) Prepare()

type GetDelayedStatesResult

type GetDelayedStatesResult struct {
	States []DelayedState
	More   bool
}

type GetStateByIDCommand

type GetStateByIDCommand struct {
	ID  StateID
	Rev int64

	StateCtx *StateCtx
	// contains filtered or unexported fields
}

func GetStateByID

func GetStateByID(stateCtx *StateCtx, id StateID, rev int64) *GetStateByIDCommand

func (*GetStateByIDCommand) Prepare

func (cmd *GetStateByIDCommand) Prepare() error

func (*GetStateByIDCommand) Result

func (cmd *GetStateByIDCommand) Result() (*StateCtx, error)

type GetStateByLabelsCommand

type GetStateByLabelsCommand struct {
	Labels map[string]string

	StateCtx *StateCtx
	// contains filtered or unexported fields
}

func GetStateByLabels

func GetStateByLabels(stateCtx *StateCtx, labels map[string]string) *GetStateByLabelsCommand

func (*GetStateByLabelsCommand) Result

func (cmd *GetStateByLabelsCommand) Result() (*StateCtx, error)

type GetStatesCommand

type GetStatesCommand struct {
	SinceRev   int64
	SinceTime  time.Time
	Labels     []map[string]string
	LatestOnly bool
	Limit      int

	Result *GetStatesResult
	// contains filtered or unexported fields
}

func GetStatesByLabels

func GetStatesByLabels(labels map[string]string) *GetStatesCommand

func (*GetStatesCommand) MustResult

func (cmd *GetStatesCommand) MustResult() *GetStatesResult

func (*GetStatesCommand) Prepare

func (cmd *GetStatesCommand) Prepare()

func (*GetStatesCommand) WithLatestOnly

func (cmd *GetStatesCommand) WithLatestOnly() *GetStatesCommand

func (*GetStatesCommand) WithLimit

func (cmd *GetStatesCommand) WithLimit(limit int) *GetStatesCommand

func (*GetStatesCommand) WithORLabels

func (cmd *GetStatesCommand) WithORLabels(labels map[string]string) *GetStatesCommand

func (*GetStatesCommand) WithSinceLatest

func (cmd *GetStatesCommand) WithSinceLatest() *GetStatesCommand

func (*GetStatesCommand) WithSinceRev

func (cmd *GetStatesCommand) WithSinceRev(rev int64) *GetStatesCommand

WithSinceRev sets SinceRev filter for the command. States with revision greater than SinceRev will be returned.

func (*GetStatesCommand) WithSinceTime

func (cmd *GetStatesCommand) WithSinceTime(since time.Time) *GetStatesCommand

type GetStatesResult

type GetStatesResult struct {
	States []State
	More   bool
}

type Iter

type Iter struct {
	Cmd *GetStatesCommand
	// contains filtered or unexported fields
}

func NewIter

func NewIter(d Driver, cmd *GetStatesCommand) *Iter

func (*Iter) Err

func (it *Iter) Err() error

Err returns the error encountered during iteration, if any It is expected to call Err only when Next() returned false The iterator cannot be used once Err returns a non-nil error Create a new iterator using Iter.Cmd.

func (*Iter) Next

func (it *Iter) Next() bool

Next advances the iterator to the next state It returns true if there is a next state, false otherwise It is expected to call Next repeatedly until it returns false

func (*Iter) State

func (it *Iter) State() State

State returns the current state in the iteration It is expected to call State only when Next() returned true

func (*Iter) Wait

func (it *Iter) Wait(ctx context.Context)

Wait returns when new states are available or the context is done It is expected to call Wait only when Next() returned false It is expected to call Next() again after Wait() returns You can optionally check ctx.Err() to see if the context was done to break the loop

type NoopCommand

type NoopCommand struct {
	// contains filtered or unexported fields
}

func Noop

func Noop() *NoopCommand

type ParkCommand

type ParkCommand struct {
	StateCtx    *StateCtx
	Annotations map[string]string
	// contains filtered or unexported fields
}

func Park

func Park(stateCtx *StateCtx) *ParkCommand

func (*ParkCommand) CommittableStateCtx

func (cmd *ParkCommand) CommittableStateCtx() *StateCtx

func (*ParkCommand) Do

func (cmd *ParkCommand) Do() error

func (*ParkCommand) WithAnnotation

func (cmd *ParkCommand) WithAnnotation(name, value string) *ParkCommand

func (*ParkCommand) WithAnnotations

func (cmd *ParkCommand) WithAnnotations(annotations map[string]string) *ParkCommand

type Recoverer

type Recoverer struct {
	// contains filtered or unexported fields
}

func NewRecoverer

func NewRecoverer(e *Engine, l *slog.Logger) (*Recoverer, error)

func (*Recoverer) Shutdown

func (r *Recoverer) Shutdown(ctx context.Context) error

func (*Recoverer) Stats

func (r *Recoverer) Stats() RecovererStats

type RecovererStats

type RecovererStats struct {
	HeadRev  int64
	HeadTime time.Time
	TailRev  int64
	TailTime time.Time

	Commited  int64
	Added     int64
	Completed int64
	Retried   int64
	Dropped   int64

	Active bool
}

type StackCommand

type StackCommand struct {
	StackedStateCtx *StateCtx
	CarrierStateCtx *StateCtx
	Annotation      string
	// contains filtered or unexported fields
}

func Stack

func Stack(carrierStateCtx, stackStateCtx *StateCtx, annotation string) *StackCommand

func (*StackCommand) Do

func (cmd *StackCommand) Do() error

type State

type State struct {
	ID          StateID
	Rev         int64
	Annotations map[string]string
	Labels      map[string]string

	CommittedAt time.Time

	Transition Transition
}

func (State) Annotation

func (s State) Annotation(name string) string

func (*State) CopyTo

func (s *State) CopyTo(to *State) State

func (*State) CopyToCtx

func (s *State) CopyToCtx(to *StateCtx) *StateCtx

func (State) MarshalJSON

func (s State) MarshalJSON() ([]byte, error)

func (*State) SetAnnotation

func (s *State) SetAnnotation(name, value string)

func (*State) SetLabel

func (s *State) SetLabel(name, value string)

func (*State) UnmarshalJSON

func (s *State) UnmarshalJSON(data []byte) error

type StateCtx

type StateCtx struct {
	Current   State
	Committed State

	Datas map[string]*Data

	// Transitions between committed and current states
	Transitions []Transition
	// contains filtered or unexported fields
}

func (*StateCtx) CopyTo

func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx

func (*StateCtx) Data

func (s *StateCtx) Data(name string) (*Data, error)

func (*StateCtx) Deadline

func (s *StateCtx) Deadline() (time.Time, bool)

func (*StateCtx) Done

func (s *StateCtx) Done() <-chan struct{}

func (*StateCtx) Err

func (s *StateCtx) Err() error

func (*StateCtx) MarshalJSON

func (s *StateCtx) MarshalJSON() ([]byte, error)

func (*StateCtx) MustData

func (s *StateCtx) MustData(name string) *Data

func (*StateCtx) NewTo

func (s *StateCtx) NewTo(id StateID, to *StateCtx) *StateCtx

func (*StateCtx) SetData

func (s *StateCtx) SetData(name string, d *Data)

func (*StateCtx) UnmarshalJSON

func (s *StateCtx) UnmarshalJSON(data []byte) error

func (*StateCtx) Value

func (s *StateCtx) Value(key any) any

type StateID

type StateID string

type StoreDataCommand

type StoreDataCommand struct {
	StateCtx *StateCtx
	Alias    string
	// contains filtered or unexported fields
}

func StoreData

func StoreData(stateCtx *StateCtx, alias string) *StoreDataCommand

func (*StoreDataCommand) Prepare

func (cmd *StoreDataCommand) Prepare() (bool, error)

type TransitCommand

type TransitCommand struct {
	StateCtx    *StateCtx
	Annotations map[string]string
	To          FlowID
	// contains filtered or unexported fields
}

func Transit

func Transit(stateCtx *StateCtx, to FlowID) *TransitCommand

func (*TransitCommand) CommittableStateCtx

func (cmd *TransitCommand) CommittableStateCtx() *StateCtx

func (*TransitCommand) Do

func (cmd *TransitCommand) Do() error

func (*TransitCommand) WithAnnotation

func (cmd *TransitCommand) WithAnnotation(name, value string) *TransitCommand

func (*TransitCommand) WithAnnotations

func (cmd *TransitCommand) WithAnnotations(annotations map[string]string) *TransitCommand

type Transition

type Transition struct {
	To          FlowID
	Annotations map[string]string
}

func (*Transition) CopyTo

func (ts *Transition) CopyTo(to *Transition)

func (*Transition) MarshalJSON

func (ts *Transition) MarshalJSON() ([]byte, error)

func (*Transition) SetAnnotation

func (ts *Transition) SetAnnotation(name, value string)

func (*Transition) String

func (ts *Transition) String() string

func (*Transition) UnmarshalJSON

func (ts *Transition) UnmarshalJSON(data []byte) error

type UnstackCommand

type UnstackCommand struct {
	CarrierStateCtx *StateCtx
	UnstackStateCtx *StateCtx
	Annotation      string
	// contains filtered or unexported fields
}

func Unstack

func Unstack(carrierStateCtx, unstackStateCtx *StateCtx, annotation string) *UnstackCommand

func (*UnstackCommand) Do

func (cmd *UnstackCommand) Do() error

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func NewWatcher

func NewWatcher(e *Engine, cmd *GetStatesCommand) *Watcher

func (*Watcher) Close

func (w *Watcher) Close()

func (*Watcher) Next

func (w *Watcher) Next() <-chan State

Directories

Path Synopsis
delayed_execute command
durable_execute command
queue command
state_machine command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL