persistence

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ID        string                 `json:"id"`
	ThreadID  string                 `json:"thread_id"`
	State     *core.BaseState        `json:"state"`
	Metadata  map[string]interface{} `json:"metadata"`
	CreatedAt time.Time              `json:"created_at"`
	NodeID    string                 `json:"node_id"`
	StepID    int                    `json:"step_id"`
}

Checkpoint represents a saved state

type CheckpointManager

type CheckpointManager struct {
	// contains filtered or unexported fields
}

CheckpointManager manages checkpointing for graph execution

func NewCheckpointManager

func NewCheckpointManager(checkpointer Checkpointer) *CheckpointManager

NewCheckpointManager creates a new checkpoint manager

func (*CheckpointManager) Close

func (cm *CheckpointManager) Close() error

Close closes the checkpoint manager

func (*CheckpointManager) DeleteCheckpoint

func (cm *CheckpointManager) DeleteCheckpoint(ctx context.Context, threadID, checkpointID string) error

DeleteCheckpoint deletes a checkpoint

func (*CheckpointManager) IsEnabled

func (cm *CheckpointManager) IsEnabled() bool

IsEnabled returns true if checkpointing is enabled

func (*CheckpointManager) ListCheckpoints

func (cm *CheckpointManager) ListCheckpoints(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

ListCheckpoints lists checkpoints for a thread

func (*CheckpointManager) LoadCheckpoint

func (cm *CheckpointManager) LoadCheckpoint(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

LoadCheckpoint loads a checkpoint

func (*CheckpointManager) SaveCheckpoint

func (cm *CheckpointManager) SaveCheckpoint(ctx context.Context, threadID, nodeID string, stepID int, state *core.BaseState) error

SaveCheckpoint saves a checkpoint

type CheckpointMetadata

type CheckpointMetadata struct {
	ID        string                 `json:"id"`
	ThreadID  string                 `json:"thread_id"`
	Metadata  map[string]interface{} `json:"metadata"`
	CreatedAt time.Time              `json:"created_at"`
	NodeID    string                 `json:"node_id"`
	StepID    int                    `json:"step_id"`
}

CheckpointMetadata represents checkpoint metadata without the full state

type Checkpointer

type Checkpointer interface {
	// Save saves a state checkpoint
	Save(ctx context.Context, checkpoint *Checkpoint) error

	// Load loads a state checkpoint
	Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

	// List lists checkpoints for a thread
	List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

	// Delete deletes a checkpoint
	Delete(ctx context.Context, threadID, checkpointID string) error

	// Close closes the checkpointer
	Close() error
}

Checkpointer defines the interface for state persistence

func CreateCheckpointer

func CreateCheckpointer(config *DatabaseConfig) (Checkpointer, error)

CreateCheckpointer creates a checkpointer for the specified database

type DatabaseConfig

type DatabaseConfig struct {
	Type         DatabaseType `json:"type"` // "postgres", "pgvector", "redis", "opensearch", etc.
	Host         string       `json:"host"`
	Port         int          `json:"port"`
	Database     string       `json:"database"`
	Username     string       `json:"username"`
	Password     string       `json:"password"`
	SSLMode      string       `json:"ssl_mode"`
	MaxOpenConns int          `json:"max_open_conns"`
	MaxIdleConns int          `json:"max_idle_conns"`
	MaxLifetime  string       `json:"max_lifetime"`

	// Vector-specific configuration
	VectorDimension int    `json:"vector_dimension"`
	VectorMetric    string `json:"vector_metric"` // "cosine", "euclidean", "dot_product"

	// OpenSearch/Elasticsearch specific
	Index   string `json:"index"`
	APIKey  string `json:"api_key"`
	CloudID string `json:"cloud_id"`
	CACert  string `json:"ca_cert"`

	// Additional connection parameters
	ConnectionParams map[string]string `json:"connection_params"`

	// RAG-specific settings
	EnableRAG           bool    `json:"enable_rag"`
	EmbeddingModel      string  `json:"embedding_model"`
	EmbeddingDimension  int     `json:"embedding_dimension"`
	SimilarityThreshold float64 `json:"similarity_threshold"`
}

DatabaseConfig represents database configuration

func NewPgVectorConfig

func NewPgVectorConfig(host string, port int, database, username, password string, vectorDim int) *DatabaseConfig

Helper function to create a PostgreSQL with pgvector configuration

func NewPostgresConfig

func NewPostgresConfig(host string, port int, database, username, password string) *DatabaseConfig

Helper function to create a default PostgreSQL configuration

func NewRedisConfig

func NewRedisConfig(host string, port int, password string) *DatabaseConfig

Helper function to create a Redis configuration

type DatabaseConnection

type DatabaseConnection interface {
	Connect() error
	Close() error
	Ping() error
	GetType() DatabaseType
	GetConfig() *DatabaseConfig
	ExecuteQuery(ctx context.Context, query string, args ...interface{}) error
	QueryRow(ctx context.Context, query string, args ...interface{}) interface{}
	QueryRows(ctx context.Context, query string, args ...interface{}) (interface{}, error)
}

DatabaseConnection represents a database connection interface

type DatabaseConnectionManager

type DatabaseConnectionManager struct {
	// contains filtered or unexported fields
}

DatabaseConnectionManager manages multiple database connections

func NewDatabaseConnectionManager

func NewDatabaseConnectionManager() *DatabaseConnectionManager

NewDatabaseConnectionManager creates a new connection manager

func (*DatabaseConnectionManager) AddConnection

func (dcm *DatabaseConnectionManager) AddConnection(name string, config *DatabaseConfig) error

AddConnection adds a database connection

func (*DatabaseConnectionManager) CloseAll

func (dcm *DatabaseConnectionManager) CloseAll() error

CloseAll closes all database connections

func (*DatabaseConnectionManager) GetConnection

func (dcm *DatabaseConnectionManager) GetConnection(name string) (DatabaseConnection, error)

GetConnection retrieves a database connection

type DatabaseType

type DatabaseType string

DatabaseType represents supported database types

const (
	DatabaseTypePostgres   DatabaseType = "postgres"
	DatabaseTypePostgresQL DatabaseType = "postgresql"
	DatabaseTypePgVector   DatabaseType = "pgvector"
	DatabaseTypeRedis      DatabaseType = "redis"
	DatabaseTypeOpenSearch DatabaseType = "opensearch"
	DatabaseTypeElastic    DatabaseType = "elasticsearch"
	DatabaseTypeMongoDB    DatabaseType = "mongodb"
	DatabaseTypeMySQL      DatabaseType = "mysql"
	DatabaseTypeSQLite     DatabaseType = "sqlite"
)

type Document

type Document struct {
	ID        string                 `json:"id"`
	ThreadID  string                 `json:"thread_id"`
	Content   string                 `json:"content"`
	Metadata  map[string]interface{} `json:"metadata"`
	Embedding []float64              `json:"embedding,omitempty"`
	CreatedAt time.Time              `json:"created_at"`
	UpdatedAt time.Time              `json:"updated_at"`
}

Document represents a document for RAG

type FileCheckpointer

type FileCheckpointer struct {
	// contains filtered or unexported fields
}

FileCheckpointer implements file-based checkpointing

func NewFileCheckpointer

func NewFileCheckpointer(basePath string) *FileCheckpointer

NewFileCheckpointer creates a new file checkpointer

func (*FileCheckpointer) Close

func (c *FileCheckpointer) Close() error

Close closes the file checkpointer

func (*FileCheckpointer) Delete

func (c *FileCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error

Delete deletes a checkpoint

func (*FileCheckpointer) List

func (c *FileCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

List lists checkpoints for a thread

func (*FileCheckpointer) Load

func (c *FileCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

Load loads a checkpoint from file

func (*FileCheckpointer) Save

func (c *FileCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save saves a checkpoint to file

type MemoryCheckpointer

type MemoryCheckpointer struct {
	// contains filtered or unexported fields
}

MemoryCheckpointer implements in-memory checkpointing

func NewMemoryCheckpointer

func NewMemoryCheckpointer() *MemoryCheckpointer

NewMemoryCheckpointer creates a new memory checkpointer

func (*MemoryCheckpointer) Close

func (c *MemoryCheckpointer) Close() error

Close closes the memory checkpointer

func (*MemoryCheckpointer) Delete

func (c *MemoryCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error

Delete deletes a checkpoint

func (*MemoryCheckpointer) GetCheckpointCount

func (c *MemoryCheckpointer) GetCheckpointCount() int

GetCheckpointCount returns the total number of checkpoints

func (*MemoryCheckpointer) GetThreadIDs

func (c *MemoryCheckpointer) GetThreadIDs() []string

GetThreadIDs returns all thread IDs

func (*MemoryCheckpointer) List

func (c *MemoryCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

List lists checkpoints for a thread

func (*MemoryCheckpointer) Load

func (c *MemoryCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

Load loads a checkpoint from memory

func (*MemoryCheckpointer) Save

func (c *MemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save saves a checkpoint to memory

type PostgresCheckpointer

type PostgresCheckpointer struct {
	// contains filtered or unexported fields
}

PostgresCheckpointer implements database-based checkpointing with PostgreSQL

func NewPostgresCheckpointer

func NewPostgresCheckpointer(config *DatabaseConfig) (*PostgresCheckpointer, error)

NewPostgresCheckpointer creates a new PostgreSQL checkpointer

func (*PostgresCheckpointer) Close

func (p *PostgresCheckpointer) Close() error

Close closes the PostgreSQL checkpointer

func (*PostgresCheckpointer) Delete

func (p *PostgresCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error

Delete deletes a checkpoint

func (*PostgresCheckpointer) List

func (p *PostgresCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

List lists checkpoints for a thread

func (*PostgresCheckpointer) Load

func (p *PostgresCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

Load loads a checkpoint from PostgreSQL

func (*PostgresCheckpointer) Save

func (p *PostgresCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save saves a checkpoint to PostgreSQL

func (*PostgresCheckpointer) SaveDocument

func (p *PostgresCheckpointer) SaveDocument(ctx context.Context, doc *Document) error

SaveDocument saves a document for RAG

func (*PostgresCheckpointer) SearchDocuments

func (p *PostgresCheckpointer) SearchDocuments(ctx context.Context, threadID string, queryEmbedding []float64, limit int) ([]*Document, error)

SearchDocuments performs similarity search on documents

type PostgresConnection

type PostgresConnection struct {
	// contains filtered or unexported fields
}

PostgresConnection implements PostgreSQL connection

func NewPostgresConnection

func NewPostgresConnection(config *DatabaseConfig) (*PostgresConnection, error)

NewPostgresConnection creates a new PostgreSQL connection

func (*PostgresConnection) Close

func (p *PostgresConnection) Close() error

Close closes the database connection

func (*PostgresConnection) Connect

func (p *PostgresConnection) Connect() error

Connect establishes the PostgreSQL connection

func (*PostgresConnection) ExecuteQuery

func (p *PostgresConnection) ExecuteQuery(ctx context.Context, query string, args ...interface{}) error

ExecuteQuery executes a query without returning results

func (*PostgresConnection) GetConfig

func (p *PostgresConnection) GetConfig() *DatabaseConfig

GetConfig returns the database configuration

func (*PostgresConnection) GetType

func (p *PostgresConnection) GetType() DatabaseType

GetType returns the database type

func (*PostgresConnection) Ping

func (p *PostgresConnection) Ping() error

Ping tests the database connection

func (*PostgresConnection) QueryRow

func (p *PostgresConnection) QueryRow(ctx context.Context, query string, args ...interface{}) interface{}

QueryRow executes a query that returns a single row

func (*PostgresConnection) QueryRows

func (p *PostgresConnection) QueryRows(ctx context.Context, query string, args ...interface{}) (interface{}, error)

QueryRows executes a query that returns multiple rows

type RedisCheckpointer

type RedisCheckpointer struct {
	// contains filtered or unexported fields
}

RedisCheckpointer implements Redis-based checkpointing

func NewRedisCheckpointer

func NewRedisCheckpointer(config *DatabaseConfig) (*RedisCheckpointer, error)

NewRedisCheckpointer creates a new Redis checkpointer

func (*RedisCheckpointer) Close

func (r *RedisCheckpointer) Close() error

Close closes the Redis checkpointer

func (*RedisCheckpointer) Delete

func (r *RedisCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error

Delete deletes a checkpoint

func (*RedisCheckpointer) List

func (r *RedisCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

List lists checkpoints for a thread

func (*RedisCheckpointer) Load

func (r *RedisCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)

Load loads a checkpoint from Redis

func (*RedisCheckpointer) Save

func (r *RedisCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error

Save saves a checkpoint to Redis

type Session

type Session struct {
	ID        string                 `json:"id"`
	ThreadID  string                 `json:"thread_id"`
	UserID    string                 `json:"user_id"`
	Metadata  map[string]interface{} `json:"metadata"`
	CreatedAt time.Time              `json:"created_at"`
	ExpiresAt *time.Time             `json:"expires_at,omitempty"`
}

Session represents a user session

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages user sessions and threads

func NewSessionManager

func NewSessionManager(conn DatabaseConnection) *SessionManager

NewSessionManager creates a new session manager

func (*SessionManager) CreateSession

func (sm *SessionManager) CreateSession(ctx context.Context, session *Session) error

CreateSession creates a new session

func (*SessionManager) CreateThread

func (sm *SessionManager) CreateThread(ctx context.Context, thread *Thread) error

CreateThread creates a new thread

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*Session, error)

GetSession retrieves a session

func (*SessionManager) GetThread

func (sm *SessionManager) GetThread(ctx context.Context, threadID string) (*Thread, error)

GetThread retrieves a thread

type Thread

type Thread struct {
	ID        string                 `json:"id"`
	Name      string                 `json:"name"`
	Metadata  map[string]interface{} `json:"metadata"`
	CreatedAt time.Time              `json:"created_at"`
	UpdatedAt time.Time              `json:"updated_at"`
}

Thread represents a conversation thread

type TimeTravel

type TimeTravel struct {
	// contains filtered or unexported fields
}

TimeTravel provides time travel functionality

func NewTimeTravel

func NewTimeTravel(checkpointManager *CheckpointManager) *TimeTravel

NewTimeTravel creates a new time travel instance

func (*TimeTravel) FindCheckpointByNode

func (tt *TimeTravel) FindCheckpointByNode(ctx context.Context, threadID, nodeID string) (*CheckpointMetadata, error)

FindCheckpointByNode finds the latest checkpoint for a specific node

func (*TimeTravel) FindCheckpointByStep

func (tt *TimeTravel) FindCheckpointByStep(ctx context.Context, threadID string, stepID int) (*CheckpointMetadata, error)

FindCheckpointByStep finds a checkpoint by step ID

func (*TimeTravel) GetHistory

func (tt *TimeTravel) GetHistory(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)

GetHistory returns the execution history for a thread

func (*TimeTravel) RewindTo

func (tt *TimeTravel) RewindTo(ctx context.Context, threadID, checkpointID string) (*core.BaseState, error)

RewindTo rewinds execution to a specific checkpoint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL