store

package
v0.4.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 10 Imported by: 1

Documentation

Overview

Package store provides persistence implementations for graph state.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFound = errors.New("not found")

ErrNotFound is returned when a requested run ID or checkpoint ID does not exist.

Functions

This section is empty.

Types

type Checkpoint deprecated

type Checkpoint[S any] struct {
	// ID is the unique checkpoint identifier.
	ID string

	// State is the snapshotted workflow state.
	State S

	// Step is the step number when this checkpoint was created.
	Step int
}

Checkpoint represents a named snapshot of workflow state. Used by Store implementations to persist and restore checkpoints.

Deprecated: Use CheckpointV2 for enhanced checkpointing features. This type is kept for backward compatibility with the original SaveCheckpoint/LoadCheckpoint methods.

type CheckpointV2

type CheckpointV2[S any] struct {
	// RunID uniquely identifies the execution this checkpoint belongs to.
	RunID string `json:"run_id"`

	// StepID is the execution step number at checkpoint time.
	// Monotonically increasing within a run.
	StepID int `json:"step_id"`

	// State is the current accumulated state after applying all deltas up to StepID.
	// Must be JSON-serializable for persistence.
	State S `json:"state"`

	// Frontier contains the work items ready to execute at this checkpoint.
	// Must be JSON-serializable. Type is interface{} to avoid circular dependency.
	// Expected to be []WorkItem[S] from graph package.
	Frontier interface{} `json:"frontier"`

	// RNGSeed is the seed for deterministic random number generation.
	// Computed from RunID to ensure consistent random values across replays.
	RNGSeed int64 `json:"rng_seed"`

	// RecordedIOs contains all captured external interactions up to this checkpoint.
	// Must be JSON-serializable. Type is interface{} to avoid circular dependency.
	// Expected to be []RecordedIO from graph package.
	RecordedIOs interface{} `json:"recorded_ios"`

	// IdempotencyKey is a hash of (RunID, StepID, State, Frontier) that prevents.
	// duplicate checkpoint commits. Format: "sha256:hex_encoded_hash".
	IdempotencyKey string `json:"idempotency_key"`

	// Timestamp records when this checkpoint was created.
	Timestamp time.Time `json:"timestamp"`

	// Label is an optional user-defined name for this checkpoint, useful for.
	// debugging or creating named save points (e.g., "before_summary", "after_validation").
	// Empty string for automatic checkpoints.
	Label string `json:"label,omitempty"`
}

CheckpointV2 represents an enhanced checkpoint with full execution context for deterministic replay.

This type contains all information needed to resume execution from a specific point: - Current accumulated state. - Work items ready to execute (frontier). - Recorded I/O for replay. - RNG seed for deterministic random number generation. - Idempotency key for preventing duplicate commits.

CheckpointV2 supports both automatic resumption after failures and. user-initiated labeled snapshots for debugging or branching workflows.

This type is generic over the state type S, which must be JSON-serializable.

type MemStore

type MemStore[S any] struct {
	// contains filtered or unexported fields
}

MemStore is an in-memory implementation of Store[S].

It stores workflow state and checkpoints in memory using maps. Designed for:

  • Testing and development
  • Single-process workflows
  • Short-lived workflows where persistence isn't required

MemStore is thread-safe and supports concurrent access.

Limitations:

  • Data is lost when process terminates
  • Not suitable for distributed systems
  • Memory usage grows with workflow history

For production use with persistence, use database-backed stores (MySQL, PostgreSQL, Redis).

Type parameter S is the state type to persist.

func NewMemStore

func NewMemStore[S any]() *MemStore[S]

NewMemStore creates a new in-memory store.

Example:

store := NewMemStore[MyState]()
engine := graph.New(reducer, store, emitter, opts)

func (*MemStore[S]) CheckIdempotency

func (m *MemStore[S]) CheckIdempotency(_ context.Context, key string) (bool, error)

CheckIdempotency verifies if an idempotency key has been used (T096).

Returns (true, nil) if the key exists (has been used). Returns (false, nil) if the key doesn't exist (safe to use). Only returns error on store access failure (never for this in-memory implementation).

Thread-safe for concurrent access.

func (*MemStore[S]) LoadCheckpoint

func (m *MemStore[S]) LoadCheckpoint(_ context.Context, cpID string) (state S, step int, err error)

LoadCheckpoint retrieves a named checkpoint (T042).

Returns ErrNotFound if the checkpoint ID doesn't exist.

func (*MemStore[S]) LoadCheckpointV2

func (m *MemStore[S]) LoadCheckpointV2(_ context.Context, runID string, stepID int) (CheckpointV2[S], error)

LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID (T095).

Returns ErrNotFound if the checkpoint doesn't exist. Thread-safe for concurrent reads.

func (*MemStore[S]) LoadLatest

func (m *MemStore[S]) LoadLatest(_ context.Context, runID string) (state S, step int, err error)

LoadLatest retrieves the most recent step for a run (T038).

Returns the step with the highest step number. This handles out-of-order step saves correctly.

func (*MemStore[S]) MarkEventsEmitted

func (m *MemStore[S]) MarkEventsEmitted(_ context.Context, eventIDs []string) error

MarkEventsEmitted marks events as successfully emitted to prevent re-delivery (T098).

Removes events from the pending queue by their IDs. Event IDs should be stored in the event's Meta map with key "event_id". If an event ID is not found, it is silently ignored (idempotent operation).

Thread-safe for concurrent access.

func (*MemStore[S]) MarshalJSON

func (m *MemStore[S]) MarshalJSON() ([]byte, error)

MarshalJSON serializes the MemStore to JSON (T072).

The resulting JSON can be saved to disk, transmitted over network, or used for debugging. All state values must be JSON-serializable.

Thread-safe: acquires read lock during serialization.

Example:

store := NewMemStore[MyState]()
// ... add steps and checkpoints ...
data, err := store.MarshalJSON()
if err != nil {
    log.Fatal(err)
}
os.WriteFile("store.json", data, 0644)

func (*MemStore[S]) PendingEvents

func (m *MemStore[S]) PendingEvents(_ context.Context, limit int) ([]emit.Event, error)

PendingEvents retrieves events from the transactional outbox that haven't been emitted (T097).

Returns up to 'limit' pending events ordered by insertion order. Empty list is not an error - it means no events are pending.

Thread-safe for concurrent access.

func (*MemStore[S]) SaveCheckpoint

func (m *MemStore[S]) SaveCheckpoint(_ context.Context, cpID string, state S, step int) error

SaveCheckpoint creates a named checkpoint (T040).

Checkpoints can be used to:

  • Create branching workflows (save checkpoint, try different paths)
  • Mark significant milestones (after-validation, before-deploy)
  • Provide manual resumption points

If a checkpoint with the same ID exists, it is overwritten.

func (*MemStore[S]) SaveCheckpointV2

func (m *MemStore[S]) SaveCheckpointV2(_ context.Context, checkpoint CheckpointV2[S]) error

SaveCheckpointV2 persists an enhanced checkpoint with full execution context (T094).

Stores checkpoint indexed by (runID, stepID) and optionally by label if provided. Returns error if the idempotency key already exists (duplicate commit prevention).

Thread-safe for concurrent access.

func (*MemStore[S]) SaveStep

func (m *MemStore[S]) SaveStep(_ context.Context, runID string, step int, nodeID string, state S) error

SaveStep persists a workflow execution step (T036).

Steps are appended to the run's history in the order they are saved. Thread-safe for concurrent writes.

func (*MemStore[S]) UnmarshalJSON

func (m *MemStore[S]) UnmarshalJSON(data []byte) error

UnmarshalJSON deserializes JSON data into the MemStore (T074).

Replaces the current contents of the MemStore with the deserialized data. All existing steps and checkpoints are discarded.

Thread-safe: acquires write lock during deserialization.

Example:

data, _ := os.ReadFile("store.json")
store := NewMemStore[MyState]()
if err := store.UnmarshalJSON(data); err != nil {
    log.Fatal(err)
}
// Store now contains data from JSON file

type MySQLStore

type MySQLStore[S any] struct {
	// contains filtered or unexported fields
}

MySQLStore is a MySQL/MariaDB implementation of Store[S].

It stores workflow state and checkpoints in a relational database. Designed for:

  • Production workflows requiring persistence
  • Distributed systems with multiple workers
  • Long-running workflows that survive process restarts
  • Audit trails and compliance requirements

MySQLStore uses connection pooling and transactions for reliability.

Schema:

  • workflow_steps: Step-by-step execution history
  • workflow_checkpoints: Named checkpoints for resumption

Type parameter S is the state type to persist (must be JSON-serializable).

func NewMySQLStore

func NewMySQLStore[S any](dsn string) (*MySQLStore[S], error)

NewMySQLStore creates a new MySQL-backed store.

The DSN (Data Source Name) format is:

[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]

Example DSNs:

user:password@tcp(localhost:3306)/workflows
user:password@tcp(127.0.0.1:3306)/workflows?parseTime=true
user:password@/workflows (uses localhost:3306)

Security Warning:

NEVER hardcode credentials in your source code. Use environment variables:
    dsn := os.Getenv("MYSQL_DSN")
    if dsn == "" {
        log.Fatal("MYSQL_DSN environment variable not set")
    }
    store, err := NewMySQLStore[State](dsn)

The store automatically:

  • Creates required tables if they don't exist
  • Configures connection pooling
  • Sets appropriate timeouts

Example:

store, err := NewMySQLStore[MyState]("user:pass@tcp(localhost:3306)/workflows")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

func (*MySQLStore[S]) CheckIdempotency

func (m *MySQLStore[S]) CheckIdempotency(ctx context.Context, key string) (bool, error)

CheckIdempotency verifies if an idempotency key has been used.

Returns true if the key exists in the idempotency_keys table. Returns false if the key doesn't exist (safe to use). Returns error only on database access failures.

This uses a unique constraint on the key for race-safe duplicate detection.

func (*MySQLStore[S]) Close

func (m *MySQLStore[S]) Close() error

Close closes the database connection pool.

After Close, all operations will return an error. Calling Close multiple times is safe (subsequent calls are no-ops).

func (*MySQLStore[S]) LoadCheckpoint

func (m *MySQLStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)

LoadCheckpoint retrieves a named checkpoint (implements Store interface).

Returns ErrNotFound if the checkpoint ID doesn't exist.

func (*MySQLStore[S]) LoadCheckpointV2

func (m *MySQLStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)

LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID.

This method can also load checkpoints by label if stepID is 0 and a label is stored. Returns ErrNotFound if no checkpoint exists for the given identifiers.

func (*MySQLStore[S]) LoadLatest

func (m *MySQLStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)

LoadLatest retrieves the most recent step for a run (implements Store interface).

Returns the step with the highest step number for the given runID. Returns ErrNotFound if no steps exist for the runID.

func (*MySQLStore[S]) MarkEventsEmitted

func (m *MySQLStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error

MarkEventsEmitted marks events as successfully emitted to prevent re-delivery.

Updates the emitted_at timestamp for the specified event IDs. This ensures the events won't be returned by PendingEvents again.

func (*MySQLStore[S]) PendingEvents

func (m *MySQLStore[S]) PendingEvents(ctx context.Context, limit int) ([]emit.Event, error)

PendingEvents retrieves events from the outbox that haven't been emitted yet.

Returns events where emitted_at IS NULL, ordered by created_at. Limited to the specified number of events for batching.

func (*MySQLStore[S]) Ping

func (m *MySQLStore[S]) Ping(ctx context.Context) error

Ping verifies the database connection is alive.

Useful for health checks and connection validation.

func (*MySQLStore[S]) SaveCheckpoint

func (m *MySQLStore[S]) SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error

SaveCheckpoint creates a named checkpoint (implements Store interface).

Checkpoints are stored in the workflow_checkpoints table. If a checkpoint with the same ID exists, it is updated.

Thread-safe for concurrent writes.

func (*MySQLStore[S]) SaveCheckpointV2

func (m *MySQLStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error

SaveCheckpointV2 persists an enhanced checkpoint with full execution context.

This method saves a complete checkpoint including:

  • Current state after all deltas applied
  • Frontier of pending work items
  • Recorded I/O for deterministic replay
  • RNG seed for random value consistency
  • Idempotency key to prevent duplicate commits

The operation is performed in a transaction to ensure atomicity. If the idempotency key already exists, returns an error (prevents duplicate saves).

Thread-safe for concurrent writes.

func (*MySQLStore[S]) SaveStep

func (m *MySQLStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error

SaveStep persists a workflow execution step (implements Store interface).

Steps are stored in the workflow_steps table with the current state. If a step with the same runID and step number already exists, it is replaced.

Thread-safe for concurrent writes.

func (*MySQLStore[S]) SaveStepBatch

func (m *MySQLStore[S]) SaveStepBatch(ctx context.Context, _ string, _ interface{}) error

SaveStepBatch atomically saves multiple workflow steps in a transaction.

All steps are saved atomically - either all succeed or all are rolled back. This is useful for saving multiple parallel branch results or recovery scenarios.

If any step fails to save, the entire batch is rolled back and an error is returned.

Note: This is a low-level API. For most use cases, use SaveStep directly. The Engine handles batch operations via individual SaveStep calls within its own transaction management.

Example:

type StepData struct {
    Step   int
    NodeID string
    State  MyState
}
steps := []StepData{
    {1, "node-a", stateA},
    {2, "node-b", stateB},
}
err := store.SaveStepBatch(ctx, "run-001", steps)

func (*MySQLStore[S]) Stats

func (m *MySQLStore[S]) Stats() sql.DBStats

Stats returns database connection pool statistics.

Useful for monitoring connection usage and pool health.

func (*MySQLStore[S]) WithTransaction

func (m *MySQLStore[S]) WithTransaction(ctx context.Context, fn func(context.Context, *sql.Tx) error) error

WithTransaction executes a function within a database transaction.

If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed.

This is useful for atomic multi-operation workflows.

Example:

err := store.WithTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
    // Perform multiple operations
    if err := saveStepInTx(tx, ...); err != nil {
        return err
    }
    if err := saveCheckpointInTx(tx, ...); err != nil {
        return err
    }
    return nil
})

type SQLiteStore

type SQLiteStore[S any] struct {
	// contains filtered or unexported fields
}

SQLiteStore is a SQLite implementation of Store[S].

It stores workflow state and checkpoints in a single-file database. Designed for:

  • Development and testing with zero setup
  • Single-process workflows
  • Local workflows requiring persistence
  • Prototyping before migrating to distributed store

SQLiteStore uses WAL mode for concurrent reads and proper transactions.

Features:

  • Single file database (e.g., "./dev.db")
  • Auto-migration on first use
  • WAL mode for concurrent reads
  • Transactional writes for safety

Schema:

  • workflow_steps: Step-by-step execution history
  • workflow_checkpoints: Named checkpoints for resumption
  • workflow_checkpoints_v2: Enhanced checkpoints with full context
  • idempotency_keys: Duplicate prevention
  • events_outbox: Transactional event delivery

Type parameter S is the state type to persist (must be JSON-serializable).

func NewSQLiteStore

func NewSQLiteStore[S any](path string) (*SQLiteStore[S], error)

NewSQLiteStore creates a new SQLite-backed store.

The path parameter specifies the database file location:

  • "./dev.db" - file in current directory
  • "/tmp/workflow.db" - absolute path
  • ":memory:" - in-memory database (data lost on close)

The store automatically:

  • Creates the database file if it doesn't exist
  • Creates required tables
  • Enables WAL mode for concurrent reads
  • Configures appropriate timeouts

WAL Mode Benefits:

  • Multiple readers can access database concurrently
  • Writers don't block readers
  • Better concurrency for read-heavy workloads

Example:

store, err := NewSQLiteStore[MyState]("./dev.db")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

For testing with in-memory database:

store, err := NewSQLiteStore[MyState](":memory:")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

func (*SQLiteStore[S]) CheckIdempotency

func (s *SQLiteStore[S]) CheckIdempotency(ctx context.Context, key string) (bool, error)

CheckIdempotency verifies if an idempotency key has been used.

Returns true if the key exists in the idempotency_keys table. Returns false if the key doesn't exist (safe to use). Returns error only on database access failures.

This uses a unique constraint on the key for race-safe duplicate detection.

func (*SQLiteStore[S]) Close

func (s *SQLiteStore[S]) Close() error

Close closes the database connection.

After Close, all operations will return an error. Calling Close multiple times is safe (subsequent calls are no-ops).

func (*SQLiteStore[S]) LoadCheckpoint

func (s *SQLiteStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)

LoadCheckpoint retrieves a named checkpoint (implements Store interface).

Returns ErrNotFound if the checkpoint ID doesn't exist.

func (*SQLiteStore[S]) LoadCheckpointV2

func (s *SQLiteStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)

LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID.

This method can also load checkpoints by label if stepID is 0 and a label is stored. Returns ErrNotFound if no checkpoint exists for the given identifiers.

func (*SQLiteStore[S]) LoadLatest

func (s *SQLiteStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)

LoadLatest retrieves the most recent step for a run (implements Store interface).

Returns the step with the highest step number for the given runID. Returns ErrNotFound if no steps exist for the runID.

func (*SQLiteStore[S]) MarkEventsEmitted

func (s *SQLiteStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error

MarkEventsEmitted marks events as successfully emitted to prevent re-delivery.

Updates the emitted_at timestamp for the specified event IDs. This ensures the events won't be returned by PendingEvents again.

func (*SQLiteStore[S]) Path

func (s *SQLiteStore[S]) Path() string

Path returns the database file path.

This is useful for debugging and logging.

func (*SQLiteStore[S]) PendingEvents

func (s *SQLiteStore[S]) PendingEvents(ctx context.Context, limit int) ([]emit.Event, error)

PendingEvents retrieves events from the outbox that haven't been emitted yet.

Returns events where emitted_at IS NULL, ordered by created_at. Limited to the specified number of events for batching.

func (*SQLiteStore[S]) Ping

func (s *SQLiteStore[S]) Ping(ctx context.Context) error

Ping verifies the database connection is alive.

Useful for health checks and connection validation.

func (*SQLiteStore[S]) SaveCheckpoint

func (s *SQLiteStore[S]) SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error

SaveCheckpoint creates a named checkpoint (implements Store interface).

Checkpoints are stored in the workflow_checkpoints table. If a checkpoint with the same ID exists, it is updated.

Thread-safe for concurrent writes.

func (*SQLiteStore[S]) SaveCheckpointV2

func (s *SQLiteStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error

SaveCheckpointV2 persists an enhanced checkpoint with full execution context.

This method saves a complete checkpoint including:

  • Current state after all deltas applied
  • Frontier of pending work items
  • Recorded I/O for deterministic replay
  • RNG seed for random value consistency
  • Idempotency key to prevent duplicate commits

The operation is performed in a transaction to ensure atomicity. If the idempotency key already exists, returns an error (prevents duplicate saves).

Thread-safe for concurrent writes.

func (*SQLiteStore[S]) SaveStep

func (s *SQLiteStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error

SaveStep persists a workflow execution step (implements Store interface).

Steps are stored in the workflow_steps table with the current state. If a step with the same runID and step number already exists, it is replaced.

Thread-safe for concurrent writes.

type StepRecord

type StepRecord[S any] struct {
	// Step is the sequential step number (1-indexed).
	Step int

	// NodeID identifies which node produced this state.
	NodeID string

	// State is the workflow state after this step completed.
	State S
}

StepRecord represents a single execution step in the workflow history. Used internally by Store implementations to track step-by-step progression.

type Store

type Store[S any] interface {
	// SaveStep persists the state after a node execution step.
	// Each step is identified by runID + step number.
	//
	// Parameters:
	// - runID: Unique identifier for this workflow execution.
	// - step: Sequential step number (starts at 1).
	// - nodeID: ID of the node that produced this state.
	// - state: The current workflow state after merging delta.
	//
	// Returns error if persistence fails.
	SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error

	// LoadLatest retrieves the most recent state for a given run.
	// Used to resume execution from the last saved step.
	//
	// Parameters:
	// - runID: Unique identifier for the workflow execution.
	//
	// Returns:
	// - state: The most recent persisted state.
	// - step: The step number of the returned state.
	// - error: ErrNotFound if runID doesn't exist, or other persistence errors.
	LoadLatest(ctx context.Context, runID string) (state S, step int, err error)

	// SaveCheckpoint creates a named snapshot of workflow state.
	// Checkpoints enable branching workflows and manual resumption points.
	//
	// Parameters:
	// - cpID: Unique checkpoint identifier (user-defined).
	// - state: The workflow state to snapshot.
	// - step: The step number at which this checkpoint was created.
	//
	// Returns error if persistence fails.
	SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error

	// LoadCheckpoint retrieves a previously saved checkpoint.
	// Used to restore workflow state from a named checkpoint.
	//
	// Parameters:
	// - cpID: Unique checkpoint identifier.
	//
	// Returns:
	// - state: The checkpointed state.
	// - step: The step number when checkpoint was created.
	// - error: ErrNotFound if cpID doesn't exist, or other persistence errors.
	LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)

	// SaveCheckpointV2 persists an enhanced checkpoint with full execution context.
	// This includes frontier state, recorded I/O, RNG seed, and idempotency key.
	//
	// The checkpoint contains all information needed for deterministic replay:
	// - Current state at the checkpoint.
	// - Pending work items in the execution frontier.
	// - Recorded I/O responses for replay.
	// - RNG seed for deterministic random values.
	// - Idempotency key to prevent duplicate commits.
	//
	// Parameters:
	// - checkpoint: Complete checkpoint with all execution context (CheckpointV2 type).
	//
	// Returns error if persistence fails or idempotency key already exists.
	//
	// This method extends SaveCheckpoint with support for concurrent execution.
	// and deterministic replay. Use this for v0.2.0+ features, or use the original.
	// SaveCheckpoint for simpler checkpointing needs.
	SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error

	// LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID.
	//
	// Unlike LoadCheckpoint which uses a user-defined label, this method loads.
	// checkpoints by their system-generated identifiers. This enables:
	// - Resumption from any specific step in execution history.
	// - Replay of partial execution segments.
	// - Time-travel debugging through execution steps.
	//
	// Parameters:
	// - runID: Unique workflow run identifier.
	// - stepID: Step number to load checkpoint from.
	//
	// Returns:
	// - checkpoint: Complete checkpoint with execution context (CheckpointV2 type).
	// - error: ErrNotFound if checkpoint doesn't exist.
	LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)

	// CheckIdempotency verifies if an idempotency key has been used.
	//
	// Idempotency keys prevent duplicate step commits during retries or crash recovery.
	// The key is typically a hash of: runID + stepID + frontier state + node outputs.
	//
	// Parameters:
	// - key: Idempotency key to check (SHA-256 hash string).
	//
	// Returns:
	// - exists: true if key was previously used.
	// - error: Only on store access failure (not for "key not found").
	//
	// Implementation note: Store the key atomically when committing checkpoints.
	// Keys should be indexed for fast lookup. Consider TTL-based cleanup for old keys.
	CheckIdempotency(ctx context.Context, key string) (bool, error)

	// PendingEvents retrieves events from the transactional outbox that haven't been emitted.
	//
	// This implements the "transactional outbox pattern" for exactly-once event delivery:
	// 1. Events are persisted atomically with state changes.
	// 2. Separate process reads pending events and emits them.
	// 3. Successfully emitted events are marked via MarkEventsEmitted.
	// 4. Crashed emitters can resume from pending events.
	//
	// Parameters:
	// - limit: Maximum number of events to retrieve (for batching).
	//
	// Returns:
	// - events: Events pending emission, ordered by creation time.
	// - error: Only on store access failure (empty list is not an error).
	//
	// Use this with MarkEventsEmitted to implement reliable event delivery without.
	// message broker dependencies.
	PendingEvents(ctx context.Context, limit int) ([]emit.Event, error)

	// MarkEventsEmitted marks events as successfully emitted to prevent re-delivery.
	//
	// After successfully emitting events to external systems (logs, traces, metrics),
	// call this method to record their emission. This ensures:
	// - Events are emitted exactly once (not lost, not duplicated).
	// - Crash recovery doesn't re-emit already-delivered events.
	// - PendingEvents won't return these events again.
	//
	// Parameters:
	// - eventIDs: List of event IDs that were successfully emitted.
	//
	// Returns error if store update fails. On error, events may be re-emitted.
	// (at-least-once semantics).
	//
	// Implementation note: Mark as emitted in the same transaction/atomic operation.
	// as the external emit when possible, or use idempotency keys on the receiving end.
	MarkEventsEmitted(ctx context.Context, eventIDs []string) error
}

Store provides persistence for workflow state and checkpoints.

It enables: - Step-by-step state persistence during execution. - Latest state retrieval for resumption. - Named checkpoint save/load for branching workflows.

Implementations can use: - In-memory storage (for testing, see memory.go). - Relational databases (MySQL, PostgreSQL). - Key-value stores (Redis, DynamoDB). - Object storage (S3, GCS).

Type parameter S is the state type to persist.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL