Documentation
¶
Overview ¶
Package memory implements governed agent memory with provenance tracking, conflict detection, and Constitutional AI governance.
Index ¶
- Constants
- Variables
- func AllowedWhenDomainKnowledgeAllowed(cat string) bool
- func DedupSkipsAdd(ctx context.Context, n int64)
- func DeriveTrustScore(sourceType string) int
- func IsForbiddenCategory(cat string) bool
- func RecordPoisoningBlocked(ctx context.Context)
- func RunRetention(ctx context.Context, store *Store, pol *policy.Policy)
- func StartRetentionLoop(ctx context.Context, store *Store, pol *policy.Policy, interval time.Duration) func()
- func ValidObservationTypes() []string
- func ValidSourceTypes() []string
- type ConflictCandidate
- type ConsolidationAction
- type ConsolidationResult
- type Consolidator
- type Entry
- type EntrySigner
- type Governance
- func (g *Governance) CheckConflicts(ctx context.Context, entry Entry, similarityThreshold float64) ([]ConflictCandidate, error)
- func (g *Governance) SetPolicyEvaluator(eval PolicyEvaluator)
- func (g *Governance) ValidateWrite(ctx context.Context, entry *Entry, pol *policy.Policy, eval PolicyEvaluator) error
- type HealthReport
- type IndexEntry
- type PolicyEvaluator
- type PrivacyResult
- type Store
- func (s *Store) AppendContent(ctx context.Context, tenantID, entryID, additionalContent string, ...) error
- func (s *Store) AsOf(ctx context.Context, tenantID, agentID string, asOf time.Time, limit int) ([]Entry, error)
- func (s *Store) AuditLog(ctx context.Context, tenantID, agentID string, limit int) ([]Entry, error)
- func (s *Store) Close() error
- func (s *Store) CountPendingReviewForTenant(ctx context.Context, tenantID string) (int, error)
- func (s *Store) DistinctAgents(ctx context.Context) ([][2]string, error)
- func (s *Store) EnforceMaxEntries(ctx context.Context, tenantID, agentID string, maxEntries int) (int64, error)
- func (s *Store) Get(ctx context.Context, tenantID, id string) (*Entry, error)
- func (s *Store) HasRecentWithInputHash(ctx context.Context, tenantID, agentID, inputHash string, window time.Duration) (bool, error)
- func (s *Store) HealthStats(ctx context.Context, tenantID, agentID string) (*HealthReport, error)
- func (s *Store) Invalidate(ctx context.Context, tenantID, entryID, newEntryID string, now time.Time) error
- func (s *Store) InvalidateAndWrite(ctx context.Context, tenantID, targetID string, now time.Time, entry *Entry) error
- func (s *Store) List(ctx context.Context, tenantID, agentID, category string, limit int) ([]Entry, error)
- func (s *Store) ListIndex(ctx context.Context, tenantID, agentID string, limit int, scopes ...string) ([]IndexEntry, error)
- func (s *Store) ListPendingReview(ctx context.Context, tenantID, agentID string, limit int) ([]IndexEntry, error)
- func (s *Store) PurgeExpired(ctx context.Context, tenantID, agentID string, retentionDays int) (int64, error)
- func (s *Store) Read(ctx context.Context, tenantID, agentID string) ([]Entry, error)
- func (s *Store) RetrieveScored(ctx context.Context, tenantID, agentID, queryText string, maxTokens int) ([]IndexEntry, error)
- func (s *Store) RollbackTo(ctx context.Context, tenantID, entryID string) (int64, error)
- func (s *Store) Search(ctx context.Context, tenantID, agentID, query string, limit int) ([]IndexEntry, error)
- func (s *Store) SearchByCategory(ctx context.Context, tenantID, agentID, category string) ([]Entry, error)
- func (s *Store) SetSigner(signer EntrySigner)
- func (s *Store) UpdateReviewStatus(ctx context.Context, tenantID, agentID, entryID, status string) error
- func (s *Store) Write(ctx context.Context, entry *Entry) error
Constants ¶
const ( CategoryFactualCorrections = "factual_corrections" CategoryUserPreferences = "user_preferences" CategoryDomainKnowledge = "domain_knowledge" CategoryProcedureImprovements = "procedure_improvements" )
Learning categories -- things the agent can learn.
const ( CategoryPolicyHit = "policy_hit" CategoryCostDecision = "cost_decision" CategoryPIIRedaction = "pii_redaction" CategoryToolApproval = "tool_approval" CategoryEscalation = "escalation" CategoryErrorRecovery = "error_recovery" )
Operational categories -- runtime observations.
const ( MemTypeSemanticFact = "semantic" // What the agent knows: facts, preferences, constraints MemTypeEpisodic = "episodic" // What happened: specific interactions, outcomes, events MemTypeProcedural = "procedural" // How to do things: learned behaviors, response patterns )
Memory types (Tulving/CoALA — three-type model for retrieval scoring).
const ( ObsDecision = "decision" ObsPolicyHit = "policy_hit" ObsToolUse = "tool_use" ObsLearning = "learning" ObsError = "error" )
Observation types describe what kind of memory entry this is.
const ( SourceManual = "manual" SourceUserInput = "user_input" SourceAgentRun = "agent_run" SourceToolOutput = "tool_output" SourceWebhook = "webhook" )
Source types and their default trust scores.
const ( ScopeAgent = "agent" // persists per-agent across sessions (default) ScopeSession = "session" // per-session; auto-expiry is Phase 2 ScopeWorkspace = "workspace" // shared read-only across agents in a tenant )
Memory scopes define the lifetime and visibility of entries.
Variables ¶
var ( ErrPIIDetected = errors.New("PII detected in content") ErrMemoryWriteDenied = errors.New("memory write denied by governance") )
Domain errors
var ErrEntryNotFound = errors.New("memory entry not found")
ErrEntryNotFound is returned when a memory entry does not exist.
var ErrMemoryConflict = errors.New("memory entry conflicts with existing entries")
ErrMemoryConflict is returned when a memory write conflicts with existing entries and the conflict resolution policy is "reject".
var TypeWeights = map[string]float64{ MemTypeSemanticFact: 0.6, MemTypeEpisodic: 0.3, MemTypeProcedural: 0.1, }
TypeWeights for relevance-scored retrieval (Mem0-style). Semantic facts are most valuable for prompt injection; episodic provides recent context; procedural fine-tunes behavior.
Functions ¶
func AllowedWhenDomainKnowledgeAllowed ¶
AllowedWhenDomainKnowledgeAllowed returns true if the category is either domain_knowledge or one of its sub-types (tool_approval, cost_decision, user_preferences, procedure_improvements, factual_corrections). Used so that legacy policies with only allowed_categories: [domain_knowledge, policy_hit] do not silently reject writes that are now classified with the finer categories.
func DedupSkipsAdd ¶
DedupSkipsAdd records memory writes skipped due to input-hash deduplication.
func DeriveTrustScore ¶
DeriveTrustScore returns the default trust score for the given source type. Unknown source types get a conservative score of 30.
func IsForbiddenCategory ¶
IsForbiddenCategory returns true for categories that are always forbidden, regardless of policy configuration.
func RecordPoisoningBlocked ¶
RecordPoisoningBlocked increments the memory poisoning defense counter.
func RunRetention ¶
RunRetention purges expired entries and enforces max_entries for all agents in the store, using the provided policy for retention_days and max_entries. This is designed to be called periodically (e.g., daily) from talon serve.
func StartRetentionLoop ¶
func StartRetentionLoop(ctx context.Context, store *Store, pol *policy.Policy, interval time.Duration) func()
StartRetentionLoop runs retention every interval in a goroutine. Returns a cancel function to stop the loop.
func ValidObservationTypes ¶
func ValidObservationTypes() []string
ValidObservationTypes returns the set of valid observation types.
func ValidSourceTypes ¶
func ValidSourceTypes() []string
ValidSourceTypes returns all recognized source types.
Types ¶
type ConflictCandidate ¶
type ConflictCandidate struct {
ExistingEntryID string
ExistingTitle string
Similarity float64
Category string
TrustScore int
}
ConflictCandidate describes a potential conflict with an existing memory entry.
type ConsolidationAction ¶
type ConsolidationAction string
ConsolidationAction is the AUDN decision for a candidate memory entry (Mem0-style, governed).
const ( ActionAdd ConsolidationAction = "add" ActionUpdate ConsolidationAction = "update" ActionInvalidate ConsolidationAction = "invalidate" ActionNoop ConsolidationAction = "noop" )
type ConsolidationResult ¶
type ConsolidationResult struct {
Action ConsolidationAction `json:"action"`
TargetID string `json:"target_id,omitempty"`
Reason string `json:"reason"`
Similarity float64 `json:"similarity,omitempty"`
NewContent string `json:"new_content,omitempty"`
}
ConsolidationResult holds the decision and audit metadata.
type Consolidator ¶
type Consolidator struct {
// contains filtered or unexported fields
}
Consolidator evaluates candidate entries against existing memory and applies AUDN decisions.
func NewConsolidator ¶
func NewConsolidator(store *Store) *Consolidator
NewConsolidator returns a consolidator backed by the given store.
func (*Consolidator) Apply ¶
func (c *Consolidator) Apply(ctx context.Context, candidate *Entry, result *ConsolidationResult) error
Apply executes the consolidation decision against the store.
func (*Consolidator) Evaluate ¶
func (c *Consolidator) Evaluate(ctx context.Context, candidate *Entry) (*ConsolidationResult, error)
Evaluate determines the AUDN action for a candidate entry (rule-based similarity). Input-hash deduplication is performed by the caller (e.g. Runner.writeMemoryObservation) before calling Evaluate; do not duplicate HasRecentWithInputHash here.
Logic: similarity >= 0.90 → NOOP; >= 0.60 → trust comparison → INVALIDATE or UPDATE; < 0.30 → ADD; 0.30–0.60 → ADD (related but distinct).
type Entry ¶
type Entry struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
Category string `json:"category"`
Scope string `json:"scope"` // "agent" (default), "session", "workspace"
InputHash string `json:"input_hash"` // SHA256 fingerprint for deduplication (optional)
MemoryType string `json:"memory_type"` // "semantic", "episodic", "procedural" (Phase 2)
ValidAt *time.Time `json:"valid_at,omitempty"` // Event time: when fact was true
InvalidAt *time.Time `json:"invalid_at,omitempty"` // Event time: when fact ceased to be true
InvalidatedBy string `json:"invalidated_by,omitempty"`
ConsolidationStatus string `json:"consolidation_status"` // "active", "invalidated", "merged", "superseded"
CreatedAt time.Time `json:"created_at"` // Transaction time: when ingested
ExpiredAt *time.Time `json:"expired_at,omitempty"` // Transaction time: when superseded in system
Title string `json:"title"`
Content string `json:"content"`
ObservationType string `json:"observation_type"`
TokenCount int `json:"token_count"`
FilesAffected []string `json:"files_affected"`
Version int `json:"version"`
Timestamp time.Time `json:"timestamp"`
EvidenceID string `json:"evidence_id"`
SourceType string `json:"source_type"`
SourceEvidenceID string `json:"source_evidence_id"`
TrustScore int `json:"trust_score"`
ConflictsWith []string `json:"conflicts_with"`
ReviewStatus string `json:"review_status"`
Signature string `json:"signature,omitempty"`
}
Entry is a full memory record with provenance.
type EntrySigner ¶
type EntrySigner interface {
Sign(data []byte) (string, error)
Verify(data []byte, signature string) bool
}
Store persists governed memory entries in SQLite with FTS5 full-text search. EntrySigner can sign memory entries for tamper evidence (optional).
type Governance ¶
type Governance struct {
// contains filtered or unexported fields
}
Governance enforces Constitutional AI rules on memory writes. opaMu protects the shared opa field so that SetPolicyEvaluator and evalOPAMemoryWrite (when using g.opa) are safe under concurrent Run() calls.
func NewGovernance ¶
func NewGovernance(store *Store, cls *classifier.Scanner) *Governance
NewGovernance creates a governance checker backed by the given store and PII scanner.
func (*Governance) CheckConflicts ¶
func (g *Governance) CheckConflicts(ctx context.Context, entry Entry, similarityThreshold float64) ([]ConflictCandidate, error)
CheckConflicts finds existing entries that may contradict the new entry. similarityThreshold is the minimum keyword-overlap ratio (0..1) to consider two entries in conflict; it is typically taken from policy memory.governance.conflict_similarity_threshold (default 0.6).
func (*Governance) SetPolicyEvaluator ¶
func (g *Governance) SetPolicyEvaluator(eval PolicyEvaluator)
SetPolicyEvaluator attaches an OPA-based policy evaluator for memory governance. When set, ValidateWrite runs OPA rules in addition to hardcoded Go checks (when called with a nil per-call eval). Callers running concurrent agent invocations (e.g. talon serve webhooks/cron) should prefer passing the per-run engine into ValidateWrite instead of setting a shared evaluator here, to avoid sharing mutable state across goroutines.
func (*Governance) ValidateWrite ¶
func (g *Governance) ValidateWrite(ctx context.Context, entry *Entry, pol *policy.Policy, eval PolicyEvaluator) error
ValidateWrite runs all five governance checks in order. It may mutate the entry (setting TrustScore, ReviewStatus, ConflictsWith). eval is an optional per-call policy evaluator (e.g. the OPA engine for this run). When non-nil, it is used for OPA memory governance instead of the shared g.opa, avoiding data races when multiple Run() calls execute concurrently.
type HealthReport ¶
type HealthReport struct {
TotalEntries int
RolledBack int
TrustDistribution map[string]int
PendingReview int
ConflictCount int
AutoResolved int
PendingConflicts int
}
HealthReport aggregates memory health metrics for CLI output.
type IndexEntry ¶
type IndexEntry struct {
ID string `json:"id"`
Category string `json:"category"`
Scope string `json:"scope"`
Title string `json:"title"`
ObservationType string `json:"observation_type"`
TokenCount int `json:"token_count"`
Timestamp time.Time `json:"timestamp"`
TrustScore int `json:"trust_score"`
ReviewStatus string `json:"review_status"`
MemoryType string `json:"memory_type"` // semantic, episodic, procedural (for scored retrieval)
}
IndexEntry is a lightweight summary for Layer 1 progressive disclosure (~50 tokens).
type PolicyEvaluator ¶
type PolicyEvaluator interface {
EvaluateMemoryWrite(ctx context.Context, category string, contentSizeBytes int) (*policy.Decision, error)
}
PolicyEvaluator is an optional interface for OPA-based memory governance. When set on Governance, ValidateWrite calls OPA in addition to hardcoded checks.
type PrivacyResult ¶
type PrivacyResult struct {
CleanContent string // Content with <private> blocks removed and <classified> tags stripped
FullContent string // Original content unchanged
PrivateSectionsStripped int
MaxClassifiedTier int
HasPrivateContent bool
}
PrivacyResult holds the outcome of stripping privacy tags from content.
func StripPrivateTags ¶
func StripPrivateTags(content string) PrivacyResult
StripPrivateTags processes content for privacy:
- Removes <private>...</private> sections entirely (GDPR Art. 25)
- Extracts max tier from <classified:tier_N>...</classified> tags
- Removes classified tags but preserves inner content
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func NewStore ¶
NewStore creates a memory store, initializing the schema and FTS5 tables. FTS5 is optional; if the SQLite build doesn't support it, full-text search degrades to LIKE queries.
func (*Store) AppendContent ¶
func (s *Store) AppendContent(ctx context.Context, tenantID, entryID, additionalContent string, now time.Time) error
AppendContent appends supplementary content to an existing active entry (consolidation UPDATE).
func (*Store) AsOf ¶
func (s *Store) AsOf(ctx context.Context, tenantID, agentID string, asOf time.Time, limit int) ([]Entry, error)
AsOf returns memory entries valid at the given point in time (transaction time: created_at/expired_at). Used for compliance (NIS2 Art. 23, EU AI Act Art. 11) to reconstruct state at a past time.
func (*Store) CountPendingReviewForTenant ¶
CountPendingReviewForTenant returns the number of memory entries in pending_review for the tenant (all agents).
func (*Store) DistinctAgents ¶
DistinctAgents returns all (tenant_id, agent_id) pairs in the store.
func (*Store) EnforceMaxEntries ¶
func (s *Store) EnforceMaxEntries(ctx context.Context, tenantID, agentID string, maxEntries int) (int64, error)
EnforceMaxEntries deletes the oldest entries when the count exceeds maxEntries (FIFO). Returns the number of deleted entries.
func (*Store) Get ¶
Get retrieves a full memory entry by ID (Layer 2). tenantID enforces tenant isolation — the entry must belong to the specified tenant.
func (*Store) HasRecentWithInputHash ¶
func (s *Store) HasRecentWithInputHash(ctx context.Context, tenantID, agentID, inputHash string, window time.Duration) (bool, error)
HasRecentWithInputHash checks if a memory entry with the same input fingerprint exists within the given time window. Used to skip duplicate writes for re-runs.
Returns true if a recent entry with matching hash exists (should skip write). Returns false on empty hash or any error (fail-open: proceed with write).
func (*Store) HealthStats ¶
HealthStats returns aggregate health metrics for an agent's memory.
func (*Store) Invalidate ¶
func (s *Store) Invalidate(ctx context.Context, tenantID, entryID, newEntryID string, now time.Time) error
Invalidate marks an entry as invalidated by a newer entry (Zep-style: preserved for audit). Sets consolidation_status = "invalidated", invalid_at = now, expired_at = now, invalidated_by = newEntryID.
func (*Store) InvalidateAndWrite ¶
func (s *Store) InvalidateAndWrite(ctx context.Context, tenantID, targetID string, now time.Time, entry *Entry) error
InvalidateAndWrite atomically invalidates an existing entry and writes its replacement in a single transaction. If the write fails, the invalidation is rolled back. entry.ID is used as the invalidated_by reference; prepareEntry is called to ensure it is set.
func (*Store) List ¶
func (s *Store) List(ctx context.Context, tenantID, agentID, category string, limit int) ([]Entry, error)
List returns full memory entries filtered by category.
func (*Store) ListIndex ¶
func (s *Store) ListIndex(ctx context.Context, tenantID, agentID string, limit int, scopes ...string) ([]IndexEntry, error)
ListIndex returns lightweight memory summaries (Layer 1) ordered by timestamp desc. When scopes is non-empty, only entries whose scope is in scopes are returned.
func (*Store) ListPendingReview ¶
func (s *Store) ListPendingReview(ctx context.Context, tenantID, agentID string, limit int) ([]IndexEntry, error)
ListPendingReview returns entries with review_status = 'pending_review' for the tenant/agent.
func (*Store) PurgeExpired ¶
func (s *Store) PurgeExpired(ctx context.Context, tenantID, agentID string, retentionDays int) (int64, error)
PurgeExpired deletes memory entries older than retentionDays. Entries with consolidation_status 'rolled_back' or 'invalidated' are never purged so they remain available for audit (NIS2, EU AI Act point-in-time reconstruction). Returns the number of deleted entries.
func (*Store) RetrieveScored ¶
func (s *Store) RetrieveScored(ctx context.Context, tenantID, agentID, queryText string, maxTokens int) ([]IndexEntry, error)
RetrieveScored returns memory index entries ordered by relevance to queryText, capped by maxTokens. It returns the highest-scored entries that fit within the token budget (score order preserved). When maxTokens <= 0, all scored candidates are returned. Score = relevance*0.4 + recency*0.3 + typeWeight*0.2 + trustNorm*0.1.
func (*Store) RollbackTo ¶
RollbackTo soft-deletes all memory entries newer than the specified entry. The entry identified by entryID becomes the newest "active" entry for its agent. Rolled-back entries are marked consolidation_status = 'rolled_back' and expired_at = now. They remain in the database for audit (AuditLog still returns them) but are excluded from list, search, and prompt injection.
func (*Store) Search ¶
func (s *Store) Search(ctx context.Context, tenantID, agentID, query string, limit int) ([]IndexEntry, error)
Search performs FTS5 full-text search and returns matching index entries. Falls back to LIKE-based search if FTS5 is not available.
func (*Store) SearchByCategory ¶
func (s *Store) SearchByCategory(ctx context.Context, tenantID, agentID, category string) ([]Entry, error)
SearchByCategory returns all entries matching a given category for an agent.
func (*Store) SetSigner ¶
func (s *Store) SetSigner(signer EntrySigner)
SetSigner configures an optional HMAC signer for memory entry integrity.