Documentation
¶
Overview ¶
Package postgres provides PostgreSQL database operations for the storage system.
nolint: errcheck // Operations may ignore return values Package postgres provides PostgreSQL database operations for the storage system.
Package postgres provides PostgreSQL database operations for the storage system.
nolint: errcheck // Operations may ignore return values
nolint: errcheck // Operations may ignore return values
Package postgres provides PostgreSQL database operations for the storage system.
Package postgres provides PostgreSQL database operations for the storage system.
Package postgres provides PostgreSQL database operations for the storage system.
Package postgres provides utility functions for vector operations.
Package postgres provides PostgreSQL database operations for the storage system.
Index ¶
- Variables
- func FormatVector(embedding []float64) string
- func Migrate(ctx context.Context, pool *Pool) error
- func MigrateStorage(ctx context.Context, pool *Pool) error
- func NormalizeVector(embedding []float64) []float64
- func RollbackLast(ctx context.Context, pool *Pool) error
- func Seed(ctx context.Context, pool *Pool) error
- func WithDeleteTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func WithInsertTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func WithQueryTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func WithTransactionTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func WithUpdateTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- func WithVectorSearchTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- type CircuitBreaker
- type CircuitBreakerState
- type Config
- type DBTX
- type EmbeddingConfig
- type EmbeddingQueue
- func (q *EmbeddingQueue) Enqueue(ctx context.Context, task *EmbeddingTask) error
- func (q *EmbeddingQueue) FetchPendingTasks(ctx context.Context, limit int) ([]*EmbeddingTask, error)
- func (q *EmbeddingQueue) MarkCompleted(ctx context.Context, taskID string) error
- func (q *EmbeddingQueue) MarkFailed(ctx context.Context, taskID string, errMessage string) error
- func (q *EmbeddingQueue) MarkProcessing(ctx context.Context, taskID string) error
- func (q *EmbeddingQueue) Reconcile(ctx context.Context, threshold time.Duration) error
- type EmbeddingReconciler
- type EmbeddingTask
- type ManagedRows
- type Pool
- func (p *Pool) Begin(ctx context.Context) (*sql.Tx, error)
- func (p *Pool) Close() error
- func (p *Pool) Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (p *Pool) Get(ctx context.Context) (*sql.Conn, error)
- func (p *Pool) GetDB() *sql.DB
- func (p *Pool) IsHealthy() bool
- func (p *Pool) Ping(ctx context.Context) error
- func (p *Pool) Query(ctx context.Context, query string, args ...any) (*ManagedRows, error)
- func (p *Pool) QueryRow(ctx context.Context, query string, args ...any) *sql.Row
- func (p *Pool) Release(conn *sql.Conn)
- func (p *Pool) Stats() *PoolStats
- func (p *Pool) WithConnection(ctx context.Context, fn func(*sql.Conn) error) error
- type PoolStats
- type ProfileRepository
- func (r *ProfileRepository) Create(ctx context.Context, profile *models.UserProfile) error
- func (r *ProfileRepository) Delete(ctx context.Context, userID string) error
- func (r *ProfileRepository) Exists(ctx context.Context, userID string) (bool, error)
- func (r *ProfileRepository) GetByID(ctx context.Context, userID string) (*models.UserProfile, error)
- func (r *ProfileRepository) Update(ctx context.Context, profile *models.UserProfile) error
- type RecommendRepository
- func (r *RecommendRepository) Create(ctx context.Context, result *models.RecommendResult) error
- func (r *RecommendRepository) Delete(ctx context.Context, sessionID string) error
- func (r *RecommendRepository) GetBySessionID(ctx context.Context, sessionID string) (*models.RecommendResult, error)
- func (r *RecommendRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.RecommendResult, error)
- func (r *RecommendRepository) UpdateFeedback(ctx context.Context, sessionID string, feedback *models.UserFeedback) error
- type Repository
- func (r *Repository) Close() error
- func (r *Repository) Commit() error
- func (r *Repository) GetSessionWithResult(ctx context.Context, sessionID string) (*models.Session, *models.RecommendResult, error)
- func (r *Repository) IsTransaction() bool
- func (r *Repository) Pool() *Pool
- func (r *Repository) Rollback() error
- func (r *Repository) SaveProfile(ctx context.Context, profile *models.UserProfile) error
- func (r *Repository) SaveSession(ctx context.Context, session *models.Session, result *models.RecommendResult) error
- func (r *Repository) Transaction(ctx context.Context, fn func(repo *Repository) error) error
- func (r *Repository) WithTransaction(ctx context.Context) (*Repository, error)
- type RetrievalGuard
- func (g *RetrievalGuard) AllowRateLimit() error
- func (g *RetrievalGuard) CheckDBTimeout(ctx context.Context) error
- func (g *RetrievalGuard) CheckEmbeddingCircuitBreaker() error
- func (g *RetrievalGuard) GetCircuitBreakerState() CircuitBreakerState
- func (g *RetrievalGuard) RecordEmbeddingFailure()
- func (g *RetrievalGuard) RecordEmbeddingSuccess()
- func (g *RetrievalGuard) ResetCircuitBreaker()
- func (g *RetrievalGuard) WithDBTimeout(ctx context.Context) (context.Context, context.CancelFunc)
- type SearchResult
- type SecurityError
- type SecurityErrorType
- type SessionRepository
- func (r *SessionRepository) CleanupExpired(ctx context.Context) (int64, error)
- func (r *SessionRepository) Create(ctx context.Context, session *models.Session) error
- func (r *SessionRepository) Delete(ctx context.Context, sessionID string) error
- func (r *SessionRepository) GetByID(ctx context.Context, sessionID string) (*models.Session, error)
- func (r *SessionRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.Session, error)
- func (r *SessionRepository) Update(ctx context.Context, session *models.Session) error
- type TenantGuard
- func (g *TenantGuard) ClearTenantContext(ctx context.Context) error
- func (g *TenantGuard) MustSetTenantContext(ctx context.Context, tenantID string) error
- func (g *TenantGuard) SetTenantContext(ctx context.Context, tenantID string) error
- func (g *TenantGuard) WithTenant(ctx context.Context, tenantID string, fn func(context.Context) error) error
- type VectorSearcher
- func (v *VectorSearcher) AddEmbedding(ctx context.Context, table, id string, embedding []float64, ...) error
- func (v *VectorSearcher) CreateVectorTable(ctx context.Context, table string, metadataSchema string) error
- func (v *VectorSearcher) DeleteEmbedding(ctx context.Context, table, id string) error
- func (v *VectorSearcher) Search(ctx context.Context, table string, embedding []float64, limit int) ([]*SearchResult, error)
- type WriteBuffer
- type WriteItem
Constants ¶
This section is empty.
Variables ¶
var DefaultTimeouts = struct { Query time.Duration Insert time.Duration Update time.Duration Delete time.Duration Transaction time.Duration VectorSearch time.Duration }{ Query: 30 * time.Second, Insert: 20 * time.Second, Update: 20 * time.Second, Delete: 20 * time.Second, Transaction: 60 * time.Second, VectorSearch: 10 * time.Second, }
DefaultTimeouts defines default timeout values for database operations.
Functions ¶
func FormatVector ¶
FormatVector converts []float64 to pgvector format string. This properly formats the embedding array to avoid double brackets. Uses %.6f format to limit decimal places to 6 for compact representation.
func MigrateStorage ¶
MigrateStorage runs the storage system database migrations. This creates the new vector-based storage schema with 6 core tables and supporting indexes.
func NormalizeVector ¶
NormalizeVector normalizes a vector to unit length. This is required for pgvector's cosine distance operator (<=>).
func RollbackLast ¶
RollbackLast rolls back the last migration.
func WithDeleteTimeout ¶
WithDeleteTimeout ensures the context has a timeout suitable for delete operations.
func WithInsertTimeout ¶
WithInsertTimeout ensures the context has a timeout suitable for insert operations.
func WithQueryTimeout ¶
WithQueryTimeout ensures the context has a timeout suitable for query operations.
func WithTransactionTimeout ¶
WithTransactionTimeout ensures the context has a timeout suitable for transaction operations.
func WithUpdateTimeout ¶
WithUpdateTimeout ensures the context has a timeout suitable for update operations.
func WithVectorSearchTimeout ¶
WithVectorSearchTimeout ensures the context has a timeout suitable for vector search operations.
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker provides failure detection and automatic fallback for unreliable services. This implements the circuit breaker pattern to prevent cascading failures.
func NewCircuitBreaker ¶
func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new CircuitBreaker instance. Args: failureThreshold - number of failures before opening the circuit. openTimeout - time to wait before attempting half-open state. Returns new CircuitBreaker instance.
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() error
AllowRequest checks if a request should be allowed based on circuit breaker state. Returns error if circuit is open or enters open state.
func (*CircuitBreaker) Close ¶
func (cb *CircuitBreaker) Close()
Close stops the cleanup goroutine and closes the circuit breaker.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() CircuitBreakerState
State returns the current circuit breaker state. Returns current state.
type CircuitBreakerState ¶
type CircuitBreakerState string
CircuitBreakerState represents the state of a circuit breaker.
const ( CircuitBreakerStateClosed CircuitBreakerState = "closed" CircuitBreakerStateOpen CircuitBreakerState = "open" CircuitBreakerStateHalfOpen CircuitBreakerState = "half-open" )
type Config ¶
type Config struct {
Host string
Port int
User string
Password string
Database string
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
QueryTimeout time.Duration
Embedding *EmbeddingConfig
}
Config represents the database configuration.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns the default database configuration.
type DBTX ¶
type DBTX interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
DBTX is an interface that both *sql.DB and *sql.Tx satisfy.
type EmbeddingConfig ¶
type EmbeddingConfig struct {
DefaultModel string
DefaultVersion int
MaxRetries int
MaxBatchSize int
MaxVectorSearchLimit int
ReconcileBatchSize int
EmbeddingTimeout time.Duration
}
EmbeddingConfig represents embedding-related configuration.
func DefaultEmbeddingConfig ¶
func DefaultEmbeddingConfig() *EmbeddingConfig
DefaultEmbeddingConfig returns the default embedding configuration.
func (*EmbeddingConfig) Validate ¶
func (e *EmbeddingConfig) Validate() error
Validate validates the embedding configuration.
type EmbeddingQueue ¶
type EmbeddingQueue struct {
// contains filtered or unexported fields
}
EmbeddingQueue manages async embedding tasks with idempotency and retry logic. This provides eventual consistency for embedding operations using a database-backed queue.
func NewEmbeddingQueue ¶
func NewEmbeddingQueue(pool *Pool, embeddingConfig *EmbeddingConfig) *EmbeddingQueue
NewEmbeddingQueue creates a new EmbeddingQueue instance. Args: pool - database connection pool. embeddingConfig - embedding configuration for retry settings. Returns new EmbeddingQueue instance.
func (*EmbeddingQueue) Enqueue ¶
func (q *EmbeddingQueue) Enqueue(ctx context.Context, task *EmbeddingTask) error
Enqueue adds an embedding task to the queue with idempotency protection. This uses dedupe_key to prevent duplicate tasks for the same content. Args: ctx - database operation context. task - embedding task to enqueue. Returns error if enqueue operation fails.
func (*EmbeddingQueue) FetchPendingTasks ¶
func (q *EmbeddingQueue) FetchPendingTasks(ctx context.Context, limit int) ([]*EmbeddingTask, error)
FetchPendingTasks retrieves pending embedding tasks with locking. This uses FOR UPDATE SKIP LOCKED to enable multiple concurrent workers. Args: ctx - database operation context. limit - maximum number of tasks to fetch. Returns list of pending tasks or error if fetch fails.
func (*EmbeddingQueue) MarkCompleted ¶
func (q *EmbeddingQueue) MarkCompleted(ctx context.Context, taskID string) error
MarkCompleted marks a task as successfully completed. Args: ctx - database operation context. taskID - task identifier. Returns error if update fails.
func (*EmbeddingQueue) MarkFailed ¶
MarkFailed marks a task as failed and updates retry count. This implements exponential backoff for retries. Args: ctx - database operation context. taskID - task identifier. errMessage - error message to store. Returns error if update fails or task exceeded max retries.
func (*EmbeddingQueue) MarkProcessing ¶
func (q *EmbeddingQueue) MarkProcessing(ctx context.Context, taskID string) error
MarkProcessing marks a task as being processed. Args: ctx - database operation context. taskID - task identifier. Returns error if update fails.
func (*EmbeddingQueue) Reconcile ¶
Reconcile finds orphaned tasks that were never processed and re-enqueues them. This provides eventual consistency for tasks that were lost between DB write and queue enqueue. Args: ctx - database operation context. threshold - time threshold to consider a task orphaned. Returns error if reconciliation fails.
type EmbeddingReconciler ¶
type EmbeddingReconciler struct {
// contains filtered or unexported fields
}
EmbeddingReconciler provides eventual consistency for embedding operations. This scans for orphaned tasks where DB write succeeded but queue enqueue failed.
func NewEmbeddingReconciler ¶
func NewEmbeddingReconciler(db *Pool, queue *EmbeddingQueue, embeddingConfig *EmbeddingConfig, interval, missingThreshold time.Duration) *EmbeddingReconciler
NewEmbeddingReconciler creates a new EmbeddingReconciler instance. Args: db - database connection pool. queue - embedding queue for re-enqueuing tasks. embeddingConfig - embedding configuration for model and version settings. interval - time between reconciliation scans. missingThreshold - time after which a task is considered orphaned. Returns new EmbeddingReconciler instance.
func (*EmbeddingReconciler) Reconcile ¶
func (r *EmbeddingReconciler) Reconcile(ctx context.Context) error
Reconcile scans for orphaned embedding tasks and re-enqueues them. This addresses the case where DB write succeeded but queue enqueue failed. Args: ctx - database operation context. Returns error if reconciliation fails.
func (*EmbeddingReconciler) Start ¶
func (r *EmbeddingReconciler) Start(ctx context.Context)
Start begins periodic reconciliation scanning. This runs until Stop is called or context is cancelled. Args: ctx - context for cancellation.
func (*EmbeddingReconciler) Stop ¶
func (r *EmbeddingReconciler) Stop()
Stop gracefully shuts down the reconciler. This method is idempotent and safe to call multiple times.
type EmbeddingTask ¶
type EmbeddingTask struct {
TaskID string
Table string
Content string
TenantID string
Model string
Version int
}
EmbeddingTask represents a single embedding task.
type ManagedRows ¶
ManagedRows wraps sql.Rows and manages connection lifecycle.
func (*ManagedRows) Close ¶
func (m *ManagedRows) Close() error
Close closes the rows and releases the connection.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents a database connection pool with "get usage release" pattern.
func (*Pool) GetDB ¶
GetDB returns the underlying *sql.DB for repository initialization. This is needed for repository constructors that require *sql.DB.
func (*Pool) Query ¶
Query executes a query and returns rows. The connection is released when rows are closed.
type PoolStats ¶
type PoolStats struct {
OpenConnections int
InUseConnections int
IdleConnections int
WaitCount int64
WaitDuration time.Duration
MaxOpenConns int
}
PoolStats holds pool statistics.
type ProfileRepository ¶
type ProfileRepository struct {
// contains filtered or unexported fields
}
ProfileRepository handles user profile persistence.
func NewProfileRepository ¶
func NewProfileRepository(pool *Pool) *ProfileRepository
NewProfileRepository creates a new ProfileRepository.
func NewProfileRepositoryWithDB ¶
func NewProfileRepositoryWithDB(db DBTX) *ProfileRepository
NewProfileRepositoryWithDB creates a new ProfileRepository with a transaction or connection.
func (*ProfileRepository) Create ¶
func (r *ProfileRepository) Create(ctx context.Context, profile *models.UserProfile) error
Create creates a new user profile.
func (*ProfileRepository) Delete ¶
func (r *ProfileRepository) Delete(ctx context.Context, userID string) error
Delete deletes a user profile.
func (*ProfileRepository) GetByID ¶
func (r *ProfileRepository) GetByID(ctx context.Context, userID string) (*models.UserProfile, error)
GetByID retrieves a user profile by ID.
func (*ProfileRepository) Update ¶
func (r *ProfileRepository) Update(ctx context.Context, profile *models.UserProfile) error
Update updates a user profile.
type RecommendRepository ¶
type RecommendRepository struct {
// contains filtered or unexported fields
}
RecommendRepository handles recommendation persistence.
func NewRecommendRepository ¶
func NewRecommendRepository(pool *Pool) *RecommendRepository
NewRecommendRepository creates a new RecommendRepository.
func NewRecommendRepositoryWithDB ¶
func NewRecommendRepositoryWithDB(db DBTX) *RecommendRepository
NewRecommendRepositoryWithDB creates a new RecommendRepository with a transaction or connection.
func (*RecommendRepository) Create ¶
func (r *RecommendRepository) Create(ctx context.Context, result *models.RecommendResult) error
Create creates a new recommendation result.
func (*RecommendRepository) Delete ¶
func (r *RecommendRepository) Delete(ctx context.Context, sessionID string) error
Delete deletes a recommendation.
func (*RecommendRepository) GetBySessionID ¶
func (r *RecommendRepository) GetBySessionID(ctx context.Context, sessionID string) (*models.RecommendResult, error)
GetBySessionID retrieves a recommendation by session ID.
func (*RecommendRepository) ListByUserID ¶
func (r *RecommendRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.RecommendResult, error)
ListByUserID lists recommendations by user ID.
func (*RecommendRepository) UpdateFeedback ¶
func (r *RecommendRepository) UpdateFeedback(ctx context.Context, sessionID string, feedback *models.UserFeedback) error
UpdateFeedback updates user feedback for a recommendation.
type Repository ¶
type Repository struct {
Session *SessionRepository
Recommend *RecommendRepository
Profile *ProfileRepository
Vector *VectorSearcher
// contains filtered or unexported fields
}
Repository provides a unified interface for all data access.
func NewRepository ¶
func NewRepository(pool *Pool) *Repository
NewRepository creates a new Repository with all sub-repositories.
func (*Repository) Close ¶
func (r *Repository) Close() error
Close closes the repository and its pool.
func (*Repository) GetSessionWithResult ¶
func (r *Repository) GetSessionWithResult(ctx context.Context, sessionID string) (*models.Session, *models.RecommendResult, error)
GetSessionWithResult retrieves a session with its recommendation result.
func (*Repository) IsTransaction ¶
func (r *Repository) IsTransaction() bool
IsTransaction returns true if this repository is in transaction mode.
func (*Repository) Pool ¶
func (r *Repository) Pool() *Pool
Pool returns the underlying connection pool.
func (*Repository) Rollback ¶
func (r *Repository) Rollback() error
Rollback rolls back the transaction.
func (*Repository) SaveProfile ¶
func (r *Repository) SaveProfile(ctx context.Context, profile *models.UserProfile) error
SaveProfile saves a user profile.
func (*Repository) SaveSession ¶
func (r *Repository) SaveSession(ctx context.Context, session *models.Session, result *models.RecommendResult) error
SaveSession saves a session and its results.
func (*Repository) Transaction ¶
func (r *Repository) Transaction(ctx context.Context, fn func(repo *Repository) error) error
Transaction executes a function within a transaction.
func (*Repository) WithTransaction ¶
func (r *Repository) WithTransaction(ctx context.Context) (*Repository, error)
WithTransaction creates a new repository bound to a transaction.
type RetrievalGuard ¶
type RetrievalGuard struct {
// contains filtered or unexported fields
}
RetrievalGuard provides protection mechanisms for retrieval operations. This implements rate limiting, circuit breaking, and timeout protection to prevent system overload.
func NewRetrievalGuard ¶
func NewRetrievalGuard(maxRequestsPerSec int, failureThreshold int, openTimeout, dbTimeout time.Duration) *RetrievalGuard
NewRetrievalGuard creates a new RetrievalGuard instance. Args: maxRequestsPerSec - maximum requests per second allowed. failureThreshold - number of failures before opening circuit breaker. openTimeout - time to wait before attempting half-open state. dbTimeout - database operation timeout. Returns new RetrievalGuard instance.
func (*RetrievalGuard) AllowRateLimit ¶
func (g *RetrievalGuard) AllowRateLimit() error
AllowRateLimit checks if a request should be allowed based on rate limiting. Returns error if rate limit is exceeded.
func (*RetrievalGuard) CheckDBTimeout ¶
func (g *RetrievalGuard) CheckDBTimeout(ctx context.Context) error
CheckDBTimeout checks if a database operation exceeded the timeout. Returns error if context deadline was exceeded.
func (*RetrievalGuard) CheckEmbeddingCircuitBreaker ¶
func (g *RetrievalGuard) CheckEmbeddingCircuitBreaker() error
CheckEmbeddingCircuitBreaker checks if embedding service is available. Returns error if circuit breaker is open, triggering fallback to keyword-only search.
func (*RetrievalGuard) GetCircuitBreakerState ¶
func (g *RetrievalGuard) GetCircuitBreakerState() CircuitBreakerState
GetCircuitBreakerState returns the current circuit breaker state. This is primarily used for monitoring and debugging. Returns current circuit breaker state.
func (*RetrievalGuard) RecordEmbeddingFailure ¶
func (g *RetrievalGuard) RecordEmbeddingFailure()
RecordEmbeddingFailure records a failed embedding operation. This may trigger circuit breaker opening if failure threshold is reached.
func (*RetrievalGuard) RecordEmbeddingSuccess ¶
func (g *RetrievalGuard) RecordEmbeddingSuccess()
RecordEmbeddingSuccess records a successful embedding operation. This may help recover circuit breaker from open state.
func (*RetrievalGuard) ResetCircuitBreaker ¶
func (g *RetrievalGuard) ResetCircuitBreaker()
ResetCircuitBreaker resets the circuit breaker to closed state. This is primarily used for testing purposes.
func (*RetrievalGuard) WithDBTimeout ¶
func (g *RetrievalGuard) WithDBTimeout(ctx context.Context) (context.Context, context.CancelFunc)
WithDBTimeout creates a context with database timeout protection. This prevents long-running database queries from blocking the system. Args: ctx - original context. Returns new context with timeout and cancel function.
type SearchResult ¶
SearchResult represents a vector search result.
type SecurityError ¶
type SecurityError struct {
Type SecurityErrorType
Message string
}
SecurityError represents a security-related error.
func (*SecurityError) Error ¶
func (e *SecurityError) Error() string
type SecurityErrorType ¶
type SecurityErrorType string
SecurityErrorType represents different types of security errors.
const ( SecurityErrorInvalidIdentifier SecurityErrorType = "invalid_identifier" SecurityErrorInjectionAttempt SecurityErrorType = "injection_attempt" SecurityErrorInvalidInput SecurityErrorType = "invalid_input" )
type SessionRepository ¶
type SessionRepository struct {
// contains filtered or unexported fields
}
SessionRepository handles session persistence.
func NewSessionRepository ¶
func NewSessionRepository(pool *Pool) *SessionRepository
NewSessionRepository creates a new SessionRepository.
func NewSessionRepositoryWithDB ¶
func NewSessionRepositoryWithDB(db DBTX) *SessionRepository
NewSessionRepositoryWithDB creates a new SessionRepository with a transaction or connection.
func (*SessionRepository) CleanupExpired ¶
func (r *SessionRepository) CleanupExpired(ctx context.Context) (int64, error)
CleanupExpired removes expired sessions.
func (*SessionRepository) Delete ¶
func (r *SessionRepository) Delete(ctx context.Context, sessionID string) error
Delete deletes a session.
func (*SessionRepository) ListByUserID ¶
func (r *SessionRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.Session, error)
ListByUserID lists sessions by user ID.
type TenantGuard ¶
type TenantGuard struct {
// contains filtered or unexported fields
}
TenantGuard provides physical isolation for multi-tenant data access. This enforces tenant context at the database level to prevent cross-tenant data access.
func NewTenantGuard ¶
func NewTenantGuard(pool *Pool) *TenantGuard
NewTenantGuard creates a new TenantGuard instance.
func (*TenantGuard) ClearTenantContext ¶
func (g *TenantGuard) ClearTenantContext(ctx context.Context) error
ClearTenantContext clears the tenant context. This is primarily used for cleanup and testing purposes. Args: ctx - database operation context. Returns error if clearing tenant context fails.
func (*TenantGuard) MustSetTenantContext ¶
func (g *TenantGuard) MustSetTenantContext(ctx context.Context, tenantID string) error
MustSetTenantContext sets the tenant context and returns error on failure. This should only be used in initialization paths where failure is fatal. Args: ctx - database operation context. tenantID - tenant identifier. Returns: error - if tenant context setup fails.
func (*TenantGuard) SetTenantContext ¶
func (g *TenantGuard) SetTenantContext(ctx context.Context, tenantID string) error
SetTenantContext sets the tenant context for the current database session. This MUST be called for every tenant-specific operation to ensure physical isolation.
NOTE: Uses SET LOCAL to set tenant context within the current transaction only. This ensures tenant isolation works correctly with connection pooling, as SET LOCAL only affects the current transaction and is reset when the transaction ends. This prevents tenant context leakage across different connections in the pool.
Args: ctx - database operation context. tenantID - tenant identifier, must be non-empty. Returns error if setting tenant context fails.
func (*TenantGuard) WithTenant ¶
func (g *TenantGuard) WithTenant(ctx context.Context, tenantID string, fn func(context.Context) error) error
WithTenant executes a function within a tenant context. This is a convenience wrapper that ensures tenant context is set before execution. Args: ctx - database operation context. tenantID - tenant identifier. fn - function to execute within tenant context. Returns error if tenant context setup or function execution fails.
type VectorSearcher ¶
type VectorSearcher struct {
// contains filtered or unexported fields
}
VectorSearcher handles vector similarity search.
func NewVectorSearcher ¶
func NewVectorSearcher(pool *Pool, embeddingConfig *EmbeddingConfig) *VectorSearcher
NewVectorSearcher creates a new VectorSearcher. Args: pool - database connection pool. embeddingConfig - embedding configuration for search limit settings. Returns new VectorSearcher instance.
func NewVectorSearcherWithDB ¶
func NewVectorSearcherWithDB(db DBTX, embeddingConfig *EmbeddingConfig) *VectorSearcher
NewVectorSearcherWithDB creates a new VectorSearcher with a transaction or connection. Args: db - database transaction or connection. embeddingConfig - embedding configuration for search limit settings. Returns new VectorSearcher instance.
func (*VectorSearcher) AddEmbedding ¶
func (v *VectorSearcher) AddEmbedding(ctx context.Context, table, id string, embedding []float64, metadata map[string]any) error
AddEmbedding adds a vector embedding to the specified table.
func (*VectorSearcher) CreateVectorTable ¶
func (v *VectorSearcher) CreateVectorTable(ctx context.Context, table string, metadataSchema string) error
CreateVectorTable creates a table with vector support. This is a simplified implementation - in production use proper pgvector setup.
func (*VectorSearcher) DeleteEmbedding ¶
func (v *VectorSearcher) DeleteEmbedding(ctx context.Context, table, id string) error
DeleteEmbedding deletes a vector embedding.
func (*VectorSearcher) Search ¶
func (v *VectorSearcher) Search(ctx context.Context, table string, embedding []float64, limit int) ([]*SearchResult, error)
Search performs a vector similarity search. This is a simplified implementation that uses pgvector if available.
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
WriteBuffer provides write batching to reduce database and embedding load. This implements an in-memory buffer with periodic flushing to batch database operations.
func NewWriteBuffer ¶
func NewWriteBuffer(pool *Pool, queue *EmbeddingQueue, batchSize int, flushInterval time.Duration, embeddingConfig *EmbeddingConfig) *WriteBuffer
NewWriteBuffer creates a new WriteBuffer instance. Args: pool - database connection pool. queue - embedding queue for async processing. batchSize - number of items to batch before flushing. flushInterval - maximum time between flushes. embeddingConfig - embedding configuration for model and version settings. Returns new WriteBuffer instance.
func (*WriteBuffer) Start ¶
func (b *WriteBuffer) Start(ctx context.Context) error
Start begins the buffer processing loop in a background goroutine. This method returns immediately after starting the goroutine. The processing loop runs until Stop is called.
Args: ctx - context for cancellation and graceful shutdown. Returns error if the goroutine fails to start.
func (*WriteBuffer) Stop ¶
func (b *WriteBuffer) Stop(ctx context.Context) error
Stop gracefully shuts down the buffer and flushes remaining items. This should be called during application shutdown.
Thread-safety: Uses sync.Once to ensure the channel is closed only once, preventing panic from concurrent close operations. The stopped flag is checked atomically to avoid unnecessary mutex contention.
Args: ctx - context for cancellation. Returns error if stopping fails.
func (*WriteBuffer) Write ¶
func (b *WriteBuffer) Write(ctx context.Context, item *WriteItem) error
Write queues a write operation for batch processing. This is non-blocking and returns immediately if the buffer has capacity. If the buffer is full, it returns an error instead of spawning a goroutine.
Thread-safety: The stopped flag is checked and set atomically under mutex. Channel send is performed outside the lock to prevent deadlock when the buffer is full and Stop() is called concurrently.
Args: ctx - context for cancellation. item - write operation to queue. Returns error if buffer is stopped, item is invalid, or buffer is full.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package adapters provides format conversion layer for storage operations.
|
Package adapters provides format conversion layer for storage operations. |
|
nolint: errcheck // Operations may ignore return values
|
nolint: errcheck // Operations may ignore return values |
|
Package models defines data structures for the storage system.
|
Package models defines data structures for the storage system. |
|
Package repositories provides data access layer for storage system.
|
Package repositories provides data access layer for storage system. |
|
Package services provides retrieval services for the storage system.
|
Package services provides retrieval services for the storage system. |