storage

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package storage is RiskKernel's durable state layer. The Store interface is the seam behind which SQLite (default, zero-config, the file the user owns) and Postgres (opt-in, later) live. Callers never see SQL. Runs, steps, tool calls, and the cost ledger persist here so runs survive a crash (step 5: resume) and so spend is auditable (`riskkernel audit export`).

Schema evolution is forward-only via embedded Goose migrations (see COMPATIBILITY.md): migrations run in a transaction on startup, and the daemon refuses to start if the on-disk schema is newer than the binary understands.

Index

Constants

View Source
const (
	ApprovalPending  = "pending"
	ApprovalApproved = "approved"
	ApprovalDenied   = "denied"
)

Approval status values.

Variables

View Source
var ErrNotFound = errors.New("storage: not found")

ErrNotFound is returned when a requested record does not exist.

View Source
var ErrSchemaTooNew = errors.New("storage: on-disk schema is newer than this binary; upgrade riskkernel")

ErrSchemaTooNew is returned when the on-disk schema version is newer than the running binary understands. RiskKernel refuses to start in this case rather than risk corrupting a user's data (downgrade protection, COMPATIBILITY.md).

Functions

This section is empty.

Types

type ApprovalRecord

type ApprovalRecord struct {
	ID         string
	RunID      string
	StepIndex  int32
	Tool       string
	SideEffect string
	Arguments  map[string]any
	Status     string // pending | approved | denied
	Reason     string
	DecidedBy  string
	CreatedAt  time.Time
	DecidedAt  *time.Time
}

ApprovalRecord is a human-in-the-loop gate on a side-effecting tool call.

type ApprovalRule added in v0.6.0

type ApprovalRule struct {
	Tool       string `json:"tool,omitempty"`
	SideEffect string `json:"sideEffect,omitempty"`
}

ApprovalRule mirrors an api/v1 ApprovalPolicy rule: an action needs approval if it matches the tool exactly or the side-effect glob.

type CheckpointRecord

type CheckpointRecord struct {
	RunID                 string
	StepIndex             int32
	Name                  string
	UsagePromptTokens     int64
	UsageCompletionTokens int64
	UsageDollars          float64
	UsageLoops            int32
	Payload               map[string]any
	CreatedAt             time.Time
}

CheckpointRecord is a crash-resumable snapshot: a run's usage at a step plus an opaque user-supplied payload to restart from.

type Fact

type Fact struct {
	Namespace string
	Key       string
	Value     string
	RunID     string
	UpdatedAt time.Time
}

Fact is one episodic memory key/value.

type LedgerEntry

type LedgerEntry struct {
	RunID            string
	StepIndex        int32
	Provider         string
	Model            string
	PromptTokens     int64
	CompletionTokens int64
	Dollars          float64
	Priced           bool // false when the model had no known price
	ResponseID       string
	CreatedAt        time.Time
}

LedgerEntry is one priced model call — the auditable unit of spend.

type LedgerTotals

type LedgerTotals struct {
	RunID            string  `json:"runId"`
	Calls            int64   `json:"calls"`
	PromptTokens     int64   `json:"promptTokens"`
	CompletionTokens int64   `json:"completionTokens"`
	Dollars          float64 `json:"dollars"`
}

LedgerTotals aggregates spend for audit/reporting.

type PolicyRecord added in v0.6.0

type PolicyRecord struct {
	Name          string
	BudgetTokens  int64
	BudgetDollars float64
	BudgetLoops   int32
	BudgetSeconds int32
	ToolAllowlist []string
	ApprovalRules []ApprovalRule
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

PolicyRecord is a reusable, named policy bundle — a default budget, a tool allowlist, and approval rules — that a run can reference by name (policyRef) instead of inlining. Mirrors the api/v1 Policy schema.

type Postgres added in v0.7.0

type Postgres struct {
	// contains filtered or unexported fields
}

Postgres is the opt-in Store backend for multi-instance / HA deployments. It implements the same Store interface as the default SQLite backend against the same schema (timestamps as RFC3339 text, JSON marshaled in Go), so every package-level scan/marshal helper is shared — only the SQL dialect (placeholder style, the metadata JSON accessor) and the DDL differ. SQLite stays the default; Postgres is selected only when a connection string is configured.

func OpenPostgres added in v0.7.0

func OpenPostgres(dsn string) (*Postgres, error)

OpenPostgres connects to Postgres using a standard libpq/pgx connection string (e.g. postgres://user:pass@host:5432/db?sslmode=require), applies pending forward migrations in a transaction, and enforces downgrade protection.

func (*Postgres) AppendLedger added in v0.7.0

func (p *Postgres) AppendLedger(ctx context.Context, e LedgerEntry) error

func (*Postgres) AppendToolCall added in v0.7.0

func (p *Postgres) AppendToolCall(ctx context.Context, t ToolCallRecord) error

func (*Postgres) Close added in v0.7.0

func (p *Postgres) Close() error

Close closes the connection pool.

func (*Postgres) CreateApproval added in v0.7.0

func (p *Postgres) CreateApproval(ctx context.Context, a ApprovalRecord) error

func (*Postgres) GetApproval added in v0.7.0

func (p *Postgres) GetApproval(ctx context.Context, id string) (ApprovalRecord, error)

func (*Postgres) GetFact added in v0.7.0

func (p *Postgres) GetFact(ctx context.Context, namespace, key string) (Fact, error)

func (*Postgres) GetPolicy added in v0.7.0

func (p *Postgres) GetPolicy(ctx context.Context, name string) (PolicyRecord, error)

func (*Postgres) GetRun added in v0.7.0

func (p *Postgres) GetRun(ctx context.Context, id string) (RunRecord, error)

func (*Postgres) LatestCheckpoint added in v0.7.0

func (p *Postgres) LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)

func (*Postgres) LedgerForRun added in v0.7.0

func (p *Postgres) LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)

func (*Postgres) ListApprovals added in v0.7.0

func (p *Postgres) ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)

func (*Postgres) ListCheckpoints added in v0.7.0

func (p *Postgres) ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)

func (*Postgres) ListFacts added in v0.7.0

func (p *Postgres) ListFacts(ctx context.Context, namespace string) ([]Fact, error)

func (*Postgres) ListPolicies added in v0.7.0

func (p *Postgres) ListPolicies(ctx context.Context) ([]PolicyRecord, error)

func (*Postgres) ListRuns added in v0.7.0

func (p *Postgres) ListRuns(ctx context.Context) ([]RunRecord, error)

func (*Postgres) ListRunsByStatus added in v0.7.0

func (p *Postgres) ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)

func (*Postgres) ListSteps added in v0.7.0

func (p *Postgres) ListSteps(ctx context.Context, runID string) ([]StepRecord, error)

func (*Postgres) ListToolCalls added in v0.7.0

func (p *Postgres) ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)

func (*Postgres) PutFact added in v0.7.0

func (p *Postgres) PutFact(ctx context.Context, f Fact) error

func (*Postgres) ResolveApproval added in v0.7.0

func (p *Postgres) ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error

func (*Postgres) SaveCheckpoint added in v0.7.0

func (p *Postgres) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error

func (*Postgres) SummarizeLedger added in v0.7.0

func (p *Postgres) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)

SummarizeLedger mirrors the SQLite aggregation. The grouping expression is chosen from a fixed whitelist (structural, not a bound parameter); the metadata key is validated and bound as a value, so nothing is built from raw user input unsafely. The only dialect difference from SQLite is the metadata accessor: Postgres reads the JSON-in-text column via `metadata::jsonb ->> key` instead of json_extract.

func (*Postgres) Totals added in v0.7.0

func (p *Postgres) Totals(ctx context.Context, runID string) (LedgerTotals, error)

func (*Postgres) UpsertPolicy added in v0.7.0

func (p *Postgres) UpsertPolicy(ctx context.Context, pol PolicyRecord) error

func (*Postgres) UpsertRun added in v0.7.0

func (p *Postgres) UpsertRun(ctx context.Context, r RunRecord) error

func (*Postgres) UpsertStep added in v0.7.0

func (p *Postgres) UpsertStep(ctx context.Context, st StepRecord) error

type RunRecord

type RunRecord struct {
	ID         string
	Name       string
	Status     string
	HaltReason string
	// PolicyRef is the name of the policy bundle the run was created under (empty
	// = none). Its tool allowlist and approval rules are enforced per-run.
	PolicyRef string

	BudgetTokens  int64
	BudgetDollars float64
	BudgetLoops   int32
	BudgetSeconds int32

	UsagePromptTokens     int64
	UsageCompletionTokens int64
	UsageDollars          float64
	UsageLoops            int32

	Metadata  map[string]string
	CreatedAt time.Time
	UpdatedAt time.Time
}

RunRecord is the persisted form of a governed run.

type SQLite

type SQLite struct {
	// contains filtered or unexported fields
}

SQLite is the default Store backend: a single WAL-mode SQLite file the user owns. Pure-Go driver, so the binary stays static and cross-compilable.

func OpenSQLite

func OpenSQLite(path string) (*SQLite, error)

OpenSQLite opens (creating if needed) the SQLite database at path, applies pending forward migrations in a transaction, and enforces downgrade protection.

func (*SQLite) AppendLedger

func (s *SQLite) AppendLedger(ctx context.Context, e LedgerEntry) error

func (*SQLite) AppendToolCall

func (s *SQLite) AppendToolCall(ctx context.Context, t ToolCallRecord) error

func (*SQLite) Close

func (s *SQLite) Close() error

Close closes the database.

func (*SQLite) CreateApproval

func (s *SQLite) CreateApproval(ctx context.Context, a ApprovalRecord) error

CreateApproval persists a new (pending) approval request.

func (*SQLite) GetApproval

func (s *SQLite) GetApproval(ctx context.Context, id string) (ApprovalRecord, error)

GetApproval returns an approval by id, or ErrNotFound.

func (*SQLite) GetFact

func (s *SQLite) GetFact(ctx context.Context, namespace, key string) (Fact, error)

GetFact returns a fact by (namespace, key), or ErrNotFound.

func (*SQLite) GetPolicy added in v0.6.0

func (s *SQLite) GetPolicy(ctx context.Context, name string) (PolicyRecord, error)

GetPolicy returns a policy bundle by name, or ErrNotFound.

func (*SQLite) GetRun

func (s *SQLite) GetRun(ctx context.Context, id string) (RunRecord, error)

func (*SQLite) LatestCheckpoint

func (s *SQLite) LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)

LatestCheckpoint returns a run's most recent checkpoint, or ErrNotFound.

func (*SQLite) LedgerForRun

func (s *SQLite) LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)

func (*SQLite) ListApprovals

func (s *SQLite) ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)

ListApprovals returns approvals filtered by status ("" = all), newest first.

func (*SQLite) ListCheckpoints

func (s *SQLite) ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)

ListCheckpoints returns a run's checkpoints in time order.

func (*SQLite) ListFacts

func (s *SQLite) ListFacts(ctx context.Context, namespace string) ([]Fact, error)

ListFacts returns all facts in a namespace, key order.

func (*SQLite) ListPolicies added in v0.6.0

func (s *SQLite) ListPolicies(ctx context.Context) ([]PolicyRecord, error)

ListPolicies returns all policy bundles, newest first.

func (*SQLite) ListRuns

func (s *SQLite) ListRuns(ctx context.Context) ([]RunRecord, error)

func (*SQLite) ListRunsByStatus

func (s *SQLite) ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)

ListRunsByStatus returns runs in the given lifecycle status, newest first.

func (*SQLite) ListSteps

func (s *SQLite) ListSteps(ctx context.Context, runID string) ([]StepRecord, error)

func (*SQLite) ListToolCalls added in v0.2.0

func (s *SQLite) ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)

func (*SQLite) PutFact

func (s *SQLite) PutFact(ctx context.Context, f Fact) error

PutFact inserts or updates an episodic fact by (namespace, key).

func (*SQLite) ResolveApproval

func (s *SQLite) ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error

ResolveApproval records a decision on a pending approval. It is a no-op if the approval is not currently pending (returns ErrNotFound so callers can detect a double-resolve or unknown id).

func (*SQLite) SaveCheckpoint

func (s *SQLite) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error

SaveCheckpoint appends a crash-resumable checkpoint.

func (*SQLite) SummarizeLedger added in v0.1.2

func (s *SQLite) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)

SummarizeLedger aggregates the cost ledger across runs, grouped by opts.By. The grouping expression is chosen from a fixed whitelist (it's structural and can't be a bound parameter); the metadata key is validated and its JSON path is bound as a value, so nothing here is built from raw user input unsafely.

func (*SQLite) Totals

func (s *SQLite) Totals(ctx context.Context, runID string) (LedgerTotals, error)

func (*SQLite) UpsertPolicy added in v0.6.0

func (s *SQLite) UpsertPolicy(ctx context.Context, p PolicyRecord) error

UpsertPolicy inserts or replaces a named policy bundle. On an update (same name) the original created_at is preserved.

func (*SQLite) UpsertRun

func (s *SQLite) UpsertRun(ctx context.Context, r RunRecord) error

func (*SQLite) UpsertStep

func (s *SQLite) UpsertStep(ctx context.Context, st StepRecord) error

type StepRecord

type StepRecord struct {
	RunID            string
	Index            int32
	Status           string
	PromptTokens     int64
	CompletionTokens int64
	Dollars          float64
	StartedAt        time.Time
	EndedAt          *time.Time // nil while running
}

StepRecord is one loop iteration of a run.

type Store

type Store interface {
	// UpsertRun inserts or replaces a run row by ID.
	UpsertRun(ctx context.Context, r RunRecord) error
	// GetRun returns a run by ID, or ErrNotFound.
	GetRun(ctx context.Context, id string) (RunRecord, error)
	// ListRuns returns all runs, newest first.
	ListRuns(ctx context.Context) ([]RunRecord, error)
	// ListRunsByStatus returns runs in the given lifecycle status (e.g. "running"
	// for reload-on-startup), newest first.
	ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)

	// UpsertStep inserts or replaces a step row by (run_id, index).
	UpsertStep(ctx context.Context, s StepRecord) error
	// ListSteps returns a run's steps in index order.
	ListSteps(ctx context.Context, runID string) ([]StepRecord, error)

	// AppendLedger appends one priced call to the cost ledger.
	AppendLedger(ctx context.Context, e LedgerEntry) error
	// LedgerForRun returns a run's ledger entries in time order.
	LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)
	// Totals aggregates a run's ledger.
	Totals(ctx context.Context, runID string) (LedgerTotals, error)
	// SummarizeLedger aggregates spend across runs, grouped by opts.By
	// (provider/model/day/name/metadata.<key>). The unit is the ledger row.
	SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)

	// AppendToolCall records a tool invocation.
	AppendToolCall(ctx context.Context, t ToolCallRecord) error
	// ListToolCalls returns a run's tool invocations in audit order.
	ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)

	// PutFact inserts or updates an episodic memory fact by (namespace, key).
	PutFact(ctx context.Context, f Fact) error
	// GetFact returns a fact, or ErrNotFound.
	GetFact(ctx context.Context, namespace, key string) (Fact, error)
	// ListFacts returns all facts in a namespace.
	ListFacts(ctx context.Context, namespace string) ([]Fact, error)

	// CreateApproval persists a new (pending) approval request.
	CreateApproval(ctx context.Context, a ApprovalRecord) error
	// GetApproval returns an approval by id, or ErrNotFound.
	GetApproval(ctx context.Context, id string) (ApprovalRecord, error)
	// ResolveApproval records a decision (approved/denied) on a pending approval.
	ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error
	// ListApprovals returns approvals filtered by status ("" = all), newest first.
	ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)

	// UpsertPolicy inserts or replaces a named policy bundle (register/update by name).
	UpsertPolicy(ctx context.Context, p PolicyRecord) error
	// GetPolicy returns a policy bundle by name, or ErrNotFound.
	GetPolicy(ctx context.Context, name string) (PolicyRecord, error)
	// ListPolicies returns all policy bundles, newest first.
	ListPolicies(ctx context.Context) ([]PolicyRecord, error)

	// SaveCheckpoint appends a crash-resumable checkpoint.
	SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
	// LatestCheckpoint returns a run's most recent checkpoint, or ErrNotFound.
	LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)
	// ListCheckpoints returns a run's checkpoints in time order.
	ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)

	// Close releases the backend's resources.
	Close() error
}

Store is the durable backend. Implementations must be safe for concurrent use.

type SummarizeOptions added in v0.1.2

type SummarizeOptions struct {
	// By is the grouping dimension: "provider", "model", "day", "name", or
	// "metadata.<key>" (e.g. "metadata.team"). Required.
	By string
	// Since/Until optionally bound the call time (created_at): Since is inclusive,
	// Until exclusive. Nil means unbounded on that end.
	Since *time.Time
	Until *time.Time
}

SummarizeOptions selects how SummarizeLedger aggregates the cost ledger.

type ToolCallRecord

type ToolCallRecord struct {
	ID         string
	RunID      string
	StepIndex  int32
	Tool       string
	SideEffect string
	Arguments  map[string]any
	Status     string
	CreatedAt  time.Time
}

ToolCallRecord is a (side-effecting) tool invocation; populated by the MCP gateway and HITL approval gate in later build steps.

type UsageGroup added in v0.1.2

type UsageGroup struct {
	Key              string  `json:"key"`
	Calls            int64   `json:"calls"`
	PromptTokens     int64   `json:"promptTokens"`
	CompletionTokens int64   `json:"completionTokens"`
	Dollars          float64 `json:"dollars"`
}

UsageGroup is one bucket of aggregated spend — e.g. one team, one provider, or one day. Tokens/dollars are summed from the cost ledger (the auditable source).

type UsageSummary added in v0.1.2

type UsageSummary struct {
	By     string       `json:"by"`
	Groups []UsageGroup `json:"groups"`
	Total  UsageGroup   `json:"total"`
}

UsageSummary is cost-ledger spend grouped by one dimension, plus the grand total across all groups in range.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL