Documentation
¶
Overview ¶
Package workflow provides workflow state management primitives for components that participate in stateful workflows.
This package enables components to track and coordinate workflow state without tight coupling to specific workflow engines. Components implement Participant to expose their workflow role for observability.
Index ¶
- Constants
- type Participant
- type ParticipantRegistry
- func (r *ParticipantRegistry) GetParticipants(workflowID string) []Participant
- func (r *ParticipantRegistry) GetWorkflowTopology(workflowID string) []string
- func (r *ParticipantRegistry) ListWorkflows() []string
- func (r *ParticipantRegistry) Register(p Participant)
- func (r *ParticipantRegistry) Unregister(p Participant)
- type State
- type StateEntry
- type StateManager
- func (m *StateManager) Complete(ctx context.Context, id string) error
- func (m *StateManager) Create(ctx context.Context, state *State) error
- func (m *StateManager) Delete(ctx context.Context, id string) error
- func (m *StateManager) Fail(ctx context.Context, id, errMsg string) error
- func (m *StateManager) Get(ctx context.Context, id string) (*State, error)
- func (m *StateManager) GetWithRevision(ctx context.Context, id string) (*StateEntry, error)
- func (m *StateManager) IncrementIteration(ctx context.Context, id string) error
- func (m *StateManager) IncrementIterationWithRevision(ctx context.Context, id string, expectedRevision uint64) (uint64, error)
- func (m *StateManager) List(ctx context.Context) ([]*State, error)
- func (m *StateManager) Put(ctx context.Context, state *State) error
- func (m *StateManager) Transition(ctx context.Context, id, phase string) error
- func (m *StateManager) TransitionWithRevision(ctx context.Context, id, phase string, expectedRevision uint64) (uint64, error)
- func (m *StateManager) Update(ctx context.Context, state *State, expectedRevision uint64) (uint64, error)
Constants ¶
const DefaultStateBucket = "WORKFLOW_STATE"
DefaultStateBucket is the default KV bucket name for workflow state.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Participant ¶
type Participant interface {
// WorkflowID returns the workflow this component participates in.
// Returns empty string if component handles multiple workflows dynamically.
WorkflowID() string
// Phase returns the phase name this component represents in the workflow.
Phase() string
// StateManager returns the state manager for updating workflow state.
StateManager() *StateManager
}
Participant is implemented by components that participate in stateful workflows. This provides a clear contract for workflow-aware components and enables observability tooling to discover and visualize workflow participants.
Implementation notes: - Implementations MUST use pointer receivers to ensure interface comparison works correctly - WorkflowID() should return a stable value (same value each call) for proper registry tracking
type ParticipantRegistry ¶
type ParticipantRegistry struct {
// contains filtered or unexported fields
}
ParticipantRegistry tracks all workflow participants for observability. This enables discovering the workflow topology from registered components.
func NewParticipantRegistry ¶
func NewParticipantRegistry() *ParticipantRegistry
NewParticipantRegistry creates a new participant registry.
func (*ParticipantRegistry) GetParticipants ¶
func (r *ParticipantRegistry) GetParticipants(workflowID string) []Participant
GetParticipants returns all participants for a given workflow.
func (*ParticipantRegistry) GetWorkflowTopology ¶
func (r *ParticipantRegistry) GetWorkflowTopology(workflowID string) []string
GetWorkflowTopology returns the unique phases in registration order for a workflow. Duplicate phases are deduplicated (only first occurrence is kept). This provides a basic view of the workflow structure based on registered components.
func (*ParticipantRegistry) ListWorkflows ¶
func (r *ParticipantRegistry) ListWorkflows() []string
ListWorkflows returns all known workflow IDs.
func (*ParticipantRegistry) Register ¶
func (r *ParticipantRegistry) Register(p Participant)
Register adds a workflow participant to the registry.
func (*ParticipantRegistry) Unregister ¶
func (r *ParticipantRegistry) Unregister(p Participant)
Unregister removes a workflow participant from the registry.
type State ¶
type State struct {
// ID is the unique execution identifier.
ID string `json:"id"`
// WorkflowID identifies which workflow definition this execution follows.
WorkflowID string `json:"workflow_id"`
// Phase is the current execution phase/step name.
Phase string `json:"phase"`
// Iteration tracks loop count for retry/iteration patterns.
Iteration int `json:"iteration"`
// MaxIter is the maximum allowed iterations (0 = unlimited).
MaxIter int `json:"max_iter,omitempty"`
// StartedAt is when the execution started.
StartedAt time.Time `json:"started_at"`
// UpdatedAt is when the state was last updated.
UpdatedAt time.Time `json:"updated_at"`
// CompletedAt is when the execution completed (nil if still running).
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Error holds the last error message if the execution failed.
Error string `json:"error,omitempty"`
// Context holds arbitrary workflow-specific context data.
Context map[string]any `json:"context,omitempty"`
}
State represents the state of a workflow execution. Components use this to track progress through multi-step workflows.
func (*State) IsComplete ¶
IsComplete returns true if the execution has completed.
type StateEntry ¶
StateEntry wraps a State with its KV revision for optimistic concurrency.
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager provides operations for managing workflow state in NATS KV.
func NewStateManager ¶
func NewStateManager(bucket jetstream.KeyValue, logger *slog.Logger) *StateManager
NewStateManager creates a new StateManager backed by the given KV bucket.
func (*StateManager) Complete ¶
func (m *StateManager) Complete(ctx context.Context, id string) error
Complete marks the workflow execution as completed successfully.
func (*StateManager) Create ¶
func (m *StateManager) Create(ctx context.Context, state *State) error
Create creates a new workflow state, failing if it already exists.
func (*StateManager) Delete ¶
func (m *StateManager) Delete(ctx context.Context, id string) error
Delete removes a workflow state.
func (*StateManager) Fail ¶
func (m *StateManager) Fail(ctx context.Context, id, errMsg string) error
Fail marks the workflow execution as failed with an error message.
func (*StateManager) GetWithRevision ¶
func (m *StateManager) GetWithRevision(ctx context.Context, id string) (*StateEntry, error)
GetWithRevision retrieves a workflow state by ID along with its revision for use with optimistic concurrency control.
func (*StateManager) IncrementIteration ¶
func (m *StateManager) IncrementIteration(ctx context.Context, id string) error
IncrementIteration increments the iteration counter. Note: This uses read-modify-write without optimistic concurrency. For concurrent access, use GetWithRevision + Update instead.
func (*StateManager) IncrementIterationWithRevision ¶
func (m *StateManager) IncrementIterationWithRevision(ctx context.Context, id string, expectedRevision uint64) (uint64, error)
IncrementIterationWithRevision increments the iteration counter with optimistic concurrency. Returns the new revision on success. Fails if the revision has changed.
func (*StateManager) List ¶
func (m *StateManager) List(ctx context.Context) ([]*State, error)
List returns all workflow states in the bucket.
func (*StateManager) Put ¶
func (m *StateManager) Put(ctx context.Context, state *State) error
Put stores a workflow state, creating or updating as needed.
func (*StateManager) Transition ¶
func (m *StateManager) Transition(ctx context.Context, id, phase string) error
Transition updates the workflow state to a new phase. Note: This uses read-modify-write without optimistic concurrency. For concurrent access, use GetWithRevision + Update instead.
func (*StateManager) TransitionWithRevision ¶
func (m *StateManager) TransitionWithRevision(ctx context.Context, id, phase string, expectedRevision uint64) (uint64, error)
TransitionWithRevision updates the workflow state to a new phase with optimistic concurrency. Returns the new revision on success. Fails if the revision has changed.