Documentation
¶
Index ¶
- Constants
- Variables
- func Delayed(state State) bool
- func DelayedUntil(state State) time.Time
- func DisableRecovery(stateCtx *StateCtx)
- func DoCommitSubCommand(d Driver, cmd0 Command) error
- func IsErrRevMismatch(err error) bool
- func IsErrRevMismatchContains(err error, sID StateID) bool
- func LogCommand(msg string, cmd0 Command, l *slog.Logger)
- func MarshalCommand(cmd Command, dst []byte) []byte
- func MarshalData(d *Data, dst []byte) []byte
- func MarshalDelayedState(ds DelayedState, dst []byte) []byte
- func MarshalJSONCommand(cmd0 Command) ([]byte, error)
- func MarshalJSONData(d *Data) ([]byte, error)
- func MarshalJSONDelayedState(ds DelayedState) ([]byte, error)
- func MarshalJSONState(s State) ([]byte, error)
- func MarshalJSONStateCtx(s *StateCtx) ([]byte, error)
- func MarshalJSONTransition(ts Transition) ([]byte, error)
- func MarshalState(s State, dst []byte) []byte
- func MarshalStateCtx(stateCtx *StateCtx, dst []byte) []byte
- func MarshalTransition(ts Transition, dst []byte) []byte
- func MaxRecoveryAttempts(state State) int
- func Parked(state State) bool
- func RecoveryAttempt(state State) int
- func SetMaxRecoveryAttempts(stateCtx *StateCtx, attempts int)
- func SetRetryAfter(stateCtx *StateCtx, retryAfter time.Duration)
- func UnmarshalData(src []byte, d *Data) (err error)
- func UnmarshalDelayedState(src []byte, ds *DelayedState) (err error)
- func UnmarshalJSONData(data []byte, d *Data) error
- func UnmarshalJSONDelayedState(data []byte, ds *DelayedState) error
- func UnmarshalJSONState(data []byte, s *State) error
- func UnmarshalJSONStateCtx(data []byte, s *StateCtx) error
- func UnmarshalJSONTransition(data []byte, ts *Transition) error
- func UnmarshalState(src []byte, s *State) (err error)
- func UnmarshalStateCtx(src []byte, stateCtx *StateCtx) (err error)
- func UnmarshalTransition(src []byte, ts *Transition) (err error)
- type Command
- type CommitCommand
- type CommittableCommand
- type Data
- type DefaultFlowRegistry
- type DelayCommand
- type DelayedState
- type Delayer
- type Driver
- type Engine
- type ErrRevMismatch
- type ExecuteCommand
- type Flow
- type FlowFunc
- type FlowID
- type FlowRegistry
- type GetDataCommand
- type GetDelayedStatesCommand
- type GetDelayedStatesResult
- type GetStateByIDCommand
- type GetStateByLabelsCommand
- type GetStatesCommand
- func (cmd *GetStatesCommand) MustResult() *GetStatesResult
- func (cmd *GetStatesCommand) Prepare()
- func (cmd *GetStatesCommand) WithLatestOnly() *GetStatesCommand
- func (cmd *GetStatesCommand) WithLimit(limit int) *GetStatesCommand
- func (cmd *GetStatesCommand) WithORLabels(labels map[string]string) *GetStatesCommand
- func (cmd *GetStatesCommand) WithSinceLatest() *GetStatesCommand
- func (cmd *GetStatesCommand) WithSinceRev(rev int64) *GetStatesCommand
- func (cmd *GetStatesCommand) WithSinceTime(since time.Time) *GetStatesCommand
- type GetStatesResult
- type Iter
- type NoopCommand
- type ParkCommand
- type Recoverer
- type RecovererStats
- type StackCommand
- type State
- func (s State) Annotation(name string) string
- func (s *State) CopyTo(to *State) State
- func (s *State) CopyToCtx(to *StateCtx) *StateCtx
- func (s State) MarshalJSON() ([]byte, error)
- func (s *State) SetAnnotation(name, value string)
- func (s *State) SetLabel(name, value string)
- func (s *State) UnmarshalJSON(data []byte) error
- type StateCtx
- func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx
- func (s *StateCtx) Data(name string) (*Data, error)
- func (s *StateCtx) Deadline() (time.Time, bool)
- func (s *StateCtx) Done() <-chan struct{}
- func (s *StateCtx) Err() error
- func (s *StateCtx) MarshalJSON() ([]byte, error)
- func (s *StateCtx) MustData(name string) *Data
- func (s *StateCtx) NewTo(id StateID, to *StateCtx) *StateCtx
- func (s *StateCtx) SetData(name string, d *Data)
- func (s *StateCtx) UnmarshalJSON(data []byte) error
- func (s *StateCtx) Value(key any) any
- type StateID
- type StoreDataCommand
- type TransitCommand
- type Transition
- type UnstackCommand
- type Watcher
Constants ¶
View Source
const GetDelayedStatesDefaultLimit = 500
View Source
const GetStatesDefaultLimit = 50
Variables ¶
View Source
var DefaultMaxRecoveryAttempts = 3
View Source
var DefaultRetryAfter = time.Minute * 2
View Source
var DelayCommitAnnotation = `flowstate.delay.commit`
View Source
var DelayUntilAnnotation = `flowstate.delay.until`
View Source
var ErrFlowNotFound = errors.New("flow not found")
View Source
var ErrNotFound = errors.New("state not found")
View Source
var MaxRecoveryAttemptsAnnotation = `flowstate.recovery.max_attempts`
View Source
var MaxRetryAfter = time.Minute * 5
View Source
var MinRetryAfter = time.Minute
View Source
var RecoveryAttemptAnnotation = `flowstate.recovery.attempt`
View Source
var RecoveryEnabledAnnotation = `flowstate.recovery.enabled`
View Source
var RetryAfterAnnotation = `flowstate.recovery.retry_after`
Functions ¶
func DelayedUntil ¶
func DisableRecovery ¶
func DisableRecovery(stateCtx *StateCtx)
func DoCommitSubCommand ¶
func IsErrRevMismatch ¶
func MarshalCommand ¶
func MarshalData ¶
func MarshalDelayedState ¶
func MarshalDelayedState(ds DelayedState, dst []byte) []byte
func MarshalJSONCommand ¶
func MarshalJSONData ¶
func MarshalJSONDelayedState ¶
func MarshalJSONDelayedState(ds DelayedState) ([]byte, error)
func MarshalJSONState ¶
func MarshalJSONStateCtx ¶
func MarshalJSONTransition ¶
func MarshalJSONTransition(ts Transition) ([]byte, error)
func MarshalState ¶
func MarshalStateCtx ¶
func MarshalTransition ¶
func MarshalTransition(ts Transition, dst []byte) []byte
func MaxRecoveryAttempts ¶
func RecoveryAttempt ¶
func SetMaxRecoveryAttempts ¶
func SetRetryAfter ¶
func UnmarshalData ¶
func UnmarshalDelayedState ¶
func UnmarshalDelayedState(src []byte, ds *DelayedState) (err error)
func UnmarshalJSONData ¶
func UnmarshalJSONDelayedState ¶
func UnmarshalJSONDelayedState(data []byte, ds *DelayedState) error
func UnmarshalJSONState ¶
func UnmarshalJSONStateCtx ¶
func UnmarshalJSONTransition ¶
func UnmarshalJSONTransition(data []byte, ts *Transition) error
func UnmarshalState ¶
func UnmarshalStateCtx ¶
func UnmarshalTransition ¶
func UnmarshalTransition(src []byte, ts *Transition) (err error)
Types ¶
type Command ¶
type Command interface {
// contains filtered or unexported methods
}
func UnmarshalCommand ¶
func UnmarshalJSONCommand ¶
type CommitCommand ¶
type CommitCommand struct {
Commands []Command
// contains filtered or unexported fields
}
func Commit ¶
func Commit(cmds ...Command) *CommitCommand
type CommittableCommand ¶
type CommittableCommand interface {
CommittableStateCtx() *StateCtx
}
type Data ¶
type Data struct {
Rev int64 `json:"-"`
Annotations map[string]string `json:"-"`
Blob []byte `json:"-"`
// contains filtered or unexported fields
}
func (*Data) MarshalJSON ¶
func (*Data) SetAnnotation ¶
func (*Data) UnmarshalJSON ¶
type DefaultFlowRegistry ¶
type DefaultFlowRegistry struct {
// contains filtered or unexported fields
}
func (*DefaultFlowRegistry) SetFlow ¶
func (fr *DefaultFlowRegistry) SetFlow(id FlowID, flow Flow) error
func (*DefaultFlowRegistry) UnsetFlow ¶
func (fr *DefaultFlowRegistry) UnsetFlow(id FlowID) error
type DelayCommand ¶
type DelayCommand struct {
StateCtx *StateCtx
ExecuteAt time.Time
Commit bool
To FlowID
Annotations map[string]string
Result *DelayedState
// contains filtered or unexported fields
}
func DelayUntil ¶
func DelayUntil(stateCtx *StateCtx, to FlowID, executeAt time.Time) *DelayCommand
func (*DelayCommand) MustResult ¶
func (cmd *DelayCommand) MustResult() DelayedState
func (*DelayCommand) Prepare ¶
func (cmd *DelayCommand) Prepare() error
func (*DelayCommand) WithAnnotation ¶
func (cmd *DelayCommand) WithAnnotation(name, value string) *DelayCommand
func (*DelayCommand) WithCommit ¶
func (cmd *DelayCommand) WithCommit(commit bool) *DelayCommand
func (*DelayCommand) WithTransit ¶
func (cmd *DelayCommand) WithTransit(to FlowID) *DelayCommand
type DelayedState ¶
func (DelayedState) MarshalJSON ¶
func (ds DelayedState) MarshalJSON() ([]byte, error)
func (*DelayedState) UnmarshalJSON ¶
func (ds *DelayedState) UnmarshalJSON(data []byte) error
type Driver ¶
type Driver interface {
// Init must be called by NewEngine only.
Init(e *Engine) error
GetStateByID(cmd *GetStateByIDCommand) error
GetStateByLabels(cmd *GetStateByLabelsCommand) error
GetStates(cmd *GetStatesCommand) error
GetDelayedStates(cmd *GetDelayedStatesCommand) error
Delay(cmd *DelayCommand) error
Commit(cmd *CommitCommand) error
GetData(cmd *GetDataCommand) error
StoreData(cmd *StoreDataCommand) error
}
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func (*Engine) Iter ¶
func (e *Engine) Iter(cmd *GetStatesCommand) *Iter
type ErrRevMismatch ¶
type ErrRevMismatch struct {
IDS []StateID
}
ErrRevMismatch is an error that indicates a revision mismatch during a commit operation.
func (*ErrRevMismatch) Add ¶
func (err *ErrRevMismatch) Add(id StateID)
func (*ErrRevMismatch) All ¶
func (err *ErrRevMismatch) All() []StateID
func (ErrRevMismatch) As ¶
func (err ErrRevMismatch) As(target interface{}) bool
func (*ErrRevMismatch) Contains ¶
func (err *ErrRevMismatch) Contains(id StateID) bool
func (ErrRevMismatch) Error ¶
func (err ErrRevMismatch) Error() string
type ExecuteCommand ¶
type ExecuteCommand struct {
StateCtx *StateCtx
// contains filtered or unexported fields
}
func Execute ¶
func Execute(stateCtx *StateCtx) *ExecuteCommand
type FlowRegistry ¶
type GetDataCommand ¶
type GetDataCommand struct {
StateCtx *StateCtx
Alias string
// contains filtered or unexported fields
}
func GetData ¶
func GetData(stateCtx *StateCtx, alias string) *GetDataCommand
func (*GetDataCommand) Prepare ¶
func (cmd *GetDataCommand) Prepare() (bool, error)
type GetDelayedStatesCommand ¶
type GetDelayedStatesCommand struct {
Since time.Time
Until time.Time
// Offset is valid inside the since-until range.
// Should be used to pagination results.
Offset int64
Limit int
Result *GetDelayedStatesResult
// contains filtered or unexported fields
}
func GetDelayedStates ¶
func GetDelayedStates(since, until time.Time, offset int64) *GetDelayedStatesCommand
func (*GetDelayedStatesCommand) MustResult ¶
func (cmd *GetDelayedStatesCommand) MustResult() *GetDelayedStatesResult
func (*GetDelayedStatesCommand) Prepare ¶
func (cmd *GetDelayedStatesCommand) Prepare()
type GetDelayedStatesResult ¶
type GetDelayedStatesResult struct {
States []DelayedState
More bool
}
type GetStateByIDCommand ¶
type GetStateByIDCommand struct {
ID StateID
Rev int64
StateCtx *StateCtx
// contains filtered or unexported fields
}
func GetStateByID ¶
func GetStateByID(stateCtx *StateCtx, id StateID, rev int64) *GetStateByIDCommand
func (*GetStateByIDCommand) Prepare ¶
func (cmd *GetStateByIDCommand) Prepare() error
func (*GetStateByIDCommand) Result ¶
func (cmd *GetStateByIDCommand) Result() (*StateCtx, error)
type GetStateByLabelsCommand ¶
type GetStateByLabelsCommand struct {
Labels map[string]string
StateCtx *StateCtx
// contains filtered or unexported fields
}
func GetStateByLabels ¶
func GetStateByLabels(stateCtx *StateCtx, labels map[string]string) *GetStateByLabelsCommand
func (*GetStateByLabelsCommand) Result ¶
func (cmd *GetStateByLabelsCommand) Result() (*StateCtx, error)
type GetStatesCommand ¶
type GetStatesCommand struct {
SinceRev int64
SinceTime time.Time
Labels []map[string]string
LatestOnly bool
Limit int
Result *GetStatesResult
// contains filtered or unexported fields
}
func GetStatesByLabels ¶
func GetStatesByLabels(labels map[string]string) *GetStatesCommand
func (*GetStatesCommand) MustResult ¶
func (cmd *GetStatesCommand) MustResult() *GetStatesResult
func (*GetStatesCommand) Prepare ¶
func (cmd *GetStatesCommand) Prepare()
func (*GetStatesCommand) WithLatestOnly ¶
func (cmd *GetStatesCommand) WithLatestOnly() *GetStatesCommand
func (*GetStatesCommand) WithLimit ¶
func (cmd *GetStatesCommand) WithLimit(limit int) *GetStatesCommand
func (*GetStatesCommand) WithORLabels ¶
func (cmd *GetStatesCommand) WithORLabels(labels map[string]string) *GetStatesCommand
func (*GetStatesCommand) WithSinceLatest ¶
func (cmd *GetStatesCommand) WithSinceLatest() *GetStatesCommand
func (*GetStatesCommand) WithSinceRev ¶
func (cmd *GetStatesCommand) WithSinceRev(rev int64) *GetStatesCommand
WithSinceRev sets SinceRev filter for the command. States with revision greater than SinceRev will be returned.
func (*GetStatesCommand) WithSinceTime ¶
func (cmd *GetStatesCommand) WithSinceTime(since time.Time) *GetStatesCommand
type GetStatesResult ¶
type Iter ¶
type Iter struct {
Cmd *GetStatesCommand
// contains filtered or unexported fields
}
func NewIter ¶
func NewIter(d Driver, cmd *GetStatesCommand) *Iter
func (*Iter) Err ¶
Err returns the error encountered during iteration, if any It is expected to call Err only when Next() returned false The iterator cannot be used once Err returns a non-nil error Create a new iterator using Iter.Cmd.
func (*Iter) Next ¶
Next advances the iterator to the next state It returns true if there is a next state, false otherwise It is expected to call Next repeatedly until it returns false
func (*Iter) State ¶
State returns the current state in the iteration It is expected to call State only when Next() returned true
type NoopCommand ¶
type NoopCommand struct {
// contains filtered or unexported fields
}
func Noop ¶
func Noop() *NoopCommand
type ParkCommand ¶
type ParkCommand struct {
StateCtx *StateCtx
Annotations map[string]string
// contains filtered or unexported fields
}
func Park ¶
func Park(stateCtx *StateCtx) *ParkCommand
func (*ParkCommand) CommittableStateCtx ¶
func (cmd *ParkCommand) CommittableStateCtx() *StateCtx
func (*ParkCommand) Do ¶
func (cmd *ParkCommand) Do() error
func (*ParkCommand) WithAnnotation ¶
func (cmd *ParkCommand) WithAnnotation(name, value string) *ParkCommand
func (*ParkCommand) WithAnnotations ¶
func (cmd *ParkCommand) WithAnnotations(annotations map[string]string) *ParkCommand
type Recoverer ¶
type Recoverer struct {
// contains filtered or unexported fields
}
func (*Recoverer) Stats ¶
func (r *Recoverer) Stats() RecovererStats
type RecovererStats ¶
type StackCommand ¶
type StackCommand struct {
StackedStateCtx *StateCtx
CarrierStateCtx *StateCtx
Annotation string
// contains filtered or unexported fields
}
func Stack ¶
func Stack(carrierStateCtx, stackStateCtx *StateCtx, annotation string) *StackCommand
func (*StackCommand) Do ¶
func (cmd *StackCommand) Do() error
type State ¶
type State struct {
ID StateID
Rev int64
Annotations map[string]string
Labels map[string]string
CommittedAt time.Time
Transition Transition
}
func (State) Annotation ¶
func (State) MarshalJSON ¶
func (*State) SetAnnotation ¶
func (*State) UnmarshalJSON ¶
type StateCtx ¶
type StateCtx struct {
Current State
Committed State
Datas map[string]*Data
// Transitions between committed and current states
Transitions []Transition
// contains filtered or unexported fields
}
func (*StateCtx) MarshalJSON ¶
func (*StateCtx) UnmarshalJSON ¶
type StoreDataCommand ¶
type StoreDataCommand struct {
StateCtx *StateCtx
Alias string
// contains filtered or unexported fields
}
func StoreData ¶
func StoreData(stateCtx *StateCtx, alias string) *StoreDataCommand
func (*StoreDataCommand) Prepare ¶
func (cmd *StoreDataCommand) Prepare() (bool, error)
type TransitCommand ¶
type TransitCommand struct {
StateCtx *StateCtx
Annotations map[string]string
To FlowID
// contains filtered or unexported fields
}
func Transit ¶
func Transit(stateCtx *StateCtx, to FlowID) *TransitCommand
func (*TransitCommand) CommittableStateCtx ¶
func (cmd *TransitCommand) CommittableStateCtx() *StateCtx
func (*TransitCommand) Do ¶
func (cmd *TransitCommand) Do() error
func (*TransitCommand) WithAnnotation ¶
func (cmd *TransitCommand) WithAnnotation(name, value string) *TransitCommand
func (*TransitCommand) WithAnnotations ¶
func (cmd *TransitCommand) WithAnnotations(annotations map[string]string) *TransitCommand
type Transition ¶
func (*Transition) CopyTo ¶
func (ts *Transition) CopyTo(to *Transition)
func (*Transition) MarshalJSON ¶
func (ts *Transition) MarshalJSON() ([]byte, error)
func (*Transition) SetAnnotation ¶
func (ts *Transition) SetAnnotation(name, value string)
func (*Transition) String ¶
func (ts *Transition) String() string
func (*Transition) UnmarshalJSON ¶
func (ts *Transition) UnmarshalJSON(data []byte) error
type UnstackCommand ¶
type UnstackCommand struct {
CarrierStateCtx *StateCtx
UnstackStateCtx *StateCtx
Annotation string
// contains filtered or unexported fields
}
func Unstack ¶
func Unstack(carrierStateCtx, unstackStateCtx *StateCtx, annotation string) *UnstackCommand
func (*UnstackCommand) Do ¶
func (cmd *UnstackCommand) Do() error
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
delayed_execute
command
|
|
|
durable_execute
command
|
|
|
execute_with_timeout
command
|
|
|
iterate_over_all_states
command
|
|
|
queue
command
|
|
|
state_machine
command
|
|
Click to show internal directories.
Click to hide internal directories.