store

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package store provides shared persistence types, validation, and helper primitives.

Index

Constants

View Source
const (
	// SessionInputQueueModeQueue stores operator input to run after the active turn ends.
	SessionInputQueueModeQueue = "queue"
	// SessionInputQueueModeSteer stores operator guidance staged while a turn is active.
	SessionInputQueueModeSteer = "steer"
)
View Source
const (
	SessionInputQueueStatusQueued      = "queued"
	SessionInputQueueStatusDispatching = "dispatching"
	SessionInputQueueStatusSent        = "sent"
	SessionInputQueueStatusFailed      = "failed"
	SessionInputQueueStatusCanceled    = "canceled"
)
View Source
const (
	// SessionStallStateDetected marks a session whose ACP activity timed out
	// while the subprocess still appeared alive during recovery.
	SessionStallStateDetected = "stalled"

	// SessionStallReasonActivityTimeout reports that no ACP activity was observed
	// within the configured supervision window.
	SessionStallReasonActivityTimeout = "activity_timeout"

	// SessionStallReasonPromptDeadlineExceeded reports that the prompt-level
	// supervision deadline elapsed before completion.
	SessionStallReasonPromptDeadlineExceeded = "prompt_deadline_exceeded"

	// SessionStallReasonProcessUnhealthy reports that subprocess health checks
	// failed while a prompt was still active.
	SessionStallReasonProcessUnhealthy = "process_unhealthy"
)
View Source
const (
	// SessionDatabaseName is the filename for per-session event storage.
	SessionDatabaseName = "events.db"
	// GlobalDatabaseName is the filename for the global AGH index database.
	GlobalDatabaseName = "agh.db"
	// SessionMetaName is the filename for quick session metadata lookups.
	SessionMetaName = "meta.json"
	// DefaultSQLiteBusyTimeoutMS is the shared SQLite busy timeout in milliseconds.
	DefaultSQLiteBusyTimeoutMS = defaultBusyTimeoutMS
)
View Source
const (
	// NetworkSurfaceThread stores a public thread conversation container.
	NetworkSurfaceThread = "thread"
	// NetworkSurfaceDirect stores a two-party direct-room conversation container.
	NetworkSurfaceDirect = "direct"

	// NetworkKindGreet stores a presence announcement.
	NetworkKindGreet = "greet"
	// NetworkKindWhois stores a peer identity request or response.
	NetworkKindWhois = "whois"
	// NetworkKindSay stores a text conversation message.
	NetworkKindSay = "say"
	// NetworkKindCapability stores a capability transfer message.
	NetworkKindCapability = "capability"
	// NetworkKindReceipt stores an admission receipt message.
	NetworkKindReceipt = "receipt"
	// NetworkKindTrace stores a work lifecycle trace message.
	NetworkKindTrace = "trace"

	// NetworkWorkStateSubmitted is the initial work state.
	NetworkWorkStateSubmitted = "submitted"
	// NetworkWorkStateWorking marks active work.
	NetworkWorkStateWorking = "working"
	// NetworkWorkStateNeedsInput marks blocked work awaiting input.
	NetworkWorkStateNeedsInput = "needs_input"
	// NetworkWorkStateCompleted marks successful terminal work.
	NetworkWorkStateCompleted = "completed"
	// NetworkWorkStateFailed marks failed terminal work.
	NetworkWorkStateFailed = "failed"
	// NetworkWorkStateCanceled marks canceled terminal work.
	NetworkWorkStateCanceled = "canceled"
)

Variables

View Source
var (
	// ErrSessionInputQueueFull reports that accepting an entry would exceed the configured capacity.
	ErrSessionInputQueueFull = errors.New("store: session input queue full")
	// ErrSessionInputQueueEntryNotFound reports that a queued input entry does not exist for a session.
	ErrSessionInputQueueEntryNotFound = errors.New("store: session input queue entry not found")
)
View Source
var (
	// ErrClosed reports that a session database no longer accepts writes.
	ErrClosed = errors.New("store: session database closed")
	// ErrDrainTimeout reports that shutdown timed out before queued writes drained.
	ErrDrainTimeout = errors.New("store: writer drain timeout")
	// ErrNetworkConversationNotFound reports a missing network conversation container.
	ErrNetworkConversationNotFound = errors.New("store: network conversation not found")
	// ErrNetworkDirectRoomCollision reports a direct_id bound to another peer pair.
	ErrNetworkDirectRoomCollision = errors.New("store: network direct room collision")
	// ErrNetworkWorkContainerMismatch reports a work_id used outside its bound container.
	ErrNetworkWorkContainerMismatch = errors.New("store: network work container mismatch")
	// ErrNetworkWorkClosed reports a non-duplicate message for terminal work.
	ErrNetworkWorkClosed = errors.New("store: network work closed")
)
View Source
var (
	// ErrSessionNotFound reports that a persisted session row does not exist.
	ErrSessionNotFound = errors.New("store: session not found")
	// ErrSessionAttachLocked reports that another holder owns a live attach lease.
	ErrSessionAttachLocked = errors.New("store: session attach locked")
	// ErrSessionNotAttachable reports that a session is not eligible for attach/resume.
	ErrSessionNotAttachable = errors.New("store: session not attachable")
)

Functions

func AppendLimit

func AppendLimit(query string, args []any, limit int) (string, []any)

AppendLimit appends a LIMIT clause when the limit is positive.

func AppendWhere

func AppendWhere(query string, where []string) string

AppendWhere appends a WHERE block when any clauses are present.

func BuildClauses

func BuildClauses(input ...Clause) ([]string, []any)

BuildClauses compacts optional clauses into WHERE fragments and args.

func Checkpoint

func Checkpoint(ctx context.Context, db *sql.DB) error

Checkpoint truncates the WAL for an open SQLite database.

func EncodeSessionPermissionPolicy

func EncodeSessionPermissionPolicy(policy SessionPermissionPolicy) (string, error)

EncodeSessionPermissionPolicy marshals normalized permission policy metadata.

func EncodeSessionSpawnBudget

func EncodeSessionSpawnBudget(budget SessionSpawnBudget) (string, error)

EncodeSessionSpawnBudget marshals budget metadata for the global session catalog.

func EnsureSchema

func EnsureSchema(ctx context.Context, db *sql.DB, statements []string) error

EnsureSchema executes each schema statement in order.

func ExecuteWrite

func ExecuteWrite(ctx context.Context, db *sql.DB, fn func(context.Context, *WriteTx) error) error

ExecuteWrite runs fn inside a BEGIN IMMEDIATE transaction with bounded SQLITE_BUSY retries.

func FormatNullableTimestamp

func FormatNullableTimestamp(value time.Time) string

FormatNullableTimestamp renders zero timestamps as empty strings for optional columns.

func FormatTimestamp

func FormatTimestamp(value time.Time) string

FormatTimestamp renders a timestamp in the canonical SQLite text layout.

func IsSQLiteBusy

func IsSQLiteBusy(err error) bool

IsSQLiteBusy reports whether err is a SQLite BUSY or LOCKED condition.

func MigrationChecksum

func MigrationChecksum(migration Migration) (string, error)

MigrationChecksum returns the checksum RunMigrations records for migration.

func NetworkDirectRoomIdentity

func NetworkDirectRoomIdentity(
	workspaceID string,
	channel string,
	peerA string,
	peerB string,
) (string, string, string, error)

NetworkDirectRoomIdentity derives the stable direct-room id for one ordered peer pair.

func NewID

func NewID(prefix string) string

NewID returns a random identifier with an optional prefix.

func NormalizeNetworkDirectRoomPeers

func NormalizeNetworkDirectRoomPeers(peerA string, peerB string) (string, string, error)

NormalizeNetworkDirectRoomPeers validates and orders a two-party room pair.

func NormalizeSQLiteIdentifier

func NormalizeSQLiteIdentifier(value string) (string, error)

NormalizeSQLiteIdentifier validates a SQLite identifier for use in helper queries.

func NormalizeSessionType

func NormalizeSessionType(value string) string

NormalizeSessionType applies the default session type when empty.

func NullFloat64

func NullFloat64(value sql.NullFloat64) *float64

NullFloat64 converts sql.NullFloat64 into a pointer.

func NullInt64

func NullInt64(value sql.NullInt64) *int64

NullInt64 converts sql.NullInt64 into a pointer.

func NullString

func NullString(value sql.NullString) *string

NullString converts sql.NullString into a trimmed string pointer.

func NullableFloat64

func NullableFloat64(value *float64) any

NullableFloat64 maps nil pointers to SQL NULL.

func NullableInt64

func NullableInt64(value *int64) any

NullableInt64 maps nil pointers to SQL NULL.

func NullableString

func NullableString(value string) any

NullableString maps blank strings to SQL NULL.

func NullableStringPointer

func NullableStringPointer(value *string) any

NullableStringPointer maps nil or blank string pointers to SQL NULL.

func OpenSQLiteDatabase

func OpenSQLiteDatabase(
	ctx context.Context,
	path string,
	initialize func(context.Context, *sql.DB) error,
) (*sql.DB, error)

OpenSQLiteDatabase opens a SQLite database, applies shared configuration, and retries once after moving aside a corrupt file.

func OpenSQLiteDatabaseWithPragmas

func OpenSQLiteDatabaseWithPragmas(
	ctx context.Context,
	path string,
	pragmas []string,
	initialize func(context.Context, *sql.DB) error,
) (*sql.DB, error)

OpenSQLiteDatabaseWithPragmas opens a SQLite database with additional driver-level pragmas.

func ParseNullableTimestamp

func ParseNullableTimestamp(value string) (*time.Time, error)

ParseNullableTimestamp parses optional canonical timestamps.

func ParseTimestamp

func ParseTimestamp(value string) (time.Time, error)

ParseTimestamp parses the canonical SQLite text timestamp.

func RunMigrations

func RunMigrations(ctx context.Context, db *sql.DB, migrations []Migration, opts ...MigrationOption) error

RunMigrations applies pending migrations once in deterministic version order.

func SessionActivityIdleSeconds

func SessionActivityIdleSeconds(meta *SessionActivityMeta, now time.Time) int64

SessionActivityIdleSeconds reports the age of the last recorded runtime activity relative to now.

func SessionDBFile

func SessionDBFile(sessionDir string) string

SessionDBFile returns the canonical events database path for a session directory.

func SessionMetaFile

func SessionMetaFile(sessionDir string) string

SessionMetaFile returns the canonical metadata file path for a session directory.

func ShouldRecoverSQLite

func ShouldRecoverSQLite(err error) bool

ShouldRecoverSQLite reports whether the open error indicates recoverable corruption.

func ValidFailureKind

func ValidFailureKind(kind FailureKind) bool

ValidFailureKind reports whether kind is a supported failure enum member.

func ValidStopReason

func ValidStopReason(r StopReason) bool

ValidStopReason reports whether r is a supported stop reason enum member.

func ValidateChildSessionToolSubset

func ValidateChildSessionToolSubset(parent SessionPermissionPolicy, child SessionPermissionPolicy) error

ValidateChildSessionToolSubset ensures child concrete tool atoms cannot exceed the parent.

func ValidateSessionLineage

func ValidateSessionLineage(sessionID string, lineage *SessionLineage) error

ValidateSessionLineage ensures lineage is structurally usable by spawn policy enforcement.

func WriteSessionMeta

func WriteSessionMeta(path string, meta SessionMeta) error

WriteSessionMeta writes the metadata file atomically via temp file and rename.

Types

type AppMetadataStore

type AppMetadataStore interface {
	GetAppMetadata(ctx context.Context, key string) (string, bool, error)
	SetAppMetadata(ctx context.Context, key string, value string) error
	DeleteAppMetadata(ctx context.Context, key string) error
}

AppMetadataStore manages a small global key-value table for instance-level flags.

type Clause

type Clause struct {
	// contains filtered or unexported fields
}

Clause represents an optional SQL filter clause plus its bound argument.

func Int64Clause

func Int64Clause(column string, op string, value int64) Clause

Int64Clause builds a numeric comparison clause when the value is positive.

func NotStringClause

func NotStringClause(column string, value string) Clause

NotStringClause builds an inequality clause when the value is non-empty.

func StringClause

func StringClause(column string, value string) Clause

StringClause builds an equality clause when the value is non-empty.

func TimeClause

func TimeClause(column string, op string, value time.Time) Clause

TimeClause builds a timestamp comparison clause when the value is non-zero.

type EventCorrelation

type EventCorrelation struct {
	TaskID               string     `json:"task_id,omitempty"`
	RunID                string     `json:"run_id,omitempty"`
	WorkflowID           string     `json:"workflow_id,omitempty"`
	ClaimTokenHash       string     `json:"claim_token_hash,omitempty"`
	LeaseUntil           *time.Time `json:"lease_until,omitempty"`
	CoordinatorSessionID string     `json:"coordinator_session_id,omitempty"`
	SchedulerReason      string     `json:"scheduler_reason,omitempty"`
	HookEvent            string     `json:"hook_event,omitempty"`
	HookName             string     `json:"hook_name,omitempty"`
	ActorKind            string     `json:"actor_kind,omitempty"`
	ActorID              string     `json:"actor_id,omitempty"`
	ReleaseReason        string     `json:"release_reason,omitempty"`
}

EventCorrelation carries the canonical cross-surface correlation keys for session and observability events.

func (EventCorrelation) IsZero

func (c EventCorrelation) IsZero() bool

IsZero reports whether the correlation payload carries any fields.

func (EventCorrelation) Normalize

func (c EventCorrelation) Normalize() EventCorrelation

Normalize trims string fields and canonicalizes timestamps.

type EventQuery

type EventQuery struct {
	Type          string
	AgentName     string
	TurnID        string
	Since         time.Time
	Limit         int
	AfterSequence int64
}

EventQuery filters per-session events while preserving follow-friendly ordering.

func (EventQuery) Validate

func (q EventQuery) Validate() error

Validate ensures the query is internally consistent.

type EventRecorder

type EventRecorder interface {
	Record(ctx context.Context, event SessionEvent) error
	RecordTokenUsage(ctx context.Context, usage TokenUsage) error
	Query(ctx context.Context, query EventQuery) ([]SessionEvent, error)
	History(ctx context.Context, query EventQuery) ([]TurnHistory, error)
	Close(ctx context.Context) error
}

EventRecorder captures session events and token usage in the per-session database.

type EventSummary

type EventSummary struct {
	ID          string
	SessionID   string
	WorkspaceID string
	Sequence    int64
	Type        string
	AgentName   string
	Provider    string
	Outcome     string
	Content     json.RawMessage
	EventCorrelation
	ParentSessionID string
	RootSessionID   string
	SpawnDepth      int
	Summary         string
	Timestamp       time.Time
}

EventSummary is the global, cross-session observability record for one event.

func (EventSummary) Validate

func (s EventSummary) Validate() error

Validate ensures the summary contains the required identifying fields.

type EventSummaryQuery

type EventSummaryQuery struct {
	SessionID     string
	WorkspaceID   string
	AgentName     string
	Type          string
	TaskID        string
	RunID         string
	ActorKind     string
	ActorID       string
	Provider      string
	Outcome       string
	Component     string
	ErrorOnly     bool
	AfterSequence int64
	Since         time.Time
	Limit         int
}

EventSummaryQuery filters global event summary queries.

func (EventSummaryQuery) Validate

func (q EventSummaryQuery) Validate() error

Validate ensures the query uses sane bounds.

type EventSummaryStore

type EventSummaryStore interface {
	WriteEventSummary(ctx context.Context, summary EventSummary) error
	ListEventSummaries(ctx context.Context, query EventSummaryQuery) ([]EventSummary, error)
}

EventSummaryStore manages persisted observability event summaries.

type FailureKind

type FailureKind string

FailureKind classifies ACP/session lifecycle failures at the source and keeps them transport-stable across storage, API, SSE, and CLI surfaces.

const (
	FailureStartup      FailureKind = "startup_failure"
	FailureHandshake    FailureKind = "handshake_failure"
	FailureLoad         FailureKind = "load_session_failure"
	FailureProtocol     FailureKind = "protocol_failure"
	FailurePrompt       FailureKind = "prompt_failure"
	FailureCanceled     FailureKind = "cancellation"
	FailurePermission   FailureKind = "permission_failure"
	FailureProviderAuth FailureKind = "provider_auth_failure"
	FailureProcess      FailureKind = "process_exit"
	FailureTransport    FailureKind = "transport_failure"
	FailureTimeout      FailureKind = "timeout"
	FailureUnknown      FailureKind = "unknown_failure"
)

type HookRunQuery

type HookRunQuery struct {
	SessionID string
	Event     string
	Outcome   hookspkg.HookRunOutcome
	Since     time.Time
	Limit     int
}

HookRunQuery filters persisted per-session hook run records.

func (HookRunQuery) Validate

func (q HookRunQuery) Validate() error

Validate ensures the query uses sane bounds.

type Migration

type Migration struct {
	Version    int
	Name       string
	Statements []string
	Up         func(ctx context.Context, tx *sql.Tx) error
	Checksum   string
}

Migration describes one ordered SQLite schema change.

type MigrationOption

type MigrationOption func(*migrationConfig)

MigrationOption customizes migration execution.

func WithMigrationsTable

func WithMigrationsTable(name string) MigrationOption

WithMigrationsTable stores migration records in a subsystem-specific table. Use this when independent migration streams share one SQLite database file.

type MigrationRecord

type MigrationRecord struct {
	Version   int
	Name      string
	Checksum  string
	AppliedAt time.Time
}

MigrationRecord describes one applied schema migration row.

func AppliedMigrations

func AppliedMigrations(ctx context.Context, db *sql.DB) ([]MigrationRecord, error)

AppliedMigrations returns applied migration records ordered by version.

func AppliedMigrationsWithTable

func AppliedMigrationsWithTable(ctx context.Context, db *sql.DB, table string) ([]MigrationRecord, error)

AppliedMigrationsWithTable returns applied migration records from a named migration table.

type NetworkAuditEntry

type NetworkAuditEntry struct {
	ID          string
	SessionID   string
	Direction   string
	Kind        string
	WorkspaceID string
	Channel     string
	Surface     string
	ThreadID    string
	DirectID    string
	WorkID      string
	PeerFrom    string
	PeerTo      string
	MessageID   string
	Reason      string
	Size        int
	Timestamp   time.Time
}

NetworkAuditEntry is an audit row for one network message event.

func (NetworkAuditEntry) Validate

func (e NetworkAuditEntry) Validate() error

Validate ensures the network audit entry is complete and internally consistent.

type NetworkAuditQuery

type NetworkAuditQuery struct {
	SessionID   string
	WorkspaceID string
	// Global explicitly allows daemon-admin aggregate callers to scan audit rows
	// across workspaces. Workspace-scoped API surfaces must leave this false.
	Global    bool
	Direction string
	Kind      string
	Channel   string
	Surface   string
	ThreadID  string
	DirectID  string
	WorkID    string
	MessageID string
	Since     time.Time
	Limit     int
}

NetworkAuditQuery filters network audit lookups.

func (NetworkAuditQuery) Validate

func (q NetworkAuditQuery) Validate() error

Validate ensures the query uses sane bounds.

type NetworkAuditStore

type NetworkAuditStore interface {
	WriteNetworkAudit(ctx context.Context, entry NetworkAuditEntry) error
	ListNetworkAudit(ctx context.Context, query NetworkAuditQuery) ([]NetworkAuditEntry, error)
}

NetworkAuditStore manages network message audit entries.

type NetworkChannelEntry

type NetworkChannelEntry struct {
	Channel     string
	WorkspaceID string
	Purpose     string
	CreatedBy   string
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

NetworkChannelEntry stores durable channel metadata for the operator-facing network workspace.

func (NetworkChannelEntry) Validate

func (e NetworkChannelEntry) Validate() error

Validate ensures the persisted channel metadata is complete.

type NetworkChannelQuery

type NetworkChannelQuery struct {
	Channel     string
	WorkspaceID string
	Limit       int
}

NetworkChannelQuery filters persisted network channel metadata lookups.

func (NetworkChannelQuery) Validate

func (q NetworkChannelQuery) Validate() error

Validate ensures the query uses sane bounds.

type NetworkChannelRef

type NetworkChannelRef struct {
	WorkspaceID string
	Channel     string
}

NetworkChannelRef identifies one workspace-qualified network channel.

func (NetworkChannelRef) Validate

func (r NetworkChannelRef) Validate() error

Validate ensures the channel reference is workspace-qualified.

type NetworkChannelStore

type NetworkChannelStore interface {
	WriteNetworkChannel(ctx context.Context, entry NetworkChannelEntry) error
	GetNetworkChannel(ctx context.Context, ref NetworkChannelRef) (NetworkChannelEntry, error)
	ListNetworkChannels(ctx context.Context, query NetworkChannelQuery) ([]NetworkChannelEntry, error)
	DeleteNetworkChannel(ctx context.Context, ref NetworkChannelRef) error
}

NetworkChannelStore manages durable network channel metadata.

type NetworkConversationMessage

type NetworkConversationMessage struct {
	MessageID   string
	SessionID   string
	WorkspaceID string
	Channel     string
	Surface     string
	ThreadID    string
	DirectID    string
	Direction   string
	PeerFrom    string
	PeerTo      string
	Kind        string
	WorkID      string
	ReplyTo     string
	TraceID     string
	CausationID string
	Intent      string
	Text        string
	PreviewText string
	ExtJSON     json.RawMessage
	Body        json.RawMessage
	Timestamp   time.Time
}

NetworkConversationMessage is one persisted network conversation or presence message.

func (NetworkConversationMessage) Validate

func (e NetworkConversationMessage) Validate() error

Validate ensures the persisted network message is complete and internally consistent.

type NetworkConversationMessageQuery

type NetworkConversationMessageQuery struct {
	BeforeMessageID string
	AfterMessageID  string
	Kind            string
	WorkID          string
	Limit           int
}

NetworkConversationMessageQuery filters conversation message lookups.

func (NetworkConversationMessageQuery) Validate

Validate ensures the query uses sane bounds and one cursor direction.

type NetworkConversationRef

type NetworkConversationRef struct {
	WorkspaceID string
	Channel     string
	Surface     string
	ThreadID    string
	DirectID    string
}

NetworkConversationRef identifies one persisted network conversation container.

func (NetworkConversationRef) ContainerID

func (r NetworkConversationRef) ContainerID() string

ContainerID returns the thread or direct-room id selected by the surface.

func (NetworkConversationRef) Validate

func (r NetworkConversationRef) Validate() error

Validate ensures the reference identifies exactly one supported container.

type NetworkConversationStore

type NetworkConversationStore interface {
	ResolveDirectRoom(ctx context.Context, entry NetworkDirectRoomEntry) (NetworkDirectRoomSummary, error)
	WriteConversationMessage(
		ctx context.Context,
		entry NetworkConversationMessage,
	) (NetworkConversationWriteResult, error)
	ListThreads(ctx context.Context, ref NetworkChannelRef, query NetworkThreadQuery) ([]NetworkThreadSummary, error)
	GetThread(ctx context.Context, ref NetworkChannelRef, threadID string) (NetworkThreadSummary, error)
	ListDirectRooms(
		ctx context.Context,
		ref NetworkChannelRef,
		query NetworkDirectRoomQuery,
	) ([]NetworkDirectRoomSummary, error)
	GetDirectRoom(ctx context.Context, ref NetworkChannelRef, directID string) (NetworkDirectRoomSummary, error)
	ListConversationMessages(
		ctx context.Context,
		ref NetworkConversationRef,
		query NetworkConversationMessageQuery,
	) ([]NetworkConversationMessage, error)
	GetWork(ctx context.Context, workspaceID string, workID string) (NetworkWorkEntry, error)
}

NetworkConversationStore manages durable conversation containers and work rows.

type NetworkConversationWriteResult

type NetworkConversationWriteResult struct {
	MessageID          string
	Duplicate          bool
	ConversationOpened bool
	WorkOpened         bool
	WorkTransitioned   bool
	WorkState          string
	LastActivityAt     time.Time
}

NetworkConversationWriteResult reports the transactional write outcome.

type NetworkDirectRoomEntry

type NetworkDirectRoomEntry struct {
	WorkspaceID    string
	Channel        string
	DirectID       string
	PeerA          string
	PeerB          string
	OpenedAt       time.Time
	LastActivityAt time.Time
}

NetworkDirectRoomEntry is the write DTO for a direct-room row.

func (NetworkDirectRoomEntry) Validate

func (e NetworkDirectRoomEntry) Validate() error

Validate ensures direct-room membership is stable and ordered.

type NetworkDirectRoomQuery

type NetworkDirectRoomQuery struct {
	PeerID string
	Limit  int
	After  string
}

NetworkDirectRoomQuery filters direct-room summary lookups.

func (NetworkDirectRoomQuery) Validate

func (q NetworkDirectRoomQuery) Validate() error

Validate ensures the query uses sane bounds.

type NetworkDirectRoomSummary

type NetworkDirectRoomSummary struct {
	WorkspaceID        string
	Channel            string
	DirectID           string
	PeerA              string
	PeerB              string
	OpenedAt           time.Time
	LastActivityAt     time.Time
	MessageCount       int
	OpenWorkCount      int
	LastMessagePreview string
}

NetworkDirectRoomSummary is the list/detail projection for a direct room.

func (NetworkDirectRoomSummary) Validate

func (s NetworkDirectRoomSummary) Validate() error

Validate ensures the direct-room summary is internally consistent.

type NetworkMessageEntry

type NetworkMessageEntry = NetworkConversationMessage

NetworkMessageEntry is the persisted network timeline row used by existing store interfaces.

type NetworkMessageQuery

type NetworkMessageQuery struct {
	SessionID       string
	WorkspaceID     string
	Channel         string
	PeerID          string
	PeerFrom        string
	PeerTo          string
	Kind            string
	Direction       string
	MessageID       string
	BeforeMessageID string
	AfterMessageID  string
	DirectedOnly    bool
	IncludePresence bool
	Since           time.Time
	Limit           int
}

NetworkMessageQuery filters persisted network timeline lookups.

func (NetworkMessageQuery) Validate

func (q NetworkMessageQuery) Validate() error

Validate ensures the query uses sane bounds.

type NetworkMessageStore

type NetworkMessageStore interface {
	WriteNetworkMessage(ctx context.Context, entry NetworkMessageEntry) error
	ListNetworkMessages(ctx context.Context, query NetworkMessageQuery) ([]NetworkMessageEntry, error)
}

NetworkMessageStore manages persisted network timeline messages.

type NetworkThreadQuery

type NetworkThreadQuery struct {
	Limit int
	After string
}

NetworkThreadQuery filters public-thread summary lookups.

func (NetworkThreadQuery) Validate

func (q NetworkThreadQuery) Validate() error

Validate ensures the query uses sane bounds.

type NetworkThreadSummary

type NetworkThreadSummary struct {
	WorkspaceID        string
	Channel            string
	ThreadID           string
	RootMessageID      string
	Title              string
	OpenedByPeerID     string
	OpenedSessionID    string
	OpenedAt           time.Time
	LastActivityAt     time.Time
	MessageCount       int
	ParticipantCount   int
	OpenWorkCount      int
	LastMessagePreview string
}

NetworkThreadSummary is the list/detail projection for a public thread.

func (NetworkThreadSummary) Validate

func (s NetworkThreadSummary) Validate() error

Validate ensures the public-thread summary is internally consistent.

type NetworkWorkEntry

type NetworkWorkEntry struct {
	WorkID          string
	WorkspaceID     string
	Channel         string
	Surface         string
	ThreadID        string
	DirectID        string
	OpenedByPeerID  string
	OpenedSessionID string
	TargetPeerID    string
	State           string
	OpenedAt        time.Time
	LastActivityAt  time.Time
	TerminalAt      *time.Time
}

NetworkWorkEntry stores lifecycle metadata for work inside one conversation.

func (NetworkWorkEntry) Validate

func (e NetworkWorkEntry) Validate() error

Validate ensures a work row is bound to exactly one conversation container.

type ObservabilityRetentionSweepResult

type ObservabilityRetentionSweepResult struct {
	CutoffAt              time.Time
	DeletedEventSummaries int64
	DeletedTokenStats     int64
	DeletedPermissionLogs int64
}

ObservabilityRetentionSweepResult reports how many global observability rows were deleted by one retention sweep.

type OnboardingStatus

type OnboardingStatus struct {
	Completed   bool
	CompletedAt string
}

OnboardingStatus describes the global first-run onboarding completion state.

type OnboardingStore

type OnboardingStore interface {
	GetOnboardingStatus(ctx context.Context) (OnboardingStatus, error)
	CompleteOnboarding(ctx context.Context, completedAt string) (OnboardingStatus, error)
	ResetOnboarding(ctx context.Context) (OnboardingStatus, error)
}

OnboardingStore manages the domain lifecycle for first-run onboarding.

type PermissionLogEntry

type PermissionLogEntry struct {
	ID         string
	SessionID  string
	AgentName  string
	Action     string
	Resource   string
	Decision   string
	PolicyUsed string
	Timestamp  time.Time
}

PermissionLogEntry is an audit log entry for a daemon permission decision.

func (PermissionLogEntry) Validate

func (e PermissionLogEntry) Validate() error

Validate ensures the permission audit entry is complete.

type PermissionLogQuery

type PermissionLogQuery struct {
	SessionID string
	AgentName string
	Decision  string
	Since     time.Time
	Limit     int
}

PermissionLogQuery filters permission audit queries.

func (PermissionLogQuery) Validate

func (q PermissionLogQuery) Validate() error

Validate ensures the query uses sane bounds.

type PermissionLogStore

type PermissionLogStore interface {
	WritePermissionLog(ctx context.Context, entry PermissionLogEntry) error
	ListPermissionLog(ctx context.Context, query PermissionLogQuery) ([]PermissionLogEntry, error)
}

PermissionLogStore manages permission decision audit entries.

type ReconcileResult

type ReconcileResult struct {
	Indexed  []string
	Orphaned []string
}

ReconcileResult reports which sessions were indexed or marked orphaned.

type SessionActivityMeta

type SessionActivityMeta struct {
	TurnID             string     `json:"turn_id,omitempty"`
	TurnSource         string     `json:"turn_source,omitempty"`
	TurnStartedAt      *time.Time `json:"turn_started_at,omitempty"`
	LastActivityAt     *time.Time `json:"last_activity_at,omitempty"`
	LastActivityKind   string     `json:"last_activity_kind,omitempty"`
	LastActivityDetail string     `json:"last_activity_detail,omitempty"`
	CurrentTool        string     `json:"current_tool,omitempty"`
	ToolCallID         string     `json:"tool_call_id,omitempty"`
	LastProgressAt     *time.Time `json:"last_progress_at,omitempty"`
	IterationCurrent   int        `json:"iteration_current,omitempty"`
	IterationMax       int        `json:"iteration_max,omitempty"`
	IdleSeconds        int64      `json:"idle_seconds,omitempty"`
}

SessionActivityMeta is the persisted prompt/runtime activity snapshot for one ACP-backed session.

func CloneSessionActivityMeta

func CloneSessionActivityMeta(meta *SessionActivityMeta) *SessionActivityMeta

CloneSessionActivityMeta returns a deep copy of the runtime activity payload.

func (*SessionActivityMeta) Validate

func (m *SessionActivityMeta) Validate() error

Validate ensures the activity payload remains internally consistent.

type SessionAttach

type SessionAttach struct {
	SessionID       string
	AttachedTo      string
	AttachExpiresAt time.Time
	AttachedAt      time.Time
}

SessionAttach records one acquired attach lock.

type SessionAttachRequest

type SessionAttachRequest struct {
	SessionID  string
	AttachedTo string
	Now        time.Time
	TTL        time.Duration
}

SessionAttachRequest identifies one explicit attach lock acquisition.

func (SessionAttachRequest) Normalize

Normalize trims the attach request and applies UTC timestamps.

func (SessionAttachRequest) Validate

func (r SessionAttachRequest) Validate() error

Validate ensures the attach request carries the fields needed for a CAS lock.

type SessionCatalog

type SessionCatalog interface {
	RegisterSession(ctx context.Context, session SessionInfo) error
	UpdateSessionState(ctx context.Context, update SessionStateUpdate) error
	ListSessions(ctx context.Context, query SessionListQuery) ([]SessionInfo, error)
	AttachSession(ctx context.Context, req SessionAttachRequest) (SessionAttach, error)
	ReconcileSessions(ctx context.Context, sessions []SessionInfo) (ReconcileResult, error)
}

SessionCatalog manages global session index records.

type SessionEvent

type SessionEvent struct {
	ID        string
	SessionID string
	Sequence  int64
	TurnID    string
	Type      string
	AgentName string
	Content   string
	Timestamp time.Time
}

SessionEvent is a persisted event row for a single AGH session.

func (SessionEvent) Validate

func (e SessionEvent) Validate() error

Validate ensures the event has the required fields for persistence.

type SessionFailure

type SessionFailure struct {
	Kind            FailureKind `json:"kind"`
	Summary         string      `json:"summary,omitempty"`
	CrashBundlePath string      `json:"crash_bundle_path,omitempty"`
}

SessionFailure is the durable, redacted diagnostic summary attached to a session terminal state and projected through public read paths.

func CloneSessionFailure

func CloneSessionFailure(failure *SessionFailure) *SessionFailure

CloneSessionFailure returns a deep copy of a session failure pointer.

func (SessionFailure) IsZero

func (f SessionFailure) IsZero() bool

IsZero reports whether the failure carries no diagnostic fields.

func (SessionFailure) Normalize

func (f SessionFailure) Normalize() SessionFailure

Normalize returns a trimmed copy of f.

func (SessionFailure) Validate

func (f SessionFailure) Validate() error

Validate checks that non-empty failure records use known kinds.

type SessionInfo

type SessionInfo struct {
	ID               string
	Name             string
	AgentName        string
	Provider         string
	WorkspaceID      string
	Channel          string
	SessionType      string
	Lineage          *SessionLineage
	State            string
	ACPSessionID     *string
	StopReason       StopReason
	StopDetail       string
	Failure          *SessionFailure
	Liveness         *SessionLivenessMeta
	Sandbox          *SessionSandboxMeta
	SoulSnapshotID   string
	SoulDigest       string
	ParentSoulDigest string
	AttachedTo       string
	AttachExpiresAt  *time.Time
	CreatedAt        time.Time
	UpdatedAt        time.Time
}

SessionInfo is the canonical session index row stored in the global database.

func (SessionInfo) Validate

func (s SessionInfo) Validate() error

Validate ensures the session record contains the required fields.

type SessionInputQueueEntry

type SessionInputQueueEntry struct {
	ID                string
	SessionID         string
	Status            string
	Mode              string
	Text              string
	SessionGeneration int64
	TaskRunID         string
	RunGeneration     *int64
	AttemptCount      int
	EnqueuedAt        time.Time
	DispatchStartedAt *time.Time
	SentAt            *time.Time
	FailedAt          *time.Time
	FailureSummary    string
	CanceledAt        *time.Time
	UpdatedAt         time.Time
}

SessionInputQueueEntry is one persisted busy-input item.

type SessionInputQueueInsert

type SessionInputQueueInsert struct {
	ID                string
	SessionID         string
	Mode              string
	Text              string
	SessionGeneration int64
	TaskRunID         string
	RunGeneration     *int64
	QueueCap          int
	Now               time.Time
}

SessionInputQueueInsert captures the atomic insert request for busy input.

func (SessionInputQueueInsert) Normalize

Normalize returns a trimmed, UTC-normalized insert request.

func (SessionInputQueueInsert) Validate

func (r SessionInputQueueInsert) Validate() error

Validate ensures the insert request can be persisted.

type SessionInputQueueStore

type SessionInputQueueStore interface {
	EnqueueSessionInput(
		ctx context.Context,
		req SessionInputQueueInsert,
	) (SessionInputQueueEntry, int, error)
	StageSessionSteer(
		ctx context.Context,
		req SessionInputQueueInsert,
	) (SessionInputQueueEntry, error)
	ConsumeSessionSteer(
		ctx context.Context,
		sessionID string,
		now time.Time,
	) (SessionInputQueueEntry, bool, error)
	ClaimNextSessionInput(
		ctx context.Context,
		sessionID string,
		now time.Time,
	) (SessionInputQueueEntry, bool, error)
	MarkSessionInputSent(
		ctx context.Context,
		sessionID string,
		entryID string,
		now time.Time,
	) error
	ReleaseSessionInput(ctx context.Context, sessionID string, entryID string, now time.Time) error
	MarkSessionInputFailed(
		ctx context.Context,
		sessionID string,
		entryID string,
		summary string,
		now time.Time,
	) error
	CancelSessionInput(
		ctx context.Context,
		sessionID string,
		entryID string,
		now time.Time,
	) (SessionInputQueueEntry, error)
	CancelPendingSessionInputs(
		ctx context.Context,
		sessionID string,
		generation int64,
		now time.Time,
	) (int, error)
	AdvanceSessionInputGeneration(ctx context.Context, sessionID string, now time.Time) (int64, error)
	CurrentSessionInputGeneration(ctx context.Context, sessionID string) (int64, error)
	SessionInputQueueSummary(ctx context.Context, sessionID string) (SessionInputQueueSummary, error)
}

SessionInputQueueStore persists operator busy-input entries.

type SessionInputQueueSummary

type SessionInputQueueSummary struct {
	SessionID     string
	Generation    int64
	PendingActive int
	PendingQueued int
	PendingSteer  int
	PendingLeased int
}

SessionInputQueueSummary captures the current generation and pending entries for recap/status reads.

type SessionLedgerRecord

type SessionLedgerRecord struct {
	SessionID    string
	WorkspaceID  string
	AgentName    string
	SessionType  string
	EventsDBPath string
	Lineage      *SessionLineage
	StartedAt    time.Time
	EndedAt      time.Time
}

SessionLedgerRecord carries the durable session evidence needed to materialize a forensic session ledger after the live event store has been closed.

type SessionLineage

type SessionLineage struct {
	ParentSessionID  string                  `json:"parent_session_id,omitempty"`
	RootSessionID    string                  `json:"root_session_id,omitempty"`
	SpawnDepth       int                     `json:"spawn_depth"`
	SpawnRole        string                  `json:"spawn_role,omitempty"`
	TTLExpiresAt     *time.Time              `json:"ttl_expires_at,omitempty"`
	AutoStopOnParent bool                    `json:"auto_stop_on_parent"`
	SpawnBudget      SessionSpawnBudget      `json:"spawn_budget"`
	PermissionPolicy SessionPermissionPolicy `json:"permission_policy"`
}

SessionLineage is the persisted parent/root metadata used for safe spawned sessions.

func CloneSessionLineage

func CloneSessionLineage(lineage *SessionLineage) *SessionLineage

CloneSessionLineage returns a deep copy of lineage metadata.

func NormalizeSessionLineage

func NormalizeSessionLineage(sessionID string, lineage *SessionLineage) *SessionLineage

NormalizeSessionLineage returns lineage with trimmed identifiers and a root record for first-class manual sessions when no lineage was supplied.

type SessionListQuery

type SessionListQuery struct {
	ID              string
	State           string
	AgentName       string
	WorkspaceID     string
	SessionType     string
	ParentSessionID string
	RootSessionID   string
	SpawnRole       string
	Resumable       bool
	Sort            string
	Limit           int
}

SessionListQuery filters global session index queries.

func (SessionListQuery) Validate

func (q SessionListQuery) Validate() error

Validate ensures the query uses sane bounds.

type SessionLivenessMeta

type SessionLivenessMeta struct {
	SubprocessPID       int                  `json:"subprocess_pid,omitempty"`
	SubprocessStartedAt *time.Time           `json:"subprocess_started_at,omitempty"`
	LastUpdateAt        *time.Time           `json:"last_update_at,omitempty"`
	StallState          string               `json:"stall_state,omitempty"`
	StallReason         string               `json:"stall_reason,omitempty"`
	Activity            *SessionActivityMeta `json:"activity,omitempty"`
}

SessionLivenessMeta is the persisted runtime supervision state for one ACP-backed session.

func CloneSessionLivenessMeta

func CloneSessionLivenessMeta(meta *SessionLivenessMeta) *SessionLivenessMeta

CloneSessionLivenessMeta returns a deep copy of the liveness payload.

func (*SessionLivenessMeta) Validate

func (m *SessionLivenessMeta) Validate() error

Validate ensures the liveness payload remains internally consistent.

type SessionMeta

type SessionMeta struct {
	ID               string               `json:"id"`
	Name             string               `json:"name,omitempty"`
	AgentName        string               `json:"agent_name"`
	Provider         string               `json:"provider,omitempty"`
	Model            string               `json:"model,omitempty"`
	ReasoningEffort  string               `json:"reasoning_effort,omitempty"`
	WorkspaceID      string               `json:"workspace_id,omitempty"`
	Channel          string               `json:"channel,omitempty"`
	SessionType      string               `json:"session_type,omitempty"`
	Lineage          *SessionLineage      `json:"lineage,omitempty"`
	State            string               `json:"state"`
	StopReason       *StopReason          `json:"stop_reason,omitempty"`
	StopDetail       string               `json:"stop_detail,omitempty"`
	Failure          *SessionFailure      `json:"failure,omitempty"`
	ACPSessionID     *string              `json:"acp_session_id,omitempty"`
	Liveness         *SessionLivenessMeta `json:"liveness,omitempty"`
	Sandbox          *SessionSandboxMeta  `json:"sandbox,omitempty"`
	SoulSnapshotID   string               `json:"soul_snapshot_id,omitempty"`
	SoulDigest       string               `json:"soul_digest,omitempty"`
	ParentSoulDigest string               `json:"parent_soul_digest,omitempty"`
	CreatedAt        time.Time            `json:"created_at"`
	UpdatedAt        time.Time            `json:"updated_at"`
}

SessionMeta is the atomically-written session metadata document.

func ReadSessionMeta

func ReadSessionMeta(path string) (SessionMeta, error)

ReadSessionMeta loads a session metadata document from disk.

func (SessionMeta) Validate

func (m SessionMeta) Validate() error

Validate ensures the metadata file remains aligned with the session index schema.

type SessionPermissionPolicy

type SessionPermissionPolicy struct {
	Tools           []string `json:"tools"`
	Skills          []string `json:"skills"`
	MCPServers      []string `json:"mcp_servers"`
	WorkspacePaths  []string `json:"workspace_paths"`
	NetworkChannels []string `json:"network_channels"`
	SandboxProfiles []string `json:"sandbox_profiles"`
}

SessionPermissionPolicy captures concrete permission atoms available to a session.

func DecodeSessionPermissionPolicy

func DecodeSessionPermissionPolicy(raw string) (SessionPermissionPolicy, error)

DecodeSessionPermissionPolicy unmarshals permission policy metadata from the global session catalog.

func NormalizeSessionPermissionPolicy

func NormalizeSessionPermissionPolicy(policy SessionPermissionPolicy) SessionPermissionPolicy

NormalizeSessionPermissionPolicy returns a policy with stable, trimmed atom lists.

type SessionRegistry

SessionRegistry composes the global persistence surfaces used by runtime consumers.

type SessionSandboxMeta

type SessionSandboxMeta struct {
	SandboxID             string          `json:"sandbox_id,omitempty"`
	Backend               string          `json:"backend"`
	Profile               string          `json:"profile,omitempty"`
	State                 string          `json:"state,omitempty"`
	InstanceID            string          `json:"instance_id,omitempty"`
	RuntimeRootDir        string          `json:"runtime_root_dir,omitempty"`
	RuntimeAdditionalDirs []string        `json:"runtime_additional_dirs,omitempty"`
	ProviderState         json.RawMessage `json:"provider_state,omitempty"`
	SSHAccessExpiresAt    *time.Time      `json:"ssh_access_expires_at,omitempty"`
	LastSyncAt            *time.Time      `json:"last_sync_at,omitempty"`
	LastSyncError         string          `json:"last_sync_error,omitempty"`
}

SessionSandboxMeta is the persisted runtime sandbox state for a session.

type SessionSoulSnapshotUpdate

type SessionSoulSnapshotUpdate struct {
	ID               string
	SoulSnapshotID   string
	SoulDigest       string
	ParentSoulDigest string
	UpdatedAt        time.Time
}

SessionSoulSnapshotUpdate updates the Soul provenance attached to a session.

func (SessionSoulSnapshotUpdate) Validate

func (u SessionSoulSnapshotUpdate) Validate() error

Validate ensures session Soul provenance is internally consistent.

type SessionSpawnBudget

type SessionSpawnBudget struct {
	MaxChildren           int   `json:"max_children"`
	MaxDepth              int   `json:"max_depth"`
	TTLSeconds            int64 `json:"ttl_seconds"`
	MaxActivePerWorkspace int   `json:"max_active_per_workspace,omitempty"`
}

SessionSpawnBudget captures durable spawn limits attached to a session.

func DecodeSessionSpawnBudget

func DecodeSessionSpawnBudget(raw string) (SessionSpawnBudget, error)

DecodeSessionSpawnBudget unmarshals budget metadata from the global session catalog.

type SessionStateUpdate

type SessionStateUpdate struct {
	ID            string
	State         string
	ACPSessionID  *string
	StopReasonSet bool
	StopReason    *string
	StopDetail    string
	FailureSet    bool
	Failure       *SessionFailure
	Liveness      *SessionLivenessMeta
	Sandbox       *SessionSandboxMeta
	UpdatedAt     time.Time
}

SessionStateUpdate updates only the stateful fields of an indexed session.

func (SessionStateUpdate) Validate

func (u SessionStateUpdate) Validate() error

Validate ensures the update contains the required fields.

type StopReason

type StopReason string

StopReason classifies why a session ended.

const (
	StopCompleted      StopReason = "completed"
	StopUserCanceled   StopReason = "user_canceled"
	StopMaxIterations  StopReason = "max_iterations"
	StopLoopDetected   StopReason = "loop_detected"
	StopTimeout        StopReason = "timeout"
	StopBudgetExceeded StopReason = "budget_exceeded"
	StopError          StopReason = "error"
	StopAgentCrashed   StopReason = "agent_crashed"
	StopHookStopped    StopReason = "hook_stopped"
	StopShutdown       StopReason = "shutdown"
)

type TokenStats

type TokenStats struct {
	ID           string
	SessionID    string
	AgentName    string
	InputTokens  *int64
	OutputTokens *int64
	TotalTokens  *int64
	TotalCost    *float64
	CostCurrency *string
	TurnCount    int64
	UpdatedAt    time.Time
}

TokenStats is the aggregated usage record for a session in the global database.

type TokenStatsQuery

type TokenStatsQuery struct {
	SessionID string
	AgentName string
	Limit     int
}

TokenStatsQuery filters token aggregation lookups.

func (TokenStatsQuery) Validate

func (q TokenStatsQuery) Validate() error

Validate ensures the query uses sane bounds.

type TokenStatsStore

type TokenStatsStore interface {
	UpdateTokenStats(ctx context.Context, update TokenStatsUpdate) error
	ListTokenStats(ctx context.Context, query TokenStatsQuery) ([]TokenStats, error)
}

TokenStatsStore manages aggregated token usage rows.

type TokenStatsUpdate

type TokenStatsUpdate struct {
	SessionID    string
	AgentName    string
	InputTokens  *int64
	OutputTokens *int64
	TotalTokens  *int64
	CostAmount   *float64
	CostCurrency *string
	Turns        int64
	UpdatedAt    time.Time
}

TokenStatsUpdate adds one or more turns of usage into a session aggregate.

func (TokenStatsUpdate) Validate

func (u TokenStatsUpdate) Validate() error

Validate ensures the aggregate update contains the required identifying fields.

type TokenUsage

type TokenUsage struct {
	TurnID           string
	InputTokens      *int64
	OutputTokens     *int64
	TotalTokens      *int64
	ThoughtTokens    *int64
	CacheReadTokens  *int64
	CacheWriteTokens *int64
	ContextUsed      *int64
	ContextSize      *int64
	CostAmount       *float64
	CostCurrency     *string
	Timestamp        time.Time
}

TokenUsage captures per-turn usage data reported by an ACP provider.

func (TokenUsage) Validate

func (u TokenUsage) Validate() error

Validate ensures the usage payload has the required fields.

type TurnHistory

type TurnHistory struct {
	TurnID string
	Events []SessionEvent
}

TurnHistory groups ordered events by their turn identifier.

type WriteTx

type WriteTx struct {
	// contains filtered or unexported fields
}

WriteTx is the single-connection transaction handle passed to ExecuteWrite callbacks.

func (*WriteTx) ExecContext

func (tx *WriteTx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

ExecContext executes a statement inside the active write transaction.

func (*WriteTx) QueryContext

func (tx *WriteTx) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

QueryContext executes a query inside the active write transaction.

func (*WriteTx) QueryRowContext

func (tx *WriteTx) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

QueryRowContext executes a single-row query inside the active write transaction.

Directories

Path Synopsis
Package workspacedb owns per-workspace SQLite database lifecycle helpers.
Package workspacedb owns per-workspace SQLite database lifecycle helpers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL