checkpoint

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReplicationModeSync  = "sync"
	ReplicationModeAsync = "async"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CheckpointInfo

type CheckpointInfo struct {
	// ID is the unique checkpoint identifier.
	ID string `json:"id"`

	// ThreadID is the thread/session identifier.
	ThreadID string `json:"thread_id"`

	// CreatedAt is when the checkpoint was created.
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is when the checkpoint was last updated.
	UpdatedAt time.Time `json:"updated_at"`

	// Metadata holds additional checkpoint information.
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// Size is the approximate size in bytes of the checkpoint data.
	// Useful for storage management and cleanup decisions.
	Size int64 `json:"size"`
}

CheckpointInfo contains metadata about a checkpoint.

Note: This struct is aligned with interfaces.CheckpointMetadata to ensure consistency. Fields match exactly for seamless integration.

type Checkpointer

type Checkpointer interface {
	// Save persists the current state for a thread/session.
	Save(ctx context.Context, threadID string, state agentstate.State) error

	// Load retrieves the saved state for a thread/session.
	Load(ctx context.Context, threadID string) (agentstate.State, error)

	// List returns information about all saved checkpoints.
	List(ctx context.Context) ([]CheckpointInfo, error)

	// Delete removes the checkpoint for a thread/session.
	Delete(ctx context.Context, threadID string) error

	// Exists checks if a checkpoint exists for a thread/session.
	Exists(ctx context.Context, threadID string) (bool, error)
}

Checkpointer defines the interface for session state persistence.

Inspired by LangChain's Checkpointer, it provides:

  • Session state saving and loading
  • Thread-based conversation continuity
  • Checkpoint history management

Use cases:

  • Multi-turn conversations
  • Resuming interrupted workflows
  • A/B testing different conversation paths

type CheckpointerConfig

type CheckpointerConfig struct {
	// MaxHistorySize limits the number of historical states to keep
	MaxHistorySize int

	// MaxCheckpointAge is the maximum age before a checkpoint is considered stale
	MaxCheckpointAge time.Duration

	// EnableCompression enables state compression (for future implementation)
	EnableCompression bool

	// AutoCleanup enables automatic cleanup of old checkpoints
	AutoCleanup bool

	// CleanupInterval specifies how often to run cleanup
	CleanupInterval time.Duration
}

CheckpointerConfig configures checkpointer behavior.

func DefaultCheckpointerConfig

func DefaultCheckpointerConfig() *CheckpointerConfig

DefaultCheckpointerConfig returns the default checkpointer configuration.

type CheckpointerWithAutoCleanup

type CheckpointerWithAutoCleanup struct {
	// contains filtered or unexported fields
}

CheckpointerWithAutoCleanup wraps a Checkpointer with automatic cleanup.

func NewCheckpointerWithAutoCleanup

func NewCheckpointerWithAutoCleanup(cp Checkpointer, config *CheckpointerConfig) *CheckpointerWithAutoCleanup

NewCheckpointerWithAutoCleanup creates a Checkpointer with automatic cleanup.

func (*CheckpointerWithAutoCleanup) Delete

func (c *CheckpointerWithAutoCleanup) Delete(ctx context.Context, threadID string) error

Delete removes the checkpoint for a thread/session.

func (*CheckpointerWithAutoCleanup) Exists

func (c *CheckpointerWithAutoCleanup) Exists(ctx context.Context, threadID string) (bool, error)

Exists checks if a checkpoint exists for a thread/session.

func (*CheckpointerWithAutoCleanup) List

List returns information about all saved checkpoints.

func (*CheckpointerWithAutoCleanup) Load

Load retrieves the saved state for a thread/session.

func (*CheckpointerWithAutoCleanup) Save

func (c *CheckpointerWithAutoCleanup) Save(ctx context.Context, threadID string, state agentstate.State) error

Save persists the current state for a thread/session.

func (*CheckpointerWithAutoCleanup) Stop

func (c *CheckpointerWithAutoCleanup) Stop()

Stop stops the automatic cleanup goroutine.

type DistributedCheckpointer

type DistributedCheckpointer struct {
	// contains filtered or unexported fields
}

DistributedCheckpointer provides high-availability checkpointing with multiple backends.

Features:

  • Primary/secondary backend architecture
  • Automatic failover on primary failure
  • Replication modes: sync or async
  • Health monitoring
  • Automatic failback to primary

Suitable for:

  • Production environments requiring high availability
  • Critical applications that cannot tolerate data loss
  • Multi-region deployments
  • Disaster recovery scenarios

func NewDistributedCheckpointer

func NewDistributedCheckpointer(config *DistributedCheckpointerConfig) (*DistributedCheckpointer, error)

NewDistributedCheckpointer creates a new distributed checkpointer

func (*DistributedCheckpointer) Close

func (dc *DistributedCheckpointer) Close() error

Close shuts down the distributed checkpointer

func (*DistributedCheckpointer) Delete

func (dc *DistributedCheckpointer) Delete(ctx context.Context, threadID string) error

Delete removes the checkpoint for a thread/session

func (*DistributedCheckpointer) Exists

func (dc *DistributedCheckpointer) Exists(ctx context.Context, threadID string) (bool, error)

Exists checks if a checkpoint exists for a thread/session

func (*DistributedCheckpointer) GetStatus

func (dc *DistributedCheckpointer) GetStatus() map[string]interface{}

GetStatus returns the current status of the distributed checkpointer

func (*DistributedCheckpointer) List

List returns information about all saved checkpoints

func (*DistributedCheckpointer) Load

func (dc *DistributedCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)

Load retrieves the saved state for a thread/session

func (*DistributedCheckpointer) Save

func (dc *DistributedCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error

Save persists the current state for a thread/session

type DistributedCheckpointerConfig

type DistributedCheckpointerConfig struct {
	// PrimaryBackend is the primary checkpointer (e.g., Redis)
	PrimaryBackend Checkpointer

	// SecondaryBackend is the backup checkpointer (e.g., PostgreSQL, optional)
	SecondaryBackend Checkpointer

	// EnableReplication enables automatic replication to secondary backend
	EnableReplication bool

	// ReplicationMode determines how replication works
	// - "sync": Write to both backends synchronously
	// - "async": Write to primary, then replicate to secondary asynchronously
	ReplicationMode string

	// HealthCheckInterval is how often to check backend health
	HealthCheckInterval time.Duration

	// EnableAutoFailover enables automatic failover to secondary on primary failure
	EnableAutoFailover bool

	// MaxFailoverAttempts limits the number of failover attempts
	MaxFailoverAttempts int

	// FailbackDelay is the delay before attempting to fail back to primary
	FailbackDelay time.Duration
}

DistributedCheckpointerConfig holds configuration for distributed checkpointer

func DefaultDistributedCheckpointerConfig

func DefaultDistributedCheckpointerConfig() *DistributedCheckpointerConfig

DefaultDistributedCheckpointerConfig returns default configuration

type InMemorySaver

type InMemorySaver struct {
	// contains filtered or unexported fields
}

InMemorySaver is a thread-safe in-memory implementation of Checkpointer.

Suitable for:

  • Development and testing
  • Single-instance deployments
  • Short-lived sessions

func NewInMemorySaver

func NewInMemorySaver() *InMemorySaver

NewInMemorySaver creates a new InMemorySaver.

func (*InMemorySaver) CleanupOld

func (s *InMemorySaver) CleanupOld(maxAge time.Duration) int

CleanupOld removes checkpoints older than the specified duration.

func (*InMemorySaver) Delete

func (s *InMemorySaver) Delete(ctx context.Context, threadID string) error

Delete removes the checkpoint for a thread/session.

func (*InMemorySaver) Exists

func (s *InMemorySaver) Exists(ctx context.Context, threadID string) (bool, error)

Exists checks if a checkpoint exists for a thread/session.

func (*InMemorySaver) GetHistory

func (s *InMemorySaver) GetHistory(ctx context.Context, threadID string) ([]agentstate.State, error)

GetHistory returns the state history for a thread/session.

func (*InMemorySaver) List

func (s *InMemorySaver) List(ctx context.Context) ([]CheckpointInfo, error)

List returns information about all saved checkpoints.

func (*InMemorySaver) Load

func (s *InMemorySaver) Load(ctx context.Context, threadID string) (agentstate.State, error)

Load retrieves the saved state for a thread/session.

func (*InMemorySaver) Save

func (s *InMemorySaver) Save(ctx context.Context, threadID string, state agentstate.State) error

Save persists the current state for a thread/session.

func (*InMemorySaver) Size

func (s *InMemorySaver) Size() int

Size returns the number of checkpoints.

type RedisCheckpointer

type RedisCheckpointer struct {
	// contains filtered or unexported fields
}

RedisCheckpointer is a Redis-backed implementation of the Checkpointer interface.

Features:

  • Distributed checkpoint storage
  • Optional distributed locking for concurrent access
  • Automatic expiration with TTL
  • Connection pooling for high performance
  • JSON serialization for state data

Suitable for:

  • Multi-instance deployments
  • Production environments
  • Distributed agent systems
  • High-availability scenarios

func NewRedisCheckpointer

func NewRedisCheckpointer(config *RedisCheckpointerConfig) (*RedisCheckpointer, error)

NewRedisCheckpointer creates a new Redis-backed checkpointer

func NewRedisCheckpointerFromClient

func NewRedisCheckpointerFromClient(client *redis.Client, config *RedisCheckpointerConfig) *RedisCheckpointer

NewRedisCheckpointerFromClient creates a RedisCheckpointer from an existing client

func (*RedisCheckpointer) CleanupOld

func (c *RedisCheckpointer) CleanupOld(ctx context.Context, maxAge time.Duration) (int, error)

CleanupOld removes checkpoints older than the specified duration

func (*RedisCheckpointer) Close

func (c *RedisCheckpointer) Close() error

Close closes the Redis connection

func (*RedisCheckpointer) Delete

func (c *RedisCheckpointer) Delete(ctx context.Context, threadID string) error

Delete removes the checkpoint for a thread/session

func (*RedisCheckpointer) Exists

func (c *RedisCheckpointer) Exists(ctx context.Context, threadID string) (bool, error)

Exists checks if a checkpoint exists for a thread/session

func (*RedisCheckpointer) List

List returns information about all saved checkpoints

func (*RedisCheckpointer) Load

func (c *RedisCheckpointer) Load(ctx context.Context, threadID string) (agentstate.State, error)

Load retrieves the saved state for a thread/session

func (*RedisCheckpointer) Ping

func (c *RedisCheckpointer) Ping(ctx context.Context) error

Ping tests the connection to Redis

func (*RedisCheckpointer) Save

func (c *RedisCheckpointer) Save(ctx context.Context, threadID string, state agentstate.State) error

Save persists the current state for a thread/session

func (*RedisCheckpointer) Size

func (c *RedisCheckpointer) Size(ctx context.Context) (int, error)

Size returns the number of checkpoints

type RedisCheckpointerConfig

type RedisCheckpointerConfig struct {
	// Addr is the Redis server address (host:port)
	Addr string

	// Password for Redis authentication
	Password string

	// DB is the Redis database number
	DB int

	// Prefix is the key prefix for all checkpoint keys
	Prefix string

	// TTL is the default time-to-live for checkpoints (0 = no expiration)
	TTL time.Duration

	// PoolSize is the maximum number of connections
	PoolSize int

	// MinIdleConns is the minimum number of idle connections
	MinIdleConns int

	// MaxRetries is the maximum number of retry attempts
	MaxRetries int

	// DialTimeout is the timeout for establishing connections
	DialTimeout time.Duration

	// ReadTimeout is the timeout for read operations
	ReadTimeout time.Duration

	// WriteTimeout is the timeout for write operations
	WriteTimeout time.Duration

	// EnableLock enables distributed locking for concurrent access
	EnableLock bool

	// LockTimeout is the timeout for acquiring locks
	LockTimeout time.Duration

	// LockExpiry is the expiry time for locks
	LockExpiry time.Duration
}

RedisCheckpointerConfig holds configuration for Redis checkpointer

func DefaultRedisCheckpointerConfig

func DefaultRedisCheckpointerConfig() *RedisCheckpointerConfig

DefaultRedisCheckpointerConfig returns default Redis checkpointer configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL