workflow

package
v1.0.0-alpha.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package workflow provides workflow state management primitives for components that participate in stateful workflows.

This package enables components to track and coordinate workflow state without tight coupling to specific workflow engines. Components implement Participant to expose their workflow role for observability.

Index

Constants

View Source
const DefaultStateBucket = "WORKFLOW_STATE"

DefaultStateBucket is the default KV bucket name for workflow state.

Variables

This section is empty.

Functions

This section is empty.

Types

type Participant

type Participant interface {
	// WorkflowID returns the workflow this component participates in.
	// Returns empty string if component handles multiple workflows dynamically.
	WorkflowID() string

	// Phase returns the phase name this component represents in the workflow.
	Phase() string

	// StateManager returns the state manager for updating workflow state.
	StateManager() *StateManager
}

Participant is implemented by components that participate in stateful workflows. This provides a clear contract for workflow-aware components and enables observability tooling to discover and visualize workflow participants.

Implementation notes: - Implementations MUST use pointer receivers to ensure interface comparison works correctly - WorkflowID() should return a stable value (same value each call) for proper registry tracking

type ParticipantRegistry

type ParticipantRegistry struct {
	// contains filtered or unexported fields
}

ParticipantRegistry tracks all workflow participants for observability. This enables discovering the workflow topology from registered components.

func NewParticipantRegistry

func NewParticipantRegistry() *ParticipantRegistry

NewParticipantRegistry creates a new participant registry.

func (*ParticipantRegistry) GetParticipants

func (r *ParticipantRegistry) GetParticipants(workflowID string) []Participant

GetParticipants returns all participants for a given workflow.

func (*ParticipantRegistry) GetWorkflowTopology

func (r *ParticipantRegistry) GetWorkflowTopology(workflowID string) []string

GetWorkflowTopology returns the unique phases in registration order for a workflow. Duplicate phases are deduplicated (only first occurrence is kept). This provides a basic view of the workflow structure based on registered components.

func (*ParticipantRegistry) ListWorkflows

func (r *ParticipantRegistry) ListWorkflows() []string

ListWorkflows returns all known workflow IDs.

func (*ParticipantRegistry) Register

func (r *ParticipantRegistry) Register(p Participant)

Register adds a workflow participant to the registry.

func (*ParticipantRegistry) Unregister

func (r *ParticipantRegistry) Unregister(p Participant)

Unregister removes a workflow participant from the registry.

type State

type State struct {
	// ID is the unique execution identifier.
	ID string `json:"id"`

	// WorkflowID identifies which workflow definition this execution follows.
	WorkflowID string `json:"workflow_id"`

	// Phase is the current execution phase/step name.
	Phase string `json:"phase"`

	// Iteration tracks loop count for retry/iteration patterns.
	Iteration int `json:"iteration"`

	// MaxIter is the maximum allowed iterations (0 = unlimited).
	MaxIter int `json:"max_iter,omitempty"`

	// StartedAt is when the execution started.
	StartedAt time.Time `json:"started_at"`

	// UpdatedAt is when the state was last updated.
	UpdatedAt time.Time `json:"updated_at"`

	// CompletedAt is when the execution completed (nil if still running).
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// Error holds the last error message if the execution failed.
	Error string `json:"error,omitempty"`

	// Context holds arbitrary workflow-specific context data.
	Context map[string]any `json:"context,omitempty"`
}

State represents the state of a workflow execution. Components use this to track progress through multi-step workflows.

func (*State) IsComplete

func (s *State) IsComplete() bool

IsComplete returns true if the execution has completed.

func (*State) IsFailed

func (s *State) IsFailed() bool

IsFailed returns true if the execution failed with an error.

type StateEntry

type StateEntry struct {
	State    *State
	Revision uint64
}

StateEntry wraps a State with its KV revision for optimistic concurrency.

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager provides operations for managing workflow state in NATS KV.

func NewStateManager

func NewStateManager(bucket jetstream.KeyValue, logger *slog.Logger) *StateManager

NewStateManager creates a new StateManager backed by the given KV bucket.

func (*StateManager) Complete

func (m *StateManager) Complete(ctx context.Context, id string) error

Complete marks the workflow execution as completed successfully.

func (*StateManager) Create

func (m *StateManager) Create(ctx context.Context, state *State) error

Create creates a new workflow state, failing if it already exists.

func (*StateManager) Delete

func (m *StateManager) Delete(ctx context.Context, id string) error

Delete removes a workflow state.

func (*StateManager) Fail

func (m *StateManager) Fail(ctx context.Context, id, errMsg string) error

Fail marks the workflow execution as failed with an error message.

func (*StateManager) Get

func (m *StateManager) Get(ctx context.Context, id string) (*State, error)

Get retrieves a workflow state by ID.

func (*StateManager) GetWithRevision

func (m *StateManager) GetWithRevision(ctx context.Context, id string) (*StateEntry, error)

GetWithRevision retrieves a workflow state by ID along with its revision for use with optimistic concurrency control.

func (*StateManager) IncrementIteration

func (m *StateManager) IncrementIteration(ctx context.Context, id string) error

IncrementIteration increments the iteration counter. Note: This uses read-modify-write without optimistic concurrency. For concurrent access, use GetWithRevision + Update instead.

func (*StateManager) IncrementIterationWithRevision

func (m *StateManager) IncrementIterationWithRevision(ctx context.Context, id string, expectedRevision uint64) (uint64, error)

IncrementIterationWithRevision increments the iteration counter with optimistic concurrency. Returns the new revision on success. Fails if the revision has changed.

func (*StateManager) List

func (m *StateManager) List(ctx context.Context) ([]*State, error)

List returns all workflow states in the bucket.

func (*StateManager) Put

func (m *StateManager) Put(ctx context.Context, state *State) error

Put stores a workflow state, creating or updating as needed.

func (*StateManager) Transition

func (m *StateManager) Transition(ctx context.Context, id, phase string) error

Transition updates the workflow state to a new phase. Note: This uses read-modify-write without optimistic concurrency. For concurrent access, use GetWithRevision + Update instead.

func (*StateManager) TransitionWithRevision

func (m *StateManager) TransitionWithRevision(ctx context.Context, id, phase string, expectedRevision uint64) (uint64, error)

TransitionWithRevision updates the workflow state to a new phase with optimistic concurrency. Returns the new revision on success. Fails if the revision has changed.

func (*StateManager) Update

func (m *StateManager) Update(ctx context.Context, state *State, expectedRevision uint64) (uint64, error)

Update stores a workflow state with optimistic concurrency control. Returns the new revision on success. Fails if the revision has changed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL