postgres

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithUserID

func ContextWithUserID(ctx context.Context, userID string) context.Context

ContextWithUserID returns a new context with the given user ID attached. When a user ID is present, execInTx will SET LOCAL app.current_user_id within each transaction to activate row-level security policies.

func UserIDFromContext

func UserIDFromContext(ctx context.Context) string

UserIDFromContext extracts the user ID from the context, if present. Returns empty string if no user ID is set.

Types

type AdminStore

type AdminStore struct {
	// contains filtered or unexported fields
}

AdminStore implements agent.AdminStorage using PostgreSQL without RLS. All queries use execInTxNoRLS to bypass row-level security policies, providing cross-tenant visibility for platform administrators.

func NewAdminStore

func NewAdminStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *AdminStore

NewAdminStore creates a new admin store with the given connection pool. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.

func (*AdminStore) CountSessionsByUser

func (s *AdminStore) CountSessionsByUser(ctx context.Context) ([]agent.UserSessionCount, error)

CountSessionsByUser returns session counts grouped by user_id.

func (*AdminStore) GetSystemStats

func (s *AdminStore) GetSystemStats(ctx context.Context) (*agent.SystemStats, error)

GetSystemStats returns aggregate statistics across all users.

func (*AdminStore) ListAllSessions

func (s *AdminStore) ListAllSessions(ctx context.Context, limit, offset int) ([]agent.AdminSession, int32, error)

ListAllSessions returns sessions across all users (bypasses RLS).

func (*AdminStore) ValidatePermissions

func (s *AdminStore) ValidatePermissions(ctx context.Context) error

ValidatePermissions checks whether the admin connection has BYPASSRLS privilege. If the admin role does not have BYPASSRLS, a warning is logged. This is not necessarily fatal because RLS policies may be configured to allow the admin role, but operators should be aware of the misconfiguration.

type ArtifactStore

type ArtifactStore struct {
	// contains filtered or unexported fields
}

ArtifactStore implements artifacts.ArtifactStore using PostgreSQL.

func NewArtifactStore

func NewArtifactStore(pool *pgxpool.Pool, tracer observability.Tracer) *ArtifactStore

NewArtifactStore creates a new PostgreSQL-backed artifact store.

func (*ArtifactStore) Close

func (s *ArtifactStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ArtifactStore) Delete

func (s *ArtifactStore) Delete(ctx context.Context, id string, hard bool) error

Delete soft-deletes or hard-deletes an artifact.

func (*ArtifactStore) Get

Get retrieves an artifact by ID.

func (*ArtifactStore) GetByName

func (s *ArtifactStore) GetByName(ctx context.Context, name string, sessionID string) (*artifacts.Artifact, error)

GetByName retrieves an artifact by name, optionally scoped to a session.

func (*ArtifactStore) GetStats

func (s *ArtifactStore) GetStats(ctx context.Context) (*artifacts.Stats, error)

GetStats returns aggregate statistics about stored artifacts using a single query.

func (*ArtifactStore) Index

func (s *ArtifactStore) Index(ctx context.Context, artifact *artifacts.Artifact) error

Index stores or updates an artifact record.

func (*ArtifactStore) List

func (s *ArtifactStore) List(ctx context.Context, filter *artifacts.Filter) ([]*artifacts.Artifact, error)

List retrieves artifacts matching the given filters.

func (*ArtifactStore) RecordAccess

func (s *ArtifactStore) RecordAccess(ctx context.Context, id string) error

RecordAccess updates the last_accessed_at timestamp and increments access_count.

func (*ArtifactStore) Search

func (s *ArtifactStore) Search(ctx context.Context, query string, sessionID string, limit int) ([]*artifacts.Artifact, error)

Search performs full-text search on artifacts using tsvector.

func (*ArtifactStore) Update

func (s *ArtifactStore) Update(ctx context.Context, artifact *artifacts.Artifact) error

Update updates an existing artifact's metadata.

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend implements backend.StorageBackend using PostgreSQL with pgx. NOTE: The proto-defined PostgresStorageConfig includes a SupabaseConfig field (loomv1.SupabaseConfig) for future Supabase-hosted PostgreSQL integration. This is available via cfg.GetSupabase() but is not yet consumed by the backend.

func NewBackend

func NewBackend(ctx context.Context, cfg *loomv1.PostgresStorageConfig, tracer observability.Tracer) (*Backend, error)

NewBackend creates a new PostgreSQL storage backend from proto configuration.

func (*Backend) AdminStore

func (b *Backend) AdminStore() *AdminStore

AdminStore creates a new AdminStore using the backend's pool and tracer. This provides cross-tenant administrative queries that bypass RLS.

func (*Backend) ArtifactStore

func (b *Backend) ArtifactStore() artifacts.ArtifactStore

ArtifactStore returns the PostgreSQL artifact store implementation.

func (*Backend) Close

func (b *Backend) Close() error

Close closes the PostgreSQL connection pool.

func (*Backend) ErrorStore

func (b *Backend) ErrorStore() agent.ErrorStore

ErrorStore returns the PostgreSQL error store implementation.

func (*Backend) HumanRequestStore

func (b *Backend) HumanRequestStore() shuttle.HumanRequestStore

HumanRequestStore returns the PostgreSQL human request store implementation.

func (*Backend) Migrate

func (b *Backend) Migrate(ctx context.Context) error

Migrate runs all pending PostgreSQL migrations.

func (*Backend) Migrator

func (b *Backend) Migrator() *Migrator

Migrator returns the migration manager for manual migration operations.

func (*Backend) Ping

func (b *Backend) Ping(ctx context.Context) error

Ping verifies the PostgreSQL connection is healthy.

func (*Backend) Pool

func (b *Backend) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool.Pool for advanced operations.

func (*Backend) RawPendingMigrations

func (b *Backend) RawPendingMigrations(ctx context.Context) ([]Migration, error)

RawPendingMigrations returns the list of migrations that have not yet been applied. Returns the postgres-internal Migration type. The backend package wraps this to satisfy the MigrationInspector interface without introducing an import cycle.

func (*Backend) ResultStore

func (b *Backend) ResultStore() storage.ResultStore

ResultStore returns the PostgreSQL result store implementation.

func (*Backend) SessionStorage

func (b *Backend) SessionStorage() agent.SessionStorage

SessionStorage returns the PostgreSQL session storage implementation.

type ErrorStore

type ErrorStore struct {
	// contains filtered or unexported fields
}

ErrorStore implements agent.ErrorStore using PostgreSQL.

func NewErrorStore

func NewErrorStore(pool *pgxpool.Pool, tracer observability.Tracer) *ErrorStore

NewErrorStore creates a new PostgreSQL-backed error store.

func (*ErrorStore) Close

func (s *ErrorStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ErrorStore) Get

func (s *ErrorStore) Get(ctx context.Context, errorID string) (*agent.StoredError, error)

Get retrieves an error by its ID.

func (*ErrorStore) List

func (s *ErrorStore) List(ctx context.Context, filters agent.ErrorFilters) ([]*agent.StoredError, error)

List retrieves errors matching the given filters.

func (*ErrorStore) Store

func (s *ErrorStore) Store(ctx context.Context, storedErr *agent.StoredError) (string, error)

Store persists an agent error and returns its ID.

type HumanRequestStore

type HumanRequestStore struct {
	// contains filtered or unexported fields
}

HumanRequestStore implements shuttle.HumanRequestStore using PostgreSQL.

func NewHumanRequestStore

func NewHumanRequestStore(pool *pgxpool.Pool, tracer observability.Tracer) *HumanRequestStore

NewHumanRequestStore creates a new PostgreSQL-backed human request store.

func (*HumanRequestStore) Close

func (s *HumanRequestStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*HumanRequestStore) Get

Get retrieves a human request by ID.

func (*HumanRequestStore) ListBySession

func (s *HumanRequestStore) ListBySession(ctx context.Context, sessionID string) ([]*shuttle.HumanRequest, error)

ListBySession retrieves all human requests for a session.

func (*HumanRequestStore) ListPending

func (s *HumanRequestStore) ListPending(ctx context.Context) ([]*shuttle.HumanRequest, error)

ListPending retrieves all pending human requests ordered by creation time.

func (*HumanRequestStore) Store

Store persists a human request.

func (*HumanRequestStore) Update

Update modifies an existing human request.

type Migration

type Migration struct {
	Version     int
	Description string
	UpSQL       string
	DownSQL     string
}

Migration represents a single database migration step.

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator manages PostgreSQL schema migrations using embedded SQL files.

func NewMigrator

func NewMigrator(pool *pgxpool.Pool, tracer observability.Tracer) (*Migrator, error)

NewMigrator creates a new migrator with embedded SQL migrations.

func (*Migrator) CurrentVersion

func (m *Migrator) CurrentVersion(ctx context.Context) (int, error)

CurrentVersion returns the highest applied migration version.

func (*Migrator) MigrateDown

func (m *Migrator) MigrateDown(ctx context.Context, steps int) error

MigrateDown rolls back the specified number of migrations.

func (*Migrator) MigrateUp

func (m *Migrator) MigrateUp(ctx context.Context) error

MigrateUp applies all pending migrations up to the latest version. Uses a PostgreSQL advisory lock to prevent concurrent migration execution.

func (*Migrator) PendingMigrations

func (m *Migrator) PendingMigrations(ctx context.Context) ([]Migration, error)

PendingMigrations returns the list of migrations that have not yet been applied.

type ResultStore

type ResultStore struct {
	// contains filtered or unexported fields
}

ResultStore implements storage.ResultStore using PostgreSQL.

func NewResultStore

func NewResultStore(pool *pgxpool.Pool, tracer observability.Tracer) *ResultStore

NewResultStore creates a new PostgreSQL-backed result store.

func (*ResultStore) Close

func (s *ResultStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ResultStore) Delete

func (s *ResultStore) Delete(ctx context.Context, id string) error

Delete removes a stored result and its table. Both the table drop and metadata delete execute within a single transaction with RLS tenant isolation via execInTx.

func (*ResultStore) GetMetadata

func (s *ResultStore) GetMetadata(ctx context.Context, id string) (*storage.SQLResultMetadata, error)

GetMetadata retrieves metadata about a stored result. The query runs in a transaction for RLS tenant isolation.

func (*ResultStore) Query

func (s *ResultStore) Query(ctx context.Context, id, query string) (interface{}, error)

Query executes a query against a stored result table. If query is empty, returns all rows from the result table. Caller-supplied SQL is NOT allowed; only an empty query string is accepted. This prevents SQL injection. The operation runs in a transaction for RLS tenant isolation.

func (*ResultStore) Store

func (s *ResultStore) Store(ctx context.Context, id string, data interface{}) (*loomv1.DataReference, error)

Store persists a result set in a dynamic table and records metadata. All database operations (create table, insert rows, store metadata) execute within a single transaction with RLS tenant isolation via execInTx.

type SessionStore

type SessionStore struct {
	// contains filtered or unexported fields
}

SessionStore implements agent.SessionStorage using PostgreSQL with pgx.

func NewSessionStore

func NewSessionStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *SessionStore

NewSessionStore creates a new PostgreSQL-backed session store. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.

func (*SessionStore) Close

func (s *SessionStore) Close() error

Close is a no-op for the session store; the pool is managed by the backend.

func (*SessionStore) DeleteSession

func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error

DeleteSession soft-deletes a session by setting deleted_at. Hard deletion happens via the purge_soft_deleted function after the grace period. Cleanup hooks run after the transaction commits successfully.

func (*SessionStore) GetStats

func (s *SessionStore) GetStats(ctx context.Context) (*agent.Stats, error)

GetStats returns aggregate statistics about stored sessions.

func (*SessionStore) ListSessions

func (s *SessionStore) ListSessions(ctx context.Context) ([]string, error)

ListSessions returns all session IDs ordered by most recently updated.

func (*SessionStore) LoadAgentSessions

func (s *SessionStore) LoadAgentSessions(ctx context.Context, agentID string) ([]string, error)

LoadAgentSessions returns session IDs for a specific agent.

func (*SessionStore) LoadMemorySnapshots

func (s *SessionStore) LoadMemorySnapshots(ctx context.Context, sessionID string, snapshotType string, limit int) ([]agent.MemorySnapshot, error)

LoadMemorySnapshots retrieves memory snapshots for a session.

func (*SessionStore) LoadMessages

func (s *SessionStore) LoadMessages(ctx context.Context, sessionID string) ([]agent.Message, error)

LoadMessages retrieves all messages for a session ordered by timestamp.

func (*SessionStore) LoadMessagesForAgent

func (s *SessionStore) LoadMessagesForAgent(ctx context.Context, agentID string) ([]agent.Message, error)

LoadMessagesForAgent retrieves all messages created by a specific agent.

func (*SessionStore) LoadMessagesFromParentSession

func (s *SessionStore) LoadMessagesFromParentSession(ctx context.Context, sessionID string) ([]agent.Message, error)

LoadMessagesFromParentSession loads messages from the parent session of the given session.

func (*SessionStore) LoadSession

func (s *SessionStore) LoadSession(ctx context.Context, sessionID string) (*agent.Session, error)

LoadSession retrieves a session and its messages from PostgreSQL. Returns (nil, nil) if the session does not exist or has been soft-deleted.

func (*SessionStore) PurgeDeleted

func (s *SessionStore) PurgeDeleted(ctx context.Context, graceInterval string) error

PurgeDeleted permanently removes all soft-deleted data older than the grace interval. graceInterval should be a PostgreSQL interval string like '30 days', '7 days', etc.

func (*SessionStore) RegisterCleanupHook

func (s *SessionStore) RegisterCleanupHook(hook agent.SessionCleanupHook)

RegisterCleanupHook adds a function to be called after successful session deletion.

func (*SessionStore) RestoreSession

func (s *SessionStore) RestoreSession(ctx context.Context, sessionID string) error

RestoreSession restores a soft-deleted session by clearing deleted_at.

func (*SessionStore) SaveMemorySnapshot

func (s *SessionStore) SaveMemorySnapshot(ctx context.Context, sessionID, snapshotType, content string, tokenCount int) error

SaveMemorySnapshot persists a memory snapshot.

func (*SessionStore) SaveMessage

func (s *SessionStore) SaveMessage(ctx context.Context, sessionID string, msg agent.Message) error

SaveMessage persists a message to the messages table and updates the session timestamp atomically.

func (*SessionStore) SaveSession

func (s *SessionStore) SaveSession(ctx context.Context, session *agent.Session) error

SaveSession persists a session to PostgreSQL using an upsert.

func (*SessionStore) SaveToolExecution

func (s *SessionStore) SaveToolExecution(ctx context.Context, sessionID string, exec agent.ToolExecution) error

SaveToolExecution persists a tool execution record.

func (*SessionStore) SearchMessages

func (s *SessionStore) SearchMessages(ctx context.Context, sessionID, query string, limit int) ([]agent.Message, error)

SearchMessages performs full-text search on messages within a session using tsvector.

func (*SessionStore) SearchMessagesByAgent

func (s *SessionStore) SearchMessagesByAgent(ctx context.Context, agentID, query string, limit int) ([]agent.Message, error)

SearchMessagesByAgent performs full-text search on messages for an agent.

func (*SessionStore) SoftDeleteSession

func (s *SessionStore) SoftDeleteSession(ctx context.Context, sessionID string) error

SoftDeleteSession marks a session as deleted without removing data. This is the same as DeleteSession for PostgreSQL (already uses soft-delete).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL