Documentation
¶
Overview ¶
Package store provides storage implementations for persisting LangGraph checkpoints and state.
Store implementations allow graph executions to be persisted across different runs, processes, or even different machines. This enables features like resuming interrupted workflows, debugging complex executions, and maintaining state in distributed systems.
The store package includes implementations for three popular storage backends:
- SQLite: Lightweight, serverless file-based storage
- PostgreSQL: Robust, scalable relational database
- Redis: High-performance in-memory storage
Core Concepts ¶
## Checkpointing
Checkpointing captures the state of a graph execution at specific points, including:
- The current node being executed
- The complete state object
- Execution metadata
- Timestamp and configuration information
This allows execution to be paused and later resumed from the exact same state.
## Store Interface
All store implementations follow the same interface defined in the graph package:
type CheckpointStore interface {
// Save a checkpoint
Put(ctx context.Context, checkpoint *Checkpoint) error
// Retrieve a specific checkpoint
Get(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
// List all checkpoints for a thread
List(ctx context.Context, threadID string) ([]*Checkpoint, error)
// Delete a checkpoint
Delete(ctx context.Context, threadID, checkpointID string) error
// Clear all checkpoints for a thread
Clear(ctx context.Context, threadID string) error
}
Available Implementations ¶
## SQLite Store (store/sqlite)
Best for:
- Single-process applications
- Development and testing
- Desktop applications
- Scenarios requiring zero configuration
Features:
- Serverless, file-based database
- ACID transactions
- No external dependencies
- Built-in full-text search
Example:
import "github.com/smallnest/langgraphgo/store/sqlite"
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
Path: "./checkpoints.db",
})
## PostgreSQL Store (store/postgres)
Best for:
- Production deployments
- High-throughput applications
- Complex querying requirements
- Distributed systems
Features:
- Scalable relational database
- Connection pooling
- Advanced indexing
- JSONB support for metadata
Example:
import "github.com/smallnest/langgraphgo/store/postgres"
store, err := postgres.NewPostgresCheckpointStore(ctx, postgres.PostgresOptions{
ConnString: "postgres://user:pass@localhost/langgraph",
})
## Redis Store (store/redis)
Best for:
- High-performance requirements
- Distributed caching scenarios
- Temporary checkpoint storage
- Real-time collaboration features
Features:
- In-memory storage with optional persistence
- Automatic TTL (time-to-live) expiration
- Atomic operations
- Pub/Sub notifications
Example:
import "github.com/smallnest/langgraphgo/store/redis"
store := redis.NewRedisCheckpointStore(redis.RedisOptions{
Addr: "localhost:6379",
TTL: 24 * time.Hour,
})
Usage Patterns ¶
## Basic Checkpointing
// Create a graph
g := graph.NewStateGraph()
// ... configure graph ...
// Choose and configure a store
store, err := sqlite.NewSqliteCheckpointStore(sqlite.SqliteOptions{
Path: "./checkpoints.db",
})
if err != nil {
return err
}
defer store.Close()
// Enable checkpointing
compileConfig := graph.CompileConfig{
CheckpointConfig: graph.CheckpointConfig{
Store: store,
},
}
runnable, err := g.CompileWithOptions(compileConfig)
// Execute with automatic checkpointing
result, err := runnable.Invoke(ctx, input,
graph.WithExecutionID("unique-execution-id"))
// Resume from a checkpoint
resumed, err := runnable.Resume(ctx,
"unique-execution-id",
"checkpoint-to-resume-from")
## Custom Checkpointing Strategy
// Checkpoint at specific intervals
type IntervalCheckpointConfig struct {
graph.CheckpointConfig
Interval time.Duration
LastCheckpoint time.Time
}
// Or checkpoint on specific conditions
type ConditionalCheckpointConfig struct {
graph.CheckpointConfig
ShouldCheckpoint func(state any) bool
}
Choosing the Right Store ¶
## Decision Guide
Use SQLite when:
- You need a simple, self-contained solution
- Your application runs on a single machine
- You prefer zero configuration
- You need to store checkpoints in files
Use PostgreSQL when:
- You need robust persistence and scalability
- Your application requires complex queries
- You have multiple processes accessing the same data
- You need enterprise-grade features (backups, replication)
Use Redis when:
- Performance is the primary concern
- You need automatic expiration of old checkpoints
- You're building a distributed system
- You need real-time notifications for checkpoint changes
## Migration Between Stores
The package provides utilities to migrate between different store implementations:
// Migrate from SQLite to PostgreSQL migrator := store.NewMigrator(sqliteStore, postgresStore) err := migrator.MigrateAll(ctx) // Or migrate specific checkpoints err := migrator.MigrateThread(ctx, "thread-id")
Performance Considerations ¶
## Serialization
All stores use JSON serialization for checkpoint data. For optimal performance:
- Keep state objects relatively small
- Avoid storing large binary data in checkpoints
- Consider compression for large state objects
## Batch Operations
Some stores support batch operations for better performance:
// Batch save multiple checkpoints
checkpoints := []*graph.Checkpoint{cp1, cp2, cp3}
err := store.PutBatch(ctx, checkpoints)
Best Practices ¶
**Choose the right store for your use case** - SQLite for simple applications - PostgreSQL for production systems - Redis for high-performance scenarios
**Handle errors gracefully** - Implement retry logic for transient errors - Provide fallback mechanisms - Log checkpoint failures for debugging
**Manage checkpoint lifecycle** - Clean up old checkpoints regularly - Use TTL for automatic cleanup (Redis) - Implement retention policies
**Secure checkpoint data** - Encrypt sensitive data before storage - Use secure database connections - Implement proper access controls
**Monitor storage usage** - Track checkpoint sizes and counts - Monitor database performance metrics - Set up alerts for storage limits
Integration with LangGraph ¶
Stores integrate seamlessly with all LangGraph components:
// With prebuilt agents
agent := prebuilt.CreateReactAgent(llm, tools, 10,
prebuilt.WithCheckpointing(graph.CheckpointConfig{
Store: store,
}),
)
// With custom graphs
g := graph.NewStateGraph()
g.WithCheckpointing(graph.CheckpointConfig{
Store: store,
})
// With streaming execution
streaming := graph.NewStreamingStateGraph(g, graph.StreamConfig{
BufferSize: 100,
})
streaming.WithCheckpointing(graph.CheckpointConfig{
Store: store,
})
Advanced Features ¶
## Checkpoint Versioning
Some stores support checkpoint versioning for tracking evolution:
versionedStore := postgres.NewVersionedCheckpointStore(opts) err := versionedStore.PutVersion(ctx, checkpoint, "v1.0") versions, err := versionedStore.ListVersions(ctx, checkpointID)
## Checkpoint Compression
For large state objects, consider compression:
compressedStore := store.NewCompressedWrapper(store, gzip.BestCompression) err := compressedStore.Put(ctx, checkpoint)
## Checkpoint Encryption
Encrypt sensitive checkpoint data:
encryptedStore := store.NewEncryptedWrapper(store, encryptionKey) err := encryptedStore.Put(ctx, checkpoint)
Extending the Package ¶
To add a new store implementation:
- Implement the CheckpointStore interface
- Add the package to the store directory
- Create comprehensive tests
- Add documentation with examples
- Include migration utilities if needed
Example implementation structure:
package mystore
type MyStore struct {
// Implementation details
}
func (s *MyStore) Put(ctx context.Context, cp *graph.Checkpoint) error {
// Implementation
}
// Implement other interface methods...
Community Contributions ¶
The store package welcomes contributions for additional storage backends:
- MongoDB store
- DynamoDB store
- Cassandra store
- S3/object storage store
- etcd store
Please follow the established patterns and provide comprehensive tests and documentation.
Index ¶
- func RegisterType(t reflect.Type, typeName string) error
- func RegisterTypeWithCustomSerialization(t reflect.Type, typeName string, marshalFunc func(any) ([]byte, error), ...) error
- func RegisterTypeWithValue(value any, typeName string) error
- type Checkpoint
- type CheckpointData
- type CheckpointStore
- type MarshalFunc
- type TypeRegistry
- func (r *TypeRegistry) CreateInstance(typeName string) (any, error)
- func (r *TypeRegistry) GetTypeByName(typeName string) (reflect.Type, bool)
- func (r *TypeRegistry) GetTypeName(t reflect.Type) (string, bool)
- func (r *TypeRegistry) Marshal(value any) ([]byte, error)
- func (r *TypeRegistry) RegisterTypeInternal(t reflect.Type, typeName string) error
- func (r *TypeRegistry) RegisterTypeWithCustomSerializationInternal(t reflect.Type, typeName string, marshalFunc func(any) ([]byte, error), ...) error
- func (r *TypeRegistry) StateMarshaler() MarshalFunc
- func (r *TypeRegistry) StateUnmarshaler() UnmarshalFunc
- func (r *TypeRegistry) Unmarshal(data []byte) (any, error)
- type UnmarshalFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterType ¶ added in v0.8.0
RegisterType registers a reflect.Type with the registry for serialization/deserialization. Use RegisterTypeWithValue for a more convenient API with generics.
Example usage:
var state MyState RegisterType(reflect.TypeOf(state), "MyState")
func RegisterTypeWithCustomSerialization ¶ added in v0.8.0
func RegisterTypeWithCustomSerialization( t reflect.Type, typeName string, marshalFunc func(any) ([]byte, error), unmarshalFunc func([]byte) (any, error), ) error
RegisterTypeWithCustomSerialization registers a type with custom JSON marshaling/unmarshaling.
Example usage:
var state MyState
RegisterTypeWithCustomSerialization(
reflect.TypeOf(state),
"MyState",
func(v any) ([]byte, error) { ... },
func(data []byte) (any, error) { ... },
)
func RegisterTypeWithValue ¶ added in v0.8.0
RegisterTypeWithValue is a convenience function that uses reflection from a value. This is the recommended way to register types.
Example usage:
var state MyState RegisterTypeWithValue(state, "MyState")
Types ¶
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
NodeName string `json:"node_name"`
State any `json:"state"`
Metadata map[string]any `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
Checkpoint represents a saved state at a specific point in execution
type CheckpointData ¶ added in v0.8.0
type CheckpointData struct {
TypeName string `json:"_type"`
Data json.RawMessage `json:"_data"`
}
CheckpointData represents checkpoint data with type information
func NewCheckpointData ¶ added in v0.8.0
func NewCheckpointData(state any) (*CheckpointData, error)
NewCheckpointData creates checkpoint data from a state value
func (*CheckpointData) ToValue ¶ added in v0.8.0
func (cd *CheckpointData) ToValue() (any, error)
ToValue converts checkpoint data back to a state value
type CheckpointStore ¶
type CheckpointStore interface {
// Save stores a checkpoint
Save(ctx context.Context, checkpoint *Checkpoint) error
// Load retrieves a checkpoint by ID
Load(ctx context.Context, checkpointID string) (*Checkpoint, error)
// List returns all checkpoints for a given execution
List(ctx context.Context, executionID string) ([]*Checkpoint, error)
// ListByThread returns all checkpoints for a specific thread_id.
// Returns checkpoints sorted by version (ascending).
ListByThread(ctx context.Context, threadID string) ([]*Checkpoint, error)
// GetLatestByThread returns the latest checkpoint for a thread_id.
// Returns the checkpoint with the highest version.
GetLatestByThread(ctx context.Context, threadID string) (*Checkpoint, error)
// Delete removes a checkpoint
Delete(ctx context.Context, checkpointID string) error
// Clear removes all checkpoints for an execution
Clear(ctx context.Context, executionID string) error
}
CheckpointStore defines the interface for checkpoint persistence
type MarshalFunc ¶ added in v0.8.0
MarshalState is a helper for marshaling checkpoint states
type TypeRegistry ¶ added in v0.8.0
type TypeRegistry struct {
// contains filtered or unexported fields
}
TypeRegistry manages type information for generic state serialization/deserialization. It allows state types to register themselves for proper checkpointing.
func GlobalTypeRegistry ¶ added in v0.8.0
func GlobalTypeRegistry() *TypeRegistry
GlobalTypeRegistry returns the global type registry instance
func (*TypeRegistry) CreateInstance ¶ added in v0.8.0
func (r *TypeRegistry) CreateInstance(typeName string) (any, error)
CreateInstance creates a new instance of a registered type by name.
func (*TypeRegistry) GetTypeByName ¶ added in v0.8.0
func (r *TypeRegistry) GetTypeByName(typeName string) (reflect.Type, bool)
GetTypeByName returns the reflect.Type for a registered type name.
func (*TypeRegistry) GetTypeName ¶ added in v0.8.0
func (r *TypeRegistry) GetTypeName(t reflect.Type) (string, bool)
GetTypeName returns the registered name for a type.
func (*TypeRegistry) Marshal ¶ added in v0.8.0
func (r *TypeRegistry) Marshal(value any) ([]byte, error)
Marshal marshals a value to JSON with type information.
func (*TypeRegistry) RegisterTypeInternal ¶ added in v0.8.0
func (r *TypeRegistry) RegisterTypeInternal(t reflect.Type, typeName string) error
RegisterTypeInternal registers a type with the registry.
func (*TypeRegistry) RegisterTypeWithCustomSerializationInternal ¶ added in v0.8.0
func (r *TypeRegistry) RegisterTypeWithCustomSerializationInternal( t reflect.Type, typeName string, marshalFunc func(any) ([]byte, error), unmarshalFunc func([]byte) (any, error), ) error
RegisterTypeWithCustomSerializationInternal registers a type with custom serialization.
func (*TypeRegistry) StateMarshaler ¶ added in v0.8.0
func (r *TypeRegistry) StateMarshaler() MarshalFunc
StateMarshaler creates a marshal function for the given registry
func (*TypeRegistry) StateUnmarshaler ¶ added in v0.8.0
func (r *TypeRegistry) StateUnmarshaler() UnmarshalFunc
StateUnmarshaler creates an unmarshal function for the given registry
type UnmarshalFunc ¶ added in v0.8.0
UnmarshalState is a helper for unmarshaling checkpoint states
Directories
¶
| Path | Synopsis |
|---|---|
|
Package file provides file-based checkpoint storage implementation.
|
Package file provides file-based checkpoint storage implementation. |
|
Package memory provides in-memory checkpoint storage implementation.
|
Package memory provides in-memory checkpoint storage implementation. |
|
Package postgres provides PostgreSQL-backed storage for LangGraph Go checkpoints and state.
|
Package postgres provides PostgreSQL-backed storage for LangGraph Go checkpoints and state. |
|
Package redis provides Redis-backed storage for LangGraph Go checkpoints and state.
|
Package redis provides Redis-backed storage for LangGraph Go checkpoints and state. |
|
Package sqlite provides SQLite-backed storage for LangGraph Go checkpoints and state.
|
Package sqlite provides SQLite-backed storage for LangGraph Go checkpoints and state. |
|
Package util provides common utilities for checkpoint store implementations.
|
Package util provides common utilities for checkpoint store implementations. |