Documentation
¶
Overview ¶
Copyright 2026 Teradata
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- func ContextWithUserID(ctx context.Context, userID string) context.Context
- func UserIDFromContext(ctx context.Context) string
- type AdminStore
- func (s *AdminStore) CountSessionsByUser(ctx context.Context) ([]agent.UserSessionCount, error)
- func (s *AdminStore) GetSystemStats(ctx context.Context) (*agent.SystemStats, error)
- func (s *AdminStore) ListAllSessions(ctx context.Context, limit, offset int) ([]agent.AdminSession, int32, error)
- func (s *AdminStore) ValidatePermissions(ctx context.Context) error
- type ArtifactStore
- func (s *ArtifactStore) Close() error
- func (s *ArtifactStore) Delete(ctx context.Context, id string, hard bool) error
- func (s *ArtifactStore) Get(ctx context.Context, id string) (*artifacts.Artifact, error)
- func (s *ArtifactStore) GetByName(ctx context.Context, name string, sessionID string) (*artifacts.Artifact, error)
- func (s *ArtifactStore) GetStats(ctx context.Context) (*artifacts.Stats, error)
- func (s *ArtifactStore) Index(ctx context.Context, artifact *artifacts.Artifact) error
- func (s *ArtifactStore) List(ctx context.Context, filter *artifacts.Filter) ([]*artifacts.Artifact, error)
- func (s *ArtifactStore) RecordAccess(ctx context.Context, id string) error
- func (s *ArtifactStore) Search(ctx context.Context, query string, sessionID string, limit int) ([]*artifacts.Artifact, error)
- func (s *ArtifactStore) Update(ctx context.Context, artifact *artifacts.Artifact) error
- type Backend
- func (b *Backend) AdminStore() *AdminStore
- func (b *Backend) ArtifactStore() artifacts.ArtifactStore
- func (b *Backend) Close() error
- func (b *Backend) ErrorStore() agent.ErrorStore
- func (b *Backend) HumanRequestStore() shuttle.HumanRequestStore
- func (b *Backend) Migrate(ctx context.Context) error
- func (b *Backend) Migrator() *Migrator
- func (b *Backend) Ping(ctx context.Context) error
- func (b *Backend) Pool() *pgxpool.Pool
- func (b *Backend) RawPendingMigrations(ctx context.Context) ([]Migration, error)
- func (b *Backend) ResultStore() storage.ResultStore
- func (b *Backend) SessionStorage() agent.SessionStorage
- type ErrorStore
- func (s *ErrorStore) Close() error
- func (s *ErrorStore) Get(ctx context.Context, errorID string) (*agent.StoredError, error)
- func (s *ErrorStore) List(ctx context.Context, filters agent.ErrorFilters) ([]*agent.StoredError, error)
- func (s *ErrorStore) Store(ctx context.Context, storedErr *agent.StoredError) (string, error)
- type HumanRequestStore
- func (s *HumanRequestStore) Close() error
- func (s *HumanRequestStore) Get(ctx context.Context, id string) (*shuttle.HumanRequest, error)
- func (s *HumanRequestStore) ListBySession(ctx context.Context, sessionID string) ([]*shuttle.HumanRequest, error)
- func (s *HumanRequestStore) ListPending(ctx context.Context) ([]*shuttle.HumanRequest, error)
- func (s *HumanRequestStore) Store(ctx context.Context, req *shuttle.HumanRequest) error
- func (s *HumanRequestStore) Update(ctx context.Context, req *shuttle.HumanRequest) error
- type Migration
- type Migrator
- type ResultStore
- func (s *ResultStore) Close() error
- func (s *ResultStore) Delete(ctx context.Context, id string) error
- func (s *ResultStore) GetMetadata(ctx context.Context, id string) (*storage.SQLResultMetadata, error)
- func (s *ResultStore) Query(ctx context.Context, id, query string) (interface{}, error)
- func (s *ResultStore) Store(ctx context.Context, id string, data interface{}) (*loomv1.DataReference, error)
- type SessionStore
- func (s *SessionStore) Close() error
- func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error
- func (s *SessionStore) GetStats(ctx context.Context) (*agent.Stats, error)
- func (s *SessionStore) ListSessions(ctx context.Context) ([]string, error)
- func (s *SessionStore) LoadAgentSessions(ctx context.Context, agentID string) ([]string, error)
- func (s *SessionStore) LoadMemorySnapshots(ctx context.Context, sessionID string, snapshotType string, limit int) ([]agent.MemorySnapshot, error)
- func (s *SessionStore) LoadMessages(ctx context.Context, sessionID string) ([]agent.Message, error)
- func (s *SessionStore) LoadMessagesForAgent(ctx context.Context, agentID string) ([]agent.Message, error)
- func (s *SessionStore) LoadMessagesFromParentSession(ctx context.Context, sessionID string) ([]agent.Message, error)
- func (s *SessionStore) LoadSession(ctx context.Context, sessionID string) (*agent.Session, error)
- func (s *SessionStore) PurgeDeleted(ctx context.Context, graceInterval string) error
- func (s *SessionStore) RegisterCleanupHook(hook agent.SessionCleanupHook)
- func (s *SessionStore) RestoreSession(ctx context.Context, sessionID string) error
- func (s *SessionStore) SaveMemorySnapshot(ctx context.Context, sessionID, snapshotType, content string, tokenCount int) error
- func (s *SessionStore) SaveMessage(ctx context.Context, sessionID string, msg agent.Message) error
- func (s *SessionStore) SaveSession(ctx context.Context, session *agent.Session) error
- func (s *SessionStore) SaveToolExecution(ctx context.Context, sessionID string, exec agent.ToolExecution) error
- func (s *SessionStore) SearchMessages(ctx context.Context, sessionID, query string, limit int) ([]agent.Message, error)
- func (s *SessionStore) SearchMessagesByAgent(ctx context.Context, agentID, query string, limit int) ([]agent.Message, error)
- func (s *SessionStore) SoftDeleteSession(ctx context.Context, sessionID string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextWithUserID ¶
ContextWithUserID returns a new context with the given user ID attached. When a user ID is present, execInTx will SET LOCAL app.current_user_id within each transaction to activate row-level security policies.
func UserIDFromContext ¶
UserIDFromContext extracts the user ID from the context, if present. Returns empty string if no user ID is set.
Types ¶
type AdminStore ¶
type AdminStore struct {
// contains filtered or unexported fields
}
AdminStore implements agent.AdminStorage using PostgreSQL without RLS. All queries use execInTxNoRLS to bypass row-level security policies, providing cross-tenant visibility for platform administrators.
func NewAdminStore ¶
func NewAdminStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *AdminStore
NewAdminStore creates a new admin store with the given connection pool. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.
func (*AdminStore) CountSessionsByUser ¶
func (s *AdminStore) CountSessionsByUser(ctx context.Context) ([]agent.UserSessionCount, error)
CountSessionsByUser returns session counts grouped by user_id.
func (*AdminStore) GetSystemStats ¶
func (s *AdminStore) GetSystemStats(ctx context.Context) (*agent.SystemStats, error)
GetSystemStats returns aggregate statistics across all users.
func (*AdminStore) ListAllSessions ¶
func (s *AdminStore) ListAllSessions(ctx context.Context, limit, offset int) ([]agent.AdminSession, int32, error)
ListAllSessions returns sessions across all users (bypasses RLS).
func (*AdminStore) ValidatePermissions ¶
func (s *AdminStore) ValidatePermissions(ctx context.Context) error
ValidatePermissions checks whether the admin connection has BYPASSRLS privilege. If the admin role does not have BYPASSRLS, a warning is logged. This is not necessarily fatal because RLS policies may be configured to allow the admin role, but operators should be aware of the misconfiguration.
type ArtifactStore ¶
type ArtifactStore struct {
// contains filtered or unexported fields
}
ArtifactStore implements artifacts.ArtifactStore using PostgreSQL.
func NewArtifactStore ¶
func NewArtifactStore(pool *pgxpool.Pool, tracer observability.Tracer) *ArtifactStore
NewArtifactStore creates a new PostgreSQL-backed artifact store.
func (*ArtifactStore) Close ¶
func (s *ArtifactStore) Close() error
Close is a no-op; the pool is managed by the backend.
func (*ArtifactStore) GetByName ¶
func (s *ArtifactStore) GetByName(ctx context.Context, name string, sessionID string) (*artifacts.Artifact, error)
GetByName retrieves an artifact by name, optionally scoped to a session.
func (*ArtifactStore) GetStats ¶
GetStats returns aggregate statistics about stored artifacts using a single query.
func (*ArtifactStore) List ¶
func (s *ArtifactStore) List(ctx context.Context, filter *artifacts.Filter) ([]*artifacts.Artifact, error)
List retrieves artifacts matching the given filters.
func (*ArtifactStore) RecordAccess ¶
func (s *ArtifactStore) RecordAccess(ctx context.Context, id string) error
RecordAccess updates the last_accessed_at timestamp and increments access_count.
type Backend ¶
type Backend struct {
// contains filtered or unexported fields
}
Backend implements backend.StorageBackend using PostgreSQL with pgx. NOTE: The proto-defined PostgresStorageConfig includes a SupabaseConfig field (loomv1.SupabaseConfig) for future Supabase-hosted PostgreSQL integration. This is available via cfg.GetSupabase() but is not yet consumed by the backend.
func NewBackend ¶
func NewBackend(ctx context.Context, cfg *loomv1.PostgresStorageConfig, tracer observability.Tracer) (*Backend, error)
NewBackend creates a new PostgreSQL storage backend from proto configuration.
func (*Backend) AdminStore ¶
func (b *Backend) AdminStore() *AdminStore
AdminStore creates a new AdminStore using the backend's pool and tracer. This provides cross-tenant administrative queries that bypass RLS.
func (*Backend) ArtifactStore ¶
func (b *Backend) ArtifactStore() artifacts.ArtifactStore
ArtifactStore returns the PostgreSQL artifact store implementation.
func (*Backend) ErrorStore ¶
func (b *Backend) ErrorStore() agent.ErrorStore
ErrorStore returns the PostgreSQL error store implementation.
func (*Backend) HumanRequestStore ¶
func (b *Backend) HumanRequestStore() shuttle.HumanRequestStore
HumanRequestStore returns the PostgreSQL human request store implementation.
func (*Backend) RawPendingMigrations ¶
RawPendingMigrations returns the list of migrations that have not yet been applied. Returns the postgres-internal Migration type. The backend package wraps this to satisfy the MigrationInspector interface without introducing an import cycle.
func (*Backend) ResultStore ¶
func (b *Backend) ResultStore() storage.ResultStore
ResultStore returns the PostgreSQL result store implementation.
func (*Backend) SessionStorage ¶
func (b *Backend) SessionStorage() agent.SessionStorage
SessionStorage returns the PostgreSQL session storage implementation.
type ErrorStore ¶
type ErrorStore struct {
// contains filtered or unexported fields
}
ErrorStore implements agent.ErrorStore using PostgreSQL.
func NewErrorStore ¶
func NewErrorStore(pool *pgxpool.Pool, tracer observability.Tracer) *ErrorStore
NewErrorStore creates a new PostgreSQL-backed error store.
func (*ErrorStore) Close ¶
func (s *ErrorStore) Close() error
Close is a no-op; the pool is managed by the backend.
func (*ErrorStore) Get ¶
func (s *ErrorStore) Get(ctx context.Context, errorID string) (*agent.StoredError, error)
Get retrieves an error by its ID.
func (*ErrorStore) List ¶
func (s *ErrorStore) List(ctx context.Context, filters agent.ErrorFilters) ([]*agent.StoredError, error)
List retrieves errors matching the given filters.
func (*ErrorStore) Store ¶
func (s *ErrorStore) Store(ctx context.Context, storedErr *agent.StoredError) (string, error)
Store persists an agent error and returns its ID.
type HumanRequestStore ¶
type HumanRequestStore struct {
// contains filtered or unexported fields
}
HumanRequestStore implements shuttle.HumanRequestStore using PostgreSQL.
func NewHumanRequestStore ¶
func NewHumanRequestStore(pool *pgxpool.Pool, tracer observability.Tracer) *HumanRequestStore
NewHumanRequestStore creates a new PostgreSQL-backed human request store.
func (*HumanRequestStore) Close ¶
func (s *HumanRequestStore) Close() error
Close is a no-op; the pool is managed by the backend.
func (*HumanRequestStore) Get ¶
func (s *HumanRequestStore) Get(ctx context.Context, id string) (*shuttle.HumanRequest, error)
Get retrieves a human request by ID.
func (*HumanRequestStore) ListBySession ¶
func (s *HumanRequestStore) ListBySession(ctx context.Context, sessionID string) ([]*shuttle.HumanRequest, error)
ListBySession retrieves all human requests for a session.
func (*HumanRequestStore) ListPending ¶
func (s *HumanRequestStore) ListPending(ctx context.Context) ([]*shuttle.HumanRequest, error)
ListPending retrieves all pending human requests ordered by creation time.
func (*HumanRequestStore) Store ¶
func (s *HumanRequestStore) Store(ctx context.Context, req *shuttle.HumanRequest) error
Store persists a human request.
func (*HumanRequestStore) Update ¶
func (s *HumanRequestStore) Update(ctx context.Context, req *shuttle.HumanRequest) error
Update modifies an existing human request.
type Migrator ¶
type Migrator struct {
// contains filtered or unexported fields
}
Migrator manages PostgreSQL schema migrations using embedded SQL files.
func NewMigrator ¶
NewMigrator creates a new migrator with embedded SQL migrations.
func (*Migrator) CurrentVersion ¶
CurrentVersion returns the highest applied migration version.
func (*Migrator) MigrateDown ¶
MigrateDown rolls back the specified number of migrations.
type ResultStore ¶
type ResultStore struct {
// contains filtered or unexported fields
}
ResultStore implements storage.ResultStore using PostgreSQL.
func NewResultStore ¶
func NewResultStore(pool *pgxpool.Pool, tracer observability.Tracer) *ResultStore
NewResultStore creates a new PostgreSQL-backed result store.
func (*ResultStore) Close ¶
func (s *ResultStore) Close() error
Close is a no-op; the pool is managed by the backend.
func (*ResultStore) Delete ¶
func (s *ResultStore) Delete(ctx context.Context, id string) error
Delete removes a stored result and its table. Both the table drop and metadata delete execute within a single transaction with RLS tenant isolation via execInTx.
func (*ResultStore) GetMetadata ¶
func (s *ResultStore) GetMetadata(ctx context.Context, id string) (*storage.SQLResultMetadata, error)
GetMetadata retrieves metadata about a stored result. The query runs in a transaction for RLS tenant isolation.
func (*ResultStore) Query ¶
func (s *ResultStore) Query(ctx context.Context, id, query string) (interface{}, error)
Query executes a query against a stored result table. If query is empty, returns all rows from the result table. Caller-supplied SQL is NOT allowed; only an empty query string is accepted. This prevents SQL injection. The operation runs in a transaction for RLS tenant isolation.
func (*ResultStore) Store ¶
func (s *ResultStore) Store(ctx context.Context, id string, data interface{}) (*loomv1.DataReference, error)
Store persists a result set in a dynamic table and records metadata. All database operations (create table, insert rows, store metadata) execute within a single transaction with RLS tenant isolation via execInTx.
type SessionStore ¶
type SessionStore struct {
// contains filtered or unexported fields
}
SessionStore implements agent.SessionStorage using PostgreSQL with pgx.
func NewSessionStore ¶
func NewSessionStore(pool *pgxpool.Pool, tracer observability.Tracer, logger *zap.Logger) *SessionStore
NewSessionStore creates a new PostgreSQL-backed session store. The logger parameter is optional; if nil, zap.NewNop() is used as a fallback.
func (*SessionStore) Close ¶
func (s *SessionStore) Close() error
Close is a no-op for the session store; the pool is managed by the backend.
func (*SessionStore) DeleteSession ¶
func (s *SessionStore) DeleteSession(ctx context.Context, sessionID string) error
DeleteSession soft-deletes a session by setting deleted_at. Hard deletion happens via the purge_soft_deleted function after the grace period. Cleanup hooks run after the transaction commits successfully.
func (*SessionStore) ListSessions ¶
func (s *SessionStore) ListSessions(ctx context.Context) ([]string, error)
ListSessions returns all session IDs ordered by most recently updated.
func (*SessionStore) LoadAgentSessions ¶
LoadAgentSessions returns session IDs for a specific agent.
func (*SessionStore) LoadMemorySnapshots ¶
func (s *SessionStore) LoadMemorySnapshots(ctx context.Context, sessionID string, snapshotType string, limit int) ([]agent.MemorySnapshot, error)
LoadMemorySnapshots retrieves memory snapshots for a session.
func (*SessionStore) LoadMessages ¶
LoadMessages retrieves all messages for a session ordered by timestamp.
func (*SessionStore) LoadMessagesForAgent ¶
func (s *SessionStore) LoadMessagesForAgent(ctx context.Context, agentID string) ([]agent.Message, error)
LoadMessagesForAgent retrieves all messages created by a specific agent.
func (*SessionStore) LoadMessagesFromParentSession ¶
func (s *SessionStore) LoadMessagesFromParentSession(ctx context.Context, sessionID string) ([]agent.Message, error)
LoadMessagesFromParentSession loads messages from the parent session of the given session.
func (*SessionStore) LoadSession ¶
LoadSession retrieves a session and its messages from PostgreSQL. Returns (nil, nil) if the session does not exist or has been soft-deleted.
func (*SessionStore) PurgeDeleted ¶
func (s *SessionStore) PurgeDeleted(ctx context.Context, graceInterval string) error
PurgeDeleted permanently removes all soft-deleted data older than the grace interval. graceInterval should be a PostgreSQL interval string like '30 days', '7 days', etc.
func (*SessionStore) RegisterCleanupHook ¶
func (s *SessionStore) RegisterCleanupHook(hook agent.SessionCleanupHook)
RegisterCleanupHook adds a function to be called after successful session deletion.
func (*SessionStore) RestoreSession ¶
func (s *SessionStore) RestoreSession(ctx context.Context, sessionID string) error
RestoreSession restores a soft-deleted session by clearing deleted_at.
func (*SessionStore) SaveMemorySnapshot ¶
func (s *SessionStore) SaveMemorySnapshot(ctx context.Context, sessionID, snapshotType, content string, tokenCount int) error
SaveMemorySnapshot persists a memory snapshot.
func (*SessionStore) SaveMessage ¶
SaveMessage persists a message to the messages table and updates the session timestamp atomically.
func (*SessionStore) SaveSession ¶
SaveSession persists a session to PostgreSQL using an upsert.
func (*SessionStore) SaveToolExecution ¶
func (s *SessionStore) SaveToolExecution(ctx context.Context, sessionID string, exec agent.ToolExecution) error
SaveToolExecution persists a tool execution record.
func (*SessionStore) SearchMessages ¶
func (s *SessionStore) SearchMessages(ctx context.Context, sessionID, query string, limit int) ([]agent.Message, error)
SearchMessages performs full-text search on messages within a session using tsvector.
func (*SessionStore) SearchMessagesByAgent ¶
func (s *SessionStore) SearchMessagesByAgent(ctx context.Context, agentID, query string, limit int) ([]agent.Message, error)
SearchMessagesByAgent performs full-text search on messages for an agent.
func (*SessionStore) SoftDeleteSession ¶
func (s *SessionStore) SoftDeleteSession(ctx context.Context, sessionID string) error
SoftDeleteSession marks a session as deleted without removing data. This is the same as DeleteSession for PostgreSQL (already uses soft-delete).