messagequeue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package messagequeue provides AMQ implementation

Package messagequeue provides interfaces for agent communication

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageToPayload

func MessageToPayload(msg *Message) ([]byte, error)

MessageToPayload converts a Message to JSON bytes

Types

type AMQFactory

type AMQFactory struct{}

AMQFactory creates AMQ message queue instances

func (*AMQFactory) Create

func (f *AMQFactory) Create(config Config) (MessageQueue, error)

Create creates a new AMQ message queue instance

type AMQMessageQueue

type AMQMessageQueue struct {
	// contains filtered or unexported fields
}

AMQMessageQueue implements MessageQueue using the AMQ package

func NewAMQMessageQueue

func NewAMQMessageQueue(config Config) (*AMQMessageQueue, error)

NewAMQMessageQueue creates a new AMQ-based message queue

func (*AMQMessageQueue) Close

func (mq *AMQMessageQueue) Close() error

Close closes the message queue connection

func (*AMQMessageQueue) CreateQueue

func (mq *AMQMessageQueue) CreateQueue(ctx context.Context, name string) error

CreateQueue creates a new queue/topic

func (*AMQMessageQueue) DeleteQueue

func (mq *AMQMessageQueue) DeleteQueue(ctx context.Context, name string) error

DeleteQueue removes a queue/topic

func (*AMQMessageQueue) GetAsyncConsumer

func (mq *AMQMessageQueue) GetAsyncConsumer(agentID string, metadata map[string]string) (client.AsyncConsumer, error)

GetAsyncConsumer returns an async consumer for an agent

func (*AMQMessageQueue) GetClient

func (mq *AMQMessageQueue) GetClient(agentID string, metadata map[string]string) (client.Client, error)

GetClient returns an AMQ client for an agent

func (*AMQMessageQueue) GetQueueStats

func (mq *AMQMessageQueue) GetQueueStats(ctx context.Context, queueName string) (*types.QueueStats, error)

GetQueueStats returns queue statistics

func (*AMQMessageQueue) ListQueues

func (mq *AMQMessageQueue) ListQueues(ctx context.Context) ([]*types.Queue, error)

ListQueues returns all queues

func (*AMQMessageQueue) PublishTask

func (mq *AMQMessageQueue) PublishTask(ctx context.Context, from, topic string, message *Message) error

PublishTask publishes a task to a topic

func (*AMQMessageQueue) SendMessage

func (mq *AMQMessageQueue) SendMessage(ctx context.Context, from, to string, message *Message) error

SendMessage sends a message directly between agents

type Config

type Config struct {
	StorePath      string // Path for BadgerDB storage
	WorkerPoolSize int    // Number of workers per queue
	MessageTimeout int    // Message timeout in seconds
}

Config holds message queue configuration

type Factory

type Factory interface {
	Create(config Config) (MessageQueue, error)
}

Factory creates message queue instances

type Message

type Message struct {
	ID        string                 `json:"id"`
	Topic     string                 `json:"topic"`
	From      string                 `json:"from"`         // Agent ID
	To        string                 `json:"to,omitempty"` // Agent ID or empty for broadcast
	Type      string                 `json:"type"`
	Payload   map[string]interface{} `json:"payload"`
	Timestamp int64                  `json:"timestamp"`
}

Message represents a message in the queue

func PayloadToMessage

func PayloadToMessage(payload []byte) (*Message, error)

PayloadToMessage converts JSON bytes to a Message

type MessageHandler

type MessageHandler func(ctx context.Context, msg *Message) error

MessageHandler is a callback for handling messages

type MessageQueue

type MessageQueue interface {
	// GetClient returns an AMQ client for an agent
	GetClient(agentID string, metadata map[string]string) (client.Client, error)

	// GetAsyncConsumer returns an async consumer for an agent
	GetAsyncConsumer(agentID string, metadata map[string]string) (client.AsyncConsumer, error)

	// CreateQueue creates a new queue/topic
	CreateQueue(ctx context.Context, name string) error

	// DeleteQueue removes a queue/topic
	DeleteQueue(ctx context.Context, name string) error

	// SendMessage sends a message directly between agents
	SendMessage(ctx context.Context, from, to string, message *Message) error

	// PublishTask publishes a task to a topic
	PublishTask(ctx context.Context, from, topic string, message *Message) error

	// GetQueueStats returns queue statistics
	GetQueueStats(ctx context.Context, queueName string) (*types.QueueStats, error)

	// ListQueues returns all queues
	ListQueues(ctx context.Context) ([]*types.Queue, error)

	// Close closes the message queue connection
	Close() error
}

MessageQueue defines the interface for agent message queue operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL