postgres

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package postgres provides PostgreSQL database operations for the storage system.

nolint: errcheck // Operations may ignore return values Package postgres provides PostgreSQL database operations for the storage system.

Package postgres provides PostgreSQL database operations for the storage system.

nolint: errcheck // Operations may ignore return values

nolint: errcheck // Operations may ignore return values

Package postgres provides PostgreSQL database operations for the storage system.

Package postgres provides PostgreSQL database operations for the storage system.

Package postgres provides PostgreSQL database operations for the storage system.

Package postgres provides utility functions for vector operations.

Package postgres provides PostgreSQL database operations for the storage system.

Index

Constants

This section is empty.

Variables

View Source
var DefaultTimeouts = struct {
	Query        time.Duration
	Insert       time.Duration
	Update       time.Duration
	Delete       time.Duration
	Transaction  time.Duration
	VectorSearch time.Duration
}{
	Query:        30 * time.Second,
	Insert:       20 * time.Second,
	Update:       20 * time.Second,
	Delete:       20 * time.Second,
	Transaction:  60 * time.Second,
	VectorSearch: 10 * time.Second,
}

DefaultTimeouts defines default timeout values for database operations.

Functions

func FormatVector

func FormatVector(embedding []float64) string

FormatVector converts []float64 to pgvector format string. This properly formats the embedding array to avoid double brackets. Uses %.6f format to limit decimal places to 6 for compact representation.

func Migrate

func Migrate(ctx context.Context, pool *Pool) error

Migrate runs database migrations.

func MigrateStorage

func MigrateStorage(ctx context.Context, pool *Pool) error

MigrateStorage runs the storage system database migrations. This creates the new vector-based storage schema with 6 core tables and supporting indexes.

func NormalizeVector

func NormalizeVector(embedding []float64) []float64

NormalizeVector normalizes a vector to unit length. This is required for pgvector's cosine distance operator (<=>).

func RollbackLast

func RollbackLast(ctx context.Context, pool *Pool) error

RollbackLast rolls back the last migration.

func Seed

func Seed(ctx context.Context, pool *Pool) error

Seed creates seed data for testing.

func WithDeleteTimeout

func WithDeleteTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithDeleteTimeout ensures the context has a timeout suitable for delete operations.

func WithInsertTimeout

func WithInsertTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithInsertTimeout ensures the context has a timeout suitable for insert operations.

func WithQueryTimeout

func WithQueryTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithQueryTimeout ensures the context has a timeout suitable for query operations.

func WithTransactionTimeout

func WithTransactionTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithTransactionTimeout ensures the context has a timeout suitable for transaction operations.

func WithUpdateTimeout

func WithUpdateTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithUpdateTimeout ensures the context has a timeout suitable for update operations.

func WithVectorSearchTimeout

func WithVectorSearchTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithVectorSearchTimeout ensures the context has a timeout suitable for vector search operations.

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker provides failure detection and automatic fallback for unreliable services. This implements the circuit breaker pattern to prevent cascading failures.

func NewCircuitBreaker

func NewCircuitBreaker(failureThreshold int, openTimeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new CircuitBreaker instance. Args: failureThreshold - number of failures before opening the circuit. openTimeout - time to wait before attempting half-open state. Returns new CircuitBreaker instance.

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() error

AllowRequest checks if a request should be allowed based on circuit breaker state. Returns error if circuit is open or enters open state.

func (*CircuitBreaker) Close

func (cb *CircuitBreaker) Close()

Close stops the cleanup goroutine and closes the circuit breaker.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitBreakerState

State returns the current circuit breaker state. Returns current state.

type CircuitBreakerState

type CircuitBreakerState string

CircuitBreakerState represents the state of a circuit breaker.

const (
	CircuitBreakerStateClosed   CircuitBreakerState = "closed"
	CircuitBreakerStateOpen     CircuitBreakerState = "open"
	CircuitBreakerStateHalfOpen CircuitBreakerState = "half-open"
)

type Config

type Config struct {
	Host            string
	Port            int
	User            string
	Password        string
	Database        string
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime time.Duration
	ConnMaxIdleTime time.Duration
	QueryTimeout    time.Duration
	Embedding       *EmbeddingConfig
}

Config represents the database configuration.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns the default database configuration.

func (*Config) DSN

func (c *Config) DSN() string

DSN returns the connection string.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type DBTX

type DBTX interface {
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

DBTX is an interface that both *sql.DB and *sql.Tx satisfy.

type EmbeddingConfig

type EmbeddingConfig struct {
	DefaultModel         string
	DefaultVersion       int
	MaxRetries           int
	MaxBatchSize         int
	MaxVectorSearchLimit int
	ReconcileBatchSize   int
	EmbeddingTimeout     time.Duration
}

EmbeddingConfig represents embedding-related configuration.

func DefaultEmbeddingConfig

func DefaultEmbeddingConfig() *EmbeddingConfig

DefaultEmbeddingConfig returns the default embedding configuration.

func (*EmbeddingConfig) Validate

func (e *EmbeddingConfig) Validate() error

Validate validates the embedding configuration.

type EmbeddingQueue

type EmbeddingQueue struct {
	// contains filtered or unexported fields
}

EmbeddingQueue manages async embedding tasks with idempotency and retry logic. This provides eventual consistency for embedding operations using a database-backed queue.

func NewEmbeddingQueue

func NewEmbeddingQueue(pool *Pool, embeddingConfig *EmbeddingConfig) *EmbeddingQueue

NewEmbeddingQueue creates a new EmbeddingQueue instance. Args: pool - database connection pool. embeddingConfig - embedding configuration for retry settings. Returns new EmbeddingQueue instance.

func (*EmbeddingQueue) Enqueue

func (q *EmbeddingQueue) Enqueue(ctx context.Context, task *EmbeddingTask) error

Enqueue adds an embedding task to the queue with idempotency protection. This uses dedupe_key to prevent duplicate tasks for the same content. Args: ctx - database operation context. task - embedding task to enqueue. Returns error if enqueue operation fails.

func (*EmbeddingQueue) FetchPendingTasks

func (q *EmbeddingQueue) FetchPendingTasks(ctx context.Context, limit int) ([]*EmbeddingTask, error)

FetchPendingTasks retrieves pending embedding tasks with locking. This uses FOR UPDATE SKIP LOCKED to enable multiple concurrent workers. Args: ctx - database operation context. limit - maximum number of tasks to fetch. Returns list of pending tasks or error if fetch fails.

func (*EmbeddingQueue) MarkCompleted

func (q *EmbeddingQueue) MarkCompleted(ctx context.Context, taskID string) error

MarkCompleted marks a task as successfully completed. Args: ctx - database operation context. taskID - task identifier. Returns error if update fails.

func (*EmbeddingQueue) MarkFailed

func (q *EmbeddingQueue) MarkFailed(ctx context.Context, taskID string, errMessage string) error

MarkFailed marks a task as failed and updates retry count. This implements exponential backoff for retries. Args: ctx - database operation context. taskID - task identifier. errMessage - error message to store. Returns error if update fails or task exceeded max retries.

func (*EmbeddingQueue) MarkProcessing

func (q *EmbeddingQueue) MarkProcessing(ctx context.Context, taskID string) error

MarkProcessing marks a task as being processed. Args: ctx - database operation context. taskID - task identifier. Returns error if update fails.

func (*EmbeddingQueue) Reconcile

func (q *EmbeddingQueue) Reconcile(ctx context.Context, threshold time.Duration) error

Reconcile finds orphaned tasks that were never processed and re-enqueues them. This provides eventual consistency for tasks that were lost between DB write and queue enqueue. Args: ctx - database operation context. threshold - time threshold to consider a task orphaned. Returns error if reconciliation fails.

type EmbeddingReconciler

type EmbeddingReconciler struct {
	// contains filtered or unexported fields
}

EmbeddingReconciler provides eventual consistency for embedding operations. This scans for orphaned tasks where DB write succeeded but queue enqueue failed.

func NewEmbeddingReconciler

func NewEmbeddingReconciler(db *Pool, queue *EmbeddingQueue, embeddingConfig *EmbeddingConfig, interval, missingThreshold time.Duration) *EmbeddingReconciler

NewEmbeddingReconciler creates a new EmbeddingReconciler instance. Args: db - database connection pool. queue - embedding queue for re-enqueuing tasks. embeddingConfig - embedding configuration for model and version settings. interval - time between reconciliation scans. missingThreshold - time after which a task is considered orphaned. Returns new EmbeddingReconciler instance.

func (*EmbeddingReconciler) Reconcile

func (r *EmbeddingReconciler) Reconcile(ctx context.Context) error

Reconcile scans for orphaned embedding tasks and re-enqueues them. This addresses the case where DB write succeeded but queue enqueue failed. Args: ctx - database operation context. Returns error if reconciliation fails.

func (*EmbeddingReconciler) Start

func (r *EmbeddingReconciler) Start(ctx context.Context)

Start begins periodic reconciliation scanning. This runs until Stop is called or context is cancelled. Args: ctx - context for cancellation.

func (*EmbeddingReconciler) Stop

func (r *EmbeddingReconciler) Stop()

Stop gracefully shuts down the reconciler. This method is idempotent and safe to call multiple times.

type EmbeddingTask

type EmbeddingTask struct {
	TaskID   string
	Table    string
	Content  string
	TenantID string
	Model    string
	Version  int
}

EmbeddingTask represents a single embedding task.

type ManagedRows

type ManagedRows struct {
	*sql.Rows
	// contains filtered or unexported fields
}

ManagedRows wraps sql.Rows and manages connection lifecycle.

func (*ManagedRows) Close

func (m *ManagedRows) Close() error

Close closes the rows and releases the connection.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a database connection pool with "get usage release" pattern.

func NewPool

func NewPool(cfg *Config) (*Pool, error)

NewPool creates a new database connection pool.

func (*Pool) Begin

func (p *Pool) Begin(ctx context.Context) (*sql.Tx, error)

Begin starts a new transaction.

func (*Pool) Close

func (p *Pool) Close() error

Close closes all connections in the pool.

func (*Pool) Exec

func (p *Pool) Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

Exec executes a query without returning rows.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context) (*sql.Conn, error)

Get acquires a connection from the pool.

func (*Pool) GetDB

func (p *Pool) GetDB() *sql.DB

GetDB returns the underlying *sql.DB for repository initialization. This is needed for repository constructors that require *sql.DB.

func (*Pool) IsHealthy

func (p *Pool) IsHealthy() bool

IsHealthy checks if the pool is healthy.

func (*Pool) Ping

func (p *Pool) Ping(ctx context.Context) error

Ping pings the database to check connectivity.

func (*Pool) Query

func (p *Pool) Query(ctx context.Context, query string, args ...any) (*ManagedRows, error)

Query executes a query and returns rows. The connection is released when rows are closed.

func (*Pool) QueryRow

func (p *Pool) QueryRow(ctx context.Context, query string, args ...any) *sql.Row

QueryRow executes a query and returns a single row.

func (*Pool) Release

func (p *Pool) Release(conn *sql.Conn)

Release returns a connection to the pool.

func (*Pool) Stats

func (p *Pool) Stats() *PoolStats

Stats returns connection pool statistics.

func (*Pool) WithConnection

func (p *Pool) WithConnection(ctx context.Context, fn func(*sql.Conn) error) error

WithConnection executes a function with a connection from the pool. This is the recommended pattern: get usage release.

type PoolStats

type PoolStats struct {
	OpenConnections  int
	InUseConnections int
	IdleConnections  int
	WaitCount        int64
	WaitDuration     time.Duration
	MaxOpenConns     int
}

PoolStats holds pool statistics.

type ProfileRepository

type ProfileRepository struct {
	// contains filtered or unexported fields
}

ProfileRepository handles user profile persistence.

func NewProfileRepository

func NewProfileRepository(pool *Pool) *ProfileRepository

NewProfileRepository creates a new ProfileRepository.

func NewProfileRepositoryWithDB

func NewProfileRepositoryWithDB(db DBTX) *ProfileRepository

NewProfileRepositoryWithDB creates a new ProfileRepository with a transaction or connection.

func (*ProfileRepository) Create

func (r *ProfileRepository) Create(ctx context.Context, profile *models.UserProfile) error

Create creates a new user profile.

func (*ProfileRepository) Delete

func (r *ProfileRepository) Delete(ctx context.Context, userID string) error

Delete deletes a user profile.

func (*ProfileRepository) Exists

func (r *ProfileRepository) Exists(ctx context.Context, userID string) (bool, error)

Exists checks if a profile exists.

func (*ProfileRepository) GetByID

func (r *ProfileRepository) GetByID(ctx context.Context, userID string) (*models.UserProfile, error)

GetByID retrieves a user profile by ID.

func (*ProfileRepository) Update

func (r *ProfileRepository) Update(ctx context.Context, profile *models.UserProfile) error

Update updates a user profile.

type RecommendRepository

type RecommendRepository struct {
	// contains filtered or unexported fields
}

RecommendRepository handles recommendation persistence.

func NewRecommendRepository

func NewRecommendRepository(pool *Pool) *RecommendRepository

NewRecommendRepository creates a new RecommendRepository.

func NewRecommendRepositoryWithDB

func NewRecommendRepositoryWithDB(db DBTX) *RecommendRepository

NewRecommendRepositoryWithDB creates a new RecommendRepository with a transaction or connection.

func (*RecommendRepository) Create

Create creates a new recommendation result.

func (*RecommendRepository) Delete

func (r *RecommendRepository) Delete(ctx context.Context, sessionID string) error

Delete deletes a recommendation.

func (*RecommendRepository) GetBySessionID

func (r *RecommendRepository) GetBySessionID(ctx context.Context, sessionID string) (*models.RecommendResult, error)

GetBySessionID retrieves a recommendation by session ID.

func (*RecommendRepository) ListByUserID

func (r *RecommendRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.RecommendResult, error)

ListByUserID lists recommendations by user ID.

func (*RecommendRepository) UpdateFeedback

func (r *RecommendRepository) UpdateFeedback(ctx context.Context, sessionID string, feedback *models.UserFeedback) error

UpdateFeedback updates user feedback for a recommendation.

type Repository

type Repository struct {
	Session   *SessionRepository
	Recommend *RecommendRepository
	Profile   *ProfileRepository
	Vector    *VectorSearcher
	// contains filtered or unexported fields
}

Repository provides a unified interface for all data access.

func NewRepository

func NewRepository(pool *Pool) *Repository

NewRepository creates a new Repository with all sub-repositories.

func (*Repository) Close

func (r *Repository) Close() error

Close closes the repository and its pool.

func (*Repository) Commit

func (r *Repository) Commit() error

Commit commits the transaction.

func (*Repository) GetSessionWithResult

func (r *Repository) GetSessionWithResult(ctx context.Context, sessionID string) (*models.Session, *models.RecommendResult, error)

GetSessionWithResult retrieves a session with its recommendation result.

func (*Repository) IsTransaction

func (r *Repository) IsTransaction() bool

IsTransaction returns true if this repository is in transaction mode.

func (*Repository) Pool

func (r *Repository) Pool() *Pool

Pool returns the underlying connection pool.

func (*Repository) Rollback

func (r *Repository) Rollback() error

Rollback rolls back the transaction.

func (*Repository) SaveProfile

func (r *Repository) SaveProfile(ctx context.Context, profile *models.UserProfile) error

SaveProfile saves a user profile.

func (*Repository) SaveSession

func (r *Repository) SaveSession(ctx context.Context, session *models.Session, result *models.RecommendResult) error

SaveSession saves a session and its results.

func (*Repository) Transaction

func (r *Repository) Transaction(ctx context.Context, fn func(repo *Repository) error) error

Transaction executes a function within a transaction.

func (*Repository) WithTransaction

func (r *Repository) WithTransaction(ctx context.Context) (*Repository, error)

WithTransaction creates a new repository bound to a transaction.

type RetrievalGuard

type RetrievalGuard struct {
	// contains filtered or unexported fields
}

RetrievalGuard provides protection mechanisms for retrieval operations. This implements rate limiting, circuit breaking, and timeout protection to prevent system overload.

func NewRetrievalGuard

func NewRetrievalGuard(maxRequestsPerSec int, failureThreshold int, openTimeout, dbTimeout time.Duration) *RetrievalGuard

NewRetrievalGuard creates a new RetrievalGuard instance. Args: maxRequestsPerSec - maximum requests per second allowed. failureThreshold - number of failures before opening circuit breaker. openTimeout - time to wait before attempting half-open state. dbTimeout - database operation timeout. Returns new RetrievalGuard instance.

func (*RetrievalGuard) AllowRateLimit

func (g *RetrievalGuard) AllowRateLimit() error

AllowRateLimit checks if a request should be allowed based on rate limiting. Returns error if rate limit is exceeded.

func (*RetrievalGuard) CheckDBTimeout

func (g *RetrievalGuard) CheckDBTimeout(ctx context.Context) error

CheckDBTimeout checks if a database operation exceeded the timeout. Returns error if context deadline was exceeded.

func (*RetrievalGuard) CheckEmbeddingCircuitBreaker

func (g *RetrievalGuard) CheckEmbeddingCircuitBreaker() error

CheckEmbeddingCircuitBreaker checks if embedding service is available. Returns error if circuit breaker is open, triggering fallback to keyword-only search.

func (*RetrievalGuard) GetCircuitBreakerState

func (g *RetrievalGuard) GetCircuitBreakerState() CircuitBreakerState

GetCircuitBreakerState returns the current circuit breaker state. This is primarily used for monitoring and debugging. Returns current circuit breaker state.

func (*RetrievalGuard) RecordEmbeddingFailure

func (g *RetrievalGuard) RecordEmbeddingFailure()

RecordEmbeddingFailure records a failed embedding operation. This may trigger circuit breaker opening if failure threshold is reached.

func (*RetrievalGuard) RecordEmbeddingSuccess

func (g *RetrievalGuard) RecordEmbeddingSuccess()

RecordEmbeddingSuccess records a successful embedding operation. This may help recover circuit breaker from open state.

func (*RetrievalGuard) ResetCircuitBreaker

func (g *RetrievalGuard) ResetCircuitBreaker()

ResetCircuitBreaker resets the circuit breaker to closed state. This is primarily used for testing purposes.

func (*RetrievalGuard) WithDBTimeout

func (g *RetrievalGuard) WithDBTimeout(ctx context.Context) (context.Context, context.CancelFunc)

WithDBTimeout creates a context with database timeout protection. This prevents long-running database queries from blocking the system. Args: ctx - original context. Returns new context with timeout and cancel function.

type SearchResult

type SearchResult struct {
	ID       string
	Score    float64
	Metadata map[string]any
}

SearchResult represents a vector search result.

type SecurityError

type SecurityError struct {
	Type    SecurityErrorType
	Message string
}

SecurityError represents a security-related error.

func (*SecurityError) Error

func (e *SecurityError) Error() string

type SecurityErrorType

type SecurityErrorType string

SecurityErrorType represents different types of security errors.

const (
	SecurityErrorInvalidIdentifier SecurityErrorType = "invalid_identifier"
	SecurityErrorInjectionAttempt  SecurityErrorType = "injection_attempt"
	SecurityErrorInvalidInput      SecurityErrorType = "invalid_input"
)

type SessionRepository

type SessionRepository struct {
	// contains filtered or unexported fields
}

SessionRepository handles session persistence.

func NewSessionRepository

func NewSessionRepository(pool *Pool) *SessionRepository

NewSessionRepository creates a new SessionRepository.

func NewSessionRepositoryWithDB

func NewSessionRepositoryWithDB(db DBTX) *SessionRepository

NewSessionRepositoryWithDB creates a new SessionRepository with a transaction or connection.

func (*SessionRepository) CleanupExpired

func (r *SessionRepository) CleanupExpired(ctx context.Context) (int64, error)

CleanupExpired removes expired sessions.

func (*SessionRepository) Create

func (r *SessionRepository) Create(ctx context.Context, session *models.Session) error

Create creates a new session.

func (*SessionRepository) Delete

func (r *SessionRepository) Delete(ctx context.Context, sessionID string) error

Delete deletes a session.

func (*SessionRepository) GetByID

func (r *SessionRepository) GetByID(ctx context.Context, sessionID string) (*models.Session, error)

GetByID retrieves a session by ID.

func (*SessionRepository) ListByUserID

func (r *SessionRepository) ListByUserID(ctx context.Context, userID string, limit, offset int) ([]*models.Session, error)

ListByUserID lists sessions by user ID.

func (*SessionRepository) Update

func (r *SessionRepository) Update(ctx context.Context, session *models.Session) error

Update updates a session.

type TenantGuard

type TenantGuard struct {
	// contains filtered or unexported fields
}

TenantGuard provides physical isolation for multi-tenant data access. This enforces tenant context at the database level to prevent cross-tenant data access.

func NewTenantGuard

func NewTenantGuard(pool *Pool) *TenantGuard

NewTenantGuard creates a new TenantGuard instance.

func (*TenantGuard) ClearTenantContext

func (g *TenantGuard) ClearTenantContext(ctx context.Context) error

ClearTenantContext clears the tenant context. This is primarily used for cleanup and testing purposes. Args: ctx - database operation context. Returns error if clearing tenant context fails.

func (*TenantGuard) MustSetTenantContext

func (g *TenantGuard) MustSetTenantContext(ctx context.Context, tenantID string) error

MustSetTenantContext sets the tenant context and returns error on failure. This should only be used in initialization paths where failure is fatal. Args: ctx - database operation context. tenantID - tenant identifier. Returns: error - if tenant context setup fails.

func (*TenantGuard) SetTenantContext

func (g *TenantGuard) SetTenantContext(ctx context.Context, tenantID string) error

SetTenantContext sets the tenant context for the current database session. This MUST be called for every tenant-specific operation to ensure physical isolation.

NOTE: Uses SET LOCAL to set tenant context within the current transaction only. This ensures tenant isolation works correctly with connection pooling, as SET LOCAL only affects the current transaction and is reset when the transaction ends. This prevents tenant context leakage across different connections in the pool.

Args: ctx - database operation context. tenantID - tenant identifier, must be non-empty. Returns error if setting tenant context fails.

func (*TenantGuard) WithTenant

func (g *TenantGuard) WithTenant(ctx context.Context, tenantID string, fn func(context.Context) error) error

WithTenant executes a function within a tenant context. This is a convenience wrapper that ensures tenant context is set before execution. Args: ctx - database operation context. tenantID - tenant identifier. fn - function to execute within tenant context. Returns error if tenant context setup or function execution fails.

type VectorSearcher

type VectorSearcher struct {
	// contains filtered or unexported fields
}

VectorSearcher handles vector similarity search.

func NewVectorSearcher

func NewVectorSearcher(pool *Pool, embeddingConfig *EmbeddingConfig) *VectorSearcher

NewVectorSearcher creates a new VectorSearcher. Args: pool - database connection pool. embeddingConfig - embedding configuration for search limit settings. Returns new VectorSearcher instance.

func NewVectorSearcherWithDB

func NewVectorSearcherWithDB(db DBTX, embeddingConfig *EmbeddingConfig) *VectorSearcher

NewVectorSearcherWithDB creates a new VectorSearcher with a transaction or connection. Args: db - database transaction or connection. embeddingConfig - embedding configuration for search limit settings. Returns new VectorSearcher instance.

func (*VectorSearcher) AddEmbedding

func (v *VectorSearcher) AddEmbedding(ctx context.Context, table, id string, embedding []float64, metadata map[string]any) error

AddEmbedding adds a vector embedding to the specified table.

func (*VectorSearcher) CreateVectorTable

func (v *VectorSearcher) CreateVectorTable(ctx context.Context, table string, metadataSchema string) error

CreateVectorTable creates a table with vector support. This is a simplified implementation - in production use proper pgvector setup.

func (*VectorSearcher) DeleteEmbedding

func (v *VectorSearcher) DeleteEmbedding(ctx context.Context, table, id string) error

DeleteEmbedding deletes a vector embedding.

func (*VectorSearcher) Search

func (v *VectorSearcher) Search(ctx context.Context, table string, embedding []float64, limit int) ([]*SearchResult, error)

Search performs a vector similarity search. This is a simplified implementation that uses pgvector if available.

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

WriteBuffer provides write batching to reduce database and embedding load. This implements an in-memory buffer with periodic flushing to batch database operations.

func NewWriteBuffer

func NewWriteBuffer(pool *Pool, queue *EmbeddingQueue, batchSize int, flushInterval time.Duration, embeddingConfig *EmbeddingConfig) *WriteBuffer

NewWriteBuffer creates a new WriteBuffer instance. Args: pool - database connection pool. queue - embedding queue for async processing. batchSize - number of items to batch before flushing. flushInterval - maximum time between flushes. embeddingConfig - embedding configuration for model and version settings. Returns new WriteBuffer instance.

func (*WriteBuffer) Start

func (b *WriteBuffer) Start(ctx context.Context) error

Start begins the buffer processing loop in a background goroutine. This method returns immediately after starting the goroutine. The processing loop runs until Stop is called.

Args: ctx - context for cancellation and graceful shutdown. Returns error if the goroutine fails to start.

func (*WriteBuffer) Stop

func (b *WriteBuffer) Stop(ctx context.Context) error

Stop gracefully shuts down the buffer and flushes remaining items. This should be called during application shutdown.

Thread-safety: Uses sync.Once to ensure the channel is closed only once, preventing panic from concurrent close operations. The stopped flag is checked atomically to avoid unnecessary mutex contention.

Args: ctx - context for cancellation. Returns error if stopping fails.

func (*WriteBuffer) Write

func (b *WriteBuffer) Write(ctx context.Context, item *WriteItem) error

Write queues a write operation for batch processing. This is non-blocking and returns immediately if the buffer has capacity. If the buffer is full, it returns an error instead of spawning a goroutine.

Thread-safety: The stopped flag is checked and set atomically under mutex. Channel send is performed outside the lock to prevent deadlock when the buffer is full and Stop() is called concurrently.

Args: ctx - context for cancellation. item - write operation to queue. Returns error if buffer is stopped, item is invalid, or buffer is full.

type WriteItem

type WriteItem struct {
	TenantID string
	Table    string
	Content  string
	Metadata map[string]interface{}
}

WriteItem represents a single write operation to be batched.

Directories

Path Synopsis
Package adapters provides format conversion layer for storage operations.
Package adapters provides format conversion layer for storage operations.
nolint: errcheck // Operations may ignore return values
nolint: errcheck // Operations may ignore return values
Package models defines data structures for the storage system.
Package models defines data structures for the storage system.
Package repositories provides data access layer for storage system.
Package repositories provides data access layer for storage system.
Package services provides retrieval services for the storage system.
Package services provides retrieval services for the storage system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL