postgres

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithUserID

func ContextWithUserID(ctx context.Context, userID string) context.Context

ContextWithUserID returns a new context with the given user ID attached. When a user ID is present, execInTx will SET LOCAL app.current_user_id within each transaction to activate row-level security policies.

func UserIDFromContext

func UserIDFromContext(ctx context.Context) string

UserIDFromContext extracts the user ID from the context, if present. Returns empty string if no user ID is set.

Types

type AdminStore

type AdminStore struct {
	// contains filtered or unexported fields
}

AdminStore implements agent.AdminStorage using PostgreSQL without RLS. All queries use execInTxNoRLS to bypass row-level security policies, providing cross-tenant visibility for platform administrators.

func NewAdminStore

func NewAdminStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *AdminStore

NewAdminStore creates a new admin store with the given connection pool. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.

func (*AdminStore) CountSessionsByUser

func (s *AdminStore) CountSessionsByUser(ctx context.Context) ([]agent.UserSessionCount, error)

CountSessionsByUser returns session counts grouped by user_id.

func (*AdminStore) GetSystemStats

func (s *AdminStore) GetSystemStats(ctx context.Context) (*agent.SystemStats, error)

GetSystemStats returns aggregate statistics across all users.

func (*AdminStore) ListAllSessions

func (s *AdminStore) ListAllSessions(ctx context.Context, limit, offset int) ([]agent.AdminSession, int32, error)

ListAllSessions returns sessions across all users (bypasses RLS).

func (*AdminStore) ValidatePermissions

func (s *AdminStore) ValidatePermissions(ctx context.Context) error

ValidatePermissions checks whether the admin connection has BYPASSRLS privilege. If the admin role does not have BYPASSRLS, a warning is logged. This is not necessarily fatal because RLS policies may be configured to allow the admin role, but operators should be aware of the misconfiguration.

type ArtifactStore

type ArtifactStore struct {
	// contains filtered or unexported fields
}

ArtifactStore implements artifacts.ArtifactStore using PostgreSQL.

func NewArtifactStore

func NewArtifactStore(pool *pgxpool.Pool, tracer observability.Tracer) *ArtifactStore

NewArtifactStore creates a new PostgreSQL-backed artifact store.

func (*ArtifactStore) Close

func (s *ArtifactStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ArtifactStore) Delete

func (s *ArtifactStore) Delete(ctx context.Context, id string, hard bool) error

Delete soft-deletes or hard-deletes an artifact.

func (*ArtifactStore) Get

Get retrieves an artifact by ID.

func (*ArtifactStore) GetByName

func (s *ArtifactStore) GetByName(ctx context.Context, name string, sessionID string) (*artifacts.Artifact, error)

GetByName retrieves an artifact by name, optionally scoped to a session.

func (*ArtifactStore) GetStats

func (s *ArtifactStore) GetStats(ctx context.Context) (*artifacts.Stats, error)

GetStats returns aggregate statistics about stored artifacts using a single query.

func (*ArtifactStore) Index

func (s *ArtifactStore) Index(ctx context.Context, artifact *artifacts.Artifact) error

Index stores or updates an artifact record.

func (*ArtifactStore) List

func (s *ArtifactStore) List(ctx context.Context, filter *artifacts.Filter) ([]*artifacts.Artifact, error)

List retrieves artifacts matching the given filters.

func (*ArtifactStore) RecordAccess

func (s *ArtifactStore) RecordAccess(ctx context.Context, id string) error

RecordAccess updates the last_accessed_at timestamp and increments access_count.

func (*ArtifactStore) Search

func (s *ArtifactStore) Search(ctx context.Context, query string, sessionID string, limit int) ([]*artifacts.Artifact, error)

Search performs full-text search on artifacts using tsvector.

func (*ArtifactStore) Update

func (s *ArtifactStore) Update(ctx context.Context, artifact *artifacts.Artifact) error

Update updates an existing artifact's metadata.

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend implements backend.StorageBackend using PostgreSQL with pgx. NOTE: The proto-defined PostgresStorageConfig includes a SupabaseConfig field (loomv1.SupabaseConfig) for future Supabase-hosted PostgreSQL integration. This is available via cfg.GetSupabase() but is not yet consumed by the backend.

func NewBackend

func NewBackend(ctx context.Context, cfg *loomv1.PostgresStorageConfig, tracer observability.Tracer) (*Backend, error)

NewBackend creates a new PostgreSQL storage backend from proto configuration.

func (*Backend) AdminStore

func (b *Backend) AdminStore() *AdminStore

AdminStore creates a new AdminStore using the backend's pool and tracer. This provides cross-tenant administrative queries that bypass RLS.

func (*Backend) ArtifactStore

func (b *Backend) ArtifactStore() artifacts.ArtifactStore

ArtifactStore returns the PostgreSQL artifact store implementation.

func (*Backend) Close

func (b *Backend) Close() error

Close closes the PostgreSQL connection pool.

func (*Backend) ErrorStore

func (b *Backend) ErrorStore() agent.ErrorStore

ErrorStore returns the PostgreSQL error store implementation.

func (*Backend) GraphMemoryStore added in v1.3.0

func (b *Backend) GraphMemoryStore() memory.GraphMemoryStore

GraphMemoryStore implements backend.GraphMemoryProvider.

func (*Backend) HumanRequestStore

func (b *Backend) HumanRequestStore() shuttle.HumanRequestStore

HumanRequestStore returns the PostgreSQL human request store implementation.

func (*Backend) Migrate

func (b *Backend) Migrate(ctx context.Context) error

Migrate runs all pending PostgreSQL migrations.

func (*Backend) Migrator

func (b *Backend) Migrator() *Migrator

Migrator returns the migration manager for manual migration operations.

func (*Backend) Ping

func (b *Backend) Ping(ctx context.Context) error

Ping verifies the PostgreSQL connection is healthy.

func (*Backend) Pool

func (b *Backend) Pool() *pgxpool.Pool

Pool returns the underlying pgxpool.Pool for advanced operations.

func (*Backend) RawPendingMigrations

func (b *Backend) RawPendingMigrations(ctx context.Context) ([]Migration, error)

RawPendingMigrations returns the list of migrations that have not yet been applied. Returns the postgres-internal Migration type. The backend package wraps this to satisfy the MigrationInspector interface without introducing an import cycle.

func (*Backend) ResultStore

func (b *Backend) ResultStore() storage.ResultStore

ResultStore returns the PostgreSQL result store implementation.

func (*Backend) SessionStorage

func (b *Backend) SessionStorage() agent.SessionStorage

SessionStorage returns the PostgreSQL session storage implementation.

func (*Backend) TaskStore added in v1.3.0

func (b *Backend) TaskStore() task.TaskStore

TaskStore implements backend.TaskStoreProvider.

type ErrorStore

type ErrorStore struct {
	// contains filtered or unexported fields
}

ErrorStore implements agent.ErrorStore using PostgreSQL.

func NewErrorStore

func NewErrorStore(pool *pgxpool.Pool, tracer observability.Tracer) *ErrorStore

NewErrorStore creates a new PostgreSQL-backed error store.

func (*ErrorStore) Close

func (s *ErrorStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ErrorStore) Get

func (s *ErrorStore) Get(ctx context.Context, errorID string) (*agent.StoredError, error)

Get retrieves an error by its ID.

func (*ErrorStore) List

func (s *ErrorStore) List(ctx context.Context, filters agent.ErrorFilters) ([]*agent.StoredError, error)

List retrieves errors matching the given filters.

func (*ErrorStore) Store

func (s *ErrorStore) Store(ctx context.Context, storedErr *agent.StoredError) (string, error)

Store persists an agent error and returns its ID.

type GraphMemoryOption added in v1.3.0

type GraphMemoryOption func(*GraphMemoryStore)

GraphMemoryOption configures a GraphMemoryStore.

func WithSalienceConfig added in v1.3.0

func WithSalienceConfig(cfg memory.SalienceConfig) GraphMemoryOption

WithSalienceConfig overrides the default salience configuration.

type GraphMemoryStore added in v1.3.0

type GraphMemoryStore struct {
	// contains filtered or unexported fields
}

GraphMemoryStore implements memory.GraphMemoryStore using PostgreSQL.

func NewGraphMemoryStore added in v1.3.0

func NewGraphMemoryStore(pool *pgxpool.Pool, tc memory.TokenCounter, tracer observability.Tracer, opts ...GraphMemoryOption) *GraphMemoryStore

NewGraphMemoryStore creates a new PostgreSQL-backed graph memory store.

func (*GraphMemoryStore) Close added in v1.3.0

func (s *GraphMemoryStore) Close() error

Close is a no-op; the pool is managed by the Backend.

func (*GraphMemoryStore) Consolidate added in v1.3.0

func (s *GraphMemoryStore) Consolidate(ctx context.Context, memoryIDs []string, consolidated *memory.Memory) (*memory.Memory, error)

func (*GraphMemoryStore) ContextFor added in v1.3.0

func (*GraphMemoryStore) CreateEntity added in v1.3.0

func (s *GraphMemoryStore) CreateEntity(ctx context.Context, entity *memory.Entity) (*memory.Entity, error)

func (*GraphMemoryStore) DecayAll added in v1.3.0

func (s *GraphMemoryStore) DecayAll(ctx context.Context, agentID string, decayFactor float64) error

func (*GraphMemoryStore) DeleteEntity added in v1.3.0

func (s *GraphMemoryStore) DeleteEntity(ctx context.Context, agentID, name string) error

func (*GraphMemoryStore) Forget added in v1.3.0

func (s *GraphMemoryStore) Forget(ctx context.Context, memoryID string) error

func (*GraphMemoryStore) GetEntity added in v1.3.0

func (s *GraphMemoryStore) GetEntity(ctx context.Context, agentID, name string) (*memory.Entity, error)

func (*GraphMemoryStore) GetLineage added in v1.3.0

func (s *GraphMemoryStore) GetLineage(ctx context.Context, memoryID string) ([]*memory.MemoryLineage, error)

func (*GraphMemoryStore) GetMemory added in v1.3.0

func (s *GraphMemoryStore) GetMemory(ctx context.Context, agentID, memoryID string) (*memory.Memory, error)

func (*GraphMemoryStore) GetStats added in v1.3.0

func (s *GraphMemoryStore) GetStats(ctx context.Context, agentID string) (*memory.GraphStats, error)

func (*GraphMemoryStore) ListEdgesFrom added in v1.3.0

func (s *GraphMemoryStore) ListEdgesFrom(ctx context.Context, entityID string) ([]*memory.Edge, error)

func (*GraphMemoryStore) ListEdgesTo added in v1.3.0

func (s *GraphMemoryStore) ListEdgesTo(ctx context.Context, entityID string) ([]*memory.Edge, error)

func (*GraphMemoryStore) ListEntities added in v1.3.0

func (s *GraphMemoryStore) ListEntities(ctx context.Context, agentID, entityType string, limit, offset int) ([]*memory.Entity, int, error)

func (*GraphMemoryStore) Neighbors added in v1.3.0

func (s *GraphMemoryStore) Neighbors(ctx context.Context, entityID string, relation string, direction string, depth int) ([]*memory.Edge, error)

func (*GraphMemoryStore) Recall added in v1.3.0

func (s *GraphMemoryStore) Recall(ctx context.Context, opts memory.RecallOpts) ([]*memory.Memory, error)

func (*GraphMemoryStore) Relate added in v1.3.0

func (s *GraphMemoryStore) Relate(ctx context.Context, edge *memory.Edge) (*memory.Edge, error)

func (*GraphMemoryStore) Remember added in v1.3.0

func (s *GraphMemoryStore) Remember(ctx context.Context, mem *memory.Memory) (*memory.Memory, error)

func (*GraphMemoryStore) SearchEntities added in v1.3.0

func (s *GraphMemoryStore) SearchEntities(ctx context.Context, agentID, query string, limit int) ([]*memory.Entity, error)

func (*GraphMemoryStore) Supersede added in v1.3.0

func (s *GraphMemoryStore) Supersede(ctx context.Context, oldMemoryID string, newMem *memory.Memory) (*memory.Memory, error)

func (*GraphMemoryStore) TouchMemories added in v1.3.0

func (s *GraphMemoryStore) TouchMemories(ctx context.Context, memoryIDs []string) error

func (*GraphMemoryStore) Unrelate added in v1.3.0

func (s *GraphMemoryStore) Unrelate(ctx context.Context, sourceID, targetID, relation string) error

func (*GraphMemoryStore) UpdateEntity added in v1.3.0

func (s *GraphMemoryStore) UpdateEntity(ctx context.Context, entity *memory.Entity) (*memory.Entity, error)

func (*GraphMemoryStore) VectorRecall added in v1.3.0

func (s *GraphMemoryStore) VectorRecall(ctx context.Context, opts memory.VectorRecallOpts) ([]*memory.Memory, error)

VectorRecall is not yet supported by the Postgres backend. The interface requires it for the sqlite brute-force cosine path; once pgvector wiring lands here, replace this stub with a real implementation.

type HumanRequestStore

type HumanRequestStore struct {
	// contains filtered or unexported fields
}

HumanRequestStore implements shuttle.HumanRequestStore using PostgreSQL.

func NewHumanRequestStore

func NewHumanRequestStore(pool *pgxpool.Pool, tracer observability.Tracer) *HumanRequestStore

NewHumanRequestStore creates a new PostgreSQL-backed human request store.

func (*HumanRequestStore) Close

func (s *HumanRequestStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*HumanRequestStore) Get

Get retrieves a human request by ID.

func (*HumanRequestStore) ListBySession

func (s *HumanRequestStore) ListBySession(ctx context.Context, sessionID string) ([]*shuttle.HumanRequest, error)

ListBySession retrieves all human requests for a session.

func (*HumanRequestStore) ListPending

func (s *HumanRequestStore) ListPending(ctx context.Context) ([]*shuttle.HumanRequest, error)

ListPending retrieves all pending human requests ordered by creation time.

func (*HumanRequestStore) Store

Store persists a human request.

func (*HumanRequestStore) Update

Update modifies an existing human request.

type Migration

type Migration struct {
	Version     int
	Description string
	UpSQL       string
	DownSQL     string
}

Migration represents a single database migration step.

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator manages PostgreSQL schema migrations using embedded SQL files.

func NewMigrator

func NewMigrator(pool *pgxpool.Pool, tracer observability.Tracer) (*Migrator, error)

NewMigrator creates a new migrator with embedded SQL migrations.

func (*Migrator) CurrentVersion

func (m *Migrator) CurrentVersion(ctx context.Context) (int, error)

CurrentVersion returns the highest applied migration version.

func (*Migrator) MigrateDown

func (m *Migrator) MigrateDown(ctx context.Context, steps int) error

MigrateDown rolls back the specified number of migrations.

func (*Migrator) MigrateUp

func (m *Migrator) MigrateUp(ctx context.Context) error

MigrateUp applies all pending migrations up to the latest version. Uses a PostgreSQL advisory lock to prevent concurrent migration execution.

func (*Migrator) PendingMigrations

func (m *Migrator) PendingMigrations(ctx context.Context) ([]Migration, error)

PendingMigrations returns the list of migrations that have not yet been applied.

type ResultStore

type ResultStore struct {
	// contains filtered or unexported fields
}

ResultStore implements storage.ResultStore using PostgreSQL.

func NewResultStore

func NewResultStore(pool *pgxpool.Pool, tracer observability.Tracer) *ResultStore

NewResultStore creates a new PostgreSQL-backed result store.

func (*ResultStore) Close

func (s *ResultStore) Close() error

Close is a no-op; the pool is managed by the backend.

func (*ResultStore) Delete

func (s *ResultStore) Delete(ctx context.Context, id string) error

Delete removes a stored result and its table. Both the table drop and metadata delete execute within a single transaction with RLS tenant isolation via execInTx.

func (*ResultStore) GetMetadata

func (s *ResultStore) GetMetadata(ctx context.Context, id string) (*storage.SQLResultMetadata, error)

GetMetadata retrieves metadata about a stored result. The query runs in a transaction for RLS tenant isolation.

func (*ResultStore) Query

func (s *ResultStore) Query(ctx context.Context, id, query string) (interface{}, error)

Query executes a query against a stored result table. If query is empty, returns all rows from the result table. Caller-supplied SQL is NOT allowed; only an empty query string is accepted. This prevents SQL injection. The operation runs in a transaction for RLS tenant isolation.

func (*ResultStore) Store

func (s *ResultStore) Store(ctx context.Context, id string, data interface{}) (*loomv1.DataReference, error)

Store persists a result set in a dynamic table and records metadata. All database operations (create table, insert rows, store metadata) execute within a single transaction with RLS tenant isolation via execInTx.

type SessionStore

type SessionStore struct {
	// contains filtered or unexported fields
}

SessionStore implements agent.SessionStorage using PostgreSQL with pgx.

func NewSessionStore

func NewSessionStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *SessionStore

NewSessionStore creates a new PostgreSQL-backed session store. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.

func (*SessionStore) Close

func (s *SessionStore) Close() error

Close is a no-op for the session store; the pool is managed by the backend.

func (*SessionStore) DeleteSession

func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error

DeleteSession soft-deletes a session by setting deleted_at. Hard deletion happens via the purge_soft_deleted function after the grace period. Cleanup hooks run after the transaction commits successfully.

func (*SessionStore) GetStats

func (s *SessionStore) GetStats(ctx context.Context) (*agent.Stats, error)

GetStats returns aggregate statistics about stored sessions.

func (*SessionStore) ListSessions

func (s *SessionStore) ListSessions(ctx context.Context) ([]string, error)

ListSessions returns all session IDs ordered by most recently updated.

func (*SessionStore) LoadAgentSessions

func (s *SessionStore) LoadAgentSessions(ctx context.Context, agentID string) ([]string, error)

LoadAgentSessions returns session IDs for a specific agent.

func (*SessionStore) LoadMemorySnapshots

func (s *SessionStore) LoadMemorySnapshots(ctx context.Context, sessionID string, snapshotType string, limit int) ([]agent.MemorySnapshot, error)

LoadMemorySnapshots retrieves memory snapshots for a session.

func (*SessionStore) LoadMessages

func (s *SessionStore) LoadMessages(ctx context.Context, sessionID string) ([]agent.Message, error)

LoadMessages retrieves all messages for a session ordered by timestamp.

func (*SessionStore) LoadMessagesForAgent

func (s *SessionStore) LoadMessagesForAgent(ctx context.Context, agentID string) ([]agent.Message, error)

LoadMessagesForAgent retrieves all messages created by a specific agent.

func (*SessionStore) LoadMessagesFromParentSession

func (s *SessionStore) LoadMessagesFromParentSession(ctx context.Context, sessionID string) ([]agent.Message, error)

LoadMessagesFromParentSession loads messages from the parent session of the given session.

func (*SessionStore) LoadSession

func (s *SessionStore) LoadSession(ctx context.Context, sessionID string) (*agent.Session, error)

LoadSession retrieves a session and its messages from PostgreSQL. Returns (nil, nil) if the session does not exist or has been soft-deleted.

func (*SessionStore) PurgeDeleted

func (s *SessionStore) PurgeDeleted(ctx context.Context, graceInterval string) error

PurgeDeleted permanently removes all soft-deleted data older than the grace interval. graceInterval should be a PostgreSQL interval string like '30 days', '7 days', etc.

func (*SessionStore) RegisterCleanupHook

func (s *SessionStore) RegisterCleanupHook(hook agent.SessionCleanupHook)

RegisterCleanupHook adds a function to be called after successful session deletion.

func (*SessionStore) RestoreSession

func (s *SessionStore) RestoreSession(ctx context.Context, sessionID string) error

RestoreSession restores a soft-deleted session by clearing deleted_at.

func (*SessionStore) SaveMemorySnapshot

func (s *SessionStore) SaveMemorySnapshot(ctx context.Context, sessionID, snapshotType, content string, tokenCount int) error

SaveMemorySnapshot persists a memory snapshot.

func (*SessionStore) SaveMessage

func (s *SessionStore) SaveMessage(ctx context.Context, sessionID string, msg agent.Message) error

SaveMessage persists a message to the messages table and updates the session timestamp atomically.

func (*SessionStore) SaveSession

func (s *SessionStore) SaveSession(ctx context.Context, session *agent.Session) error

SaveSession persists a session to PostgreSQL using an upsert.

func (*SessionStore) SaveToolExecution

func (s *SessionStore) SaveToolExecution(ctx context.Context, sessionID string, exec agent.ToolExecution) error

SaveToolExecution persists a tool execution record.

func (*SessionStore) SearchMessages

func (s *SessionStore) SearchMessages(ctx context.Context, sessionID, query string, limit int) ([]agent.Message, error)

SearchMessages performs full-text search on messages within a session using tsvector.

func (*SessionStore) SearchMessagesByAgent

func (s *SessionStore) SearchMessagesByAgent(ctx context.Context, agentID, query string, limit int) ([]agent.Message, error)

SearchMessagesByAgent performs full-text search on messages for an agent.

func (*SessionStore) SoftDeleteSession

func (s *SessionStore) SoftDeleteSession(ctx context.Context, sessionID string) error

SoftDeleteSession marks a session as deleted without removing data. This is the same as DeleteSession for PostgreSQL (already uses soft-delete).

type TaskStore added in v1.3.0

type TaskStore struct {
	// contains filtered or unexported fields
}

TaskStore implements task.TaskStore using PostgreSQL.

func NewTaskStore added in v1.3.0

func NewTaskStore(pool *pgxpool.Pool, tracer observability.Tracer) *TaskStore

NewTaskStore creates a new PostgreSQL-backed task store.

func (*TaskStore) AddDependency added in v1.3.0

func (s *TaskStore) AddDependency(ctx context.Context, dep *task.TaskDependency) error

func (*TaskStore) ClaimTask added in v1.3.0

func (s *TaskStore) ClaimTask(ctx context.Context, taskID, agentID, sessionID string) (*task.Task, error)

ClaimTask atomically claims a task using FOR UPDATE SKIP LOCKED to prevent contention in multi-agent scenarios. If another agent holds the lock, this returns immediately with an error instead of blocking.

func (*TaskStore) Close added in v1.3.0

func (s *TaskStore) Close() error

func (*TaskStore) CloseTask added in v1.3.0

func (s *TaskStore) CloseTask(ctx context.Context, taskID, reason string) (*task.Task, error)

func (*TaskStore) CreateBoard added in v1.3.0

func (s *TaskStore) CreateBoard(ctx context.Context, board *task.TaskBoard) (*task.TaskBoard, error)

func (*TaskStore) CreateTask added in v1.3.0

func (s *TaskStore) CreateTask(ctx context.Context, t *task.Task) (*task.Task, error)

func (*TaskStore) DeleteTask added in v1.3.0

func (s *TaskStore) DeleteTask(ctx context.Context, id string) error

func (*TaskStore) GetBlockedTasks added in v1.3.0

func (s *TaskStore) GetBlockedTasks(ctx context.Context, boardID string) ([]*task.Task, error)

func (*TaskStore) GetBoard added in v1.3.0

func (s *TaskStore) GetBoard(ctx context.Context, id string) (*task.TaskBoard, error)

func (*TaskStore) GetDependencies added in v1.3.0

func (s *TaskStore) GetDependencies(ctx context.Context, taskID string) ([]*task.TaskDependency, error)

func (*TaskStore) GetDependents added in v1.3.0

func (s *TaskStore) GetDependents(ctx context.Context, taskID string) ([]*task.TaskDependency, error)

func (*TaskStore) GetHistory added in v1.3.0

func (s *TaskStore) GetHistory(ctx context.Context, taskID string) ([]*task.TaskHistoryEntry, error)

func (*TaskStore) GetReadyFront added in v1.3.0

func (s *TaskStore) GetReadyFront(ctx context.Context, boardID string, opts task.ReadyFrontOpts) ([]*task.Task, error)

func (*TaskStore) GetTask added in v1.3.0

func (s *TaskStore) GetTask(ctx context.Context, id string) (*task.Task, error)

func (*TaskStore) GetTaskByIdempotencyKey added in v1.3.0

func (s *TaskStore) GetTaskByIdempotencyKey(ctx context.Context, key string) (*task.Task, error)

GetTaskByIdempotencyKey returns the task that owns the given idempotency key, or (nil, nil) when no such task exists. Used by the skills task emitter to dedupe concurrent skill activations.

func (*TaskStore) HasOpenSkillTasks added in v1.3.0

func (s *TaskStore) HasOpenSkillTasks(ctx context.Context, skillName, sessionID string) (bool, error)

HasOpenSkillTasks returns true when any task with skill_idempotency_key matching skill:<name>|sess:<sess>|% is still in flight. See the SQLite counterpart for status semantics.

func (*TaskStore) ListBoards added in v1.3.0

func (s *TaskStore) ListBoards(ctx context.Context) ([]*task.TaskBoard, error)

func (*TaskStore) ListBySkillRun added in v1.3.0

func (s *TaskStore) ListBySkillRun(ctx context.Context, skillName, sessionID string) ([]*task.Task, error)

ListBySkillRun returns every non-deleted task whose skill_idempotency_key matches skill:<name>|sess:<sess>|%, regardless of status. Used by the end-of-turn hygiene auditor to inventory the active skill's tasks.

func (*TaskStore) ListTasks added in v1.3.0

func (s *TaskStore) ListTasks(ctx context.Context, opts task.ListTasksOpts) ([]*task.Task, int, error)

func (*TaskStore) RecordHistory added in v1.3.0

func (s *TaskStore) RecordHistory(ctx context.Context, entry *task.TaskHistoryEntry) error

func (*TaskStore) ReleaseTask added in v1.3.0

func (s *TaskStore) ReleaseTask(ctx context.Context, taskID, sessionID string) (*task.Task, error)

func (*TaskStore) RemoveDependency added in v1.3.0

func (s *TaskStore) RemoveDependency(ctx context.Context, fromTaskID, toTaskID string) error

func (*TaskStore) TransitionTask added in v1.3.0

func (s *TaskStore) TransitionTask(ctx context.Context, taskID string, newStatus loomv1.TaskStatus) (*task.Task, error)

func (*TaskStore) UpdateTask added in v1.3.0

func (s *TaskStore) UpdateTask(ctx context.Context, t *task.Task, _ []string) (*task.Task, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL