Documentation
¶
Index ¶
- Constants
- type CheckpointInfo
- type Checkpointer
- type CheckpointerConfig
- type CheckpointerWithAutoCleanup
- func (c *CheckpointerWithAutoCleanup) Delete(ctx context.Context, threadID string) error
- func (c *CheckpointerWithAutoCleanup) Exists(ctx context.Context, threadID string) (bool, error)
- func (c *CheckpointerWithAutoCleanup) List(ctx context.Context) ([]CheckpointInfo, error)
- func (c *CheckpointerWithAutoCleanup) Load(ctx context.Context, threadID string) (agentstate.State, error)
- func (c *CheckpointerWithAutoCleanup) Save(ctx context.Context, threadID string, state agentstate.State) error
- func (c *CheckpointerWithAutoCleanup) Stop()
- type DistributedCheckpointer
- func (dc *DistributedCheckpointer) Close() error
- func (dc *DistributedCheckpointer) Delete(ctx context.Context, threadID string) error
- func (dc *DistributedCheckpointer) Exists(ctx context.Context, threadID string) (bool, error)
- func (dc *DistributedCheckpointer) GetStatus() map[string]interface{}
- func (dc *DistributedCheckpointer) List(ctx context.Context) ([]CheckpointInfo, error)
- func (dc *DistributedCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)
- func (dc *DistributedCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error
- type DistributedCheckpointerConfig
- type InMemorySaver
- func (s *InMemorySaver) CleanupOld(maxAge time.Duration) int
- func (s *InMemorySaver) Delete(ctx context.Context, threadID string) error
- func (s *InMemorySaver) Exists(ctx context.Context, threadID string) (bool, error)
- func (s *InMemorySaver) GetHistory(ctx context.Context, threadID string) ([]agentstate.State, error)
- func (s *InMemorySaver) List(ctx context.Context) ([]CheckpointInfo, error)
- func (s *InMemorySaver) Load(ctx context.Context, threadID string) (agentstate.State, error)
- func (s *InMemorySaver) Save(ctx context.Context, threadID string, state agentstate.State) error
- func (s *InMemorySaver) Size() int
- type RedisCheckpointer
- func (c *RedisCheckpointer) CleanupOld(ctx context.Context, maxAge time.Duration) (int, error)
- func (c *RedisCheckpointer) Close() error
- func (c *RedisCheckpointer) Delete(ctx context.Context, threadID string) error
- func (c *RedisCheckpointer) Exists(ctx context.Context, threadID string) (bool, error)
- func (c *RedisCheckpointer) List(ctx context.Context) ([]CheckpointInfo, error)
- func (c *RedisCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)
- func (c *RedisCheckpointer) Ping(ctx context.Context) error
- func (c *RedisCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error
- func (c *RedisCheckpointer) Size(ctx context.Context) (int, error)
- type RedisCheckpointerConfig
Constants ¶
const ( ReplicationModeSync = "sync" ReplicationModeAsync = "async" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CheckpointInfo ¶
type CheckpointInfo struct {
// ID is the unique checkpoint identifier.
ID string `json:"id"`
// ThreadID is the thread/session identifier.
ThreadID string `json:"thread_id"`
// CreatedAt is when the checkpoint was created.
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is when the checkpoint was last updated.
UpdatedAt time.Time `json:"updated_at"`
// Metadata holds additional checkpoint information.
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Size is the approximate size in bytes of the checkpoint data.
// Useful for storage management and cleanup decisions.
Size int64 `json:"size"`
}
CheckpointInfo contains metadata about a checkpoint.
Note: This struct is aligned with interfaces.CheckpointMetadata to ensure consistency. Fields match exactly for seamless integration.
type Checkpointer ¶
type Checkpointer interface {
// Save persists the current state for a thread/session.
Save(ctx context.Context, threadID string, state agentstate.State) error
// Load retrieves the saved state for a thread/session.
Load(ctx context.Context, threadID string) (agentstate.State, error)
// List returns information about all saved checkpoints.
List(ctx context.Context) ([]CheckpointInfo, error)
// Delete removes the checkpoint for a thread/session.
Delete(ctx context.Context, threadID string) error
// Exists checks if a checkpoint exists for a thread/session.
Exists(ctx context.Context, threadID string) (bool, error)
}
Checkpointer defines the interface for session state persistence.
Inspired by LangChain's Checkpointer, it provides:
- Session state saving and loading
- Thread-based conversation continuity
- Checkpoint history management
Use cases:
- Multi-turn conversations
- Resuming interrupted workflows
- A/B testing different conversation paths
type CheckpointerConfig ¶
type CheckpointerConfig struct {
// MaxHistorySize limits the number of historical states to keep
MaxHistorySize int
// MaxCheckpointAge is the maximum age before a checkpoint is considered stale
MaxCheckpointAge time.Duration
// EnableCompression enables state compression (for future implementation)
EnableCompression bool
// AutoCleanup enables automatic cleanup of old checkpoints
AutoCleanup bool
// CleanupInterval specifies how often to run cleanup
CleanupInterval time.Duration
}
CheckpointerConfig configures checkpointer behavior.
func DefaultCheckpointerConfig ¶
func DefaultCheckpointerConfig() *CheckpointerConfig
DefaultCheckpointerConfig returns the default checkpointer configuration.
type CheckpointerWithAutoCleanup ¶
type CheckpointerWithAutoCleanup struct {
// contains filtered or unexported fields
}
CheckpointerWithAutoCleanup wraps a Checkpointer with automatic cleanup.
func NewCheckpointerWithAutoCleanup ¶
func NewCheckpointerWithAutoCleanup(cp Checkpointer, config *CheckpointerConfig) *CheckpointerWithAutoCleanup
NewCheckpointerWithAutoCleanup creates a Checkpointer with automatic cleanup.
func (*CheckpointerWithAutoCleanup) Delete ¶
func (c *CheckpointerWithAutoCleanup) Delete(ctx context.Context, threadID string) error
Delete removes the checkpoint for a thread/session.
func (*CheckpointerWithAutoCleanup) Exists ¶
Exists checks if a checkpoint exists for a thread/session.
func (*CheckpointerWithAutoCleanup) List ¶
func (c *CheckpointerWithAutoCleanup) List(ctx context.Context) ([]CheckpointInfo, error)
List returns information about all saved checkpoints.
func (*CheckpointerWithAutoCleanup) Load ¶
func (c *CheckpointerWithAutoCleanup) Load(ctx context.Context, threadID string) (agentstate.State, error)
Load retrieves the saved state for a thread/session.
func (*CheckpointerWithAutoCleanup) Save ¶
func (c *CheckpointerWithAutoCleanup) Save(ctx context.Context, threadID string, state agentstate.State) error
Save persists the current state for a thread/session.
func (*CheckpointerWithAutoCleanup) Stop ¶
func (c *CheckpointerWithAutoCleanup) Stop()
Stop stops the automatic cleanup goroutine.
type DistributedCheckpointer ¶
type DistributedCheckpointer struct {
// contains filtered or unexported fields
}
DistributedCheckpointer provides high-availability checkpointing with multiple backends.
Features:
- Primary/secondary backend architecture
- Automatic failover on primary failure
- Replication modes: sync or async
- Health monitoring
- Automatic failback to primary
Suitable for:
- Production environments requiring high availability
- Critical applications that cannot tolerate data loss
- Multi-region deployments
- Disaster recovery scenarios
func NewDistributedCheckpointer ¶
func NewDistributedCheckpointer(config *DistributedCheckpointerConfig) (*DistributedCheckpointer, error)
NewDistributedCheckpointer creates a new distributed checkpointer
func (*DistributedCheckpointer) Close ¶
func (dc *DistributedCheckpointer) Close() error
Close shuts down the distributed checkpointer
func (*DistributedCheckpointer) Delete ¶
func (dc *DistributedCheckpointer) Delete(ctx context.Context, threadID string) error
Delete removes the checkpoint for a thread/session
func (*DistributedCheckpointer) GetStatus ¶
func (dc *DistributedCheckpointer) GetStatus() map[string]interface{}
GetStatus returns the current status of the distributed checkpointer
func (*DistributedCheckpointer) List ¶
func (dc *DistributedCheckpointer) List(ctx context.Context) ([]CheckpointInfo, error)
List returns information about all saved checkpoints
func (*DistributedCheckpointer) Load ¶
func (dc *DistributedCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)
Load retrieves the saved state for a thread/session
func (*DistributedCheckpointer) Save ¶
func (dc *DistributedCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error
Save persists the current state for a thread/session
type DistributedCheckpointerConfig ¶
type DistributedCheckpointerConfig struct {
// PrimaryBackend is the primary checkpointer (e.g., Redis)
PrimaryBackend Checkpointer
// SecondaryBackend is the backup checkpointer (e.g., PostgreSQL, optional)
SecondaryBackend Checkpointer
// EnableReplication enables automatic replication to secondary backend
EnableReplication bool
// ReplicationMode determines how replication works
// - "sync": Write to both backends synchronously
// - "async": Write to primary, then replicate to secondary asynchronously
ReplicationMode string
// HealthCheckInterval is how often to check backend health
HealthCheckInterval time.Duration
// EnableAutoFailover enables automatic failover to secondary on primary failure
EnableAutoFailover bool
// MaxFailoverAttempts limits the number of failover attempts
MaxFailoverAttempts int
// FailbackDelay is the delay before attempting to fail back to primary
FailbackDelay time.Duration
}
DistributedCheckpointerConfig holds configuration for distributed checkpointer
func DefaultDistributedCheckpointerConfig ¶
func DefaultDistributedCheckpointerConfig() *DistributedCheckpointerConfig
DefaultDistributedCheckpointerConfig returns default configuration
type InMemorySaver ¶
type InMemorySaver struct {
// contains filtered or unexported fields
}
InMemorySaver is a thread-safe in-memory implementation of Checkpointer.
Suitable for:
- Development and testing
- Single-instance deployments
- Short-lived sessions
func NewInMemorySaver ¶
func NewInMemorySaver() *InMemorySaver
NewInMemorySaver creates a new InMemorySaver.
func (*InMemorySaver) CleanupOld ¶
func (s *InMemorySaver) CleanupOld(maxAge time.Duration) int
CleanupOld removes checkpoints older than the specified duration.
func (*InMemorySaver) Delete ¶
func (s *InMemorySaver) Delete(ctx context.Context, threadID string) error
Delete removes the checkpoint for a thread/session.
func (*InMemorySaver) GetHistory ¶
func (s *InMemorySaver) GetHistory(ctx context.Context, threadID string) ([]agentstate.State, error)
GetHistory returns the state history for a thread/session.
func (*InMemorySaver) List ¶
func (s *InMemorySaver) List(ctx context.Context) ([]CheckpointInfo, error)
List returns information about all saved checkpoints.
func (*InMemorySaver) Load ¶
func (s *InMemorySaver) Load(ctx context.Context, threadID string) (agentstate.State, error)
Load retrieves the saved state for a thread/session.
func (*InMemorySaver) Save ¶
func (s *InMemorySaver) Save(ctx context.Context, threadID string, state agentstate.State) error
Save persists the current state for a thread/session.
func (*InMemorySaver) Size ¶
func (s *InMemorySaver) Size() int
Size returns the number of checkpoints.
type RedisCheckpointer ¶
type RedisCheckpointer struct {
// contains filtered or unexported fields
}
RedisCheckpointer is a Redis-backed implementation of the Checkpointer interface.
Features:
- Distributed checkpoint storage
- Optional distributed locking for concurrent access
- Automatic expiration with TTL
- Connection pooling for high performance
- JSON serialization for state data
Suitable for:
- Multi-instance deployments
- Production environments
- Distributed agent systems
- High-availability scenarios
func NewRedisCheckpointer ¶
func NewRedisCheckpointer(config *RedisCheckpointerConfig) (*RedisCheckpointer, error)
NewRedisCheckpointer creates a new Redis-backed checkpointer
func NewRedisCheckpointerFromClient ¶
func NewRedisCheckpointerFromClient(client *redis.Client, config *RedisCheckpointerConfig) *RedisCheckpointer
NewRedisCheckpointerFromClient creates a RedisCheckpointer from an existing client
func (*RedisCheckpointer) CleanupOld ¶
CleanupOld removes checkpoints older than the specified duration
func (*RedisCheckpointer) Close ¶
func (c *RedisCheckpointer) Close() error
Close closes the Redis connection
func (*RedisCheckpointer) Delete ¶
func (c *RedisCheckpointer) Delete(ctx context.Context, threadID string) error
Delete removes the checkpoint for a thread/session
func (*RedisCheckpointer) List ¶
func (c *RedisCheckpointer) List(ctx context.Context) ([]CheckpointInfo, error)
List returns information about all saved checkpoints
func (*RedisCheckpointer) Load ¶
func (c *RedisCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)
Load retrieves the saved state for a thread/session
func (*RedisCheckpointer) Ping ¶
func (c *RedisCheckpointer) Ping(ctx context.Context) error
Ping tests the connection to Redis
func (*RedisCheckpointer) Save ¶
func (c *RedisCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error
Save persists the current state for a thread/session
type RedisCheckpointerConfig ¶
type RedisCheckpointerConfig struct {
// Addr is the Redis server address (host:port)
Addr string
// Password for Redis authentication
Password string
// DB is the Redis database number
DB int
// Prefix is the key prefix for all checkpoint keys
Prefix string
// TTL is the default time-to-live for checkpoints (0 = no expiration)
TTL time.Duration
// PoolSize is the maximum number of connections
PoolSize int
// MinIdleConns is the minimum number of idle connections
MinIdleConns int
// MaxRetries is the maximum number of retry attempts
MaxRetries int
// DialTimeout is the timeout for establishing connections
DialTimeout time.Duration
// ReadTimeout is the timeout for read operations
ReadTimeout time.Duration
// WriteTimeout is the timeout for write operations
WriteTimeout time.Duration
// EnableLock enables distributed locking for concurrent access
EnableLock bool
// LockTimeout is the timeout for acquiring locks
LockTimeout time.Duration
// LockExpiry is the expiry time for locks
LockExpiry time.Duration
}
RedisCheckpointerConfig holds configuration for Redis checkpointer
func DefaultRedisCheckpointerConfig ¶
func DefaultRedisCheckpointerConfig() *RedisCheckpointerConfig
DefaultRedisCheckpointerConfig returns default Redis checkpointer configuration