memory

package
v0.0.0-...-d80aada Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: MIT Imports: 31 Imported by: 0

README

Memory System

Package: memory Architecture: Event Sourcing + CQRS Test Coverage: 84.5% (event package)

Overview

The memory system provides persistent, intelligent memory capabilities for the AI assistant using Event Sourcing and CQRS patterns. It transforms conversations into a structured knowledge base that evolves over time with full auditability and correction support.

Architecture

The system implements Event Sourcing with clear separation between command and query responsibilities:

  • Commands: All state changes are captured as immutable events
  • Queries: Optimized read models built from event projections
  • Eventual Consistency: Decoupled write and read paths for resilience
Event Sourcing Flow
User Input → Extraction → Decision → Event Creation → Event Store → Projection → Read Model
                                            ↓
                                      Event Bus → Async Projections
Why Event Sourcing?
  1. Complete Audit Trail: Every change is recorded with who, what, when, why
  2. Time Travel: Can reconstruct state at any point in time
  3. Debugging: Can replay events to understand how state evolved
  4. Correction Tracking: Natural fit for tracking memory corrections

Key Features

Event-Driven Processing
  • All memory changes are recorded as immutable events (Created, Updated, Archived, Merged, Corrected)
  • Complete audit trail with source tracking (user_direct, user_confirm, user_correction, assistant_infer)
  • Support for event replay and temporal queries
  • Version-controlled event store with optimistic locking
Memory Correction Tracking
  • Explicit correction events with corrected_by relationships
  • Source-based confidence adjustment (corrections get highest confidence)
  • Correction chains support (memory A corrects B which corrected C)
  • Filtered queries automatically exclude corrected memories
Intelligent Memory Management
  • LLM-based fact extraction from conversations
  • Semantic similarity detection using pgvector HNSW index (768-dimensional embeddings)
  • Automatic conflict resolution and deduplication
  • Relationship graph between related memories
Asynchronous Pipeline
  • Non-blocking memory processing via job queue
  • Retry logic with exponential backoff
  • Dead letter queue for failed operations
  • EventBus for decoupled projection updates
Knowledge Curation
  • Background consolidation of similar memories
  • Tiered deduplication strategy:
    • Identical (>0.98): Mark as duplicate and deactivate
    • Highly Similar (0.90-0.98): Merge memories
    • Related (0.80-0.90): Create relationship links
  • Automatic archival of outdated information
  • Conflict detection and resolution

Usage

// Create memory system
system, err := memory.NewSystem(db, aiClient, logger)
if err != nil {
    return err
}

// Start background processing
system.Start(ctx)
defer system.Stop()

// Process messages asynchronously
messages := []ai.Message{
    {Role: ai.User, Content: "I love coffee"},
    {Role: ai.Assistant, Content: "Great! I'll remember that"},
}
err = system.ProcessAsync(ctx, messages)

// Search memories
memories, err := system.Search(ctx, "coffee preferences", 10)

// Get specific memory
memory, err := system.GetByID(ctx, memoryID)

Components

Core System
  • system.go - Public API (System) with Start/Stop lifecycle
  • orchestrator.go - Coordinates processing flow and event projection
  • processor.go - Pure business logic (Facts → Events) with correction detection
Event Sourcing (event/ subpackage)
  • event/event.go - Event types (Created, Updated, Archived, Merged, Corrected)
  • event/store.go - Event store with version control and optimistic locking
  • event/replay.go - Event replay functionality for rebuilding state
  • event/history.go - Event history and audit trail
Projection & Read Model
  • projector.go - Projects events to read model with duplicate detection
  • reader.go - Query interface with correction filtering and confidence adjustment
  • query.go - Advanced query capabilities with vector search
Memory Processing
  • extractor.go - LLM-based fact extraction with correction pattern detection
  • decider.go - Decision logic (ADD/UPDATE/DELETE/NONE) for memory actions
  • processor.go - Processes facts into events with deduplication
Session Management
  • session.go - Manages conversation context and temporary state
  • context.go - Provides context building for LLM interactions
Infrastructure
  • task.go - Async job processing with retry logic
  • errors.go - Typed error definitions
  • constants.go - System constants and thresholds

Testing

The memory system has comprehensive test coverage (84.5%) including:

Unit Tests
  • *_test.go - Standard unit tests for each component
  • processor_fuzz_test.go - Fuzzing tests for input validation
  • Mock implementations for AI clients and embedders
Integration Tests
  • Database integration with PostgreSQL + pgvector
  • Event store with version control
  • End-to-end memory processing pipeline
Test Files
  • event/event_test.go - Event construction and marshaling (100% coverage)
  • event/store_test.go - Event store operations (LoadFrom, GetCurrentVersion, etc.)
  • event/replay_test.go - Event replay and state reconstruction
  • event/history_test.go - Event history and audit trail
  • reader_test.go - Memory queries with correction filtering
  • projector_test.go - Event projection to read model
  • processor_test.go - Fact processing and correction detection
  • extractor_test.go - Fact extraction from messages
  • decider_test.go - Decision logic for memory actions

Run tests with:

TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/assistant_test?sslmode=disable" \
  go test -tags integration -v ./internal/memory/...

Generate coverage report:

go test -tags integration -coverprofile=coverage.out ./internal/memory/...
go tool cover -html=coverage.out -o coverage.html

Configuration

The system uses sensible defaults but can be configured:

// Custom configuration example
config := memory.Config{
    MaxWorkers:     8,        // Concurrent processors
    BatchSize:      10,       // Events per batch
    RetryAttempts:  3,        // Retry failed operations
    ProjectionMode: "async",  // async or sync
}

Database Schema

Requires PostgreSQL 14+ with pgvector extension:

Tables
  • memory_events - Event store (append-only) with optimistic locking

    • Stores all events with version control
    • Immutable audit trail
  • memories - Read model (CQRS projections)

    • Current state of memories
    • HNSW index on embedding column for fast similarity search
    • Tracks corrected_by relationships
    • Source tracking (user_direct, user_correction, etc.)
Key Columns
  • semantic_id - Stable identifier for memory updates
  • embedding vector(768) - Gemini embeddings for similarity search
  • supersedes uuid[] - Memories this one replaces
  • corrected_by uuid - Reference to correcting memory
  • source memory_source - How memory was created

See migrations in internal/storage/database/migrations/:

  • 001_initial_schema.up.sql - Base tables
  • 002_memory_event_sourcing.up.sql - Event store
  • 003_memory_source_tracking.up.sql - Correction tracking

Performance Considerations

  • Vector embeddings are generated asynchronously
  • Batch processing for improved throughput
  • Connection pooling for database efficiency
  • In-memory EventBus for low latency

Implementation Details

Event Store with Optimistic Locking

The event store ensures consistency using version-based optimistic locking:

CREATE TABLE memory_events (
    id UUID PRIMARY KEY,
    stream_id UUID NOT NULL,       -- Memory ID
    type VARCHAR(50) NOT NULL,     -- Event type
    version INT NOT NULL,          -- Version for optimistic locking
    timestamp TIMESTAMPTZ NOT NULL,
    data JSONB NOT NULL,           -- Event payload
    
    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
Memory Attributes System

Strongly-typed attributes replacing map[string]interface{}:

type MemoryAttributes struct {
    Source            MemorySource `json:"source"`
    IsCorrection      bool        `json:"is_correction"`
    CorrectionPattern string      `json:"correction_pattern"`
    Category          string      `json:"category"`
    Tags              []string    `json:"tags"`
    Importance        float32     `json:"importance"`
    // ... tracking and relationships
}

Trust Levels:

  • user_correction (Trust: 4) - User explicitly corrected
  • user_confirm (Trust: 3) - User confirmed
  • user_direct (Trust: 2) - User directly stated
  • assistant_infer (Trust: 1) - Assistant inferred
Correction Detection Patterns

The system detects corrections through pattern matching:

// Chinese: "不是...是", "錯了", "正確是", "其實是"
// English: "no, it's actually", "wrong, it's"

When corrections are detected, memories get the highest trust level and supersede previous incorrect memories.

Vector Search with HNSW Index

Optimized for fast similarity search:

CREATE INDEX idx_memories_embedding 
ON memories 
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

Hybrid scoring combines:

  • Vector similarity (70%)
  • Recency boost (up to 15%)
  • Frequency boost (up to 10%)
  • Source trust boost (up to 5%)
Parallel Processing

The system uses parallel processing for performance:

g, ctx := errgroup.WithContext(ctx)
semaphore := make(chan struct{}, maxConcurrency)

for _, fact := range facts {
    g.Go(func() error {
        // Acquire semaphore for concurrency control
        select {
        case semaphore <- struct{}{}:
            defer func() { <-semaphore }()
        case <-ctx.Done():
            return ctx.Err()
        }
        // Process fact...
    })
}

Documentation

Overview

Package memory implements intelligent decision making with optional LLM support.

Package memory implements Event Sourcing + CQRS Architecture for Assistant-Go's long-term memory system.

This package provides a sophisticated memory management system that goes beyond simple chat history, creating a structured, queryable, and evolving knowledge base using modern event-driven patterns.

Architecture Overview

The memory system follows Event Sourcing and CQRS (Command Query Responsibility Segregation) principles:

  • Event Sourcing: All state changes are captured as immutable events
  • CQRS: Write operations (commands) are separated from read operations (queries)
  • Event Store: Persistent log of all memory events
  • Projections: Materialized views optimized for different query patterns
  • Eventual Consistency: Read models are updated asynchronously

Core Components

Event Processing Pipeline:

  1. Extraction: AI-powered extraction of facts from conversations
  2. Decision: Intelligent decision-making for memory actions (ADD/UPDATE/DELETE/SKIP)
  3. Event Creation: Transform decisions into immutable events
  4. Event Storage: Persist events in append-only event store
  5. Projection: Update read models from events
  6. Query: Fast retrieval from optimized projections

Key Features

Memory Correction Tracking:

  • Tracks corrections with corrected_by relationships
  • Maintains correction history and reasons
  • Source-based confidence adjustment (user_direct > user_confirm > user_correction > assistant_infer)

Intelligent Deduplication:

  • Semantic similarity detection using vector embeddings
  • Tiered similarity thresholds:
  • Identical (>0.98): Skip or deactivate
  • Highly Similar (0.90-0.98): Merge intelligently
  • Related (0.80-0.90): Create relationships

Async Processing:

  • Non-blocking memory operations
  • Robust queue with retry logic
  • Graceful degradation under load

Vector Search:

  • 768-dimensional embeddings via Gemini
  • PostgreSQL pgvector with HNSW index
  • Hybrid search combining vector similarity and metadata

Event Types

The system supports these event types:

  • Created: New memory creation
  • Updated: Memory content or metadata update
  • Archived: Soft deletion of memory
  • Merged: Combination of related memories
  • Corrected: Explicit correction tracking

Memory Types

Supported memory categories:

  • fact: Factual information
  • preference: User preferences and likes/dislikes
  • schedule: Time-based events and recurring activities
  • relationship: Social connections
  • goal: Objectives and aspirations
  • skill: Abilities and expertise
  • personal_info: Personal details
  • work: Professional information
  • health: Medical and wellness data
  • location: Geographic information
  • contact: Contact details

Usage Example

// Initialize system with orchestrator
orchestrator := memory.NewOrchestrator(
	extractor,
	decider,
	eventStore,
	projector,
	reader,
	embedder,
	logger,
)

// Process conversation
messages := []ai.Message{
	{Role: ai.User, Content: "I prefer spaces over tabs"},
}

err := orchestrator.ProcessConversation(ctx, messages)

// Query memories
results, err := reader.Search(ctx, SearchQuery{
	Query: "coding preferences",
	Limit: 10,
})

Testing

The package includes comprehensive tests:

  • Unit tests for all components
  • Integration tests with real PostgreSQL
  • Event replay verification
  • Projection consistency tests

Run tests with: go test -tags=integration ./...

Performance Considerations

  • Events are immutable and append-only (fast writes)
  • Projections are optimized for specific query patterns
  • Vector search uses HNSW index for sub-linear performance
  • Async processing prevents blocking main execution
  • Queue provides backpressure and load management

Future Enhancements

  • Additional projection types (timeline, graph)
  • Event compaction and snapshotting
  • Multi-version concurrency control
  • Distributed event processing
  • Real-time projection updates via CDC

Package memory provides encryption for sensitive memory content

Package memory defines standard errors for the memory system.

Package memory provides memory-specific extraction logic

Package memory provides limits and constraints for the memory system.

Package memory provides types for the AI assistant memory system.

Package memory provides Options pattern for constructors following Go best practices.

Package memory implements memory orchestration

Package memory implements Event Sourcing for memory management

Package memory provides simple query processing for memory search.

Package memory provides queue configuration for async processing.

Package memory provides scoring weights and thresholds for relevance calculation.

Package memory provides session memory for immediate conversation context

Package memory provides similarity thresholds for memory comparison and deduplication.

Package memory provides a clean facade for memory operations

Package memory implements queue.Job for memory processing

Package memory provides memory management with Temporal workflow support. This file contains Temporal workflow definitions for memory processing.

Index

Constants

View Source
const (
	KindValidation = "validation"
	KindStorage    = "storage"
	KindEmbedding  = "embedding"
	KindProcessing = "processing"
	KindConflict   = "conflict"
	KindNotFound   = "not_found"
)

Error kinds for categorizing errors

View Source
const (
	// EmbeddingDimension from Gemini's text-embedding-004 model
	// Reference: https://ai.google.dev/gemini-api/docs/models/gemini#text-embedding
	EmbeddingDimension = 768

	// Content size limits
	MaxContentLength = 10000 // ~2500 words, reasonable for a memory
	MaxSummaryLength = 1000  // ~250 words summary
	MaxKeywords      = 100   // Avoid keyword spam
	MaxEntities      = 50    // Reasonable entity extraction limit
)

Limits define system capacity constraints. These values are based on practical limits and performance considerations.

View Source
const (
	SearchLimitCritical  = 200 // Score >= 5.0
	SearchLimitImportant = 100 // Score >= 4.0
	SearchLimitNormal    = 50  // Score >= 3.0
	SearchLimitLow       = 30  // Score < 3.0

	DefaultSearchLimit = 10    // Default when not specified
	MaxSearchResults   = 10000 // Hard limit to prevent OOM
)

SearchLimits control result set sizes based on importance. Higher importance queries get more results.

View Source
const (
	MinFactsForConcurrent = 2  // Below this, sequential is faster
	MaxWorkersLimit       = 32 // Prevent resource exhaustion
	WorkerMultiplier      = 2  // Workers = CPU cores * multiplier
	DefaultBatchSize      = 10 // Default batch size for processing
)

BatchLimits control concurrent processing.

View Source
const (
	DefaultTypeLimit         = 50  // GetMemoriesByType default
	DefaultRecentMemoryLimit = 20  // GetRecentMemories default
	DefaultHistoryLimit      = 50  // History query default
	DefaultAllHistoryLimit   = 100 // All history default
	ConflictFilterLimit      = 50  // Max memories after conflict filter
	DefaultGraphDepth        = 2   // Graph traversal depth
)

ServiceLimits control API behavior.

View Source
const (
	HighPriority   = 10
	NormalPriority = 5
	LowPriority    = 1
)

Priority levels for message processing.

View Source
const (
	// QueueDefaultMaxWorkers is the default number of concurrent workers
	// Tuned for typical workloads without overwhelming the system
	QueueDefaultMaxWorkers = 3

	// QueueDefaultBufferSize is the default job channel buffer size
	// Large enough to handle bursts without blocking producers
	QueueDefaultBufferSize = 1000

	// QueueDefaultMaxRetries is the default maximum retries for failed jobs
	// Balances reliability with avoiding infinite retry loops
	QueueDefaultMaxRetries = 2

	// QueueDefaultBatchSize is the default number of messages to batch
	// Optimizes database writes without excessive latency
	QueueDefaultBatchSize = 5

	// QueueDefaultBatchTimeout is the default timeout for batch accumulation
	// Ensures timely processing even with low message volume
	QueueDefaultBatchTimeout = 5 * time.Second

	// QueueProcessTimeout is the timeout for processing a single job
	// Prevents stuck jobs from blocking workers indefinitely
	QueueProcessTimeout = 60 * time.Second
)

QueueConfiguration defines parameters for the async processing queue. These values balance throughput with resource consumption.

View Source
const (
	// VectorSimilarityWeight is the weight for vector similarity in scoring
	// when both vector and text relevance are available
	VectorSimilarityWeight = 0.40

	// TextRelevanceWeight is the weight for text relevance when used with vector similarity
	TextRelevanceWeight = 0.20

	// TextOnlyRelevanceWeight is the weight for text relevance when no vector similarity available
	TextOnlyRelevanceWeight = 0.60

	// RecencyWeight is the maximum weight for recency in final score
	RecencyWeight = 0.15

	// AccessFrequencyWeight is the maximum weight for access frequency
	AccessFrequencyWeight = 0.10

	// ConfidenceWeight is the weight for confidence factor
	ConfidenceWeight = 0.05
)

ScoringWeights define the importance of different factors in relevance scoring. These weights must sum to 1.0 for each scoring method.

View Source
const (
	// RecencyHour is the recency score for updates within an hour
	RecencyHour = 0.15

	// RecencyDay is the recency score for updates within a day
	RecencyDay = 0.12

	// RecencyWeek is the recency score for updates within a week
	RecencyWeek = 0.08

	// RecencyMonth is the recency score for updates within a month
	RecencyMonth = 0.04
)

RecencyScores define time-based scoring decay. More recent memories get higher scores.

View Source
const (
	// RecencyScoreDay is the recency score for memories within a day
	RecencyScoreDay = 0.2

	// RecencyScoreWeek is the recency score for memories within a week
	RecencyScoreWeek = 0.15

	// RecencyScoreMonth is the recency score for memories within a month
	RecencyScoreMonth = 0.1

	// RecencyScoreQuarter is the recency score for memories within 90 days
	RecencyScoreQuarter = 0.05
)

RecencyScoresV2 provide alternative recency scoring for different contexts. Used in specialized search scenarios.

View Source
const (
	// FrequentAccessThreshold is the minimum access count for frequent status
	FrequentAccessThreshold = 10

	// ModerateAccessThreshold is the minimum access count for moderate status
	ModerateAccessThreshold = 5

	// AccessScoreFrequent is the score bonus for frequently accessed memories
	AccessScoreFrequent = 0.1

	// AccessScoreModerate is the score bonus for moderately accessed memories
	AccessScoreModerate = 0.05
)

AccessFrequencyThresholds determine access-based scoring.

View Source
const (
	// MinConfidenceThreshold is the minimum confidence to process a fact
	MinConfidenceThreshold = 0.3

	// DefaultConfidence is the default confidence when not specified
	DefaultConfidence = 0.5

	// DefaultMinScore is the default minimum score for search results
	DefaultMinScore = 0.5
)

ConfidenceThresholds define minimum confidence levels.

View Source
const (
	// MinSimilarityThreshold is the minimum similarity score to consider memories related
	// Below this threshold, memories are considered unrelated
	MinSimilarityThreshold float32 = 0.3

	// ModeratelySimilarThreshold indicates moderately similar content
	// Used for broader search contexts
	ModeratelySimilarThreshold float32 = 0.65

	// DefaultSimilarityThreshold is the default similarity for standard searches
	// Balances precision and recall
	DefaultSimilarityThreshold float32 = 0.75

	// HighSimilarityThreshold indicates very similar content
	// Used for precise matching
	HighSimilarityThreshold float32 = 0.85

	// ExactMatchThreshold indicates nearly identical content
	// Used for duplicate detection
	ExactMatchThreshold float32 = 0.95
)

SimilarityThresholds define boundaries for different similarity levels. These are used for search filtering and memory deduplication.

View Source
const (
	// DuplicateThreshold indicates exact duplicate (>0.98 similarity)
	// Action: Deactivate duplicate without merging
	DuplicateThreshold float32 = 0.98

	// MergeSimilarityThreshold indicates highly similar content (0.90-0.98)
	// Action: Merge memories intelligently, preserving unique information
	MergeSimilarityThreshold float32 = 0.90

	// RelateThreshold indicates moderate similarity (0.80-0.90)
	// Action: Create "similar_to" relationship in knowledge graph
	RelateThreshold float32 = 0.80
)

DeduplicationThresholds define the tiered deduplication strategy. Each threshold triggers different consolidation actions.

View Source
const (
	// LengthRatioThreshold is the minimum ratio of content lengths to consider as same topic
	// Used to prevent merging very different sized memories
	LengthRatioThreshold float32 = 0.7

	// DecisionTemperature is the temperature for LLM decision making
	// Low temperature for consistent, deterministic decisions
	DecisionTemperature float32 = 0.1

	// DecisionMaxTokens is the maximum tokens for decision responses
	DecisionMaxTokens = 500
)

DecisionThresholds for LLM-based memory decisions.

View Source
const (
	// DecisionLookAhead is the number of characters to look ahead when parsing
	DecisionLookAhead = 200

	// DecisionMoveOffset is the offset to move past current match when parsing
	DecisionMoveOffset = 20

	// ContentStartOffset is the offset after "content": when parsing JSON
	ContentStartOffset = 10
)

ParsingOffsets for decision response parsing. Used when extracting structured data from LLM responses.

Variables

View Source
var (
	ErrAttributeNotFound = errors.New("attribute not found")
	ErrInvalidAttribute  = errors.New("invalid attribute value")
)

Common errors

View Source
var (
	// ErrNotFound indicates the requested memory doesn't exist
	ErrNotFound = errors.New("memory not found")

	// ErrInvalidType indicates an unsupported memory type
	ErrInvalidType = errors.New("invalid memory type")

	// ErrDuplicate indicates a memory with the same semantic ID already exists
	ErrDuplicate = errors.New("duplicate memory")

	// ErrEmptyContent indicates the memory content is empty
	ErrEmptyContent = errors.New("empty memory content")

	// ErrInvalidQuery indicates the search query is invalid
	ErrInvalidQuery = errors.New("invalid search query")

	// ErrEmbeddingFailed indicates embedding generation failed
	ErrEmbeddingFailed = errors.New("embedding generation failed")

	// ErrStorageUnavailable indicates the storage backend is unavailable
	ErrStorageUnavailable = errors.New("storage unavailable")

	// ErrProcessingTimeout indicates memory processing exceeded timeout
	ErrProcessingTimeout = errors.New("processing timeout")

	// ErrQueueFull indicates the processing queue is at capacity
	ErrQueueFull = errors.New("processing queue full")
)

Sentinel errors for common memory operations

View Source
var (
	ExtractMemoryFacts       = (*MemoryActivities).ExtractMemoryFacts
	StoreMemoryFacts         = (*MemoryActivities).StoreMemoryFacts
	GenerateMemoryEmbeddings = (*MemoryActivities).GenerateMemoryEmbeddings
)

Activity function references for registration

View Source
var SensitivePatterns = []struct {
	Pattern *regexp.Regexp
	Type    string
}{
	{regexp.MustCompile(`(?i)\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b`), "email"},
	{regexp.MustCompile(`(?i)(?:password|passwd|pwd)(?:\s*[:=]\s*|\s+is\s*[:=]?\s*)\S+`), "password"},
	{regexp.MustCompile(`(?i)(?:api[_-]?key|apikey)\s*[:=]\s*\S+`), "api_key"},
	{regexp.MustCompile(`(?i)\b(?:secret|token)\s*[:=]\s*\S+`), "secret"},
	{regexp.MustCompile(`\b(?:\d{4}[-\s]?){3}\d{4}\b`), "credit_card"},
	{regexp.MustCompile(`\b\d{3}-\d{2}-\d{4}\b`), "ssn"},
	{regexp.MustCompile(`(?i)\b(?:private[_-]?key|priv[_-]?key)\s*[:=]\s*\S+`), "private_key"},
	{regexp.MustCompile(`(?i)-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----`), "private_key_block"},
}

SensitivePatterns defines patterns that indicate sensitive information

Functions

func DeterminePriority

func DeterminePriority(messages []ai.Message) int

DeterminePriority calculates priority based on message content. High priority keywords indicate urgent or important information.

func IsNotFound

func IsNotFound(err error) bool

IsNotFound checks if an error indicates a memory was not found

func IsStorage

func IsStorage(err error) bool

IsStorage checks if an error is a storage error

func IsValidation

func IsValidation(err error) bool

IsValidation checks if an error is a validation error

func MaskSensitiveData

func MaskSensitiveData(content string) string

MaskSensitiveData masks sensitive information in content for logging

func ProcessMemoryWorkflow

func ProcessMemoryWorkflow(ctx workflow.Context, input MemoryWorkflowInput) error

ProcessMemoryWorkflow is the main memory processing workflow. It orchestrates fact extraction, decision making, and storage.

func ProcessMemoryWorkflowWithQuery

func ProcessMemoryWorkflowWithQuery(ctx workflow.Context, input MemoryWorkflowInput) error

ProcessMemoryWorkflowWithQuery is an enhanced version with query support.

Types

type Action

type Action struct {
	Type       ActionType
	TargetID   *pgtype.UUID  // For UPDATE/DELETE actions
	Content    string        // For ADD actions or modified content
	RelatedIDs []pgtype.UUID // For corrections - memories being superseded
}

Action represents a single action within a decision

type ActionType

type ActionType string

ActionType defines memory actions

const (
	// ActionAdd represents adding a new memory
	ActionAdd ActionType = "ADD"
	// ActionUpdate represents updating an existing memory
	ActionUpdate ActionType = "UPDATE"
	// ActionDelete represents deleting a memory
	ActionDelete ActionType = "DELETE"
	// ActionSkip represents skipping a memory
	ActionSkip ActionType = "SKIP"
	// ActionMerge represents merging memories
	ActionMerge ActionType = "MERGE"
)

type Context

type Context struct {
	// Temporal context
	OccurredAt *time.Time          `json:"occurred_at,omitempty"`
	StartTime  *time.Time          `json:"start_time,omitempty"`
	EndTime    *time.Time          `json:"end_time,omitempty"`
	Duration   *time.Duration      `json:"duration,omitempty"`
	Recurrence *extract.Recurrence `json:"recurrence,omitempty"`

	// Spatial context
	Location *Location `json:"location,omitempty"`

	// Source context
	Source    string `json:"source,omitempty"` // conversation, form, etc.
	SessionID string `json:"session_id,omitempty"`
}

Context provides temporal and spatial context for memories

func (*Context) HasLocation

func (c *Context) HasLocation() bool

HasLocation returns true if the context has a location

func (*Context) HasTime

func (c *Context) HasTime() bool

HasTime returns true if the context has time information

type CorrectionInfo

type CorrectionInfo struct {
	IsCorrection bool
	Pattern      string // Which pattern matched
	Source       event.MemorySource
}

CorrectionInfo represents detected correction information

type Decider

type Decider struct {
	// contains filtered or unexported fields
}

Decider makes memory decisions using either simple rules or LLM-based semantic analysis.

func NewDecider

func NewDecider(similarityThreshold float32, aiClient ai.Client, opts ...DeciderOption) *Decider

NewDecider creates a new decider with optional AI support. Use options to configure logger and threshold.

func (*Decider) Decide

func (d *Decider) Decide(ctx context.Context, fact extract.Fact, similar []*Memory) (Decision, error)

Decide determines what action to take for a given fact.

type DeciderOption

type DeciderOption func(*Decider)

DeciderOption configures a Decider.

func WithDeciderLogger

func WithDeciderLogger(log logger.Logger) DeciderOption

WithDeciderLogger sets the logger for the decider.

func WithDeciderThreshold

func WithDeciderThreshold(threshold float32) DeciderOption

WithDeciderThreshold sets the similarity threshold.

type Decision

type Decision struct {
	Actions    []Action // Support multiple actions in sequence
	Reason     string
	Confidence float32 // Confidence in the decision (0-1)
}

Decision represents what to do with a memory

type DecisionMaker

type DecisionMaker interface {
	Decide(ctx context.Context, fact extract.Fact, existing []*Memory) (Decision, error)
}

DecisionMaker defines the interface for making memory decisions

type Dependencies

type Dependencies struct {
	DB             *pgxpool.Pool
	AIService      ai.Client
	Embedder       ai.Embedding
	TemporalClient client.Client // Required Temporal client
	TaskQueue      string
	EncryptionKey  string // Optional: Key for encrypting sensitive data
	Logger         logger.Logger
}

Dependencies for creating the memory system

type Encryptor

type Encryptor struct {
	// contains filtered or unexported fields
}

Encryptor handles encryption and decryption of sensitive data

func NewEncryptor

func NewEncryptor(key string, log logger.Logger) (*Encryptor, error)

NewEncryptor creates a new encryptor with the given key The key should be derived from a secure source (e.g., environment variable)

func (*Encryptor) ContainsSensitiveData

func (e *Encryptor) ContainsSensitiveData(content string) bool

ContainsSensitiveData checks if the content contains sensitive information

func (*Encryptor) Decrypt

func (e *Encryptor) Decrypt(ciphertext string) (string, error)

Decrypt decrypts the given ciphertext

func (*Encryptor) DecryptMemoryContent

func (e *Encryptor) DecryptMemoryContent(content string, attributes map[string]interface{}) (string, map[string]interface{}, error)

DecryptMemoryContent decrypts sensitive fields in memory content

func (*Encryptor) Encrypt

func (e *Encryptor) Encrypt(plaintext string) (string, error)

Encrypt encrypts the given plaintext

func (*Encryptor) EncryptMemoryContent

func (e *Encryptor) EncryptMemoryContent(content string, attributes map[string]interface{}) (string, map[string]interface{}, error)

EncryptMemoryContent encrypts sensitive fields in memory content

type Error

type Error struct {
	Op      string // Operation that failed
	Kind    string // Type of error
	Err     error  // Underlying error
	Message string // Additional context
}

Error represents a detailed error from memory operations

func NewError

func NewError(op, kind string, err error, message ...string) *Error

NewError creates a new Error

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface

func (*Error) Is

func (e *Error) Is(target error) bool

Is checks if the error matches target

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying error

type EventProcessor

type EventProcessor interface {
	// ProcessFacts transforms facts into events based on similar memories
	ProcessFacts(ctx context.Context, facts []extract.Fact, similarMemories map[string][]*Memory) ([]event.Event, error)
}

EventProcessor transforms facts into events. It is a pure function that doesn't have side effects.

type Extractor

type Extractor interface {
	Extract(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)
}

Extractor interface for memory extraction

type Filter

type Filter struct {
	Types           []Type
	Statuses        []Status
	UserIDs         []uuid.UUID
	TimeRange       *TimeRange
	MinConfidence   float32
	ExcludeArchived bool
}

Filter represents search filter criteria

func (Filter) HasStatusFilter

func (f Filter) HasStatusFilter() bool

HasStatusFilter returns true if status filter is set

func (Filter) HasTimeRange

func (f Filter) HasTimeRange() bool

HasTimeRange returns true if time range is set

func (Filter) HasTypeFilter

func (f Filter) HasTypeFilter() bool

HasTypeFilter returns true if type filter is set

func (Filter) HasUserFilter

func (f Filter) HasUserFilter() bool

HasUserFilter returns true if user filter is set

func (Filter) IsEmpty

func (f Filter) IsEmpty() bool

IsEmpty returns true if no filters are set

type GenerateMemoryEmbeddingsInput

type GenerateMemoryEmbeddingsInput struct {
	ConversationID string      `json:"conversation_id"`
	MemoryIDs      []uuid.UUID `json:"memory_ids,omitempty"`
}

GenerateMemoryEmbeddingsInput contains input for embedding generation.

type Item

type Item struct {
	Content string    // The actual information
	Type    string    // Type of information (fact, preference, etc.)
	Source  string    // Who said it (user/assistant)
	Created time.Time // When it was added
}

Item represents a piece of information in the session. This is simpler than a full Memory as it's temporary.

type LLMExtractor

type LLMExtractor struct {
	// contains filtered or unexported fields
}

LLMExtractor wraps the generic extractor with memory-specific prompts

func NewMemoryExtractor

func NewMemoryExtractor(aiClient ai.Client) *LLMExtractor

NewMemoryExtractor creates a memory-specific extractor

func (*LLMExtractor) Extract

func (e *LLMExtractor) Extract(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)

Extract uses memory-specific prompts for extraction

type Location

type Location struct {
	Name        string  `json:"name,omitempty"`
	Address     string  `json:"address,omitempty"`
	Latitude    float64 `json:"latitude,omitempty"`
	Longitude   float64 `json:"longitude,omitempty"`
	Description string  `json:"description,omitempty"`
}

Location represents spatial information for memories

type Memory

type Memory struct {
	ID         pgtype.UUID `json:"id"`
	SemanticID string      `json:"semantic_id"` // Preserved from existing system
	Type       Type        `json:"type"`

	// Core content
	Content string `json:"content"` // Original content in original language
	Summary string `json:"summary"` // Standardized summary (optional)

	// Structured data
	Entities   []extract.Entity  `json:"entities"`   // People, places, activities involved
	Attributes *MemoryAttributes `json:"attributes"` // Strongly-typed attributes

	// Relations and context
	Relations []Relation `json:"relations"` // Links to other memories
	Context   Context    `json:"context"`   // When, where, etc.

	// Search and retrieval
	Embedding pgvector.Vector `json:"-"`        // Vector embedding
	Keywords  []string        `json:"keywords"` // Search keywords

	// Metadata
	Confidence  float32    `json:"confidence"` // 0.0 to 1.0
	Status      Status     `json:"status"`     // active, archived, deleted
	Version     int        `json:"version"`
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
	AccessedAt  time.Time  `json:"accessed_at"`
	AccessCount int32      `json:"access_count"`
	ArchivedAt  *time.Time `json:"archived_at,omitempty"` // When memory was archived

	// Source tracking
	Source           string      `json:"source"`                      // Memory source (user_direct, user_confirm, etc.)
	Supersedes       []uuid.UUID `json:"supersedes,omitempty"`        // Memories this replaces
	CorrectedBy      *uuid.UUID  `json:"corrected_by,omitempty"`      // Memory that corrected this
	CorrectionReason string      `json:"correction_reason,omitempty"` // Why this was corrected

	// Search result fields (not persisted)
	Similarity float32 `json:"similarity,omitempty"` // Vector similarity score when from search
}

Memory represents a single memory with entities, relations, and context.

func (*Memory) Age

func (m *Memory) Age() time.Duration

Age returns the age of the memory

func (*Memory) Apply

func (m *Memory) Apply(evt event.Event) error

Apply transforms a memory based on an event. This implements event sourcing - memory state is derived from events.

func (*Memory) GetID

func (m *Memory) GetID() uuid.UUID

GetID returns the UUID from pgtype.UUID

func (*Memory) GetNextOccurrence

func (m *Memory) GetNextOccurrence(after time.Time) *time.Time

GetNextOccurrence calculates next occurrence for recurring memories

func (*Memory) HasEmbedding

func (m *Memory) HasEmbedding() bool

HasEmbedding returns true if the memory has an embedding

func (*Memory) HasSupersedes

func (m *Memory) HasSupersedes() bool

HasSupersedes returns true if this memory supersedes others

func (*Memory) IsActive

func (m *Memory) IsActive() bool

IsActive returns true if the memory is active

func (*Memory) IsCorrected

func (m *Memory) IsCorrected() bool

IsCorrected returns true if this memory has been corrected

func (*Memory) IsRecurring

func (m *Memory) IsRecurring() bool

IsRecurring checks if this memory has recurrence

func (*Memory) MarshalJSON

func (m *Memory) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for Memory

func (*Memory) NeedsEmbedding

func (m *Memory) NeedsEmbedding() bool

NeedsEmbedding returns true if the memory needs an embedding

func (*Memory) Score

func (m *Memory) Score(query string, now time.Time) float32

Score calculates memory relevance score

type MemoryActivities

type MemoryActivities struct {
	// contains filtered or unexported fields
}

MemoryActivities 包含所有 activity 的依賴 MemoryActivities 是具體類型,不是介面

func NewMemoryActivities

func NewMemoryActivities(system *System, logger logger.Logger) *MemoryActivities

NewMemoryActivities creates activities with dependencies

func (*MemoryActivities) ExtractMemoryFacts

func (a *MemoryActivities) ExtractMemoryFacts(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)

ExtractMemoryFacts extracts facts from messages

func (*MemoryActivities) GenerateMemoryEmbeddings

func (a *MemoryActivities) GenerateMemoryEmbeddings(ctx context.Context, input GenerateMemoryEmbeddingsInput) error

GenerateMemoryEmbeddings generates embeddings for memories

func (*MemoryActivities) StoreMemoryFacts

func (a *MemoryActivities) StoreMemoryFacts(ctx context.Context, input StoreMemoryFactsInput) error

StoreMemoryFacts stores facts to event store

type MemoryAttributes

type MemoryAttributes struct {
	// Source tracking - who/how this memory was created
	Source            event.MemorySource `json:"source,omitempty"`
	IsCorrection      bool               `json:"is_correction,omitempty"`
	CorrectionPattern string             `json:"correction_pattern,omitempty"`

	// Categorization
	Category string   `json:"category,omitempty"` // food, drink, activities, etc.
	Tags     []string `json:"tags,omitempty"`

	// Importance and confidence adjustments
	Importance       float32 `json:"importance,omitempty"` // 0.0 to 1.0
	ConfidenceBoost  float32 `json:"confidence_boost,omitempty"`
	ConfidenceReason string  `json:"confidence_reason,omitempty"`

	// Temporal metadata
	LastModified      *time.Time `json:"last_modified,omitempty"`
	ModificationCount int        `json:"modification_count,omitempty"`
	LastAccessed      *time.Time `json:"last_accessed,omitempty"`
	AccessCount       int        `json:"access_count,omitempty"`

	// Relations to other memories
	RelatedMemories []string `json:"related_memories,omitempty"`
	DerivedFrom     string   `json:"derived_from,omitempty"`

	// Extension point for domain-specific data
	// Only use this for truly dynamic data that can't be structured
	Extra map[string]json.RawMessage `json:"extra,omitempty"`
}

MemoryAttributes provides strongly-typed attributes for memories. This replaces the generic map[string]interface{} with explicit fields.

func NewMemoryAttributes

func NewMemoryAttributes() *MemoryAttributes

NewMemoryAttributes creates a new MemoryAttributes with default values

func NewMemoryAttributesWithSource

func NewMemoryAttributesWithSource(source event.MemorySource) *MemoryAttributes

NewMemoryAttributesWithSource creates attributes with a specific source

func (*MemoryAttributes) Clone

func (a *MemoryAttributes) Clone() *MemoryAttributes

Clone creates a deep copy of the attributes

func (*MemoryAttributes) FromJSON

func (a *MemoryAttributes) FromJSON(data json.RawMessage) error

FromJSON populates attributes from JSON data

func (*MemoryAttributes) GetExtra

func (a *MemoryAttributes) GetExtra(key string, target interface{}) error

GetExtra retrieves domain-specific data

func (*MemoryAttributes) GetTrustLevel

func (a *MemoryAttributes) GetTrustLevel() int

GetTrustLevel returns a trust level based on the source Higher values mean more trustworthy

func (*MemoryAttributes) IncrementAccess

func (a *MemoryAttributes) IncrementAccess()

IncrementAccess updates access tracking

func (*MemoryAttributes) IncrementModification

func (a *MemoryAttributes) IncrementModification()

IncrementModification updates modification tracking

func (*MemoryAttributes) IsUserSourced

func (a *MemoryAttributes) IsUserSourced() bool

IsUserSourced returns true if the memory came directly from the user

func (*MemoryAttributes) MarkAsCorrection

func (a *MemoryAttributes) MarkAsCorrection(pattern string)

MarkAsCorrection marks these attributes as a correction

func (*MemoryAttributes) SetExtra

func (a *MemoryAttributes) SetExtra(key string, value interface{}) error

SetExtra adds domain-specific data

func (*MemoryAttributes) ToJSON

func (a *MemoryAttributes) ToJSON() (json.RawMessage, error)

ToJSON converts attributes to JSON for database storage

func (*MemoryAttributes) Validate

func (a *MemoryAttributes) Validate() error

Validate checks if the attributes are valid

type MemoryMerger

type MemoryMerger struct {
	// contains filtered or unexported fields
}

MemoryMerger handles intelligent memory merging decisions

func NewMemoryMerger

func NewMemoryMerger(log logger.Logger) *MemoryMerger

NewMemoryMerger creates a new memory merger

func (*MemoryMerger) DecideMerge

func (m *MemoryMerger) DecideMerge(ctx context.Context, newFact event.CreatedData, similar []*Memory) MergeDecision

DecideMerge determines how to handle a new memory based on similarity to existing memories

type MemoryWorkflowInput

type MemoryWorkflowInput struct {
	ConversationID string       `json:"conversation_id"`
	Messages       []ai.Message `json:"messages"`
}

MemoryWorkflowInput is the input for memory processing workflow. This is kept in the memory package to avoid circular dependencies.

type MemoryWorkflowState

type MemoryWorkflowState struct {
	ConversationID string `json:"conversation_id"`
	Status         string `json:"status"`
	FactsExtracted int    `json:"facts_extracted"`
	EventsStored   int    `json:"events_stored"`
	Error          string `json:"error,omitempty"`
}

MemoryWorkflowState represents the workflow state for queries.

type MergeAction

type MergeAction string

MergeAction defines what to do with similar memories

const (
	MergeActionNone       MergeAction = "none"       // No merge needed
	MergeActionDeactivate MergeAction = "deactivate" // Deactivate as duplicate
	MergeActionMerge      MergeAction = "merge"      // Intelligently merge
	MergeActionRelate     MergeAction = "relate"     // Create relationship
)

type MergeDecision

type MergeDecision struct {
	Action     MergeAction
	TargetID   uuid.UUID
	Confidence float32
	Reason     string
	MergedData *MergedMemoryData
}

MergeDecision represents the outcome of a merge decision

type MergeStrategy

type MergeStrategy struct {
	// Thresholds for different merge actions
	IdenticalThreshold   float32 // >0.98: Treat as duplicate, deactivate
	HighSimilarThreshold float32 // 0.90-0.98: Merge intelligently
	RelatedThreshold     float32 // 0.80-0.90: Create relationship
}

MergeStrategy defines the threshold levels for memory merging decisions

func DefaultMergeStrategy

func DefaultMergeStrategy() MergeStrategy

DefaultMergeStrategy returns the default merge strategy

type MergedMemoryData

type MergedMemoryData struct {
	Content    string
	Keywords   []string
	Confidence float32
	Entities   map[string]interface{}
	Attributes map[string]interface{}
}

MergedMemoryData contains the result of merging two memories

type Metrics

type Metrics struct {
	ExtractCount    int64                    `json:"extract_count"`
	ExtractErrors   int64                    `json:"extract_errors"`
	DecideCount     int64                    `json:"decide_count"`
	DecideErrors    int64                    `json:"decide_errors"`
	StoreCount      int64                    `json:"store_count"`
	StoreErrors     int64                    `json:"store_errors"`
	ProcessingTime  map[string]time.Duration `json:"processing_time"`
	LastProcessedAt time.Time                `json:"last_processed_at"`
}

Metrics tracks pipeline performance

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator coordinates memory processing components. It separates concerns: processing, storing, projecting, and searching.

func NewOrchestrator

func NewOrchestrator(
	processor *Processor,
	eventStore *event.Store,
	projector *Projector,
	reader *Reader,
	opts ...OrchestratorOption,
) *Orchestrator

NewOrchestrator creates a new orchestrator with required dependencies. Use options to configure optional settings.

func (*Orchestrator) ExtractFacts

func (o *Orchestrator) ExtractFacts(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)

ExtractFacts extracts facts from messages. This is exposed for Temporal activities.

func (*Orchestrator) ProcessFacts

func (o *Orchestrator) ProcessFacts(ctx context.Context, facts []extract.Fact) ([]event.Event, error)

ProcessFacts processes extracted facts through decision making and storage. This is exposed for Temporal activities.

func (*Orchestrator) ProcessMessages

func (o *Orchestrator) ProcessMessages(ctx context.Context, messages []ai.Message) error

ProcessMessages orchestrates the entire memory processing flow.

func (*Orchestrator) Shutdown

func (o *Orchestrator) Shutdown(ctx context.Context) error

Shutdown waits for all background projections to complete. This should be called during graceful shutdown to prevent goroutine leaks.

func (*Orchestrator) Stop

func (o *Orchestrator) Stop()

Stop gracefully shuts down the orchestrator components.

type OrchestratorOption

type OrchestratorOption func(*Orchestrator)

OrchestratorOption configures an Orchestrator.

func WithOrchestratorLogger

func WithOrchestratorLogger(log logger.Logger) OrchestratorOption

WithOrchestratorLogger sets the logger for the orchestrator.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor implements EventProcessor as a pure fact-to-event transformer. It does NOT store events or update projections - that's the orchestrator's job.

func NewProcessor

func NewProcessor(extractor Extractor, decider DecisionMaker, opts ...ProcessorOption) *Processor

NewProcessor creates a pure processor for transforming facts to events. Use options to configure logger.

func (*Processor) ExtractFacts

func (p *Processor) ExtractFacts(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)

ExtractFacts extracts facts from messages with correction detection. This is separated so the orchestrator can control the flow.

func (*Processor) GetDecider

func (p *Processor) GetDecider() DecisionMaker

GetDecider returns the decider (needed for Temporal activities)

func (*Processor) GetExtractor

func (p *Processor) GetExtractor() Extractor

GetExtractor returns the extractor (needed for Temporal activities)

func (*Processor) ProcessFacts

func (p *Processor) ProcessFacts(ctx context.Context, facts []extract.Fact, similarMemories map[string][]*Memory) ([]event.Event, error)

ProcessFacts transforms facts into events based on similar memories. This is a pure function - it only transforms data, no side effects.

type ProcessorOption

type ProcessorOption func(*Processor)

ProcessorOption configures a Processor.

func WithProcessorLogger

func WithProcessorLogger(log logger.Logger) ProcessorOption

WithProcessorLogger sets the logger for the processor.

type Projector

type Projector struct {
	// contains filtered or unexported fields
}

Projector handles updating the read model based on evts. It transforms evts into the denormalized view for queries. It also handles deduplication during projection.

func NewProjector

func NewProjector(db *pgxpool.Pool, embedder ai.Embedding, opts ...ProjectorOption) *Projector

NewProjector creates a new projector with required database and embedder. Use options to configure encryptor and logger.

func (*Projector) Project

func (p *Projector) Project(ctx context.Context, evt event.Event) error

Project applies an evt to update the read model

func (*Projector) Rebuild

func (p *Projector) Rebuild(ctx context.Context, store *event.Store) error

Rebuild reconstructs the entire read model from evts

type ProjectorOption

type ProjectorOption func(*Projector)

ProjectorOption configures a Projector.

func WithProjectorEncryptor

func WithProjectorEncryptor(encryptor *Encryptor) ProjectorOption

WithProjectorEncryptor sets the encryptor for sensitive data.

func WithProjectorLogger

func WithProjectorLogger(log logger.Logger) ProjectorOption

WithProjectorLogger sets the logger for the projector.

type Query

type Query struct {
	ID               *uuid.UUID
	Text             string
	Types            []Type
	Statuses         []Status
	UserIDs          []uuid.UUID
	Entities         []string
	TimeRange        *TimeRange
	Limit            int
	MinScore         float32
	MinSimilarity    float32
	ExcludeArchived  bool
	IncludeEmbedding bool
	IncludeRelations bool
}

Query defines search parameters

func (Query) HasFilters

func (q Query) HasFilters() bool

HasFilters returns true if the query has any filters

func (Query) NeedsEmbedding

func (q Query) NeedsEmbedding() bool

NeedsEmbedding returns true if the query needs embedding

func (*Query) SetDefaults

func (q *Query) SetDefaults()

SetDefaults sets default values for query

func (Query) Validate

func (q Query) Validate() error

Validate checks if the query is valid

type QueryAnalyzer

type QueryAnalyzer struct{}

QueryAnalyzer processes user queries for memory search. Following Go's philosophy: simplicity over cleverness.

func NewQueryAnalyzer

func NewQueryAnalyzer() *QueryAnalyzer

NewQueryAnalyzer creates a new query analyzer.

func (*QueryAnalyzer) AnalyzeQuery

func (qa *QueryAnalyzer) AnalyzeQuery(ctx context.Context, query string) (string, error)

AnalyzeQuery extracts meaningful search terms from a user query. Simple implementation: remove question words, keep content words.

type QueueMetrics

type QueueMetrics struct {
	QueueSize     int
	ActiveWorkers int
}

QueueMetrics represents simplified queue metrics for shutdown.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader handles all query operations against the read model. It only reads from the projected memories table, not from events.

func NewReader

func NewReader(db *pgxpool.Pool, opts ...ReaderOption) *Reader

NewReader creates a new reader for memory queries. Use options to configure embedder, encryptor, and logger.

func (*Reader) Get

func (r *Reader) Get(ctx context.Context, id uuid.UUID) (*Memory, error)

Get retrieves a memory by ID

func (*Reader) GetActive

func (r *Reader) GetActive(ctx context.Context, limit int) ([]*Memory, error)

GetActive retrieves the most recent active memories

func (*Reader) GetExceptions

func (r *Reader) GetExceptions(ctx context.Context, parentID uuid.UUID) ([]*Memory, error)

GetExceptions is deprecated - exceptions are handled through events now This method returns empty list for compatibility

func (*Reader) GetRecent

func (r *Reader) GetRecent(ctx context.Context, limit int) ([]*Memory, error)

GetRecent retrieves recently created memories

func (*Reader) Search

func (r *Reader) Search(ctx context.Context, query string, limit int) ([]*Memory, error)

Search performs a vector similarity search on memories using HNSW index

func (*Reader) UpdateEmbedding

func (r *Reader) UpdateEmbedding(ctx context.Context, memoryID uuid.UUID, embedding []float32) error

UpdateEmbedding updates the embedding for a memory

type ReaderOption

type ReaderOption func(*Reader)

ReaderOption configures a Reader.

func WithReaderEmbedder

func WithReaderEmbedder(embedder ai.Embedding) ReaderOption

WithReaderEmbedder sets the embedder for vector search.

func WithReaderEncryptor

func WithReaderEncryptor(encryptor *Encryptor) ReaderOption

WithReaderEncryptor sets the encryptor for sensitive data.

func WithReaderLogger

func WithReaderLogger(log logger.Logger) ReaderOption

WithReaderLogger sets the logger for the reader.

type Relation

type Relation struct {
	Type     string      `json:"type"`      // relation type (e.g., "related_to", "derived_from")
	TargetID pgtype.UUID `json:"target_id"` // ID of related memory
	Strength float32     `json:"strength"`  // relation strength (0.0 to 1.0)
}

Relation represents a connection between memories

type SearchResult

type SearchResult struct {
	Memories []Memory
	HasMore  bool
}

SearchResult holds the results of a memory search

func (SearchResult) GetMemoryIDs

func (sr SearchResult) GetMemoryIDs() []uuid.UUID

GetMemoryIDs returns all memory IDs in the result

func (SearchResult) HasMoreResults

func (sr SearchResult) HasMoreResults() bool

HasMoreResults returns true if there are more results

func (SearchResult) IsEmpty

func (sr SearchResult) IsEmpty() bool

IsEmpty returns true if there are no memories

func (SearchResult) TotalCount

func (sr SearchResult) TotalCount() int

TotalCount returns the total number of memories in the result

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session holds the current conversation's working memory. It provides immediate access to the ongoing conversation without persistence overhead. This represents a single conversation session that can be queried instantly.

func NewSession

func NewSession(opts ...SessionOption) *Session

NewSession creates a new conversation session. Use options to configure logger and TTL.

func (*Session) Age

func (s *Session) Age() time.Duration

Age returns how long this session has been active.

func (*Session) Append

func (s *Session) Append(msg ai.Message)

Append adds a message to the session and extracts key information. This is the primary way to add content to the session.

func (*Session) Find

func (s *Session) Find(query string) []*Item

Find searches for items matching the query. This is a simple substring search optimized for speed over sophistication.

func (*Session) Format

func (s *Session) Format() string

Format returns the session content formatted for LLM context. This provides immediate context from the current conversation.

func (*Session) Len

func (s *Session) Len() int

Len returns the number of messages in the session.

func (*Session) Messages

func (s *Session) Messages() []ai.Message

Messages returns all messages in the session. The returned slice is a copy to prevent external modification.

func (*Session) Reset

func (s *Session) Reset()

Reset clears the session for a new conversation.

type SessionOption

type SessionOption func(*Session)

SessionOption configures a Session.

func WithSessionLogger

func WithSessionLogger(log logger.Logger) SessionOption

WithSessionLogger sets the logger for the session.

func WithSessionTTL

func WithSessionTTL(ttl int64) SessionOption

WithSessionTTL sets the TTL for short-term memories.

type SimilarMemory

type SimilarMemory struct {
	ID         string
	Content    string
	Similarity float32
}

SimilarMemory represents a memory with its similarity score

type Status

type Status string

Status defines the lifecycle status of a memory

const (
	// StatusActive represents a normal, searchable memory
	StatusActive Status = "active"
	// StatusArchived represents an old memory, not in default search
	StatusArchived Status = "archived"
	// StatusDeleted represents a soft deleted memory, pending cleanup
	StatusDeleted Status = "deleted"
	// StatusPending represents a pending memory
	StatusPending Status = "pending"
)

func (Status) String

func (s Status) String() string

String returns the string representation of Status

type StoreMemoryFactsInput

type StoreMemoryFactsInput struct {
	ConversationID string         `json:"conversation_id"`
	Facts          []extract.Fact `json:"facts"`
}

StoreMemoryFactsInput contains input for the store facts activity.

type System

type System struct {
	// contains filtered or unexported fields
}

System provides a clean API for memory operations. It coordinates components but doesn't own business logic.

func NewSystem

func NewSystem(deps Dependencies) (*System, error)

NewSystem creates a new memory system with Event Sourcing and Temporal.

func (*System) ArchiveMemory

func (s *System) ArchiveMemory(ctx context.Context, memoryID uuid.UUID, reason string) error

ArchiveMemory archives a memory with the given reason. In Event Sourcing, we don't delete - we create an archive event.

func (*System) ExtractFacts

func (s *System) ExtractFacts(ctx context.Context, messages []ai.Message) ([]extract.Fact, error)

ExtractFacts extracts facts from messages without storing them. This is used by Temporal activities for the extraction step.

func (*System) GenerateEmbeddings

func (s *System) GenerateEmbeddings(ctx context.Context, memoryIDs []uuid.UUID) error

GenerateEmbeddings generates embeddings for recent memories without embeddings. This is used by Temporal activities for the embedding step.

func (*System) GetByID

func (s *System) GetByID(ctx context.Context, id string) (*Memory, error)

GetByID retrieves a memory by ID.

func (*System) GetByType

func (s *System) GetByType(ctx context.Context, memType Type, limit int) ([]*Memory, error)

GetByType retrieves memories by type. TODO: Implement type-based filtering in reader

func (*System) GetByUser

func (s *System) GetByUser(ctx context.Context, userID string, limit int) ([]*Memory, error)

GetByUser retrieves memories for a specific user. TODO: Implement user-based filtering in reader

func (*System) GetContextForLLM

func (s *System) GetContextForLLM(ctx context.Context, query string, limit int) (string, error)

GetContextForLLM returns formatted context for LLM consumption. This provides a structured view of memories relevant to the query.

func (*System) GetEventStore

func (s *System) GetEventStore() *event.Store

GetEventStore returns the event store (needed for tools).

func (*System) GetOrchestrator

func (s *System) GetOrchestrator() *Orchestrator

GetOrchestrator returns the orchestrator (needed for Temporal activities).

func (*System) GetProjector

func (s *System) GetProjector() *Projector

GetProjector returns the projector (needed for tools).

func (*System) GetQueueMetrics

func (s *System) GetQueueMetrics() QueueMetrics

GetQueueMetrics returns current queue metrics for shutdown monitoring.

func (*System) GetReader

func (s *System) GetReader() *Reader

GetReader returns the reader (needed for Temporal activities).

func (*System) GetRecent

func (s *System) GetRecent(ctx context.Context, limit int) ([]*Memory, error)

GetRecent retrieves recent memories.

func (*System) GetSession

func (s *System) GetSession() *Session

GetSession returns the current conversation session. This is mainly for debugging and testing.

func (*System) LoadMemory

func (s *System) LoadMemory(ctx context.Context, id uuid.UUID) (*Memory, error)

LoadMemory loads a memory from its event stream. This is used for debugging and tools.

func (*System) MergeMemories

func (s *System) MergeMemories(ctx context.Context, keepMemoryID uuid.UUID, sourceMemoryIDs []uuid.UUID, reason string) error

MergeMemories merges multiple memories into one.

func (*System) Process

func (s *System) Process(ctx context.Context, messages []ai.Message) error

Process enqueues messages for background processing using Temporal. The actual processing happens in a Temporal workflow to ensure reliability. Messages are also added to the conversation buffer for immediate access.

func (*System) ProcessFacts

func (s *System) ProcessFacts(ctx context.Context, facts []extract.Fact) error

ProcessFacts takes extracted facts and processes them through decision making and storage. This is used by Temporal activities for the storage step.

func (*System) ResetSession

func (s *System) ResetSession()

ResetSession clears the conversation session. This should be called when starting a new conversation.

func (*System) Search

func (s *System) Search(ctx context.Context, query string, limit int) ([]*Memory, error)

Search finds memories matching the query. It searches both the conversation buffer (immediate context) and persistent memories.

func (*System) SearchWithVector

func (s *System) SearchWithVector(ctx context.Context, query string, threshold float32, limit int) ([]*Memory, error)

SearchWithVector performs vector similarity search. This now uses the unified Search method which automatically uses vector search when embedder is available.

func (*System) Shutdown

func (s *System) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the memory system. It waits for all background operations to complete.

func (*System) Start

func (s *System) Start(ctx context.Context)

Start begins background processing.

func (*System) Stop

func (s *System) Stop()

Stop gracefully shuts down the system.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a memory processing task that can be queued. It implements queue.Job interface for async execution.

func NewTask

func NewTask(messages []ai.Message, priority int, orchestrator *Orchestrator) *Task

NewTask creates a new memory processing task.

func (*Task) Execute

func (t *Task) Execute(ctx context.Context) error

Execute processes the messages through the memory system.

func (*Task) ID

func (t *Task) ID() string

ID returns the task ID.

func (*Task) Priority

func (t *Task) Priority() int

Priority returns the task priority.

type TimeRange

type TimeRange struct {
	Start time.Time
	End   time.Time
}

TimeRange for temporal queries

func (TimeRange) Contains

func (tr TimeRange) Contains(t time.Time) bool

Contains checks if a time is within the range

func (TimeRange) Duration

func (tr TimeRange) Duration() time.Duration

Duration returns the duration of the time range

func (TimeRange) IsValid

func (tr TimeRange) IsValid() bool

IsValid checks if the time range is valid

type Type

type Type string

Type defines categories of memories

const (
	// TypeFact represents factual information
	TypeFact Type = "fact"
	// TypePreference represents user preferences
	TypePreference Type = "preference"
	// TypeSchedule represents scheduled items
	TypeSchedule Type = "schedule"
	// TypeRelationship represents relationships
	TypeRelationship Type = "relationship"
	// TypeGoal represents goals
	TypeGoal Type = "goal"
	// TypeSkill represents skills
	TypeSkill Type = "skill"
	// TypePersonalInfo represents personal information
	TypePersonalInfo Type = "personal_info"
	// TypeWork represents work-related information
	TypeWork Type = "work"
	// TypeHealth represents health-related information
	TypeHealth Type = "health"
	// TypeLocation represents location information
	TypeLocation Type = "location"
	// TypeContact represents contact information
	TypeContact Type = "contact"
	// TypeGeneral represents general information
	TypeGeneral Type = "general"
	// TypeBelief represents beliefs
	TypeBelief Type = "belief"
	// TypeEpisode represents episodes
	TypeEpisode Type = "episode"
	// TypeConcept represents concepts
	TypeConcept Type = "concept"
	// TypeEmotion represents emotions
	TypeEmotion Type = "emotion"
	// TypeRule represents rules
	TypeRule Type = "rule"
)

func (Type) IsValid

func (t Type) IsValid() bool

IsValid checks if the type is valid

func (Type) String

func (t Type) String() string

String returns the string representation of Type

Directories

Path Synopsis
Package event implements Event Sourcing storage
Package event implements Event Sourcing storage
Package testing provides test cluster management inspired by CockroachDB
Package testing provides test cluster management inspired by CockroachDB

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL