memory

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package memory implements governed agent memory with provenance tracking, conflict detection, and Constitutional AI governance.

Index

Constants

View Source
const (
	CategoryFactualCorrections    = "factual_corrections"
	CategoryUserPreferences       = "user_preferences"
	CategoryDomainKnowledge       = "domain_knowledge"
	CategoryProcedureImprovements = "procedure_improvements"
)

Learning categories -- things the agent can learn.

View Source
const (
	CategoryPolicyHit     = "policy_hit"
	CategoryCostDecision  = "cost_decision"
	CategoryPIIRedaction  = "pii_redaction"
	CategoryToolApproval  = "tool_approval"
	CategoryEscalation    = "escalation"
	CategoryErrorRecovery = "error_recovery"
)

Operational categories -- runtime observations.

View Source
const (
	MemTypeSemanticFact = "semantic"   // What the agent knows: facts, preferences, constraints
	MemTypeEpisodic     = "episodic"   // What happened: specific interactions, outcomes, events
	MemTypeProcedural   = "procedural" // How to do things: learned behaviors, response patterns
)

Memory types (Tulving/CoALA — three-type model for retrieval scoring).

View Source
const (
	ObsDecision  = "decision"
	ObsPolicyHit = "policy_hit"
	ObsToolUse   = "tool_use"
	ObsLearning  = "learning"
	ObsError     = "error"
)

Observation types describe what kind of memory entry this is.

View Source
const (
	SourceManual     = "manual"
	SourceUserInput  = "user_input"
	SourceAgentRun   = "agent_run"
	SourceToolOutput = "tool_output"
	SourceWebhook    = "webhook"
)

Source types and their default trust scores.

View Source
const (
	ScopeAgent     = "agent"     // persists per-agent across sessions (default)
	ScopeSession   = "session"   // per-session; auto-expiry is Phase 2
	ScopeWorkspace = "workspace" // shared read-only across agents in a tenant
)

Memory scopes define the lifetime and visibility of entries.

Variables

View Source
var (
	ErrPIIDetected       = errors.New("PII detected in content")
	ErrMemoryWriteDenied = errors.New("memory write denied by governance")
)

Domain errors

View Source
var ErrEntryNotFound = errors.New("memory entry not found")

ErrEntryNotFound is returned when a memory entry does not exist.

View Source
var ErrMemoryConflict = errors.New("memory entry conflicts with existing entries")

ErrMemoryConflict is returned when a memory write conflicts with existing entries and the conflict resolution policy is "reject".

View Source
var TypeWeights = map[string]float64{
	MemTypeSemanticFact: 0.6,
	MemTypeEpisodic:     0.3,
	MemTypeProcedural:   0.1,
}

TypeWeights for relevance-scored retrieval (Mem0-style). Semantic facts are most valuable for prompt injection; episodic provides recent context; procedural fine-tunes behavior.

Functions

func AllowedWhenDomainKnowledgeAllowed

func AllowedWhenDomainKnowledgeAllowed(cat string) bool

AllowedWhenDomainKnowledgeAllowed returns true if the category is either domain_knowledge or one of its sub-types (tool_approval, cost_decision, user_preferences, procedure_improvements, factual_corrections). Used so that legacy policies with only allowed_categories: [domain_knowledge, policy_hit] do not silently reject writes that are now classified with the finer categories.

func DedupSkipsAdd

func DedupSkipsAdd(ctx context.Context, n int64)

DedupSkipsAdd records memory writes skipped due to input-hash deduplication.

func DeriveTrustScore

func DeriveTrustScore(sourceType string) int

DeriveTrustScore returns the default trust score for the given source type. Unknown source types get a conservative score of 30.

func IsForbiddenCategory

func IsForbiddenCategory(cat string) bool

IsForbiddenCategory returns true for categories that are always forbidden, regardless of policy configuration.

func RecordPoisoningBlocked

func RecordPoisoningBlocked(ctx context.Context)

RecordPoisoningBlocked increments the memory poisoning defense counter.

func RunRetention

func RunRetention(ctx context.Context, store *Store, pol *policy.Policy)

RunRetention purges expired entries and enforces max_entries for all agents in the store, using the provided policy for retention_days and max_entries. This is designed to be called periodically (e.g., daily) from talon serve.

func StartRetentionLoop

func StartRetentionLoop(ctx context.Context, store *Store, pol *policy.Policy, interval time.Duration) func()

StartRetentionLoop runs retention every interval in a goroutine. Returns a cancel function to stop the loop.

func ValidObservationTypes

func ValidObservationTypes() []string

ValidObservationTypes returns the set of valid observation types.

func ValidSourceTypes

func ValidSourceTypes() []string

ValidSourceTypes returns all recognized source types.

Types

type ConflictCandidate

type ConflictCandidate struct {
	ExistingEntryID string
	ExistingTitle   string
	Similarity      float64
	Category        string
	TrustScore      int
}

ConflictCandidate describes a potential conflict with an existing memory entry.

type ConsolidationAction

type ConsolidationAction string

ConsolidationAction is the AUDN decision for a candidate memory entry (Mem0-style, governed).

const (
	ActionAdd        ConsolidationAction = "add"
	ActionUpdate     ConsolidationAction = "update"
	ActionInvalidate ConsolidationAction = "invalidate"
	ActionNoop       ConsolidationAction = "noop"
)

type ConsolidationResult

type ConsolidationResult struct {
	Action     ConsolidationAction `json:"action"`
	TargetID   string              `json:"target_id,omitempty"`
	Reason     string              `json:"reason"`
	Similarity float64             `json:"similarity,omitempty"`
	NewContent string              `json:"new_content,omitempty"`
}

ConsolidationResult holds the decision and audit metadata.

type Consolidator

type Consolidator struct {
	// contains filtered or unexported fields
}

Consolidator evaluates candidate entries against existing memory and applies AUDN decisions.

func NewConsolidator

func NewConsolidator(store *Store) *Consolidator

NewConsolidator returns a consolidator backed by the given store.

func (*Consolidator) Apply

func (c *Consolidator) Apply(ctx context.Context, candidate *Entry, result *ConsolidationResult) error

Apply executes the consolidation decision against the store.

func (*Consolidator) Evaluate

func (c *Consolidator) Evaluate(ctx context.Context, candidate *Entry) (*ConsolidationResult, error)

Evaluate determines the AUDN action for a candidate entry (rule-based similarity). Input-hash deduplication is performed by the caller (e.g. Runner.writeMemoryObservation) before calling Evaluate; do not duplicate HasRecentWithInputHash here.

Logic: similarity >= 0.90 → NOOP; >= 0.60 → trust comparison → INVALIDATE or UPDATE; < 0.30 → ADD; 0.30–0.60 → ADD (related but distinct).

type Entry

type Entry struct {
	ID                  string     `json:"id"`
	TenantID            string     `json:"tenant_id"`
	AgentID             string     `json:"agent_id"`
	Category            string     `json:"category"`
	Scope               string     `json:"scope"`                // "agent" (default), "session", "workspace"
	InputHash           string     `json:"input_hash"`           // SHA256 fingerprint for deduplication (optional)
	MemoryType          string     `json:"memory_type"`          // "semantic", "episodic", "procedural" (Phase 2)
	ValidAt             *time.Time `json:"valid_at,omitempty"`   // Event time: when fact was true
	InvalidAt           *time.Time `json:"invalid_at,omitempty"` // Event time: when fact ceased to be true
	InvalidatedBy       string     `json:"invalidated_by,omitempty"`
	ConsolidationStatus string     `json:"consolidation_status"` // "active", "invalidated", "merged", "superseded"
	CreatedAt           time.Time  `json:"created_at"`           // Transaction time: when ingested
	ExpiredAt           *time.Time `json:"expired_at,omitempty"` // Transaction time: when superseded in system
	Title               string     `json:"title"`
	Content             string     `json:"content"`
	ObservationType     string     `json:"observation_type"`
	TokenCount          int        `json:"token_count"`
	FilesAffected       []string   `json:"files_affected"`
	Version             int        `json:"version"`
	Timestamp           time.Time  `json:"timestamp"`
	EvidenceID          string     `json:"evidence_id"`
	SourceType          string     `json:"source_type"`
	SourceEvidenceID    string     `json:"source_evidence_id"`
	TrustScore          int        `json:"trust_score"`
	ConflictsWith       []string   `json:"conflicts_with"`
	ReviewStatus        string     `json:"review_status"`
	Signature           string     `json:"signature,omitempty"`
}

Entry is a full memory record with provenance.

type EntrySigner

type EntrySigner interface {
	Sign(data []byte) (string, error)
	Verify(data []byte, signature string) bool
}

Store persists governed memory entries in SQLite with FTS5 full-text search. EntrySigner can sign memory entries for tamper evidence (optional).

type Governance

type Governance struct {
	// contains filtered or unexported fields
}

Governance enforces Constitutional AI rules on memory writes. opaMu protects the shared opa field so that SetPolicyEvaluator and evalOPAMemoryWrite (when using g.opa) are safe under concurrent Run() calls.

func NewGovernance

func NewGovernance(store *Store, cls *classifier.Scanner) *Governance

NewGovernance creates a governance checker backed by the given store and PII scanner.

func (*Governance) CheckConflicts

func (g *Governance) CheckConflicts(ctx context.Context, entry Entry, similarityThreshold float64) ([]ConflictCandidate, error)

CheckConflicts finds existing entries that may contradict the new entry. similarityThreshold is the minimum keyword-overlap ratio (0..1) to consider two entries in conflict; it is typically taken from policy memory.governance.conflict_similarity_threshold (default 0.6).

func (*Governance) SetPolicyEvaluator

func (g *Governance) SetPolicyEvaluator(eval PolicyEvaluator)

SetPolicyEvaluator attaches an OPA-based policy evaluator for memory governance. When set, ValidateWrite runs OPA rules in addition to hardcoded Go checks (when called with a nil per-call eval). Callers running concurrent agent invocations (e.g. talon serve webhooks/cron) should prefer passing the per-run engine into ValidateWrite instead of setting a shared evaluator here, to avoid sharing mutable state across goroutines.

func (*Governance) ValidateWrite

func (g *Governance) ValidateWrite(ctx context.Context, entry *Entry, pol *policy.Policy, eval PolicyEvaluator) error

ValidateWrite runs all five governance checks in order. It may mutate the entry (setting TrustScore, ReviewStatus, ConflictsWith). eval is an optional per-call policy evaluator (e.g. the OPA engine for this run). When non-nil, it is used for OPA memory governance instead of the shared g.opa, avoiding data races when multiple Run() calls execute concurrently.

type HealthReport

type HealthReport struct {
	TotalEntries      int
	RolledBack        int
	TrustDistribution map[string]int
	PendingReview     int
	ConflictCount     int
	AutoResolved      int
	PendingConflicts  int
}

HealthReport aggregates memory health metrics for CLI output.

type IndexEntry

type IndexEntry struct {
	ID              string    `json:"id"`
	Category        string    `json:"category"`
	Scope           string    `json:"scope"`
	Title           string    `json:"title"`
	ObservationType string    `json:"observation_type"`
	TokenCount      int       `json:"token_count"`
	Timestamp       time.Time `json:"timestamp"`
	TrustScore      int       `json:"trust_score"`
	ReviewStatus    string    `json:"review_status"`
	MemoryType      string    `json:"memory_type"` // semantic, episodic, procedural (for scored retrieval)
}

IndexEntry is a lightweight summary for Layer 1 progressive disclosure (~50 tokens).

type PolicyEvaluator

type PolicyEvaluator interface {
	EvaluateMemoryWrite(ctx context.Context, category string, contentSizeBytes int) (*policy.Decision, error)
}

PolicyEvaluator is an optional interface for OPA-based memory governance. When set on Governance, ValidateWrite calls OPA in addition to hardcoded checks.

type PrivacyResult

type PrivacyResult struct {
	CleanContent            string // Content with <private> blocks removed and <classified> tags stripped
	FullContent             string // Original content unchanged
	PrivateSectionsStripped int
	MaxClassifiedTier       int
	HasPrivateContent       bool
}

PrivacyResult holds the outcome of stripping privacy tags from content.

func StripPrivateTags

func StripPrivateTags(content string) PrivacyResult

StripPrivateTags processes content for privacy:

  • Removes <private>...</private> sections entirely (GDPR Art. 25)
  • Extracts max tier from <classified:tier_N>...</classified> tags
  • Removes classified tags but preserves inner content

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(dbPath string) (*Store, error)

NewStore creates a memory store, initializing the schema and FTS5 tables. FTS5 is optional; if the SQLite build doesn't support it, full-text search degrades to LIKE queries.

func (*Store) AppendContent

func (s *Store) AppendContent(ctx context.Context, tenantID, entryID, additionalContent string, now time.Time) error

AppendContent appends supplementary content to an existing active entry (consolidation UPDATE).

func (*Store) AsOf

func (s *Store) AsOf(ctx context.Context, tenantID, agentID string, asOf time.Time, limit int) ([]Entry, error)

AsOf returns memory entries valid at the given point in time (transaction time: created_at/expired_at). Used for compliance (NIS2 Art. 23, EU AI Act Art. 11) to reconstruct state at a past time.

func (*Store) AuditLog

func (s *Store) AuditLog(ctx context.Context, tenantID, agentID string, limit int) ([]Entry, error)

AuditLog returns memory entries ordered by timestamp for audit purposes.

func (*Store) Close

func (s *Store) Close() error

Close releases the database connection.

func (*Store) CountPendingReviewForTenant

func (s *Store) CountPendingReviewForTenant(ctx context.Context, tenantID string) (int, error)

CountPendingReviewForTenant returns the number of memory entries in pending_review for the tenant (all agents).

func (*Store) DistinctAgents

func (s *Store) DistinctAgents(ctx context.Context) ([][2]string, error)

DistinctAgents returns all (tenant_id, agent_id) pairs in the store.

func (*Store) EnforceMaxEntries

func (s *Store) EnforceMaxEntries(ctx context.Context, tenantID, agentID string, maxEntries int) (int64, error)

EnforceMaxEntries deletes the oldest entries when the count exceeds maxEntries (FIFO). Returns the number of deleted entries.

func (*Store) Get

func (s *Store) Get(ctx context.Context, tenantID, id string) (*Entry, error)

Get retrieves a full memory entry by ID (Layer 2). tenantID enforces tenant isolation — the entry must belong to the specified tenant.

func (*Store) HasRecentWithInputHash

func (s *Store) HasRecentWithInputHash(ctx context.Context, tenantID, agentID, inputHash string, window time.Duration) (bool, error)

HasRecentWithInputHash checks if a memory entry with the same input fingerprint exists within the given time window. Used to skip duplicate writes for re-runs.

Returns true if a recent entry with matching hash exists (should skip write). Returns false on empty hash or any error (fail-open: proceed with write).

func (*Store) HealthStats

func (s *Store) HealthStats(ctx context.Context, tenantID, agentID string) (*HealthReport, error)

HealthStats returns aggregate health metrics for an agent's memory.

func (*Store) Invalidate

func (s *Store) Invalidate(ctx context.Context, tenantID, entryID, newEntryID string, now time.Time) error

Invalidate marks an entry as invalidated by a newer entry (Zep-style: preserved for audit). Sets consolidation_status = "invalidated", invalid_at = now, expired_at = now, invalidated_by = newEntryID.

func (*Store) InvalidateAndWrite

func (s *Store) InvalidateAndWrite(ctx context.Context, tenantID, targetID string, now time.Time, entry *Entry) error

InvalidateAndWrite atomically invalidates an existing entry and writes its replacement in a single transaction. If the write fails, the invalidation is rolled back. entry.ID is used as the invalidated_by reference; prepareEntry is called to ensure it is set.

func (*Store) List

func (s *Store) List(ctx context.Context, tenantID, agentID, category string, limit int) ([]Entry, error)

List returns full memory entries filtered by category.

func (*Store) ListIndex

func (s *Store) ListIndex(ctx context.Context, tenantID, agentID string, limit int, scopes ...string) ([]IndexEntry, error)

ListIndex returns lightweight memory summaries (Layer 1) ordered by timestamp desc. When scopes is non-empty, only entries whose scope is in scopes are returned.

func (*Store) ListPendingReview

func (s *Store) ListPendingReview(ctx context.Context, tenantID, agentID string, limit int) ([]IndexEntry, error)

ListPendingReview returns entries with review_status = 'pending_review' for the tenant/agent.

func (*Store) PurgeExpired

func (s *Store) PurgeExpired(ctx context.Context, tenantID, agentID string, retentionDays int) (int64, error)

PurgeExpired deletes memory entries older than retentionDays. Entries with consolidation_status 'rolled_back' or 'invalidated' are never purged so they remain available for audit (NIS2, EU AI Act point-in-time reconstruction). Returns the number of deleted entries.

func (*Store) Read

func (s *Store) Read(ctx context.Context, tenantID, agentID string) ([]Entry, error)

Read returns all memory entries for an agent.

func (*Store) RetrieveScored

func (s *Store) RetrieveScored(ctx context.Context, tenantID, agentID, queryText string, maxTokens int) ([]IndexEntry, error)

RetrieveScored returns memory index entries ordered by relevance to queryText, capped by maxTokens. It returns the highest-scored entries that fit within the token budget (score order preserved). When maxTokens <= 0, all scored candidates are returned. Score = relevance*0.4 + recency*0.3 + typeWeight*0.2 + trustNorm*0.1.

func (*Store) RollbackTo

func (s *Store) RollbackTo(ctx context.Context, tenantID, entryID string) (int64, error)

RollbackTo soft-deletes all memory entries newer than the specified entry. The entry identified by entryID becomes the newest "active" entry for its agent. Rolled-back entries are marked consolidation_status = 'rolled_back' and expired_at = now. They remain in the database for audit (AuditLog still returns them) but are excluded from list, search, and prompt injection.

func (*Store) Search

func (s *Store) Search(ctx context.Context, tenantID, agentID, query string, limit int) ([]IndexEntry, error)

Search performs FTS5 full-text search and returns matching index entries. Falls back to LIKE-based search if FTS5 is not available.

func (*Store) SearchByCategory

func (s *Store) SearchByCategory(ctx context.Context, tenantID, agentID, category string) ([]Entry, error)

SearchByCategory returns all entries matching a given category for an agent.

func (*Store) SetSigner

func (s *Store) SetSigner(signer EntrySigner)

SetSigner configures an optional HMAC signer for memory entry integrity.

func (*Store) UpdateReviewStatus

func (s *Store) UpdateReviewStatus(ctx context.Context, tenantID, agentID, entryID, status string) error

UpdateReviewStatus sets review_status for a memory entry to "approved" or "rejected".

func (*Store) Write

func (s *Store) Write(ctx context.Context, entry *Entry) error

Write persists a memory entry. It assigns an ID, auto-increments version, sets timestamp, and estimates token count if not set.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL