Documentation
¶
Overview ¶
Package storage is RiskKernel's durable state layer. The Store interface is the seam behind which SQLite (default, zero-config, the file the user owns) and Postgres (opt-in, later) live. Callers never see SQL. Runs, steps, tool calls, and the cost ledger persist here so runs survive a crash (step 5: resume) and so spend is auditable (`riskkernel audit export`).
Schema evolution is forward-only via embedded Goose migrations (see COMPATIBILITY.md): migrations run in a transaction on startup, and the daemon refuses to start if the on-disk schema is newer than the binary understands.
Index ¶
- Constants
- Variables
- type ApprovalRecord
- type ApprovalRule
- type CheckpointRecord
- type Fact
- type LedgerEntry
- type LedgerTotals
- type PolicyRecord
- type Postgres
- func (p *Postgres) AppendLedger(ctx context.Context, e LedgerEntry) error
- func (p *Postgres) AppendToolCall(ctx context.Context, t ToolCallRecord) error
- func (p *Postgres) Close() error
- func (p *Postgres) CreateApproval(ctx context.Context, a ApprovalRecord) error
- func (p *Postgres) GetApproval(ctx context.Context, id string) (ApprovalRecord, error)
- func (p *Postgres) GetFact(ctx context.Context, namespace, key string) (Fact, error)
- func (p *Postgres) GetPolicy(ctx context.Context, name string) (PolicyRecord, error)
- func (p *Postgres) GetRun(ctx context.Context, id string) (RunRecord, error)
- func (p *Postgres) LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)
- func (p *Postgres) LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)
- func (p *Postgres) ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)
- func (p *Postgres) ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)
- func (p *Postgres) ListFacts(ctx context.Context, namespace string) ([]Fact, error)
- func (p *Postgres) ListPolicies(ctx context.Context) ([]PolicyRecord, error)
- func (p *Postgres) ListRuns(ctx context.Context) ([]RunRecord, error)
- func (p *Postgres) ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)
- func (p *Postgres) ListSteps(ctx context.Context, runID string) ([]StepRecord, error)
- func (p *Postgres) ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)
- func (p *Postgres) PutFact(ctx context.Context, f Fact) error
- func (p *Postgres) ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error
- func (p *Postgres) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
- func (p *Postgres) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)
- func (p *Postgres) Totals(ctx context.Context, runID string) (LedgerTotals, error)
- func (p *Postgres) UpsertPolicy(ctx context.Context, pol PolicyRecord) error
- func (p *Postgres) UpsertRun(ctx context.Context, r RunRecord) error
- func (p *Postgres) UpsertStep(ctx context.Context, st StepRecord) error
- type RunRecord
- type SQLite
- func (s *SQLite) AppendLedger(ctx context.Context, e LedgerEntry) error
- func (s *SQLite) AppendToolCall(ctx context.Context, t ToolCallRecord) error
- func (s *SQLite) Close() error
- func (s *SQLite) CreateApproval(ctx context.Context, a ApprovalRecord) error
- func (s *SQLite) GetApproval(ctx context.Context, id string) (ApprovalRecord, error)
- func (s *SQLite) GetFact(ctx context.Context, namespace, key string) (Fact, error)
- func (s *SQLite) GetPolicy(ctx context.Context, name string) (PolicyRecord, error)
- func (s *SQLite) GetRun(ctx context.Context, id string) (RunRecord, error)
- func (s *SQLite) LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)
- func (s *SQLite) LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)
- func (s *SQLite) ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)
- func (s *SQLite) ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)
- func (s *SQLite) ListFacts(ctx context.Context, namespace string) ([]Fact, error)
- func (s *SQLite) ListPolicies(ctx context.Context) ([]PolicyRecord, error)
- func (s *SQLite) ListRuns(ctx context.Context) ([]RunRecord, error)
- func (s *SQLite) ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)
- func (s *SQLite) ListSteps(ctx context.Context, runID string) ([]StepRecord, error)
- func (s *SQLite) ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)
- func (s *SQLite) PutFact(ctx context.Context, f Fact) error
- func (s *SQLite) ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error
- func (s *SQLite) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
- func (s *SQLite) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)
- func (s *SQLite) Totals(ctx context.Context, runID string) (LedgerTotals, error)
- func (s *SQLite) UpsertPolicy(ctx context.Context, p PolicyRecord) error
- func (s *SQLite) UpsertRun(ctx context.Context, r RunRecord) error
- func (s *SQLite) UpsertStep(ctx context.Context, st StepRecord) error
- type StepRecord
- type Store
- type SummarizeOptions
- type ToolCallRecord
- type UsageGroup
- type UsageSummary
Constants ¶
const ( ApprovalPending = "pending" ApprovalApproved = "approved" ApprovalDenied = "denied" )
Approval status values.
Variables ¶
var ErrNotFound = errors.New("storage: not found")
ErrNotFound is returned when a requested record does not exist.
var ErrSchemaTooNew = errors.New("storage: on-disk schema is newer than this binary; upgrade riskkernel")
ErrSchemaTooNew is returned when the on-disk schema version is newer than the running binary understands. RiskKernel refuses to start in this case rather than risk corrupting a user's data (downgrade protection, COMPATIBILITY.md).
Functions ¶
This section is empty.
Types ¶
type ApprovalRecord ¶
type ApprovalRecord struct {
ID string
RunID string
StepIndex int32
Tool string
SideEffect string
Arguments map[string]any
Status string // pending | approved | denied
Reason string
DecidedBy string
CreatedAt time.Time
DecidedAt *time.Time
}
ApprovalRecord is a human-in-the-loop gate on a side-effecting tool call.
type ApprovalRule ¶ added in v0.6.0
type ApprovalRule struct {
Tool string `json:"tool,omitempty"`
SideEffect string `json:"sideEffect,omitempty"`
}
ApprovalRule mirrors an api/v1 ApprovalPolicy rule: an action needs approval if it matches the tool exactly or the side-effect glob.
type CheckpointRecord ¶
type CheckpointRecord struct {
RunID string
StepIndex int32
Name string
UsagePromptTokens int64
UsageCompletionTokens int64
UsageDollars float64
UsageLoops int32
Payload map[string]any
CreatedAt time.Time
}
CheckpointRecord is a crash-resumable snapshot: a run's usage at a step plus an opaque user-supplied payload to restart from.
type LedgerEntry ¶
type LedgerEntry struct {
RunID string
StepIndex int32
Provider string
Model string
PromptTokens int64
CompletionTokens int64
Dollars float64
Priced bool // false when the model had no known price
ResponseID string
CreatedAt time.Time
}
LedgerEntry is one priced model call — the auditable unit of spend.
type LedgerTotals ¶
type LedgerTotals struct {
RunID string `json:"runId"`
Calls int64 `json:"calls"`
PromptTokens int64 `json:"promptTokens"`
CompletionTokens int64 `json:"completionTokens"`
Dollars float64 `json:"dollars"`
}
LedgerTotals aggregates spend for audit/reporting.
type PolicyRecord ¶ added in v0.6.0
type PolicyRecord struct {
Name string
BudgetTokens int64
BudgetDollars float64
BudgetLoops int32
BudgetSeconds int32
ToolAllowlist []string
ApprovalRules []ApprovalRule
CreatedAt time.Time
UpdatedAt time.Time
}
PolicyRecord is a reusable, named policy bundle — a default budget, a tool allowlist, and approval rules — that a run can reference by name (policyRef) instead of inlining. Mirrors the api/v1 Policy schema.
type Postgres ¶ added in v0.7.0
type Postgres struct {
// contains filtered or unexported fields
}
Postgres is the opt-in Store backend for multi-instance / HA deployments. It implements the same Store interface as the default SQLite backend against the same schema (timestamps as RFC3339 text, JSON marshaled in Go), so every package-level scan/marshal helper is shared — only the SQL dialect (placeholder style, the metadata JSON accessor) and the DDL differ. SQLite stays the default; Postgres is selected only when a connection string is configured.
func OpenPostgres ¶ added in v0.7.0
OpenPostgres connects to Postgres using a standard libpq/pgx connection string (e.g. postgres://user:pass@host:5432/db?sslmode=require), applies pending forward migrations in a transaction, and enforces downgrade protection.
func (*Postgres) AppendLedger ¶ added in v0.7.0
func (p *Postgres) AppendLedger(ctx context.Context, e LedgerEntry) error
func (*Postgres) AppendToolCall ¶ added in v0.7.0
func (p *Postgres) AppendToolCall(ctx context.Context, t ToolCallRecord) error
func (*Postgres) CreateApproval ¶ added in v0.7.0
func (p *Postgres) CreateApproval(ctx context.Context, a ApprovalRecord) error
func (*Postgres) GetApproval ¶ added in v0.7.0
func (*Postgres) LatestCheckpoint ¶ added in v0.7.0
func (*Postgres) LedgerForRun ¶ added in v0.7.0
func (*Postgres) ListApprovals ¶ added in v0.7.0
func (*Postgres) ListCheckpoints ¶ added in v0.7.0
func (*Postgres) ListPolicies ¶ added in v0.7.0
func (p *Postgres) ListPolicies(ctx context.Context) ([]PolicyRecord, error)
func (*Postgres) ListRunsByStatus ¶ added in v0.7.0
func (*Postgres) ListToolCalls ¶ added in v0.7.0
func (*Postgres) ResolveApproval ¶ added in v0.7.0
func (*Postgres) SaveCheckpoint ¶ added in v0.7.0
func (p *Postgres) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
func (*Postgres) SummarizeLedger ¶ added in v0.7.0
func (p *Postgres) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)
SummarizeLedger mirrors the SQLite aggregation. The grouping expression is chosen from a fixed whitelist (structural, not a bound parameter); the metadata key is validated and bound as a value, so nothing is built from raw user input unsafely. The only dialect difference from SQLite is the metadata accessor: Postgres reads the JSON-in-text column via `metadata::jsonb ->> key` instead of json_extract.
func (*Postgres) UpsertPolicy ¶ added in v0.7.0
func (p *Postgres) UpsertPolicy(ctx context.Context, pol PolicyRecord) error
func (*Postgres) UpsertStep ¶ added in v0.7.0
func (p *Postgres) UpsertStep(ctx context.Context, st StepRecord) error
type RunRecord ¶
type RunRecord struct {
ID string
Name string
Status string
HaltReason string
// PolicyRef is the name of the policy bundle the run was created under (empty
// = none). Its tool allowlist and approval rules are enforced per-run.
PolicyRef string
BudgetTokens int64
BudgetDollars float64
BudgetLoops int32
BudgetSeconds int32
UsagePromptTokens int64
UsageCompletionTokens int64
UsageDollars float64
UsageLoops int32
Metadata map[string]string
CreatedAt time.Time
UpdatedAt time.Time
}
RunRecord is the persisted form of a governed run.
type SQLite ¶
type SQLite struct {
// contains filtered or unexported fields
}
SQLite is the default Store backend: a single WAL-mode SQLite file the user owns. Pure-Go driver, so the binary stays static and cross-compilable.
func OpenSQLite ¶
OpenSQLite opens (creating if needed) the SQLite database at path, applies pending forward migrations in a transaction, and enforces downgrade protection.
func (*SQLite) AppendLedger ¶
func (s *SQLite) AppendLedger(ctx context.Context, e LedgerEntry) error
func (*SQLite) AppendToolCall ¶
func (s *SQLite) AppendToolCall(ctx context.Context, t ToolCallRecord) error
func (*SQLite) CreateApproval ¶
func (s *SQLite) CreateApproval(ctx context.Context, a ApprovalRecord) error
CreateApproval persists a new (pending) approval request.
func (*SQLite) GetApproval ¶
GetApproval returns an approval by id, or ErrNotFound.
func (*SQLite) GetPolicy ¶ added in v0.6.0
GetPolicy returns a policy bundle by name, or ErrNotFound.
func (*SQLite) LatestCheckpoint ¶
LatestCheckpoint returns a run's most recent checkpoint, or ErrNotFound.
func (*SQLite) LedgerForRun ¶
func (*SQLite) ListApprovals ¶
ListApprovals returns approvals filtered by status ("" = all), newest first.
func (*SQLite) ListCheckpoints ¶
ListCheckpoints returns a run's checkpoints in time order.
func (*SQLite) ListPolicies ¶ added in v0.6.0
func (s *SQLite) ListPolicies(ctx context.Context) ([]PolicyRecord, error)
ListPolicies returns all policy bundles, newest first.
func (*SQLite) ListRunsByStatus ¶
ListRunsByStatus returns runs in the given lifecycle status, newest first.
func (*SQLite) ListToolCalls ¶ added in v0.2.0
func (*SQLite) ResolveApproval ¶
func (s *SQLite) ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error
ResolveApproval records a decision on a pending approval. It is a no-op if the approval is not currently pending (returns ErrNotFound so callers can detect a double-resolve or unknown id).
func (*SQLite) SaveCheckpoint ¶
func (s *SQLite) SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
SaveCheckpoint appends a crash-resumable checkpoint.
func (*SQLite) SummarizeLedger ¶ added in v0.1.2
func (s *SQLite) SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)
SummarizeLedger aggregates the cost ledger across runs, grouped by opts.By. The grouping expression is chosen from a fixed whitelist (it's structural and can't be a bound parameter); the metadata key is validated and its JSON path is bound as a value, so nothing here is built from raw user input unsafely.
func (*SQLite) UpsertPolicy ¶ added in v0.6.0
func (s *SQLite) UpsertPolicy(ctx context.Context, p PolicyRecord) error
UpsertPolicy inserts or replaces a named policy bundle. On an update (same name) the original created_at is preserved.
func (*SQLite) UpsertStep ¶
func (s *SQLite) UpsertStep(ctx context.Context, st StepRecord) error
type StepRecord ¶
type StepRecord struct {
RunID string
Index int32
Status string
PromptTokens int64
CompletionTokens int64
Dollars float64
StartedAt time.Time
EndedAt *time.Time // nil while running
}
StepRecord is one loop iteration of a run.
type Store ¶
type Store interface {
// UpsertRun inserts or replaces a run row by ID.
UpsertRun(ctx context.Context, r RunRecord) error
// GetRun returns a run by ID, or ErrNotFound.
GetRun(ctx context.Context, id string) (RunRecord, error)
// ListRuns returns all runs, newest first.
ListRuns(ctx context.Context) ([]RunRecord, error)
// ListRunsByStatus returns runs in the given lifecycle status (e.g. "running"
// for reload-on-startup), newest first.
ListRunsByStatus(ctx context.Context, status string) ([]RunRecord, error)
// UpsertStep inserts or replaces a step row by (run_id, index).
UpsertStep(ctx context.Context, s StepRecord) error
// ListSteps returns a run's steps in index order.
ListSteps(ctx context.Context, runID string) ([]StepRecord, error)
// AppendLedger appends one priced call to the cost ledger.
AppendLedger(ctx context.Context, e LedgerEntry) error
// LedgerForRun returns a run's ledger entries in time order.
LedgerForRun(ctx context.Context, runID string) ([]LedgerEntry, error)
// Totals aggregates a run's ledger.
Totals(ctx context.Context, runID string) (LedgerTotals, error)
// SummarizeLedger aggregates spend across runs, grouped by opts.By
// (provider/model/day/name/metadata.<key>). The unit is the ledger row.
SummarizeLedger(ctx context.Context, opts SummarizeOptions) (UsageSummary, error)
// AppendToolCall records a tool invocation.
AppendToolCall(ctx context.Context, t ToolCallRecord) error
// ListToolCalls returns a run's tool invocations in audit order.
ListToolCalls(ctx context.Context, runID string) ([]ToolCallRecord, error)
// PutFact inserts or updates an episodic memory fact by (namespace, key).
PutFact(ctx context.Context, f Fact) error
// GetFact returns a fact, or ErrNotFound.
GetFact(ctx context.Context, namespace, key string) (Fact, error)
// ListFacts returns all facts in a namespace.
ListFacts(ctx context.Context, namespace string) ([]Fact, error)
// CreateApproval persists a new (pending) approval request.
CreateApproval(ctx context.Context, a ApprovalRecord) error
// GetApproval returns an approval by id, or ErrNotFound.
GetApproval(ctx context.Context, id string) (ApprovalRecord, error)
// ResolveApproval records a decision (approved/denied) on a pending approval.
ResolveApproval(ctx context.Context, id, status, reason, decidedBy string, decidedAt time.Time) error
// ListApprovals returns approvals filtered by status ("" = all), newest first.
ListApprovals(ctx context.Context, status string) ([]ApprovalRecord, error)
// UpsertPolicy inserts or replaces a named policy bundle (register/update by name).
UpsertPolicy(ctx context.Context, p PolicyRecord) error
// GetPolicy returns a policy bundle by name, or ErrNotFound.
GetPolicy(ctx context.Context, name string) (PolicyRecord, error)
// ListPolicies returns all policy bundles, newest first.
ListPolicies(ctx context.Context) ([]PolicyRecord, error)
// SaveCheckpoint appends a crash-resumable checkpoint.
SaveCheckpoint(ctx context.Context, c CheckpointRecord) error
// LatestCheckpoint returns a run's most recent checkpoint, or ErrNotFound.
LatestCheckpoint(ctx context.Context, runID string) (CheckpointRecord, error)
// ListCheckpoints returns a run's checkpoints in time order.
ListCheckpoints(ctx context.Context, runID string) ([]CheckpointRecord, error)
// Close releases the backend's resources.
Close() error
}
Store is the durable backend. Implementations must be safe for concurrent use.
type SummarizeOptions ¶ added in v0.1.2
type SummarizeOptions struct {
// By is the grouping dimension: "provider", "model", "day", "name", or
// "metadata.<key>" (e.g. "metadata.team"). Required.
By string
// Since/Until optionally bound the call time (created_at): Since is inclusive,
// Until exclusive. Nil means unbounded on that end.
Since *time.Time
Until *time.Time
}
SummarizeOptions selects how SummarizeLedger aggregates the cost ledger.
type ToolCallRecord ¶
type ToolCallRecord struct {
ID string
RunID string
StepIndex int32
Tool string
SideEffect string
Arguments map[string]any
Status string
CreatedAt time.Time
}
ToolCallRecord is a (side-effecting) tool invocation; populated by the MCP gateway and HITL approval gate in later build steps.
type UsageGroup ¶ added in v0.1.2
type UsageGroup struct {
Key string `json:"key"`
Calls int64 `json:"calls"`
PromptTokens int64 `json:"promptTokens"`
CompletionTokens int64 `json:"completionTokens"`
Dollars float64 `json:"dollars"`
}
UsageGroup is one bucket of aggregated spend — e.g. one team, one provider, or one day. Tokens/dollars are summed from the cost ledger (the auditable source).
type UsageSummary ¶ added in v0.1.2
type UsageSummary struct {
By string `json:"by"`
Groups []UsageGroup `json:"groups"`
Total UsageGroup `json:"total"`
}
UsageSummary is cost-ledger spend grouped by one dimension, plus the grand total across all groups in range.