Documentation
¶
Overview ¶
Package store provides persistence implementations for graph state.
Index ¶
- Variables
- type Checkpointdeprecated
- type CheckpointV2
- type MemStore
- func (m *MemStore[S]) CheckIdempotency(_ context.Context, key string) (bool, error)
- func (m *MemStore[S]) LoadCheckpoint(_ context.Context, cpID string) (state S, step int, err error)
- func (m *MemStore[S]) LoadCheckpointV2(_ context.Context, runID string, stepID int) (CheckpointV2[S], error)
- func (m *MemStore[S]) LoadLatest(_ context.Context, runID string) (state S, step int, err error)
- func (m *MemStore[S]) MarkEventsEmitted(_ context.Context, eventIDs []string) error
- func (m *MemStore[S]) MarshalJSON() ([]byte, error)
- func (m *MemStore[S]) PendingEvents(_ context.Context, limit int) ([]emit.Event, error)
- func (m *MemStore[S]) SaveCheckpoint(_ context.Context, cpID string, state S, step int) error
- func (m *MemStore[S]) SaveCheckpointV2(_ context.Context, checkpoint CheckpointV2[S]) error
- func (m *MemStore[S]) SaveStep(_ context.Context, runID string, step int, nodeID string, state S) error
- func (m *MemStore[S]) UnmarshalJSON(data []byte) error
- type MySQLStore
- func (m *MySQLStore[S]) CheckIdempotency(ctx context.Context, key string) (bool, error)
- func (m *MySQLStore[S]) Close() error
- func (m *MySQLStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)
- func (m *MySQLStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)
- func (m *MySQLStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)
- func (m *MySQLStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error
- func (m *MySQLStore[S]) PendingEvents(ctx context.Context, limit int) ([]emit.Event, error)
- func (m *MySQLStore[S]) Ping(ctx context.Context) error
- func (m *MySQLStore[S]) SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error
- func (m *MySQLStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error
- func (m *MySQLStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error
- func (m *MySQLStore[S]) SaveStepBatch(ctx context.Context, _ string, _ interface{}) error
- func (m *MySQLStore[S]) Stats() sql.DBStats
- func (m *MySQLStore[S]) WithTransaction(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
- type SQLiteStore
- func (s *SQLiteStore[S]) CheckIdempotency(ctx context.Context, key string) (bool, error)
- func (s *SQLiteStore[S]) Close() error
- func (s *SQLiteStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)
- func (s *SQLiteStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)
- func (s *SQLiteStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)
- func (s *SQLiteStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error
- func (s *SQLiteStore[S]) Path() string
- func (s *SQLiteStore[S]) PendingEvents(ctx context.Context, limit int) ([]emit.Event, error)
- func (s *SQLiteStore[S]) Ping(ctx context.Context) error
- func (s *SQLiteStore[S]) SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error
- func (s *SQLiteStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error
- func (s *SQLiteStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error
- type StepRecord
- type Store
Constants ¶
This section is empty.
Variables ¶
var ErrNotFound = errors.New("not found")
ErrNotFound is returned when a requested run ID or checkpoint ID does not exist.
Functions ¶
This section is empty.
Types ¶
type Checkpoint
deprecated
type Checkpoint[S any] struct { // ID is the unique checkpoint identifier. ID string // State is the snapshotted workflow state. State S // Step is the step number when this checkpoint was created. Step int }
Checkpoint represents a named snapshot of workflow state. Used by Store implementations to persist and restore checkpoints.
Deprecated: Use CheckpointV2 for enhanced checkpointing features. This type is kept for backward compatibility with the original SaveCheckpoint/LoadCheckpoint methods.
type CheckpointV2 ¶
type CheckpointV2[S any] struct { // RunID uniquely identifies the execution this checkpoint belongs to. RunID string `json:"run_id"` // StepID is the execution step number at checkpoint time. // Monotonically increasing within a run. StepID int `json:"step_id"` // State is the current accumulated state after applying all deltas up to StepID. // Must be JSON-serializable for persistence. State S `json:"state"` // Frontier contains the work items ready to execute at this checkpoint. // Must be JSON-serializable. Type is interface{} to avoid circular dependency. // Expected to be []WorkItem[S] from graph package. Frontier interface{} `json:"frontier"` // RNGSeed is the seed for deterministic random number generation. // Computed from RunID to ensure consistent random values across replays. RNGSeed int64 `json:"rng_seed"` // RecordedIOs contains all captured external interactions up to this checkpoint. // Must be JSON-serializable. Type is interface{} to avoid circular dependency. // Expected to be []RecordedIO from graph package. RecordedIOs interface{} `json:"recorded_ios"` // IdempotencyKey is a hash of (RunID, StepID, State, Frontier) that prevents. // duplicate checkpoint commits. Format: "sha256:hex_encoded_hash". IdempotencyKey string `json:"idempotency_key"` // Timestamp records when this checkpoint was created. Timestamp time.Time `json:"timestamp"` // Label is an optional user-defined name for this checkpoint, useful for. // debugging or creating named save points (e.g., "before_summary", "after_validation"). // Empty string for automatic checkpoints. Label string `json:"label,omitempty"` }
CheckpointV2 represents an enhanced checkpoint with full execution context for deterministic replay.
This type contains all information needed to resume execution from a specific point: - Current accumulated state. - Work items ready to execute (frontier). - Recorded I/O for replay. - RNG seed for deterministic random number generation. - Idempotency key for preventing duplicate commits.
CheckpointV2 supports both automatic resumption after failures and. user-initiated labeled snapshots for debugging or branching workflows.
This type is generic over the state type S, which must be JSON-serializable.
type MemStore ¶
type MemStore[S any] struct { // contains filtered or unexported fields }
MemStore is an in-memory implementation of Store[S].
It stores workflow state and checkpoints in memory using maps. Designed for:
- Testing and development
- Single-process workflows
- Short-lived workflows where persistence isn't required
MemStore is thread-safe and supports concurrent access.
Limitations:
- Data is lost when process terminates
- Not suitable for distributed systems
- Memory usage grows with workflow history
For production use with persistence, use database-backed stores (MySQL, PostgreSQL, Redis).
Type parameter S is the state type to persist.
func NewMemStore ¶
NewMemStore creates a new in-memory store.
Example:
store := NewMemStore[MyState]() engine := graph.New(reducer, store, emitter, opts)
func (*MemStore[S]) CheckIdempotency ¶
CheckIdempotency verifies if an idempotency key has been used (T096).
Returns (true, nil) if the key exists (has been used). Returns (false, nil) if the key doesn't exist (safe to use). Only returns error on store access failure (never for this in-memory implementation).
Thread-safe for concurrent access.
func (*MemStore[S]) LoadCheckpoint ¶
LoadCheckpoint retrieves a named checkpoint (T042).
Returns ErrNotFound if the checkpoint ID doesn't exist.
func (*MemStore[S]) LoadCheckpointV2 ¶
func (m *MemStore[S]) LoadCheckpointV2(_ context.Context, runID string, stepID int) (CheckpointV2[S], error)
LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID (T095).
Returns ErrNotFound if the checkpoint doesn't exist. Thread-safe for concurrent reads.
func (*MemStore[S]) LoadLatest ¶
LoadLatest retrieves the most recent step for a run (T038).
Returns the step with the highest step number. This handles out-of-order step saves correctly.
func (*MemStore[S]) MarkEventsEmitted ¶
MarkEventsEmitted marks events as successfully emitted to prevent re-delivery (T098).
Removes events from the pending queue by their IDs. Event IDs should be stored in the event's Meta map with key "event_id". If an event ID is not found, it is silently ignored (idempotent operation).
Thread-safe for concurrent access.
func (*MemStore[S]) MarshalJSON ¶
MarshalJSON serializes the MemStore to JSON (T072).
The resulting JSON can be saved to disk, transmitted over network, or used for debugging. All state values must be JSON-serializable.
Thread-safe: acquires read lock during serialization.
Example:
store := NewMemStore[MyState]()
// ... add steps and checkpoints ...
data, err := store.MarshalJSON()
if err != nil {
log.Fatal(err)
}
os.WriteFile("store.json", data, 0644)
func (*MemStore[S]) PendingEvents ¶
PendingEvents retrieves events from the transactional outbox that haven't been emitted (T097).
Returns up to 'limit' pending events ordered by insertion order. Empty list is not an error - it means no events are pending.
Thread-safe for concurrent access.
func (*MemStore[S]) SaveCheckpoint ¶
SaveCheckpoint creates a named checkpoint (T040).
Checkpoints can be used to:
- Create branching workflows (save checkpoint, try different paths)
- Mark significant milestones (after-validation, before-deploy)
- Provide manual resumption points
If a checkpoint with the same ID exists, it is overwritten.
func (*MemStore[S]) SaveCheckpointV2 ¶
func (m *MemStore[S]) SaveCheckpointV2(_ context.Context, checkpoint CheckpointV2[S]) error
SaveCheckpointV2 persists an enhanced checkpoint with full execution context (T094).
Stores checkpoint indexed by (runID, stepID) and optionally by label if provided. Returns error if the idempotency key already exists (duplicate commit prevention).
Thread-safe for concurrent access.
func (*MemStore[S]) SaveStep ¶
func (m *MemStore[S]) SaveStep(_ context.Context, runID string, step int, nodeID string, state S) error
SaveStep persists a workflow execution step (T036).
Steps are appended to the run's history in the order they are saved. Thread-safe for concurrent writes.
func (*MemStore[S]) UnmarshalJSON ¶
UnmarshalJSON deserializes JSON data into the MemStore (T074).
Replaces the current contents of the MemStore with the deserialized data. All existing steps and checkpoints are discarded.
Thread-safe: acquires write lock during deserialization.
Example:
data, _ := os.ReadFile("store.json")
store := NewMemStore[MyState]()
if err := store.UnmarshalJSON(data); err != nil {
log.Fatal(err)
}
// Store now contains data from JSON file
type MySQLStore ¶
type MySQLStore[S any] struct { // contains filtered or unexported fields }
MySQLStore is a MySQL/MariaDB implementation of Store[S].
It stores workflow state and checkpoints in a relational database. Designed for:
- Production workflows requiring persistence
- Distributed systems with multiple workers
- Long-running workflows that survive process restarts
- Audit trails and compliance requirements
MySQLStore uses connection pooling and transactions for reliability.
Schema:
- workflow_steps: Step-by-step execution history
- workflow_checkpoints: Named checkpoints for resumption
Type parameter S is the state type to persist (must be JSON-serializable).
func NewMySQLStore ¶
func NewMySQLStore[S any](dsn string) (*MySQLStore[S], error)
NewMySQLStore creates a new MySQL-backed store.
The DSN (Data Source Name) format is:
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]
Example DSNs:
user:password@tcp(localhost:3306)/workflows user:password@tcp(127.0.0.1:3306)/workflows?parseTime=true user:password@/workflows (uses localhost:3306)
Security Warning:
NEVER hardcode credentials in your source code. Use environment variables:
dsn := os.Getenv("MYSQL_DSN")
if dsn == "" {
log.Fatal("MYSQL_DSN environment variable not set")
}
store, err := NewMySQLStore[State](dsn)
The store automatically:
- Creates required tables if they don't exist
- Configures connection pooling
- Sets appropriate timeouts
Example:
store, err := NewMySQLStore[MyState]("user:pass@tcp(localhost:3306)/workflows")
if err != nil {
log.Fatal(err)
}
defer store.Close()
func (*MySQLStore[S]) CheckIdempotency ¶
CheckIdempotency verifies if an idempotency key has been used.
Returns true if the key exists in the idempotency_keys table. Returns false if the key doesn't exist (safe to use). Returns error only on database access failures.
This uses a unique constraint on the key for race-safe duplicate detection.
func (*MySQLStore[S]) Close ¶
func (m *MySQLStore[S]) Close() error
Close closes the database connection pool.
After Close, all operations will return an error. Calling Close multiple times is safe (subsequent calls are no-ops).
func (*MySQLStore[S]) LoadCheckpoint ¶
func (m *MySQLStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)
LoadCheckpoint retrieves a named checkpoint (implements Store interface).
Returns ErrNotFound if the checkpoint ID doesn't exist.
func (*MySQLStore[S]) LoadCheckpointV2 ¶
func (m *MySQLStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)
LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID.
This method can also load checkpoints by label if stepID is 0 and a label is stored. Returns ErrNotFound if no checkpoint exists for the given identifiers.
func (*MySQLStore[S]) LoadLatest ¶
func (m *MySQLStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)
LoadLatest retrieves the most recent step for a run (implements Store interface).
Returns the step with the highest step number for the given runID. Returns ErrNotFound if no steps exist for the runID.
func (*MySQLStore[S]) MarkEventsEmitted ¶
func (m *MySQLStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error
MarkEventsEmitted marks events as successfully emitted to prevent re-delivery.
Updates the emitted_at timestamp for the specified event IDs. This ensures the events won't be returned by PendingEvents again.
func (*MySQLStore[S]) PendingEvents ¶
PendingEvents retrieves events from the outbox that haven't been emitted yet.
Returns events where emitted_at IS NULL, ordered by created_at. Limited to the specified number of events for batching.
func (*MySQLStore[S]) Ping ¶
func (m *MySQLStore[S]) Ping(ctx context.Context) error
Ping verifies the database connection is alive.
Useful for health checks and connection validation.
func (*MySQLStore[S]) SaveCheckpoint ¶
SaveCheckpoint creates a named checkpoint (implements Store interface).
Checkpoints are stored in the workflow_checkpoints table. If a checkpoint with the same ID exists, it is updated.
Thread-safe for concurrent writes.
func (*MySQLStore[S]) SaveCheckpointV2 ¶
func (m *MySQLStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error
SaveCheckpointV2 persists an enhanced checkpoint with full execution context.
This method saves a complete checkpoint including:
- Current state after all deltas applied
- Frontier of pending work items
- Recorded I/O for deterministic replay
- RNG seed for random value consistency
- Idempotency key to prevent duplicate commits
The operation is performed in a transaction to ensure atomicity. If the idempotency key already exists, returns an error (prevents duplicate saves).
Thread-safe for concurrent writes.
func (*MySQLStore[S]) SaveStep ¶
func (m *MySQLStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error
SaveStep persists a workflow execution step (implements Store interface).
Steps are stored in the workflow_steps table with the current state. If a step with the same runID and step number already exists, it is replaced.
Thread-safe for concurrent writes.
func (*MySQLStore[S]) SaveStepBatch ¶
func (m *MySQLStore[S]) SaveStepBatch(ctx context.Context, _ string, _ interface{}) error
SaveStepBatch atomically saves multiple workflow steps in a transaction.
All steps are saved atomically - either all succeed or all are rolled back. This is useful for saving multiple parallel branch results or recovery scenarios.
If any step fails to save, the entire batch is rolled back and an error is returned.
Note: This is a low-level API. For most use cases, use SaveStep directly. The Engine handles batch operations via individual SaveStep calls within its own transaction management.
Example:
type StepData struct {
Step int
NodeID string
State MyState
}
steps := []StepData{
{1, "node-a", stateA},
{2, "node-b", stateB},
}
err := store.SaveStepBatch(ctx, "run-001", steps)
func (*MySQLStore[S]) Stats ¶
func (m *MySQLStore[S]) Stats() sql.DBStats
Stats returns database connection pool statistics.
Useful for monitoring connection usage and pool health.
func (*MySQLStore[S]) WithTransaction ¶
func (m *MySQLStore[S]) WithTransaction(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
WithTransaction executes a function within a database transaction.
If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed.
This is useful for atomic multi-operation workflows.
Example:
err := store.WithTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
// Perform multiple operations
if err := saveStepInTx(tx, ...); err != nil {
return err
}
if err := saveCheckpointInTx(tx, ...); err != nil {
return err
}
return nil
})
type SQLiteStore ¶
type SQLiteStore[S any] struct { // contains filtered or unexported fields }
SQLiteStore is a SQLite implementation of Store[S].
It stores workflow state and checkpoints in a single-file database. Designed for:
- Development and testing with zero setup
- Single-process workflows
- Local workflows requiring persistence
- Prototyping before migrating to distributed store
SQLiteStore uses WAL mode for concurrent reads and proper transactions.
Features:
- Single file database (e.g., "./dev.db")
- Auto-migration on first use
- WAL mode for concurrent reads
- Transactional writes for safety
Schema:
- workflow_steps: Step-by-step execution history
- workflow_checkpoints: Named checkpoints for resumption
- workflow_checkpoints_v2: Enhanced checkpoints with full context
- idempotency_keys: Duplicate prevention
- events_outbox: Transactional event delivery
Type parameter S is the state type to persist (must be JSON-serializable).
func NewSQLiteStore ¶
func NewSQLiteStore[S any](path string) (*SQLiteStore[S], error)
NewSQLiteStore creates a new SQLite-backed store.
The path parameter specifies the database file location:
- "./dev.db" - file in current directory
- "/tmp/workflow.db" - absolute path
- ":memory:" - in-memory database (data lost on close)
The store automatically:
- Creates the database file if it doesn't exist
- Creates required tables
- Enables WAL mode for concurrent reads
- Configures appropriate timeouts
WAL Mode Benefits:
- Multiple readers can access database concurrently
- Writers don't block readers
- Better concurrency for read-heavy workloads
Example:
store, err := NewSQLiteStore[MyState]("./dev.db")
if err != nil {
log.Fatal(err)
}
defer store.Close()
For testing with in-memory database:
store, err := NewSQLiteStore[MyState](":memory:")
if err != nil {
log.Fatal(err)
}
defer store.Close()
func (*SQLiteStore[S]) CheckIdempotency ¶
CheckIdempotency verifies if an idempotency key has been used.
Returns true if the key exists in the idempotency_keys table. Returns false if the key doesn't exist (safe to use). Returns error only on database access failures.
This uses a unique constraint on the key for race-safe duplicate detection.
func (*SQLiteStore[S]) Close ¶
func (s *SQLiteStore[S]) Close() error
Close closes the database connection.
After Close, all operations will return an error. Calling Close multiple times is safe (subsequent calls are no-ops).
func (*SQLiteStore[S]) LoadCheckpoint ¶
func (s *SQLiteStore[S]) LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error)
LoadCheckpoint retrieves a named checkpoint (implements Store interface).
Returns ErrNotFound if the checkpoint ID doesn't exist.
func (*SQLiteStore[S]) LoadCheckpointV2 ¶
func (s *SQLiteStore[S]) LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error)
LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID.
This method can also load checkpoints by label if stepID is 0 and a label is stored. Returns ErrNotFound if no checkpoint exists for the given identifiers.
func (*SQLiteStore[S]) LoadLatest ¶
func (s *SQLiteStore[S]) LoadLatest(ctx context.Context, runID string) (state S, step int, err error)
LoadLatest retrieves the most recent step for a run (implements Store interface).
Returns the step with the highest step number for the given runID. Returns ErrNotFound if no steps exist for the runID.
func (*SQLiteStore[S]) MarkEventsEmitted ¶
func (s *SQLiteStore[S]) MarkEventsEmitted(ctx context.Context, eventIDs []string) error
MarkEventsEmitted marks events as successfully emitted to prevent re-delivery.
Updates the emitted_at timestamp for the specified event IDs. This ensures the events won't be returned by PendingEvents again.
func (*SQLiteStore[S]) Path ¶
func (s *SQLiteStore[S]) Path() string
Path returns the database file path.
This is useful for debugging and logging.
func (*SQLiteStore[S]) PendingEvents ¶
PendingEvents retrieves events from the outbox that haven't been emitted yet.
Returns events where emitted_at IS NULL, ordered by created_at. Limited to the specified number of events for batching.
func (*SQLiteStore[S]) Ping ¶
func (s *SQLiteStore[S]) Ping(ctx context.Context) error
Ping verifies the database connection is alive.
Useful for health checks and connection validation.
func (*SQLiteStore[S]) SaveCheckpoint ¶
SaveCheckpoint creates a named checkpoint (implements Store interface).
Checkpoints are stored in the workflow_checkpoints table. If a checkpoint with the same ID exists, it is updated.
Thread-safe for concurrent writes.
func (*SQLiteStore[S]) SaveCheckpointV2 ¶
func (s *SQLiteStore[S]) SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error
SaveCheckpointV2 persists an enhanced checkpoint with full execution context.
This method saves a complete checkpoint including:
- Current state after all deltas applied
- Frontier of pending work items
- Recorded I/O for deterministic replay
- RNG seed for random value consistency
- Idempotency key to prevent duplicate commits
The operation is performed in a transaction to ensure atomicity. If the idempotency key already exists, returns an error (prevents duplicate saves).
Thread-safe for concurrent writes.
func (*SQLiteStore[S]) SaveStep ¶
func (s *SQLiteStore[S]) SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error
SaveStep persists a workflow execution step (implements Store interface).
Steps are stored in the workflow_steps table with the current state. If a step with the same runID and step number already exists, it is replaced.
Thread-safe for concurrent writes.
type StepRecord ¶
type StepRecord[S any] struct { // Step is the sequential step number (1-indexed). Step int // NodeID identifies which node produced this state. NodeID string // State is the workflow state after this step completed. State S }
StepRecord represents a single execution step in the workflow history. Used internally by Store implementations to track step-by-step progression.
type Store ¶
type Store[S any] interface { // SaveStep persists the state after a node execution step. // Each step is identified by runID + step number. // // Parameters: // - runID: Unique identifier for this workflow execution. // - step: Sequential step number (starts at 1). // - nodeID: ID of the node that produced this state. // - state: The current workflow state after merging delta. // // Returns error if persistence fails. SaveStep(ctx context.Context, runID string, step int, nodeID string, state S) error // LoadLatest retrieves the most recent state for a given run. // Used to resume execution from the last saved step. // // Parameters: // - runID: Unique identifier for the workflow execution. // // Returns: // - state: The most recent persisted state. // - step: The step number of the returned state. // - error: ErrNotFound if runID doesn't exist, or other persistence errors. LoadLatest(ctx context.Context, runID string) (state S, step int, err error) // SaveCheckpoint creates a named snapshot of workflow state. // Checkpoints enable branching workflows and manual resumption points. // // Parameters: // - cpID: Unique checkpoint identifier (user-defined). // - state: The workflow state to snapshot. // - step: The step number at which this checkpoint was created. // // Returns error if persistence fails. SaveCheckpoint(ctx context.Context, cpID string, state S, step int) error // LoadCheckpoint retrieves a previously saved checkpoint. // Used to restore workflow state from a named checkpoint. // // Parameters: // - cpID: Unique checkpoint identifier. // // Returns: // - state: The checkpointed state. // - step: The step number when checkpoint was created. // - error: ErrNotFound if cpID doesn't exist, or other persistence errors. LoadCheckpoint(ctx context.Context, cpID string) (state S, step int, err error) // SaveCheckpointV2 persists an enhanced checkpoint with full execution context. // This includes frontier state, recorded I/O, RNG seed, and idempotency key. // // The checkpoint contains all information needed for deterministic replay: // - Current state at the checkpoint. // - Pending work items in the execution frontier. // - Recorded I/O responses for replay. // - RNG seed for deterministic random values. // - Idempotency key to prevent duplicate commits. // // Parameters: // - checkpoint: Complete checkpoint with all execution context (CheckpointV2 type). // // Returns error if persistence fails or idempotency key already exists. // // This method extends SaveCheckpoint with support for concurrent execution. // and deterministic replay. Use this for v0.2.0+ features, or use the original. // SaveCheckpoint for simpler checkpointing needs. SaveCheckpointV2(ctx context.Context, checkpoint CheckpointV2[S]) error // LoadCheckpointV2 retrieves an enhanced checkpoint by run ID and step ID. // // Unlike LoadCheckpoint which uses a user-defined label, this method loads. // checkpoints by their system-generated identifiers. This enables: // - Resumption from any specific step in execution history. // - Replay of partial execution segments. // - Time-travel debugging through execution steps. // // Parameters: // - runID: Unique workflow run identifier. // - stepID: Step number to load checkpoint from. // // Returns: // - checkpoint: Complete checkpoint with execution context (CheckpointV2 type). // - error: ErrNotFound if checkpoint doesn't exist. LoadCheckpointV2(ctx context.Context, runID string, stepID int) (CheckpointV2[S], error) // CheckIdempotency verifies if an idempotency key has been used. // // Idempotency keys prevent duplicate step commits during retries or crash recovery. // The key is typically a hash of: runID + stepID + frontier state + node outputs. // // Parameters: // - key: Idempotency key to check (SHA-256 hash string). // // Returns: // - exists: true if key was previously used. // - error: Only on store access failure (not for "key not found"). // // Implementation note: Store the key atomically when committing checkpoints. // Keys should be indexed for fast lookup. Consider TTL-based cleanup for old keys. CheckIdempotency(ctx context.Context, key string) (bool, error) // PendingEvents retrieves events from the transactional outbox that haven't been emitted. // // This implements the "transactional outbox pattern" for exactly-once event delivery: // 1. Events are persisted atomically with state changes. // 2. Separate process reads pending events and emits them. // 3. Successfully emitted events are marked via MarkEventsEmitted. // 4. Crashed emitters can resume from pending events. // // Parameters: // - limit: Maximum number of events to retrieve (for batching). // // Returns: // - events: Events pending emission, ordered by creation time. // - error: Only on store access failure (empty list is not an error). // // Use this with MarkEventsEmitted to implement reliable event delivery without. // message broker dependencies. PendingEvents(ctx context.Context, limit int) ([]emit.Event, error) // MarkEventsEmitted marks events as successfully emitted to prevent re-delivery. // // After successfully emitting events to external systems (logs, traces, metrics), // call this method to record their emission. This ensures: // - Events are emitted exactly once (not lost, not duplicated). // - Crash recovery doesn't re-emit already-delivered events. // - PendingEvents won't return these events again. // // Parameters: // - eventIDs: List of event IDs that were successfully emitted. // // Returns error if store update fails. On error, events may be re-emitted. // (at-least-once semantics). // // Implementation note: Mark as emitted in the same transaction/atomic operation. // as the external emit when possible, or use idempotency keys on the receiving end. MarkEventsEmitted(ctx context.Context, eventIDs []string) error }
Store provides persistence for workflow state and checkpoints.
It enables: - Step-by-step state persistence during execution. - Latest state retrieval for resumption. - Named checkpoint save/load for branching workflows.
Implementations can use: - In-memory storage (for testing, see memory.go). - Relational databases (MySQL, PostgreSQL). - Key-value stores (Redis, DynamoDB). - Object storage (S3, GCS).
Type parameter S is the state type to persist.