Documentation
¶
Overview ¶
Package messagequeue provides AMQ implementation
Package messagequeue provides interfaces for agent communication
Index ¶
- func MessageToPayload(msg *Message) ([]byte, error)
- type AMQFactory
- type AMQMessageQueue
- func (mq *AMQMessageQueue) Close() error
- func (mq *AMQMessageQueue) CreateQueue(ctx context.Context, name string) error
- func (mq *AMQMessageQueue) DeleteQueue(ctx context.Context, name string) error
- func (mq *AMQMessageQueue) GetAsyncConsumer(agentID string, metadata map[string]string) (client.AsyncConsumer, error)
- func (mq *AMQMessageQueue) GetClient(agentID string, metadata map[string]string) (client.Client, error)
- func (mq *AMQMessageQueue) GetQueueStats(ctx context.Context, queueName string) (*types.QueueStats, error)
- func (mq *AMQMessageQueue) ListQueues(ctx context.Context) ([]*types.Queue, error)
- func (mq *AMQMessageQueue) PublishTask(ctx context.Context, from, topic string, message *Message) error
- func (mq *AMQMessageQueue) SendMessage(ctx context.Context, from, to string, message *Message) error
- type Config
- type Factory
- type Message
- type MessageHandler
- type MessageQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MessageToPayload ¶
MessageToPayload converts a Message to JSON bytes
Types ¶
type AMQFactory ¶
type AMQFactory struct{}
AMQFactory creates AMQ message queue instances
func (*AMQFactory) Create ¶
func (f *AMQFactory) Create(config Config) (MessageQueue, error)
Create creates a new AMQ message queue instance
type AMQMessageQueue ¶
type AMQMessageQueue struct {
// contains filtered or unexported fields
}
AMQMessageQueue implements MessageQueue using the AMQ package
func NewAMQMessageQueue ¶
func NewAMQMessageQueue(config Config) (*AMQMessageQueue, error)
NewAMQMessageQueue creates a new AMQ-based message queue
func (*AMQMessageQueue) Close ¶
func (mq *AMQMessageQueue) Close() error
Close closes the message queue connection
func (*AMQMessageQueue) CreateQueue ¶
func (mq *AMQMessageQueue) CreateQueue(ctx context.Context, name string) error
CreateQueue creates a new queue/topic
func (*AMQMessageQueue) DeleteQueue ¶
func (mq *AMQMessageQueue) DeleteQueue(ctx context.Context, name string) error
DeleteQueue removes a queue/topic
func (*AMQMessageQueue) GetAsyncConsumer ¶
func (mq *AMQMessageQueue) GetAsyncConsumer(agentID string, metadata map[string]string) (client.AsyncConsumer, error)
GetAsyncConsumer returns an async consumer for an agent
func (*AMQMessageQueue) GetClient ¶
func (mq *AMQMessageQueue) GetClient(agentID string, metadata map[string]string) (client.Client, error)
GetClient returns an AMQ client for an agent
func (*AMQMessageQueue) GetQueueStats ¶
func (mq *AMQMessageQueue) GetQueueStats(ctx context.Context, queueName string) (*types.QueueStats, error)
GetQueueStats returns queue statistics
func (*AMQMessageQueue) ListQueues ¶
ListQueues returns all queues
func (*AMQMessageQueue) PublishTask ¶
func (mq *AMQMessageQueue) PublishTask(ctx context.Context, from, topic string, message *Message) error
PublishTask publishes a task to a topic
func (*AMQMessageQueue) SendMessage ¶
func (mq *AMQMessageQueue) SendMessage(ctx context.Context, from, to string, message *Message) error
SendMessage sends a message directly between agents
type Config ¶
type Config struct {
StorePath string // Path for BadgerDB storage
WorkerPoolSize int // Number of workers per queue
MessageTimeout int // Message timeout in seconds
}
Config holds message queue configuration
type Factory ¶
type Factory interface {
Create(config Config) (MessageQueue, error)
}
Factory creates message queue instances
type Message ¶
type Message struct {
ID string `json:"id"`
Topic string `json:"topic"`
From string `json:"from"` // Agent ID
To string `json:"to,omitempty"` // Agent ID or empty for broadcast
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
Timestamp int64 `json:"timestamp"`
}
Message represents a message in the queue
func PayloadToMessage ¶
PayloadToMessage converts JSON bytes to a Message
type MessageHandler ¶
MessageHandler is a callback for handling messages
type MessageQueue ¶
type MessageQueue interface {
// GetClient returns an AMQ client for an agent
GetClient(agentID string, metadata map[string]string) (client.Client, error)
// GetAsyncConsumer returns an async consumer for an agent
GetAsyncConsumer(agentID string, metadata map[string]string) (client.AsyncConsumer, error)
// CreateQueue creates a new queue/topic
CreateQueue(ctx context.Context, name string) error
// DeleteQueue removes a queue/topic
DeleteQueue(ctx context.Context, name string) error
// SendMessage sends a message directly between agents
SendMessage(ctx context.Context, from, to string, message *Message) error
// PublishTask publishes a task to a topic
PublishTask(ctx context.Context, from, topic string, message *Message) error
// GetQueueStats returns queue statistics
GetQueueStats(ctx context.Context, queueName string) (*types.QueueStats, error)
// ListQueues returns all queues
ListQueues(ctx context.Context) ([]*types.Queue, error)
// Close closes the message queue connection
Close() error
}
MessageQueue defines the interface for agent message queue operations