communication

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

Tiered Communication System

Status: ✅ Implemented (feature/tiered-communication branch) Tests: 28 unit tests passing Race Conditions: 0 Coverage: Value semantics, reference semantics, auto-promotion, persistence

Overview

The tiered communication system enables efficient agent-to-agent messaging with intelligent routing between value-based and reference-based semantics. This solves the problem of passing large payloads (session state, query results, datasets) between agents without overwhelming the message bus.

Three-Tier Architecture
┌─────────────────────────────────────────────────────┐
│           Tier 1: Always Reference                  │
│  Large, persistent data (session_state, datasets)   │
│  → Always stored in SQLite/Redis with reference ID  │
└─────────────────────────────────────────────────────┘
                        ▼
┌─────────────────────────────────────────────────────┐
│           Tier 2: Auto-Promote                      │
│  Dynamic routing based on payload size threshold    │
│  Small: inline value | Large: reference storage     │
└─────────────────────────────────────────────────────┘
                        ▼
┌─────────────────────────────────────────────────────┐
│           Tier 3: Always Value                      │
│  Small, ephemeral data (control messages, acks)     │
│  → Always passed inline, never stored                │
└─────────────────────────────────────────────────────┘

Components

1. Reference Store Interface

Abstract storage backend for large message payloads:

type ReferenceStore interface {
    Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
    Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)
    Delete(ctx context.Context, refID string) error
    List(ctx context.Context) ([]*loomv1.Reference, error)
    Stats(ctx context.Context) (*StoreStats, error)
    Close() error
}

Implementations:

  • MemoryStore: In-memory storage with TTL-based garbage collection
  • SQLiteStore: Persistent storage with ref_counting and manual GC
  • 📋 RedisStore: Planned for distributed deployments
2. Policy Manager

Determines routing strategy (value vs reference) based on message type:

type PolicyManager interface {
    GetPolicy(messageType string) *loomv1.CommunicationPolicy
    ShouldUseReference(messageType string, sizeBytes int64) bool
}

Built-in Policies:

  • NewAlwaysReferencePolicy() - Always use reference storage
  • NewAlwaysValuePolicy() - Always pass inline value
  • NewAutoPromotePolicy(threshold) - Threshold-based routing (default: 10KB)
  • NewSessionStatePolicy() - Always reference for session state
  • NewWorkflowContextPolicy() - Always reference for workflow context
  • NewToolResultPolicy() - Auto-promote for tool results
3. Agent Integration

Agents use Send() and Receive() methods for communication:

// pkg/agent/agent_communication.go

// Send sends a message to another agent using value or reference semantics
func (a *Agent) Send(ctx context.Context, toAgent, messageType string, data interface{}) (*loomv1.CommunicationMessage, error)

// Receive receives and resolves a message from another agent
func (a *Agent) Receive(ctx context.Context, msg *loomv1.CommunicationMessage) (interface{}, error)

Configuration via agent options:

agent := agent.NewAgent(backend, llm,
    agent.WithName("agent1"),
    agent.WithReferenceStore(refStore),
    agent.WithCommunicationPolicy(policyManager))

Usage Examples

Basic Value Message (Tier 3)

Small control messages passed inline:

// Create agents with shared reference store
policyManager := communication.NewPolicyManager()
refStore := communication.NewMemoryStore(5*time.Minute, 10*time.Minute)
defer refStore.Close()

agent1 := agent.NewAgent(backend, llm,
    agent.WithName("agent1"),
    agent.WithReferenceStore(refStore),
    agent.WithCommunicationPolicy(policyManager))

agent2 := agent.NewAgent(backend, llm,
    agent.WithName("agent2"),
    agent.WithReferenceStore(refStore),
    agent.WithCommunicationPolicy(policyManager))

// Agent 1 sends small control message
data := map[string]interface{}{
    "command": "start_processing",
    "priority": "high",
}
msg, err := agent1.Send(ctx, "agent2", "control", data)

// Message uses value semantics (no reference)
if msg.Payload.GetValue() != nil {
    fmt.Println("Using value semantics")
}

// Agent 2 receives inline value
received, err := agent2.Receive(ctx, msg)
Reference Message (Tier 1)

Large payloads stored in reference store:

// Configure policy for session_state
policyManager := communication.NewPolicyManager()
policyManager.RegisterPolicy("session_state", communication.NewSessionStatePolicy())

// Create SQLite store for persistence
refStore, err := communication.NewSQLiteStore("./data/refs.db", 3600)
defer refStore.Close()

agent1 := agent.NewAgent(backend, llm,
    agent.WithReferenceStore(refStore),
    agent.WithCommunicationPolicy(policyManager))

// Agent 1 sends large session state
largeData := map[string]interface{}{
    "session_id":   "session-123",
    "conversation": make([]string, 100),
    "metadata":     map[string]interface{}{...},
}
msg, err := agent1.Send(ctx, "agent2", "session_state", largeData)

// Message uses reference semantics
ref := msg.Payload.GetReference()
fmt.Printf("Stored as reference: %s\n", ref.Id)

// Agent2 resolves reference automatically
received, err := agent2.Receive(ctx, msg)
Auto-Promotion (Tier 2)

Automatic routing based on payload size:

// Default policy uses 10KB threshold
policyManager := communication.NewPolicyManager()
refStore := communication.NewMemoryStore(5*time.Minute, 10*time.Minute)

agent1 := agent.NewAgent(backend, llm,
    agent.WithReferenceStore(refStore),
    agent.WithCommunicationPolicy(policyManager))

// Small message → value semantics
smallData := map[string]interface{}{"status": "ok", "count": 42}
smallMsg, _ := agent1.Send(ctx, "agent2", "tool_result", smallData)
// smallMsg.Payload.GetValue() != nil

// Large message → reference semantics
largeData := map[string]interface{}{
    "data": make([]byte, 15*1024), // 15KB
}
largeMsg, _ := agent1.Send(ctx, "agent2", "tool_result", largeData)
// largeMsg.Payload.GetReference() != nil

Configuration

Server Configuration

The communication system is configured via server config:

// internal/config/config.go
type Config struct {
    Communication CommunicationConfig `yaml:"communication"`
}

type CommunicationConfig struct {
    Backend string            `yaml:"backend"` // "memory" or "sqlite"
    SQLite  SQLiteStoreConfig `yaml:"sqlite"`
}

type SQLiteStoreConfig struct {
    Path       string `yaml:"path"`
    GCInterval int    `yaml:"gc_interval"` // seconds
}

Example YAML:

communication:
  backend: sqlite
  sqlite:
    path: ./data/references.db
    gc_interval: 3600  # 1 hour

Default Configuration:

  • Backend: SQLite
  • Path: ./data/references.db
  • GC Interval: 3600 seconds (1 hour)
Factory Function

Initialize reference store from config:

// pkg/communication/factory.go
refStore, err := communication.NewReferenceStoreFromConfig(cfg.Communication)
if err != nil {
    return fmt.Errorf("failed to create reference store: %w", err)
}
defer refStore.Close()

Reference Store Implementations

MemoryStore

In-memory storage with TTL-based garbage collection:

store := communication.NewMemoryStore(
    5*time.Minute,  // GC interval
    10*time.Minute, // Entry TTL
)
defer store.Close()

// Store data
opts := communication.StoreOptions{
    Type:        loomv1.ReferenceType_REFERENCE_TYPE_SESSION_STATE,
    ContentType: "application/json",
}
ref, err := store.Store(ctx, dataBytes, opts)

// Resolve reference
data, err := store.Resolve(ctx, ref)

Use Cases:

  • Development and testing
  • Short-lived sessions
  • Single-server deployments

Limitations:

  • No persistence across restarts
  • Not suitable for distributed systems
SQLiteStore

Persistent storage with ref_counting:

store, err := communication.NewSQLiteStore(
    "./data/references.db",
    3600, // GC interval (seconds)
)
defer store.Close()

// Storage persists across sessions
ref, _ := store.Store(ctx, dataBytes, opts)

// Later session can resolve
data, _ := store.Resolve(ctx, ref)

// Check statistics
stats, _ := store.Stats(ctx)
fmt.Printf("Total refs: %d, Total bytes: %d\n",
    stats.TotalRefs, stats.TotalBytes)

Use Cases:

  • Production deployments
  • Session persistence
  • Long-lived references
  • Audit trails

Features:

  • Reference counting (increment on resolve)
  • Garbage collection based on ref_count
  • Statistics tracking (total refs, total bytes)
  • ACID guarantees
  • WAL mode for concurrent access

Testing

Running Tests
# Run all communication tests
go test -race ./pkg/communication -v

# Run specific test
go test -race ./pkg/communication -run TestMemoryStore_StoreAndResolve -v
Test Coverage

Unit Tests (28 tests):

  • MemoryStore: creation, store/resolve, deletion, GC, stats, concurrency
  • SQLiteStore: creation, store/resolve, persistence, GC, stats, WAL mode
  • PolicyManager: default policies, custom policies, policy lookup
  • Factory: config parsing, backend selection, error handling

All tests run with -race detector to ensure zero race conditions.

Test Results
✅ All unit tests pass (28 tests)
✅ Zero race conditions detected
✅ Test execution time: ~8s

Architecture Decisions

Why Three Tiers?
  1. Tier 1 (Always Reference): Large, persistent data that should never be inline

    • Session state (conversation history, context)
    • Datasets (query results, ML training data)
    • Workflow context (execution state)
  2. Tier 2 (Auto-Promote): Dynamic routing based on runtime characteristics

    • Tool results (may be small text or large JSON)
    • API responses (vary widely in size)
    • Document chunks (depends on segmentation)
  3. Tier 3 (Always Value): Small, ephemeral data that's cheaper inline

    • Control messages (start/stop/ack)
    • Status updates (progress, health checks)
    • Metadata (timestamps, agent IDs)
Why Reference IDs?

Reference IDs are SHA256 hashes of the serialized data, providing:

  • Content-addressable storage (deduplication)
  • Cryptographic integrity verification
  • Deterministic ID generation
  • No coordination required between agents
Why SQLite Default?

SQLite provides:

  • Zero configuration (embedded database)
  • ACID guarantees (data safety)
  • Cross-session persistence (survive restarts)
  • Efficient for < 1TB data (most agent workloads)
  • No external dependencies (no Redis/Postgres required)

Performance Characteristics

Value Semantics (Tier 3)
  • Latency: ~1μs (inline serialization)
  • Throughput: 100k+ msgs/sec
  • Best for: < 1KB payloads
Reference Semantics (Tier 1)
  • Memory Store: ~10μs (hash + memory write)
  • SQLite Store: ~1ms (hash + disk write)
  • Best for: > 10KB payloads
Auto-Promote (Tier 2)
  • Decision overhead: ~5μs (size check + policy lookup)
  • Threshold: 10KB default (configurable)
  • Best for: Variable-size payloads

Limitations and Future Work

Current Limitations
  • ⚠️ No distributed reference store (Redis/etcd planned)
  • ⚠️ Manual garbage collection for SQLite (automatic GC planned)
  • ⚠️ No reference expiration policies (TTL planned)
  • ⚠️ No compression for large payloads (LZ4/Zstd planned)
Planned Features
  • 📋 RedisStore for distributed deployments
  • 📋 Automatic garbage collection based on access time
  • 📋 Compression for payloads > 100KB
  • 📋 Reference expiration policies (TTL, LRU)
  • 📋 Metrics export to observability (Hawk integration)

API Reference

Agent Methods
// Send sends a message to another agent
func (a *Agent) Send(
    ctx context.Context,
    toAgent string,      // Target agent name
    messageType string,  // Message type for policy lookup
    data interface{},    // Data to send (will be JSON marshaled)
) (*loomv1.CommunicationMessage, error)

// Receive receives and resolves a message
func (a *Agent) Receive(
    ctx context.Context,
    msg *loomv1.CommunicationMessage,
) (interface{}, error)  // Returns unmarshaled data
Agent Options
// WithReferenceStore sets the reference store for this agent
func WithReferenceStore(store communication.ReferenceStore) Option

// WithCommunicationPolicy sets the communication policy manager
func WithCommunicationPolicy(policy communication.PolicyManager) Option
Reference Store Interface
type ReferenceStore interface {
    // Store stores data and returns a reference
    Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)

    // Resolve resolves a reference to data
    Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)

    // Delete removes a reference
    Delete(ctx context.Context, refID string) error

    // List lists all references
    List(ctx context.Context) ([]*loomv1.Reference, error)

    // Stats returns store statistics
    Stats(ctx context.Context) (*StoreStats, error)

    // Close closes the store
    Close() error
}
Policy Manager Interface
type PolicyManager interface {
    // GetPolicy returns the policy for a message type
    GetPolicy(messageType string) *loomv1.CommunicationPolicy

    // ShouldUseReference determines if reference should be used
    ShouldUseReference(messageType string, sizeBytes int64) bool

    // RegisterPolicy registers a custom policy
    RegisterPolicy(messageType string, policy *loomv1.CommunicationPolicy)
}

Troubleshooting

"Reference not found" errors

Cause: Reference was garbage collected or never stored

Solutions:

  1. Increase GC interval in config
  2. Check reference ID is correct
  3. Verify reference store is shared between agents
High memory usage with MemoryStore

Cause: Large payloads stored in memory without GC

Solutions:

  1. Switch to SQLiteStore for persistence
  2. Reduce GC interval and TTL
  3. Implement manual cleanup after message processing
Slow SQLite writes

Cause: Disk I/O bottleneck

Solutions:

  1. Use WAL mode (enabled by default)
  2. Increase SQLite cache size
  3. Use tmpfs/RAM disk for database file
  4. Consider Redis for distributed deployments

Branch: feature/tiered-communication Status: ✅ Implementation Complete Tests: 28 passing with -race Next: Integration tests and server deployment

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	SpanBusPublish     = "bus.publish"
	SpanBusSubscribe   = "bus.subscribe"
	SpanBusDeliver     = "bus.deliver"
	SpanBusFilter      = "bus.filter"
	SpanBusUnsubscribe = "bus.unsubscribe"
)

Hawk span constants for bus operations

View Source
const (
	SpanQueueEnqueue        = "queue.enqueue"
	SpanQueueDequeue        = "queue.dequeue"
	SpanQueueAck            = "queue.ack"
	SpanQueueRequeue        = "queue.requeue"
	SpanQueuePersist        = "queue.persist"
	SpanQueueRecover        = "queue.recover"
	SpanQueueSendAndReceive = "queue.send_and_receive"
)

Hawk span constants for queue operations

View Source
const (
	// DefaultMaxRetries is the default maximum number of retry attempts for a message
	DefaultMaxRetries = 3
	// DefaultMessageTTL is the default time-to-live for messages (24 hours)
	DefaultMessageTTL = 24 * time.Hour
)

Default queue configuration values

View Source
const (
	SpanSharedMemoryPut    = "shared_memory.put"
	SpanSharedMemoryGet    = "shared_memory.get"
	SpanSharedMemoryDelete = "shared_memory.delete"
	SpanSharedMemoryList   = "shared_memory.list"
	SpanSharedMemoryWatch  = "shared_memory.watch"
)

Hawk span constants for shared memory operations

View Source
const CompressionThreshold = 1024 // 1KB

CompressionThreshold is the minimum size in bytes to trigger automatic compression

View Source
const (
	// DefaultMessageBufferSize is the default buffer size for message channels
	DefaultMessageBufferSize = 100
)

Default configuration values

View Source
const DefaultQueueTimeout = 30

DefaultQueueTimeout is the default timeout for request-response operations (30 seconds).

Variables

This section is empty.

Functions

func DefaultPolicy

func DefaultPolicy() *loomv1.CommunicationPolicy

DefaultPolicy returns the default communication policy

func NewControlMessagePolicy

func NewControlMessagePolicy() *loomv1.CommunicationPolicy

NewControlMessagePolicy creates a policy for control messages (Tier 3: Always Value)

func NewSessionStatePolicy

func NewSessionStatePolicy() *loomv1.CommunicationPolicy

NewSessionStatePolicy creates a policy for session state (Tier 1: Always Reference)

func NewToolResultPolicy

func NewToolResultPolicy(thresholdBytes int64) *loomv1.CommunicationPolicy

NewToolResultPolicy creates a policy for tool results (Tier 2: Auto-Promote)

func NewWorkflowContextPolicy

func NewWorkflowContextPolicy() *loomv1.CommunicationPolicy

NewWorkflowContextPolicy creates a policy for workflow context (Tier 1: Always Reference)

Types

type AutoPromoteConfigParams

type AutoPromoteConfigParams struct {
	Enabled   bool
	Threshold int64 // bytes
}

AutoPromoteConfigParams holds auto-promotion configuration.

type FactoryConfig

type FactoryConfig struct {
	Store       StoreConfig
	GC          GCConfig
	AutoPromote AutoPromoteConfigParams
	Policies    PoliciesConfig
}

FactoryConfig holds all communication configuration for factory initialization.

func DefaultFactoryConfig

func DefaultFactoryConfig() FactoryConfig

DefaultFactoryConfig returns default configuration for testing/development.

type GCConfig

type GCConfig struct {
	Enabled  bool
	Strategy string // ref_counting | ttl | manual
	Interval int    // seconds
}

GCConfig holds garbage collection configuration.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore implements ReferenceStore with in-memory storage and reference counting GC

func NewMemoryStore

func NewMemoryStore(gcInterval time.Duration) *MemoryStore

NewMemoryStore creates a new in-memory reference store with GC

func (*MemoryStore) Close

func (m *MemoryStore) Close() error

Close implements ReferenceStore.Close

func (*MemoryStore) List

func (m *MemoryStore) List(ctx context.Context) ([]*loomv1.Reference, error)

List implements ReferenceStore.List

func (*MemoryStore) Release

func (m *MemoryStore) Release(ctx context.Context, refID string) error

Release implements ReferenceStore.Release

func (*MemoryStore) Resolve

func (m *MemoryStore) Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)

Resolve implements ReferenceStore.Resolve

func (*MemoryStore) Retain

func (m *MemoryStore) Retain(ctx context.Context, refID string) error

Retain implements ReferenceStore.Retain

func (*MemoryStore) Stats

func (m *MemoryStore) Stats(ctx context.Context) (*StoreStats, error)

Stats implements ReferenceStore.Stats

func (*MemoryStore) Store

func (m *MemoryStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)

Store implements ReferenceStore.Store

type MessageBus

type MessageBus struct {
	// contains filtered or unexported fields
}

MessageBus provides topic-based pub/sub for agent communication. All operations are safe for concurrent use by multiple goroutines.

func NewMessageBus

func NewMessageBus(refStore ReferenceStore, policy *PolicyManager, tracer observability.Tracer, logger *zap.Logger) *MessageBus

NewMessageBus creates a new message bus.

func (*MessageBus) Close

func (b *MessageBus) Close() error

Close shuts down the message bus and closes all subscriber channels.

func (*MessageBus) GetNotificationChannel

func (b *MessageBus) GetNotificationChannel(subscriptionID string) (chan struct{}, bool)

GetNotificationChannel returns the notification channel for a subscription, if registered.

func (*MessageBus) GetSubscriptionsByAgent

func (b *MessageBus) GetSubscriptionsByAgent(agentID string) []*Subscription

GetSubscriptionsByAgent returns all active subscriptions for an agent.

func (*MessageBus) GetTopicStats

func (b *MessageBus) GetTopicStats(ctx context.Context, topic string) (*loomv1.TopicStats, error)

GetTopicStats retrieves statistics for a topic.

func (*MessageBus) ListTopics

func (b *MessageBus) ListTopics(ctx context.Context) ([]string, error)

ListTopics returns all active topics.

func (*MessageBus) Publish

func (b *MessageBus) Publish(ctx context.Context, topic string, msg *loomv1.BusMessage) (int, int, error)

Publish sends a message to all subscribers of a topic. Returns (delivered, dropped, error). Does NOT block on slow subscribers - messages are dropped if subscriber buffers are full.

func (*MessageBus) RegisterNotificationChannel

func (b *MessageBus) RegisterNotificationChannel(subscriptionID string, notifyChan chan struct{})

RegisterNotificationChannel registers a notification channel for a subscription. When messages arrive on this subscription, the channel will be notified.

func (*MessageBus) Subscribe

func (b *MessageBus) Subscribe(ctx context.Context, agentID string, topicPattern string, filter *loomv1.SubscriptionFilter, bufferSize int) (*Subscription, error)

Subscribe creates a new subscription to a topic pattern. Topic patterns support wildcards: "workflow.*" matches "workflow.started", "workflow.completed" Returns a Subscription that contains a channel for receiving messages.

func (*MessageBus) UnregisterNotificationChannel

func (b *MessageBus) UnregisterNotificationChannel(subscriptionID string)

UnregisterNotificationChannel removes a notification channel for a subscription.

func (*MessageBus) Unsubscribe

func (b *MessageBus) Unsubscribe(ctx context.Context, subscriptionID string) error

Unsubscribe removes a subscription. The subscription's channel will be closed.

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue provides persistent message queuing for offline agents. All operations are safe for concurrent use.

func NewMessageQueue

func NewMessageQueue(dbPath string, tracer observability.Tracer, logger *zap.Logger) (*MessageQueue, error)

NewMessageQueue creates a new message queue with SQLite persistence.

func (*MessageQueue) Acknowledge

func (q *MessageQueue) Acknowledge(ctx context.Context, messageID string) error

Acknowledge marks a message as successfully processed.

func (*MessageQueue) AgentExists

func (q *MessageQueue) AgentExists(agentID string) bool

AgentExists checks if an agent exists using the registered validator. If no validator is set, returns true (permissive - allows all agents).

func (*MessageQueue) Close

func (q *MessageQueue) Close() error

Close closes the message queue and database connection.

func (*MessageQueue) Dequeue

func (q *MessageQueue) Dequeue(ctx context.Context, agentID string) (*QueueMessage, error)

Dequeue retrieves the next message for an agent. Messages are marked as in-flight and must be acknowledged or will be requeued.

func (*MessageQueue) Enqueue

func (q *MessageQueue) Enqueue(ctx context.Context, msg *QueueMessage) error

Enqueue adds a message to an agent's queue. If the agent is offline, the message is persisted to SQLite.

func (*MessageQueue) GetAgentsWithPendingMessages

func (q *MessageQueue) GetAgentsWithPendingMessages(ctx context.Context) []string

GetAgentsWithPendingMessages returns a list of agent IDs that have pending messages. This is used by the message queue monitor to trigger agents event-driven instead of polling.

func (*MessageQueue) GetNotificationChannel

func (q *MessageQueue) GetNotificationChannel(agentID string) (chan struct{}, bool)

GetNotificationChannel returns the notification channel for an agent, if registered.

func (*MessageQueue) GetPendingMessagesInfo

func (q *MessageQueue) GetPendingMessagesInfo(agentID string) []PendingMessageInfo

GetPendingMessagesInfo returns info about pending messages for an agent without dequeuing them. This is used for rich event notifications to tell the coordinator exactly what's waiting.

func (*MessageQueue) GetQueueDepth

func (q *MessageQueue) GetQueueDepth(agentID string) int

GetQueueDepth returns the number of pending messages for an agent.

func (*MessageQueue) RegisterNotificationChannel

func (q *MessageQueue) RegisterNotificationChannel(agentID string, notifyChan chan struct{})

RegisterNotificationChannel registers a notification channel for event-driven message handling. When messages arrive for this agent, the channel will be notified.

func (*MessageQueue) Requeue

func (q *MessageQueue) Requeue(ctx context.Context, messageID string) error

Requeue returns an in-flight message back to pending state for retry.

func (*MessageQueue) Send

func (q *MessageQueue) Send(ctx context.Context, fromAgent, toAgent, messageType string, payload *loomv1.MessagePayload, metadata map[string]string) (string, error)

Send is a convenience wrapper around Enqueue for fire-and-forget messaging. It creates a QueueMessage and enqueues it for the destination agent.

func (*MessageQueue) SendAndReceive

func (q *MessageQueue) SendAndReceive(ctx context.Context, fromAgent, toAgent, messageType string, payload *loomv1.MessagePayload, metadata map[string]string, timeoutSeconds int) (*loomv1.MessagePayload, error)

SendAndReceive implements request-response messaging with timeout. It sends a request and waits for a response with the specified timeout in seconds.

The correlation ID is used to match the response to this specific request. When the destination agent sends a response with the same correlation ID, it will be routed to this waiting channel.

func (*MessageQueue) SetAgentValidator

func (q *MessageQueue) SetAgentValidator(validator func(agentID string) bool)

SetAgentValidator sets the function used to validate if an agent exists. This is used by send_message to prevent messages being sent to non-existent agents.

func (*MessageQueue) UnregisterNotificationChannel

func (q *MessageQueue) UnregisterNotificationChannel(agentID string)

UnregisterNotificationChannel removes a notification channel for an agent.

type PendingMessageInfo

type PendingMessageInfo struct {
	MessageID   string
	FromAgent   string
	MessageType string
	EnqueuedAt  time.Time
	SizeBytes   int
}

PendingMessageInfo contains metadata about a pending message without the full payload.

type PoliciesConfig

type PoliciesConfig struct {
	AlwaysReference []string // Force Tier 1 (always reference)
	AlwaysValue     []string // Force Tier 3 (always value)
}

PoliciesConfig holds policy overrides.

type PolicyManager

type PolicyManager struct {
	// contains filtered or unexported fields
}

PolicyManager applies communication tier policies

func NewPolicyManager

func NewPolicyManager() *PolicyManager

NewPolicyManager creates a policy manager with default configuration

func NewPolicyManagerFromConfig

func NewPolicyManagerFromConfig(cfg FactoryConfig) *PolicyManager

NewPolicyManagerFromConfig creates a PolicyManager based on configuration.

func (*PolicyManager) GetPolicy

func (pm *PolicyManager) GetPolicy(messageType string) *loomv1.CommunicationPolicy

GetPolicy retrieves the policy for a message type, or default if not found

func (*PolicyManager) SetPolicy

func (pm *PolicyManager) SetPolicy(messageType string, policy *loomv1.CommunicationPolicy)

SetPolicy registers a policy for a specific message type

func (*PolicyManager) ShouldUseReference

func (pm *PolicyManager) ShouldUseReference(messageType string, payloadSize int64) bool

ShouldUseReference determines if a message should use reference-based communication

type QueueMessage

type QueueMessage struct {
	ID            string
	ToAgent       string
	FromAgent     string
	MessageType   string
	Payload       *loomv1.MessagePayload
	Metadata      map[string]string
	CorrelationID string // For request-response correlation
	Priority      int32
	EnqueuedAt    time.Time
	ExpiresAt     time.Time
	DequeueCount  int32
	MaxRetries    int32
	Status        QueueMessageStatus
}

QueueMessage represents a message in the queue.

type QueueMessageStatus

type QueueMessageStatus int32

QueueMessageStatus represents the state of a queued message.

const (
	QueueMessageStatusPending QueueMessageStatus = iota
	QueueMessageStatusInFlight
	QueueMessageStatusAcked
	QueueMessageStatusFailed
	QueueMessageStatusExpired
)

type ReferenceStore

type ReferenceStore interface {
	// Store data and return reference
	Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)

	// Resolve reference to actual data
	Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)

	// Retain increments reference count (retain ownership)
	Retain(ctx context.Context, refID string) error

	// Release decrements reference count (release ownership, may trigger GC)
	Release(ctx context.Context, refID string) error

	// List all references (debugging)
	List(ctx context.Context) ([]*loomv1.Reference, error)

	// Stats returns store statistics
	Stats(ctx context.Context) (*StoreStats, error)

	// Close cleans up store resources
	Close() error
}

ReferenceStore manages reference lifecycle and data storage. Implementations must be safe for concurrent use by multiple goroutines.

func NewReferenceStoreFromConfig

func NewReferenceStoreFromConfig(cfg FactoryConfig) (ReferenceStore, error)

NewReferenceStoreFromConfig creates a ReferenceStore based on configuration.

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore implements ReferenceStore with SQLite persistence

func NewSQLiteStore

func NewSQLiteStore(dbPath string, gcInterval time.Duration) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite-backed reference store with GC

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close implements ReferenceStore.Close

func (*SQLiteStore) List

func (s *SQLiteStore) List(ctx context.Context) ([]*loomv1.Reference, error)

List implements ReferenceStore.List

func (*SQLiteStore) Release

func (s *SQLiteStore) Release(ctx context.Context, refID string) error

Release implements ReferenceStore.Release

func (*SQLiteStore) Resolve

func (s *SQLiteStore) Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)

Resolve implements ReferenceStore.Resolve

func (*SQLiteStore) Retain

func (s *SQLiteStore) Retain(ctx context.Context, refID string) error

Retain implements ReferenceStore.Retain

func (*SQLiteStore) Stats

func (s *SQLiteStore) Stats(ctx context.Context) (*StoreStats, error)

Stats implements ReferenceStore.Stats

func (*SQLiteStore) Store

func (s *SQLiteStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)

Store implements ReferenceStore.Store

type SharedMemoryNamespaceStats

type SharedMemoryNamespaceStats struct {
	// contains filtered or unexported fields
}

SharedMemoryNamespaceStats tracks statistics for a single namespace.

type SharedMemoryStore

type SharedMemoryStore struct {
	// contains filtered or unexported fields
}

SharedMemoryStore provides zero-copy shared memory for agent communication. All operations are safe for concurrent use by multiple goroutines.

func NewSharedMemoryStore

func NewSharedMemoryStore(tracer observability.Tracer, logger *zap.Logger) (*SharedMemoryStore, error)

NewSharedMemoryStore creates a new shared memory store.

func (*SharedMemoryStore) Close

func (s *SharedMemoryStore) Close() error

Close closes the shared memory store and all watchers.

func (*SharedMemoryStore) Delete

Delete removes a value from shared memory with optimistic concurrency control.

func (*SharedMemoryStore) Get

Get retrieves a value from shared memory.

func (*SharedMemoryStore) GetStats

GetStats retrieves statistics for a namespace.

func (*SharedMemoryStore) List

List returns all keys matching a pattern in a namespace.

func (*SharedMemoryStore) Put

Put writes or updates a value in shared memory with optimistic concurrency control.

func (*SharedMemoryStore) Watch

Watch creates a watcher for changes in a namespace. Returns a channel that receives SharedMemoryValue updates.

type SharedMemoryWatcher

type SharedMemoryWatcher struct {
	// contains filtered or unexported fields
}

SharedMemoryWatcher represents an active watcher for a namespace.

type StoreConfig

type StoreConfig struct {
	Backend  string // memory | sqlite | redis
	Path     string // For sqlite
	RedisURL string // For redis
}

StoreConfig holds reference store configuration (mirrors cmd/looms config).

type StoreOptions

type StoreOptions struct {
	// Type categorizes the kind of data being stored
	Type loomv1.ReferenceType

	// ContentType specifies MIME type (e.g., "application/json", "text/plain")
	ContentType string

	// TTL specifies time-to-live in seconds (0 = never expires)
	TTL int64

	// Compression algorithm: "none", "gzip", "zstd"
	Compression string

	// Encoding applied: "none", "base64"
	Encoding string

	// ComputeChecksum enables integrity verification
	ComputeChecksum bool
}

StoreOptions configures reference storage behavior

type StoreStats

type StoreStats struct {
	// TotalRefs is the total references ever created
	TotalRefs int64

	// TotalBytes is the total bytes ever stored
	TotalBytes int64

	// ActiveRefs is the currently active references
	ActiveRefs int64

	// GCRuns is the number of garbage collection runs
	GCRuns int64

	// EvictionCount is the references evicted by GC
	EvictionCount int64

	// CurrentBytes is the current memory usage in bytes
	CurrentBytes int64
}

StoreStats provides statistics about reference storage

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents an agent subscribed to a topic.

type Subscription

type Subscription struct {
	ID      string
	AgentID string
	Topic   string
	Filter  *loomv1.SubscriptionFilter // Filter for this subscription
	Channel <-chan *loomv1.BusMessage  // Receive-only for external consumers

	Created time.Time
	// contains filtered or unexported fields
}

Subscription represents an active subscription. Returned to caller to receive messages.

type TopicBroadcaster

type TopicBroadcaster struct {
	// contains filtered or unexported fields
}

TopicBroadcaster manages subscribers for a single topic. Safe for concurrent use.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL