Documentation
¶
Index ¶
- type Checkpoint
- type CheckpointManager
- func (cm *CheckpointManager) Close() error
- func (cm *CheckpointManager) DeleteCheckpoint(ctx context.Context, threadID, checkpointID string) error
- func (cm *CheckpointManager) IsEnabled() bool
- func (cm *CheckpointManager) ListCheckpoints(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (cm *CheckpointManager) LoadCheckpoint(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
- func (cm *CheckpointManager) SaveCheckpoint(ctx context.Context, threadID, nodeID string, stepID int, ...) error
- type CheckpointMetadata
- type Checkpointer
- type DatabaseConfig
- type DatabaseConnection
- type DatabaseConnectionManager
- type DatabaseType
- type Document
- type FileCheckpointer
- func (c *FileCheckpointer) Close() error
- func (c *FileCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
- func (c *FileCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (c *FileCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
- func (c *FileCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- type MemoryCheckpointer
- func (c *MemoryCheckpointer) Close() error
- func (c *MemoryCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
- func (c *MemoryCheckpointer) GetCheckpointCount() int
- func (c *MemoryCheckpointer) GetThreadIDs() []string
- func (c *MemoryCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (c *MemoryCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
- func (c *MemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- type PostgresCheckpointer
- func (p *PostgresCheckpointer) Close() error
- func (p *PostgresCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
- func (p *PostgresCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (p *PostgresCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
- func (p *PostgresCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- func (p *PostgresCheckpointer) SaveDocument(ctx context.Context, doc *Document) error
- func (p *PostgresCheckpointer) SearchDocuments(ctx context.Context, threadID string, queryEmbedding []float64, limit int) ([]*Document, error)
- type PostgresConnection
- func (p *PostgresConnection) Close() error
- func (p *PostgresConnection) Connect() error
- func (p *PostgresConnection) ExecuteQuery(ctx context.Context, query string, args ...interface{}) error
- func (p *PostgresConnection) GetConfig() *DatabaseConfig
- func (p *PostgresConnection) GetType() DatabaseType
- func (p *PostgresConnection) Ping() error
- func (p *PostgresConnection) QueryRow(ctx context.Context, query string, args ...interface{}) interface{}
- func (p *PostgresConnection) QueryRows(ctx context.Context, query string, args ...interface{}) (interface{}, error)
- type RedisCheckpointer
- func (r *RedisCheckpointer) Close() error
- func (r *RedisCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
- func (r *RedisCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (r *RedisCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
- func (r *RedisCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
- type Session
- type SessionManager
- func (sm *SessionManager) CreateSession(ctx context.Context, session *Session) error
- func (sm *SessionManager) CreateThread(ctx context.Context, thread *Thread) error
- func (sm *SessionManager) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (sm *SessionManager) GetThread(ctx context.Context, threadID string) (*Thread, error)
- type Thread
- type TimeTravel
- func (tt *TimeTravel) FindCheckpointByNode(ctx context.Context, threadID, nodeID string) (*CheckpointMetadata, error)
- func (tt *TimeTravel) FindCheckpointByStep(ctx context.Context, threadID string, stepID int) (*CheckpointMetadata, error)
- func (tt *TimeTravel) GetHistory(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
- func (tt *TimeTravel) RewindTo(ctx context.Context, threadID, checkpointID string) (*core.BaseState, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"`
State *core.BaseState `json:"state"`
Metadata map[string]interface{} `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
NodeID string `json:"node_id"`
StepID int `json:"step_id"`
}
Checkpoint represents a saved state
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
CheckpointManager manages checkpointing for graph execution
func NewCheckpointManager ¶
func NewCheckpointManager(checkpointer Checkpointer) *CheckpointManager
NewCheckpointManager creates a new checkpoint manager
func (*CheckpointManager) Close ¶
func (cm *CheckpointManager) Close() error
Close closes the checkpoint manager
func (*CheckpointManager) DeleteCheckpoint ¶
func (cm *CheckpointManager) DeleteCheckpoint(ctx context.Context, threadID, checkpointID string) error
DeleteCheckpoint deletes a checkpoint
func (*CheckpointManager) IsEnabled ¶
func (cm *CheckpointManager) IsEnabled() bool
IsEnabled returns true if checkpointing is enabled
func (*CheckpointManager) ListCheckpoints ¶
func (cm *CheckpointManager) ListCheckpoints(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
ListCheckpoints lists checkpoints for a thread
func (*CheckpointManager) LoadCheckpoint ¶
func (cm *CheckpointManager) LoadCheckpoint(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
LoadCheckpoint loads a checkpoint
func (*CheckpointManager) SaveCheckpoint ¶
func (cm *CheckpointManager) SaveCheckpoint(ctx context.Context, threadID, nodeID string, stepID int, state *core.BaseState) error
SaveCheckpoint saves a checkpoint
type CheckpointMetadata ¶
type CheckpointMetadata struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"`
Metadata map[string]interface{} `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
NodeID string `json:"node_id"`
StepID int `json:"step_id"`
}
CheckpointMetadata represents checkpoint metadata without the full state
type Checkpointer ¶
type Checkpointer interface {
// Save saves a state checkpoint
Save(ctx context.Context, checkpoint *Checkpoint) error
// Load loads a state checkpoint
Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
// List lists checkpoints for a thread
List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
// Delete deletes a checkpoint
Delete(ctx context.Context, threadID, checkpointID string) error
// Close closes the checkpointer
Close() error
}
Checkpointer defines the interface for state persistence
func CreateCheckpointer ¶
func CreateCheckpointer(config *DatabaseConfig) (Checkpointer, error)
CreateCheckpointer creates a checkpointer for the specified database
type DatabaseConfig ¶
type DatabaseConfig struct {
Type DatabaseType `json:"type"` // "postgres", "pgvector", "redis", "opensearch", etc.
Host string `json:"host"`
Port int `json:"port"`
Database string `json:"database"`
Username string `json:"username"`
Password string `json:"password"`
SSLMode string `json:"ssl_mode"`
MaxOpenConns int `json:"max_open_conns"`
MaxIdleConns int `json:"max_idle_conns"`
MaxLifetime string `json:"max_lifetime"`
// Vector-specific configuration
VectorDimension int `json:"vector_dimension"`
VectorMetric string `json:"vector_metric"` // "cosine", "euclidean", "dot_product"
// OpenSearch/Elasticsearch specific
Index string `json:"index"`
APIKey string `json:"api_key"`
CloudID string `json:"cloud_id"`
CACert string `json:"ca_cert"`
// Additional connection parameters
ConnectionParams map[string]string `json:"connection_params"`
// RAG-specific settings
EnableRAG bool `json:"enable_rag"`
EmbeddingModel string `json:"embedding_model"`
EmbeddingDimension int `json:"embedding_dimension"`
SimilarityThreshold float64 `json:"similarity_threshold"`
}
DatabaseConfig represents database configuration
func NewPgVectorConfig ¶
func NewPgVectorConfig(host string, port int, database, username, password string, vectorDim int) *DatabaseConfig
Helper function to create a PostgreSQL with pgvector configuration
func NewPostgresConfig ¶
func NewPostgresConfig(host string, port int, database, username, password string) *DatabaseConfig
Helper function to create a default PostgreSQL configuration
func NewRedisConfig ¶
func NewRedisConfig(host string, port int, password string) *DatabaseConfig
Helper function to create a Redis configuration
type DatabaseConnection ¶
type DatabaseConnection interface {
Connect() error
Close() error
Ping() error
GetType() DatabaseType
GetConfig() *DatabaseConfig
ExecuteQuery(ctx context.Context, query string, args ...interface{}) error
QueryRow(ctx context.Context, query string, args ...interface{}) interface{}
QueryRows(ctx context.Context, query string, args ...interface{}) (interface{}, error)
}
DatabaseConnection represents a database connection interface
type DatabaseConnectionManager ¶
type DatabaseConnectionManager struct {
// contains filtered or unexported fields
}
DatabaseConnectionManager manages multiple database connections
func NewDatabaseConnectionManager ¶
func NewDatabaseConnectionManager() *DatabaseConnectionManager
NewDatabaseConnectionManager creates a new connection manager
func (*DatabaseConnectionManager) AddConnection ¶
func (dcm *DatabaseConnectionManager) AddConnection(name string, config *DatabaseConfig) error
AddConnection adds a database connection
func (*DatabaseConnectionManager) CloseAll ¶
func (dcm *DatabaseConnectionManager) CloseAll() error
CloseAll closes all database connections
func (*DatabaseConnectionManager) GetConnection ¶
func (dcm *DatabaseConnectionManager) GetConnection(name string) (DatabaseConnection, error)
GetConnection retrieves a database connection
type DatabaseType ¶
type DatabaseType string
DatabaseType represents supported database types
const ( DatabaseTypePostgres DatabaseType = "postgres" DatabaseTypePostgresQL DatabaseType = "postgresql" DatabaseTypePgVector DatabaseType = "pgvector" DatabaseTypeRedis DatabaseType = "redis" DatabaseTypeOpenSearch DatabaseType = "opensearch" DatabaseTypeElastic DatabaseType = "elasticsearch" DatabaseTypeMongoDB DatabaseType = "mongodb" DatabaseTypeMySQL DatabaseType = "mysql" DatabaseTypeSQLite DatabaseType = "sqlite" )
type Document ¶
type Document struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"`
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata"`
Embedding []float64 `json:"embedding,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Document represents a document for RAG
type FileCheckpointer ¶
type FileCheckpointer struct {
// contains filtered or unexported fields
}
FileCheckpointer implements file-based checkpointing
func NewFileCheckpointer ¶
func NewFileCheckpointer(basePath string) *FileCheckpointer
NewFileCheckpointer creates a new file checkpointer
func (*FileCheckpointer) Close ¶
func (c *FileCheckpointer) Close() error
Close closes the file checkpointer
func (*FileCheckpointer) Delete ¶
func (c *FileCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
Delete deletes a checkpoint
func (*FileCheckpointer) List ¶
func (c *FileCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
List lists checkpoints for a thread
func (*FileCheckpointer) Load ¶
func (c *FileCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
Load loads a checkpoint from file
func (*FileCheckpointer) Save ¶
func (c *FileCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save saves a checkpoint to file
type MemoryCheckpointer ¶
type MemoryCheckpointer struct {
// contains filtered or unexported fields
}
MemoryCheckpointer implements in-memory checkpointing
func NewMemoryCheckpointer ¶
func NewMemoryCheckpointer() *MemoryCheckpointer
NewMemoryCheckpointer creates a new memory checkpointer
func (*MemoryCheckpointer) Close ¶
func (c *MemoryCheckpointer) Close() error
Close closes the memory checkpointer
func (*MemoryCheckpointer) Delete ¶
func (c *MemoryCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
Delete deletes a checkpoint
func (*MemoryCheckpointer) GetCheckpointCount ¶
func (c *MemoryCheckpointer) GetCheckpointCount() int
GetCheckpointCount returns the total number of checkpoints
func (*MemoryCheckpointer) GetThreadIDs ¶
func (c *MemoryCheckpointer) GetThreadIDs() []string
GetThreadIDs returns all thread IDs
func (*MemoryCheckpointer) List ¶
func (c *MemoryCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
List lists checkpoints for a thread
func (*MemoryCheckpointer) Load ¶
func (c *MemoryCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
Load loads a checkpoint from memory
func (*MemoryCheckpointer) Save ¶
func (c *MemoryCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save saves a checkpoint to memory
type PostgresCheckpointer ¶
type PostgresCheckpointer struct {
// contains filtered or unexported fields
}
PostgresCheckpointer implements database-based checkpointing with PostgreSQL
func NewPostgresCheckpointer ¶
func NewPostgresCheckpointer(config *DatabaseConfig) (*PostgresCheckpointer, error)
NewPostgresCheckpointer creates a new PostgreSQL checkpointer
func (*PostgresCheckpointer) Close ¶
func (p *PostgresCheckpointer) Close() error
Close closes the PostgreSQL checkpointer
func (*PostgresCheckpointer) Delete ¶
func (p *PostgresCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
Delete deletes a checkpoint
func (*PostgresCheckpointer) List ¶
func (p *PostgresCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
List lists checkpoints for a thread
func (*PostgresCheckpointer) Load ¶
func (p *PostgresCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
Load loads a checkpoint from PostgreSQL
func (*PostgresCheckpointer) Save ¶
func (p *PostgresCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save saves a checkpoint to PostgreSQL
func (*PostgresCheckpointer) SaveDocument ¶
func (p *PostgresCheckpointer) SaveDocument(ctx context.Context, doc *Document) error
SaveDocument saves a document for RAG
func (*PostgresCheckpointer) SearchDocuments ¶
func (p *PostgresCheckpointer) SearchDocuments(ctx context.Context, threadID string, queryEmbedding []float64, limit int) ([]*Document, error)
SearchDocuments performs similarity search on documents
type PostgresConnection ¶
type PostgresConnection struct {
// contains filtered or unexported fields
}
PostgresConnection implements PostgreSQL connection
func NewPostgresConnection ¶
func NewPostgresConnection(config *DatabaseConfig) (*PostgresConnection, error)
NewPostgresConnection creates a new PostgreSQL connection
func (*PostgresConnection) Close ¶
func (p *PostgresConnection) Close() error
Close closes the database connection
func (*PostgresConnection) Connect ¶
func (p *PostgresConnection) Connect() error
Connect establishes the PostgreSQL connection
func (*PostgresConnection) ExecuteQuery ¶
func (p *PostgresConnection) ExecuteQuery(ctx context.Context, query string, args ...interface{}) error
ExecuteQuery executes a query without returning results
func (*PostgresConnection) GetConfig ¶
func (p *PostgresConnection) GetConfig() *DatabaseConfig
GetConfig returns the database configuration
func (*PostgresConnection) GetType ¶
func (p *PostgresConnection) GetType() DatabaseType
GetType returns the database type
func (*PostgresConnection) Ping ¶
func (p *PostgresConnection) Ping() error
Ping tests the database connection
type RedisCheckpointer ¶
type RedisCheckpointer struct {
// contains filtered or unexported fields
}
RedisCheckpointer implements Redis-based checkpointing
func NewRedisCheckpointer ¶
func NewRedisCheckpointer(config *DatabaseConfig) (*RedisCheckpointer, error)
NewRedisCheckpointer creates a new Redis checkpointer
func (*RedisCheckpointer) Close ¶
func (r *RedisCheckpointer) Close() error
Close closes the Redis checkpointer
func (*RedisCheckpointer) Delete ¶
func (r *RedisCheckpointer) Delete(ctx context.Context, threadID, checkpointID string) error
Delete deletes a checkpoint
func (*RedisCheckpointer) List ¶
func (r *RedisCheckpointer) List(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
List lists checkpoints for a thread
func (*RedisCheckpointer) Load ¶
func (r *RedisCheckpointer) Load(ctx context.Context, threadID, checkpointID string) (*Checkpoint, error)
Load loads a checkpoint from Redis
func (*RedisCheckpointer) Save ¶
func (r *RedisCheckpointer) Save(ctx context.Context, checkpoint *Checkpoint) error
Save saves a checkpoint to Redis
type Session ¶
type Session struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"`
UserID string `json:"user_id"`
Metadata map[string]interface{} `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
}
Session represents a user session
type SessionManager ¶
type SessionManager struct {
// contains filtered or unexported fields
}
SessionManager manages user sessions and threads
func NewSessionManager ¶
func NewSessionManager(conn DatabaseConnection) *SessionManager
NewSessionManager creates a new session manager
func (*SessionManager) CreateSession ¶
func (sm *SessionManager) CreateSession(ctx context.Context, session *Session) error
CreateSession creates a new session
func (*SessionManager) CreateThread ¶
func (sm *SessionManager) CreateThread(ctx context.Context, thread *Thread) error
CreateThread creates a new thread
func (*SessionManager) GetSession ¶
GetSession retrieves a session
type Thread ¶
type Thread struct {
ID string `json:"id"`
Name string `json:"name"`
Metadata map[string]interface{} `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Thread represents a conversation thread
type TimeTravel ¶
type TimeTravel struct {
// contains filtered or unexported fields
}
TimeTravel provides time travel functionality
func NewTimeTravel ¶
func NewTimeTravel(checkpointManager *CheckpointManager) *TimeTravel
NewTimeTravel creates a new time travel instance
func (*TimeTravel) FindCheckpointByNode ¶
func (tt *TimeTravel) FindCheckpointByNode(ctx context.Context, threadID, nodeID string) (*CheckpointMetadata, error)
FindCheckpointByNode finds the latest checkpoint for a specific node
func (*TimeTravel) FindCheckpointByStep ¶
func (tt *TimeTravel) FindCheckpointByStep(ctx context.Context, threadID string, stepID int) (*CheckpointMetadata, error)
FindCheckpointByStep finds a checkpoint by step ID
func (*TimeTravel) GetHistory ¶
func (tt *TimeTravel) GetHistory(ctx context.Context, threadID string) ([]*CheckpointMetadata, error)
GetHistory returns the execution history for a thread