Documentation
¶
Overview ¶
Copyright 2026 Teradata
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- func DefaultPolicy() *loomv1.CommunicationPolicy
- func NewControlMessagePolicy() *loomv1.CommunicationPolicy
- func NewSessionStatePolicy() *loomv1.CommunicationPolicy
- func NewToolResultPolicy(thresholdBytes int64) *loomv1.CommunicationPolicy
- func NewWorkflowContextPolicy() *loomv1.CommunicationPolicy
- type AutoPromoteConfigParams
- type FactoryConfig
- type GCConfig
- type MemoryStore
- func (m *MemoryStore) Close() error
- func (m *MemoryStore) List(ctx context.Context) ([]*loomv1.Reference, error)
- func (m *MemoryStore) Release(ctx context.Context, refID string) error
- func (m *MemoryStore) Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)
- func (m *MemoryStore) Retain(ctx context.Context, refID string) error
- func (m *MemoryStore) Stats(ctx context.Context) (*StoreStats, error)
- func (m *MemoryStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
- type MessageBus
- func (b *MessageBus) Close() error
- func (b *MessageBus) GetNotificationChannel(subscriptionID string) (chan struct{}, bool)
- func (b *MessageBus) GetSubscriptionsByAgent(agentID string) []*Subscription
- func (b *MessageBus) GetTopicStats(ctx context.Context, topic string) (*loomv1.TopicStats, error)
- func (b *MessageBus) ListTopics(ctx context.Context) ([]string, error)
- func (b *MessageBus) Publish(ctx context.Context, topic string, msg *loomv1.BusMessage) (int, int, error)
- func (b *MessageBus) RegisterNotificationChannel(subscriptionID string, notifyChan chan struct{})
- func (b *MessageBus) Subscribe(ctx context.Context, agentID string, topicPattern string, ...) (*Subscription, error)
- func (b *MessageBus) UnregisterNotificationChannel(subscriptionID string)
- func (b *MessageBus) Unsubscribe(ctx context.Context, subscriptionID string) error
- type MessageQueue
- func (q *MessageQueue) Acknowledge(ctx context.Context, messageID string) error
- func (q *MessageQueue) AgentExists(agentID string) bool
- func (q *MessageQueue) Close() error
- func (q *MessageQueue) Dequeue(ctx context.Context, agentID string) (*QueueMessage, error)
- func (q *MessageQueue) Enqueue(ctx context.Context, msg *QueueMessage) error
- func (q *MessageQueue) GetAgentsWithPendingMessages(ctx context.Context) []string
- func (q *MessageQueue) GetNotificationChannel(agentID string) (chan struct{}, bool)
- func (q *MessageQueue) GetPendingMessagesInfo(agentID string) []PendingMessageInfo
- func (q *MessageQueue) GetQueueDepth(agentID string) int
- func (q *MessageQueue) RegisterNotificationChannel(agentID string, notifyChan chan struct{})
- func (q *MessageQueue) Requeue(ctx context.Context, messageID string) error
- func (q *MessageQueue) Send(ctx context.Context, fromAgent, toAgent, messageType string, ...) (string, error)
- func (q *MessageQueue) SendAndReceive(ctx context.Context, fromAgent, toAgent, messageType string, ...) (*loomv1.MessagePayload, error)
- func (q *MessageQueue) SetAgentValidator(validator func(agentID string) bool)
- func (q *MessageQueue) UnregisterNotificationChannel(agentID string)
- type PendingMessageInfo
- type PoliciesConfig
- type PolicyManager
- type QueueMessage
- type QueueMessageStatus
- type ReferenceStore
- type SQLiteStore
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) List(ctx context.Context) ([]*loomv1.Reference, error)
- func (s *SQLiteStore) Release(ctx context.Context, refID string) error
- func (s *SQLiteStore) Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)
- func (s *SQLiteStore) Retain(ctx context.Context, refID string) error
- func (s *SQLiteStore) Stats(ctx context.Context) (*StoreStats, error)
- func (s *SQLiteStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
- type SharedMemoryNamespaceStats
- type SharedMemoryStore
- func (s *SharedMemoryStore) Close() error
- func (s *SharedMemoryStore) Delete(ctx context.Context, req *loomv1.DeleteSharedMemoryRequest) (*loomv1.DeleteSharedMemoryResponse, error)
- func (s *SharedMemoryStore) Get(ctx context.Context, req *loomv1.GetSharedMemoryRequest) (*loomv1.GetSharedMemoryResponse, error)
- func (s *SharedMemoryStore) GetStats(ctx context.Context, namespace loomv1.SharedMemoryNamespace) (*loomv1.SharedMemoryStats, error)
- func (s *SharedMemoryStore) List(ctx context.Context, req *loomv1.ListSharedMemoryKeysRequest) (*loomv1.ListSharedMemoryKeysResponse, error)
- func (s *SharedMemoryStore) Put(ctx context.Context, req *loomv1.PutSharedMemoryRequest) (*loomv1.PutSharedMemoryResponse, error)
- func (s *SharedMemoryStore) Watch(ctx context.Context, req *loomv1.WatchSharedMemoryRequest) (<-chan *loomv1.SharedMemoryValue, error)
- type SharedMemoryWatcher
- type StoreConfig
- type StoreOptions
- type StoreStats
- type Subscriber
- type Subscription
- type TopicBroadcaster
Constants ¶
const ( SpanBusPublish = "bus.publish" SpanBusSubscribe = "bus.subscribe" SpanBusDeliver = "bus.deliver" SpanBusFilter = "bus.filter" SpanBusUnsubscribe = "bus.unsubscribe" )
Hawk span constants for bus operations
const ( SpanQueueEnqueue = "queue.enqueue" SpanQueueDequeue = "queue.dequeue" SpanQueueAck = "queue.ack" SpanQueueRequeue = "queue.requeue" SpanQueuePersist = "queue.persist" SpanQueueRecover = "queue.recover" SpanQueueSendAndReceive = "queue.send_and_receive" )
Hawk span constants for queue operations
const ( // DefaultMaxRetries is the default maximum number of retry attempts for a message DefaultMaxRetries = 3 // DefaultMessageTTL is the default time-to-live for messages (24 hours) DefaultMessageTTL = 24 * time.Hour )
Default queue configuration values
const ( )
Hawk span constants for shared memory operations
const CompressionThreshold = 1024 // 1KB
CompressionThreshold is the minimum size in bytes to trigger automatic compression
const (
// DefaultMessageBufferSize is the default buffer size for message channels
DefaultMessageBufferSize = 100
)
Default configuration values
const DefaultQueueTimeout = 30
DefaultQueueTimeout is the default timeout for request-response operations (30 seconds).
Variables ¶
This section is empty.
Functions ¶
func DefaultPolicy ¶
func DefaultPolicy() *loomv1.CommunicationPolicy
DefaultPolicy returns the default communication policy
func NewControlMessagePolicy ¶
func NewControlMessagePolicy() *loomv1.CommunicationPolicy
NewControlMessagePolicy creates a policy for control messages (Tier 3: Always Value)
func NewSessionStatePolicy ¶
func NewSessionStatePolicy() *loomv1.CommunicationPolicy
NewSessionStatePolicy creates a policy for session state (Tier 1: Always Reference)
func NewToolResultPolicy ¶
func NewToolResultPolicy(thresholdBytes int64) *loomv1.CommunicationPolicy
NewToolResultPolicy creates a policy for tool results (Tier 2: Auto-Promote)
func NewWorkflowContextPolicy ¶
func NewWorkflowContextPolicy() *loomv1.CommunicationPolicy
NewWorkflowContextPolicy creates a policy for workflow context (Tier 1: Always Reference)
Types ¶
type AutoPromoteConfigParams ¶
AutoPromoteConfigParams holds auto-promotion configuration.
type FactoryConfig ¶
type FactoryConfig struct {
Store StoreConfig
GC GCConfig
AutoPromote AutoPromoteConfigParams
Policies PoliciesConfig
}
FactoryConfig holds all communication configuration for factory initialization.
func DefaultFactoryConfig ¶
func DefaultFactoryConfig() FactoryConfig
DefaultFactoryConfig returns default configuration for testing/development.
type GCConfig ¶
type GCConfig struct {
Enabled bool
Strategy string // ref_counting | ttl | manual
Interval int // seconds
}
GCConfig holds garbage collection configuration.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore implements ReferenceStore with in-memory storage and reference counting GC
func NewMemoryStore ¶
func NewMemoryStore(gcInterval time.Duration) *MemoryStore
NewMemoryStore creates a new in-memory reference store with GC
func (*MemoryStore) Close ¶
func (m *MemoryStore) Close() error
Close implements ReferenceStore.Close
func (*MemoryStore) Release ¶
func (m *MemoryStore) Release(ctx context.Context, refID string) error
Release implements ReferenceStore.Release
func (*MemoryStore) Retain ¶
func (m *MemoryStore) Retain(ctx context.Context, refID string) error
Retain implements ReferenceStore.Retain
func (*MemoryStore) Stats ¶
func (m *MemoryStore) Stats(ctx context.Context) (*StoreStats, error)
Stats implements ReferenceStore.Stats
func (*MemoryStore) Store ¶
func (m *MemoryStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
Store implements ReferenceStore.Store
type MessageBus ¶
type MessageBus struct {
// contains filtered or unexported fields
}
MessageBus provides topic-based pub/sub for agent communication. All operations are safe for concurrent use by multiple goroutines.
func NewMessageBus ¶
func NewMessageBus(refStore ReferenceStore, policy *PolicyManager, tracer observability.Tracer, logger *zap.Logger) *MessageBus
NewMessageBus creates a new message bus.
func (*MessageBus) Close ¶
func (b *MessageBus) Close() error
Close shuts down the message bus and closes all subscriber channels.
func (*MessageBus) GetNotificationChannel ¶
func (b *MessageBus) GetNotificationChannel(subscriptionID string) (chan struct{}, bool)
GetNotificationChannel returns the notification channel for a subscription, if registered.
func (*MessageBus) GetSubscriptionsByAgent ¶
func (b *MessageBus) GetSubscriptionsByAgent(agentID string) []*Subscription
GetSubscriptionsByAgent returns all active subscriptions for an agent.
func (*MessageBus) GetTopicStats ¶
func (b *MessageBus) GetTopicStats(ctx context.Context, topic string) (*loomv1.TopicStats, error)
GetTopicStats retrieves statistics for a topic.
func (*MessageBus) ListTopics ¶
func (b *MessageBus) ListTopics(ctx context.Context) ([]string, error)
ListTopics returns all active topics.
func (*MessageBus) Publish ¶
func (b *MessageBus) Publish(ctx context.Context, topic string, msg *loomv1.BusMessage) (int, int, error)
Publish sends a message to all subscribers of a topic. Returns (delivered, dropped, error). Does NOT block on slow subscribers - messages are dropped if subscriber buffers are full.
func (*MessageBus) RegisterNotificationChannel ¶
func (b *MessageBus) RegisterNotificationChannel(subscriptionID string, notifyChan chan struct{})
RegisterNotificationChannel registers a notification channel for a subscription. When messages arrive on this subscription, the channel will be notified.
func (*MessageBus) Subscribe ¶
func (b *MessageBus) Subscribe(ctx context.Context, agentID string, topicPattern string, filter *loomv1.SubscriptionFilter, bufferSize int) (*Subscription, error)
Subscribe creates a new subscription to a topic pattern. Topic patterns support wildcards: "workflow.*" matches "workflow.started", "workflow.completed" Returns a Subscription that contains a channel for receiving messages.
func (*MessageBus) UnregisterNotificationChannel ¶
func (b *MessageBus) UnregisterNotificationChannel(subscriptionID string)
UnregisterNotificationChannel removes a notification channel for a subscription.
func (*MessageBus) Unsubscribe ¶
func (b *MessageBus) Unsubscribe(ctx context.Context, subscriptionID string) error
Unsubscribe removes a subscription. The subscription's channel will be closed.
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue provides persistent message queuing for offline agents. All operations are safe for concurrent use.
func NewMessageQueue ¶
func NewMessageQueue(dbPath string, tracer observability.Tracer, logger *zap.Logger) (*MessageQueue, error)
NewMessageQueue creates a new message queue with SQLite persistence.
func (*MessageQueue) Acknowledge ¶
func (q *MessageQueue) Acknowledge(ctx context.Context, messageID string) error
Acknowledge marks a message as successfully processed.
func (*MessageQueue) AgentExists ¶
func (q *MessageQueue) AgentExists(agentID string) bool
AgentExists checks if an agent exists using the registered validator. If no validator is set, returns true (permissive - allows all agents).
func (*MessageQueue) Close ¶
func (q *MessageQueue) Close() error
Close closes the message queue and database connection.
func (*MessageQueue) Dequeue ¶
func (q *MessageQueue) Dequeue(ctx context.Context, agentID string) (*QueueMessage, error)
Dequeue retrieves the next message for an agent. Messages are marked as in-flight and must be acknowledged or will be requeued.
func (*MessageQueue) Enqueue ¶
func (q *MessageQueue) Enqueue(ctx context.Context, msg *QueueMessage) error
Enqueue adds a message to an agent's queue. If the agent is offline, the message is persisted to SQLite.
func (*MessageQueue) GetAgentsWithPendingMessages ¶
func (q *MessageQueue) GetAgentsWithPendingMessages(ctx context.Context) []string
GetAgentsWithPendingMessages returns a list of agent IDs that have pending messages. This is used by the message queue monitor to trigger agents event-driven instead of polling.
func (*MessageQueue) GetNotificationChannel ¶
func (q *MessageQueue) GetNotificationChannel(agentID string) (chan struct{}, bool)
GetNotificationChannel returns the notification channel for an agent, if registered.
func (*MessageQueue) GetPendingMessagesInfo ¶
func (q *MessageQueue) GetPendingMessagesInfo(agentID string) []PendingMessageInfo
GetPendingMessagesInfo returns info about pending messages for an agent without dequeuing them. This is used for rich event notifications to tell the coordinator exactly what's waiting.
func (*MessageQueue) GetQueueDepth ¶
func (q *MessageQueue) GetQueueDepth(agentID string) int
GetQueueDepth returns the number of pending messages for an agent.
func (*MessageQueue) RegisterNotificationChannel ¶
func (q *MessageQueue) RegisterNotificationChannel(agentID string, notifyChan chan struct{})
RegisterNotificationChannel registers a notification channel for event-driven message handling. When messages arrive for this agent, the channel will be notified.
func (*MessageQueue) Requeue ¶
func (q *MessageQueue) Requeue(ctx context.Context, messageID string) error
Requeue returns an in-flight message back to pending state for retry.
func (*MessageQueue) Send ¶
func (q *MessageQueue) Send(ctx context.Context, fromAgent, toAgent, messageType string, payload *loomv1.MessagePayload, metadata map[string]string) (string, error)
Send is a convenience wrapper around Enqueue for fire-and-forget messaging. It creates a QueueMessage and enqueues it for the destination agent.
func (*MessageQueue) SendAndReceive ¶
func (q *MessageQueue) SendAndReceive(ctx context.Context, fromAgent, toAgent, messageType string, payload *loomv1.MessagePayload, metadata map[string]string, timeoutSeconds int) (*loomv1.MessagePayload, error)
SendAndReceive implements request-response messaging with timeout. It sends a request and waits for a response with the specified timeout in seconds.
The correlation ID is used to match the response to this specific request. When the destination agent sends a response with the same correlation ID, it will be routed to this waiting channel.
func (*MessageQueue) SetAgentValidator ¶
func (q *MessageQueue) SetAgentValidator(validator func(agentID string) bool)
SetAgentValidator sets the function used to validate if an agent exists. This is used by send_message to prevent messages being sent to non-existent agents.
func (*MessageQueue) UnregisterNotificationChannel ¶
func (q *MessageQueue) UnregisterNotificationChannel(agentID string)
UnregisterNotificationChannel removes a notification channel for an agent.
type PendingMessageInfo ¶
type PendingMessageInfo struct {
MessageID string
FromAgent string
MessageType string
EnqueuedAt time.Time
SizeBytes int
}
PendingMessageInfo contains metadata about a pending message without the full payload.
type PoliciesConfig ¶
type PoliciesConfig struct {
AlwaysReference []string // Force Tier 1 (always reference)
AlwaysValue []string // Force Tier 3 (always value)
}
PoliciesConfig holds policy overrides.
type PolicyManager ¶
type PolicyManager struct {
// contains filtered or unexported fields
}
PolicyManager applies communication tier policies
func NewPolicyManager ¶
func NewPolicyManager() *PolicyManager
NewPolicyManager creates a policy manager with default configuration
func NewPolicyManagerFromConfig ¶
func NewPolicyManagerFromConfig(cfg FactoryConfig) *PolicyManager
NewPolicyManagerFromConfig creates a PolicyManager based on configuration.
func (*PolicyManager) GetPolicy ¶
func (pm *PolicyManager) GetPolicy(messageType string) *loomv1.CommunicationPolicy
GetPolicy retrieves the policy for a message type, or default if not found
func (*PolicyManager) SetPolicy ¶
func (pm *PolicyManager) SetPolicy(messageType string, policy *loomv1.CommunicationPolicy)
SetPolicy registers a policy for a specific message type
func (*PolicyManager) ShouldUseReference ¶
func (pm *PolicyManager) ShouldUseReference(messageType string, payloadSize int64) bool
ShouldUseReference determines if a message should use reference-based communication
type QueueMessage ¶
type QueueMessage struct {
ID string
ToAgent string
FromAgent string
MessageType string
Payload *loomv1.MessagePayload
Metadata map[string]string
CorrelationID string // For request-response correlation
Priority int32
EnqueuedAt time.Time
ExpiresAt time.Time
DequeueCount int32
MaxRetries int32
Status QueueMessageStatus
}
QueueMessage represents a message in the queue.
type QueueMessageStatus ¶
type QueueMessageStatus int32
QueueMessageStatus represents the state of a queued message.
const ( QueueMessageStatusPending QueueMessageStatus = iota QueueMessageStatusInFlight QueueMessageStatusAcked QueueMessageStatusFailed QueueMessageStatusExpired )
type ReferenceStore ¶
type ReferenceStore interface {
// Store data and return reference
Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
// Resolve reference to actual data
Resolve(ctx context.Context, ref *loomv1.Reference) ([]byte, error)
// Retain increments reference count (retain ownership)
Retain(ctx context.Context, refID string) error
// Release decrements reference count (release ownership, may trigger GC)
Release(ctx context.Context, refID string) error
// List all references (debugging)
List(ctx context.Context) ([]*loomv1.Reference, error)
// Stats returns store statistics
Stats(ctx context.Context) (*StoreStats, error)
// Close cleans up store resources
Close() error
}
ReferenceStore manages reference lifecycle and data storage. Implementations must be safe for concurrent use by multiple goroutines.
func NewReferenceStoreFromConfig ¶
func NewReferenceStoreFromConfig(cfg FactoryConfig) (ReferenceStore, error)
NewReferenceStoreFromConfig creates a ReferenceStore based on configuration.
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore implements ReferenceStore with SQLite persistence
func NewSQLiteStore ¶
func NewSQLiteStore(dbPath string, gcInterval time.Duration) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite-backed reference store with GC
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close implements ReferenceStore.Close
func (*SQLiteStore) Release ¶
func (s *SQLiteStore) Release(ctx context.Context, refID string) error
Release implements ReferenceStore.Release
func (*SQLiteStore) Retain ¶
func (s *SQLiteStore) Retain(ctx context.Context, refID string) error
Retain implements ReferenceStore.Retain
func (*SQLiteStore) Stats ¶
func (s *SQLiteStore) Stats(ctx context.Context) (*StoreStats, error)
Stats implements ReferenceStore.Stats
func (*SQLiteStore) Store ¶
func (s *SQLiteStore) Store(ctx context.Context, data []byte, opts StoreOptions) (*loomv1.Reference, error)
Store implements ReferenceStore.Store
type SharedMemoryNamespaceStats ¶
type SharedMemoryNamespaceStats struct {
// contains filtered or unexported fields
}
SharedMemoryNamespaceStats tracks statistics for a single namespace.
type SharedMemoryStore ¶
type SharedMemoryStore struct {
// contains filtered or unexported fields
}
SharedMemoryStore provides zero-copy shared memory for agent communication. All operations are safe for concurrent use by multiple goroutines.
func NewSharedMemoryStore ¶
func NewSharedMemoryStore(tracer observability.Tracer, logger *zap.Logger) (*SharedMemoryStore, error)
NewSharedMemoryStore creates a new shared memory store.
func (*SharedMemoryStore) Close ¶
func (s *SharedMemoryStore) Close() error
Close closes the shared memory store and all watchers.
func (*SharedMemoryStore) Delete ¶
func (s *SharedMemoryStore) Delete(ctx context.Context, req *loomv1.DeleteSharedMemoryRequest) (*loomv1.DeleteSharedMemoryResponse, error)
Delete removes a value from shared memory with optimistic concurrency control.
func (*SharedMemoryStore) Get ¶
func (s *SharedMemoryStore) Get(ctx context.Context, req *loomv1.GetSharedMemoryRequest) (*loomv1.GetSharedMemoryResponse, error)
Get retrieves a value from shared memory.
func (*SharedMemoryStore) GetStats ¶
func (s *SharedMemoryStore) GetStats(ctx context.Context, namespace loomv1.SharedMemoryNamespace) (*loomv1.SharedMemoryStats, error)
GetStats retrieves statistics for a namespace.
func (*SharedMemoryStore) List ¶
func (s *SharedMemoryStore) List(ctx context.Context, req *loomv1.ListSharedMemoryKeysRequest) (*loomv1.ListSharedMemoryKeysResponse, error)
List returns all keys matching a pattern in a namespace.
func (*SharedMemoryStore) Put ¶
func (s *SharedMemoryStore) Put(ctx context.Context, req *loomv1.PutSharedMemoryRequest) (*loomv1.PutSharedMemoryResponse, error)
Put writes or updates a value in shared memory with optimistic concurrency control.
func (*SharedMemoryStore) Watch ¶
func (s *SharedMemoryStore) Watch(ctx context.Context, req *loomv1.WatchSharedMemoryRequest) (<-chan *loomv1.SharedMemoryValue, error)
Watch creates a watcher for changes in a namespace. Returns a channel that receives SharedMemoryValue updates.
type SharedMemoryWatcher ¶
type SharedMemoryWatcher struct {
// contains filtered or unexported fields
}
SharedMemoryWatcher represents an active watcher for a namespace.
type StoreConfig ¶
type StoreConfig struct {
Backend string // memory | sqlite | redis
Path string // For sqlite
RedisURL string // For redis
}
StoreConfig holds reference store configuration (mirrors cmd/looms config).
type StoreOptions ¶
type StoreOptions struct {
// Type categorizes the kind of data being stored
Type loomv1.ReferenceType
// ContentType specifies MIME type (e.g., "application/json", "text/plain")
ContentType string
// TTL specifies time-to-live in seconds (0 = never expires)
TTL int64
// Compression algorithm: "none", "gzip", "zstd"
Compression string
// Encoding applied: "none", "base64"
Encoding string
// ComputeChecksum enables integrity verification
ComputeChecksum bool
}
StoreOptions configures reference storage behavior
type StoreStats ¶
type StoreStats struct {
// TotalRefs is the total references ever created
TotalRefs int64
// TotalBytes is the total bytes ever stored
TotalBytes int64
// ActiveRefs is the currently active references
ActiveRefs int64
// GCRuns is the number of garbage collection runs
GCRuns int64
// EvictionCount is the references evicted by GC
EvictionCount int64
// CurrentBytes is the current memory usage in bytes
CurrentBytes int64
}
StoreStats provides statistics about reference storage
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents an agent subscribed to a topic.
type Subscription ¶
type Subscription struct {
ID string
AgentID string
Topic string
Filter *loomv1.SubscriptionFilter // Filter for this subscription
Channel <-chan *loomv1.BusMessage // Receive-only for external consumers
Created time.Time
// contains filtered or unexported fields
}
Subscription represents an active subscription. Returned to caller to receive messages.
type TopicBroadcaster ¶
type TopicBroadcaster struct {
// contains filtered or unexported fields
}
TopicBroadcaster manages subscribers for a single topic. Safe for concurrent use.