messaging

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: MIT Imports: 5 Imported by: 0

README

Messaging Library

The Messaging library provides a unified interface for messaging operations across multiple providers including Kafka, NATS, RabbitMQ, and SQS. It offers comprehensive messaging capabilities with pub/sub, request/reply, message routing, filtering, and advanced features like batching, partitioning, and dead letter queues.

Features

  • Multi-Provider Support: Kafka, NATS, RabbitMQ, SQS, and more
  • Pub/Sub Messaging: Publish and subscribe to topics
  • Request/Reply: Synchronous request-reply patterns
  • Message Routing: Advanced message routing and filtering
  • Batch Operations: Efficient batch publishing and processing
  • Message Ordering: Guaranteed message ordering and deduplication
  • Dead Letter Queues: Automatic handling of failed messages
  • Message Scheduling: Delayed and scheduled message delivery
  • Priority Queues: Message priority handling
  • Health Monitoring: Provider health checks and statistics

Supported Providers

  • Kafka: Apache Kafka with advanced features
  • NATS: NATS messaging system
  • RabbitMQ: RabbitMQ message broker
  • SQS: Amazon Simple Queue Service
  • Custom: Custom messaging providers

Installation

go get github.com/anasamu/go-micro-libs/messaging

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/anasamu/go-micro-libs/messaging"
    "github.com/anasamu/go-micro-libs/messaging/types"
    "github.com/sirupsen/logrus"
)

func main() {
    // Create logger
    logger := logrus.New()

    // Create messaging manager with default config
    config := messaging.DefaultManagerConfig()
    manager := messaging.NewMessagingManager(config, logger)

    // Register Kafka provider (example)
    // kafkaProvider := kafka.NewKafkaProvider("localhost:9092")
    // manager.RegisterProvider(kafkaProvider)

    // Connect to messaging system
    ctx := context.Background()
    err := manager.Connect(ctx, "kafka")
    if err != nil {
        log.Fatal(err)
    }

    // Publish a message
    message := messaging.CreateMessage("user.created", "user-service", "notification-service", "users", 
        map[string]interface{}{
            "user_id": "123",
            "name":    "John Doe",
            "email":   "john@example.com",
        })

    publishReq := &types.PublishRequest{
        Topic:   "users",
        Message: message,
    }

    response, err := manager.PublishMessage(ctx, "kafka", publishReq)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Message published: %s\n", response.MessageID)
}

API Reference

MessagingManager

The main manager for handling messaging operations across multiple providers.

Methods
NewMessagingManager(config *ManagerConfig, logger *logrus.Logger) *MessagingManager

Creates a new messaging manager with the given configuration and logger.

RegisterProvider(provider MessagingProvider) error

Registers a new messaging provider.

Parameters:

  • provider: The messaging provider to register

Returns:

  • error: Any error that occurred during registration
GetProvider(name string) (MessagingProvider, error)

Retrieves a specific provider by name.

GetDefaultProvider() (MessagingProvider, error)

Returns the default messaging provider.

Connect(ctx context.Context, providerName string) error

Connects to a messaging system using the specified provider.

Disconnect(ctx context.Context, providerName string) error

Disconnects from a messaging system using the specified provider.

Ping(ctx context.Context, providerName string) error

Pings a messaging system to check connectivity.

PublishMessage(ctx context.Context, providerName string, request *types.PublishRequest) (*types.PublishResponse, error)

Publishes a message using the specified provider.

Parameters:

  • ctx: Context for cancellation and timeouts
  • providerName: Name of the provider to use
  • request: Publish request with topic and message

Returns:

  • *types.PublishResponse: Publish response with message ID and metadata
  • error: Any error that occurred
SubscribeToTopic(ctx context.Context, providerName string, request *types.SubscribeRequest, handler types.MessageHandler) error

Subscribes to a topic using the specified provider.

UnsubscribeFromTopic(ctx context.Context, providerName string, request *types.UnsubscribeRequest) error

Unsubscribes from a topic using the specified provider.

CreateTopic(ctx context.Context, providerName string, request *types.CreateTopicRequest) error

Creates a topic using the specified provider.

DeleteTopic(ctx context.Context, providerName string, request *types.DeleteTopicRequest) error

Deletes a topic using the specified provider.

TopicExists(ctx context.Context, providerName string, request *types.TopicExistsRequest) (bool, error)

Checks if a topic exists using the specified provider.

ListTopics(ctx context.Context, providerName string) ([]types.TopicInfo, error)

Lists topics using the specified provider.

PublishBatch(ctx context.Context, providerName string, request *types.PublishBatchRequest) (*types.PublishBatchResponse, error)

Publishes multiple messages in a batch.

GetTopicInfo(ctx context.Context, providerName string, request *types.GetTopicInfoRequest) (*types.TopicInfo, error)

Gets topic information using the specified provider.

HealthCheck(ctx context.Context) map[string]error

Performs health check on all providers.

GetStats(ctx context.Context, providerName string) (*types.MessagingStats, error)

Gets statistics from a specific provider.

GetSupportedProviders() []string

Returns a list of registered providers.

GetProviderCapabilities(providerName string) ([]types.MessagingFeature, *types.ConnectionInfo, error)

Returns capabilities of a specific provider.

Close() error

Closes all messaging connections.

Types
ManagerConfig

Configuration for the messaging manager.

type ManagerConfig struct {
    DefaultProvider string            `json:"default_provider"`
    RetryAttempts   int               `json:"retry_attempts"`
    RetryDelay      time.Duration     `json:"retry_delay"`
    Timeout         time.Duration     `json:"timeout"`
    MaxMessageSize  int64             `json:"max_message_size"`
    Metadata        map[string]string `json:"metadata"`
}
Message

Represents a unified message structure.

type Message struct {
    ID            uuid.UUID              `json:"id"`
    Type          string                 `json:"type"`
    Source        string                 `json:"source"`
    Target        string                 `json:"target"`
    Topic         string                 `json:"topic"`
    RoutingKey    string                 `json:"routing_key,omitempty"`
    Payload       map[string]interface{} `json:"payload"`
    Headers       map[string]interface{} `json:"headers,omitempty"`
    Metadata      map[string]interface{} `json:"metadata,omitempty"`
    CreatedAt     time.Time              `json:"created_at"`
    ExpiresAt     *time.Time             `json:"expires_at,omitempty"`
    ScheduledAt   *time.Time             `json:"scheduled_at,omitempty"`
    Priority      int                    `json:"priority,omitempty"`
    TTL           *time.Duration         `json:"ttl,omitempty"`
    CorrelationID string                 `json:"correlation_id,omitempty"`
    ReplyTo       string                 `json:"reply_to,omitempty"`
    ProviderData  map[string]interface{} `json:"provider_data,omitempty"`
}
PublishRequest

Represents a publish message request.

type PublishRequest struct {
    Topic      string                 `json:"topic"`
    Message    *Message               `json:"message"`
    RoutingKey string                 `json:"routing_key,omitempty"`
    Headers    map[string]interface{} `json:"headers,omitempty"`
    Options    map[string]interface{} `json:"options,omitempty"`
}
PublishResponse

Represents a publish message response.

type PublishResponse struct {
    MessageID    string                 `json:"message_id"`
    Topic        string                 `json:"topic"`
    Partition    int                    `json:"partition,omitempty"`
    Offset       int64                  `json:"offset,omitempty"`
    Timestamp    time.Time              `json:"timestamp"`
    ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}
SubscribeRequest

Represents a subscribe request.

type SubscribeRequest struct {
    Topic         string                 `json:"topic"`
    GroupID       string                 `json:"group_id,omitempty"`
    ConsumerID    string                 `json:"consumer_id,omitempty"`
    AutoAck       bool                   `json:"auto_ack"`
    PrefetchCount int                    `json:"prefetch_count,omitempty"`
    StartOffset   string                 `json:"start_offset,omitempty"`
    Filter        map[string]interface{} `json:"filter,omitempty"`
    Options       map[string]interface{} `json:"options,omitempty"`
}
MessageHandler

Handles incoming messages.

type MessageHandler func(ctx context.Context, message *Message) error
MessagingStats

Messaging statistics.

type MessagingStats struct {
    PublishedMessages   int64                  `json:"published_messages"`
    ConsumedMessages    int64                  `json:"consumed_messages"`
    FailedMessages      int64                  `json:"failed_messages"`
    ActiveConnections   int                    `json:"active_connections"`
    ActiveSubscriptions int                    `json:"active_subscriptions"`
    ProviderData        map[string]interface{} `json:"provider_data"`
}

Advanced Usage

Basic Pub/Sub
// Publish a message
message := messaging.CreateMessage("user.created", "user-service", "notification-service", "users", 
    map[string]interface{}{
        "user_id": "123",
        "name":    "John Doe",
        "email":   "john@example.com",
    })

publishReq := &types.PublishRequest{
    Topic:   "users",
    Message: message,
}

response, err := manager.PublishMessage(ctx, "kafka", publishReq)
if err != nil {
    log.Fatal(err)
}

// Subscribe to messages
handler := func(ctx context.Context, message *types.Message) error {
    fmt.Printf("Received message: %s from %s\n", message.Type, message.Source)
    fmt.Printf("Payload: %+v\n", message.Payload)
    return nil
}

subscribeReq := &types.SubscribeRequest{
    Topic:      "users",
    GroupID:    "notification-service",
    AutoAck:    true,
}

err = manager.SubscribeToTopic(ctx, "kafka", subscribeReq, handler)
if err != nil {
    log.Fatal(err)
}
Message with Headers and Metadata
// Create message with headers and metadata
message := messaging.CreateMessage("order.processed", "order-service", "inventory-service", "orders", 
    map[string]interface{}{
        "order_id": "12345",
        "items":    []string{"item1", "item2"},
        "total":    99.99,
    })

// Add headers
message.AddHeader("priority", "high")
message.AddHeader("retry_count", 0)
message.AddHeader("correlation_id", "req-123")

// Add metadata
message.AddMetadata("tenant_id", "tenant-123")
message.AddMetadata("environment", "production")

// Set message properties
message.SetPriority(10)
message.SetTTL(24 * time.Hour)
message.SetCorrelationID("req-123")

publishReq := &types.PublishRequest{
    Topic:   "orders",
    Message: message,
}

response, err := manager.PublishMessage(ctx, "kafka", publishReq)
Batch Publishing
// Create multiple messages
messages := []*types.Message{
    messaging.CreateMessage("user.created", "user-service", "email-service", "users", 
        map[string]interface{}{"user_id": "1", "email": "user1@example.com"}),
    messaging.CreateMessage("user.created", "user-service", "email-service", "users", 
        map[string]interface{}{"user_id": "2", "email": "user2@example.com"}),
    messaging.CreateMessage("user.created", "user-service", "email-service", "users", 
        map[string]interface{}{"user_id": "3", "email": "user3@example.com"}),
}

// Publish batch
batchReq := &types.PublishBatchRequest{
    Topic:    "users",
    Messages: messages,
}

response, err := manager.PublishBatch(ctx, "kafka", batchReq)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Published %d messages, %d failed\n", response.PublishedCount, response.FailedCount)
Request/Reply Pattern
// Request handler
requestHandler := func(ctx context.Context, message *types.Message) error {
    // Process request
    userID := message.Payload["user_id"].(string)
    user := getUserFromDatabase(userID)
    
    // Send reply
    replyMessage := messaging.CreateMessage("user.info", "user-service", message.Source, "replies", 
        map[string]interface{}{
            "user_id": user.ID,
            "name":    user.Name,
            "email":   user.Email,
        })
    
    replyMessage.SetCorrelationID(message.CorrelationID)
    
    replyReq := &types.PublishRequest{
        Topic:   "replies",
        Message: replyMessage,
    }
    
    _, err := manager.PublishMessage(ctx, "kafka", replyReq)
    return err
}

// Subscribe to requests
subscribeReq := &types.SubscribeRequest{
    Topic:   "user.requests",
    GroupID: "user-service",
    AutoAck: true,
}

err := manager.SubscribeToTopic(ctx, "kafka", subscribeReq, requestHandler)

// Send request and wait for reply
requestMessage := messaging.CreateMessage("user.request", "client-service", "user-service", "user.requests", 
    map[string]interface{}{"user_id": "123"})

requestMessage.SetCorrelationID("req-123")
requestMessage.SetReplyTo("replies")

publishReq := &types.PublishRequest{
    Topic:   "user.requests",
    Message: requestMessage,
}

_, err = manager.PublishMessage(ctx, "kafka", publishReq)
Message Filtering
// Subscribe with filter
subscribeReq := &types.SubscribeRequest{
    Topic:   "orders",
    GroupID: "inventory-service",
    Filter: map[string]interface{}{
        "order_type": "inventory",
        "priority":   "high",
    },
    AutoAck: true,
}

handler := func(ctx context.Context, message *types.Message) error {
    // Only high-priority inventory orders will be processed
    fmt.Printf("Processing inventory order: %+v\n", message.Payload)
    return nil
}

err := manager.SubscribeToTopic(ctx, "kafka", subscribeReq, handler)
Delayed Message Delivery
// Schedule message for future delivery
message := messaging.CreateMessage("reminder.send", "reminder-service", "email-service", "reminders", 
    map[string]interface{}{
        "user_id": "123",
        "message": "Don't forget your appointment tomorrow!",
    })

// Schedule for 1 hour from now
message.SetScheduledTime(time.Now().Add(1 * time.Hour))

publishReq := &types.PublishRequest{
    Topic:   "reminders",
    Message: message,
}

response, err := manager.PublishMessage(ctx, "kafka", publishReq)
Topic Management
// Create topic
createReq := &types.CreateTopicRequest{
    Topic:             "new-topic",
    Partitions:        3,
    ReplicationFactor: 2,
    RetentionPeriod:   &[]time.Duration{7 * 24 * time.Hour}[0], // 7 days
    Config: map[string]interface{}{
        "cleanup.policy": "delete",
        "compression.type": "snappy",
    },
}

err := manager.CreateTopic(ctx, "kafka", createReq)
if err != nil {
    log.Fatal(err)
}

// Check if topic exists
existsReq := &types.TopicExistsRequest{
    Topic: "new-topic",
}

exists, err := manager.TopicExists(ctx, "kafka", existsReq)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Topic exists: %v\n", exists)

// List all topics
topics, err := manager.ListTopics(ctx, "kafka")
if err != nil {
    log.Fatal(err)
}

for _, topic := range topics {
    fmt.Printf("Topic: %s, Partitions: %d\n", topic.Name, topic.Partitions)
}

// Get topic info
infoReq := &types.GetTopicInfoRequest{
    Topic: "new-topic",
}

info, err := manager.GetTopicInfo(ctx, "kafka", infoReq)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Topic info: %+v\n", info)
Error Handling and Retry
// Message handler with error handling
handler := func(ctx context.Context, message *types.Message) error {
    // Process message
    err := processMessage(message)
    if err != nil {
        // Log error
        log.Printf("Failed to process message %s: %v", message.ID, err)
        
        // Check retry count
        retryCount, _ := message.GetHeader("retry_count")
        if retryCount == nil {
            retryCount = 0
        }
        
        count := retryCount.(int)
        if count < 3 {
            // Retry with exponential backoff
            delay := time.Duration(count+1) * time.Minute
            message.SetScheduledTime(time.Now().Add(delay))
            message.AddHeader("retry_count", count+1)
            
            // Republish for retry
            publishReq := &types.PublishRequest{
                Topic:   message.Topic,
                Message: message,
            }
            
            _, retryErr := manager.PublishMessage(ctx, "kafka", publishReq)
            if retryErr != nil {
                log.Printf("Failed to retry message: %v", retryErr)
            }
        } else {
            // Send to dead letter queue
            message.AddHeader("error", err.Error())
            message.AddHeader("failed_at", time.Now().Unix())
            
            dlqReq := &types.PublishRequest{
                Topic:   "dead-letter-queue",
                Message: message,
            }
            
            _, dlqErr := manager.PublishMessage(ctx, "kafka", dlqReq)
            if dlqErr != nil {
                log.Printf("Failed to send to DLQ: %v", dlqErr)
            }
        }
        
        return err
    }
    
    return nil
}
Health Monitoring
// Check health of all providers
healthStatus := manager.HealthCheck(ctx)
for provider, err := range healthStatus {
    if err != nil {
        fmt.Printf("Provider %s is unhealthy: %v\n", provider, err)
    } else {
        fmt.Printf("Provider %s is healthy\n", provider)
    }
}

// Get statistics
stats, err := manager.GetStats(ctx, "kafka")
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Published messages: %d\n", stats.PublishedMessages)
fmt.Printf("Consumed messages: %d\n", stats.ConsumedMessages)
fmt.Printf("Failed messages: %d\n", stats.FailedMessages)
fmt.Printf("Active connections: %d\n", stats.ActiveConnections)
Connection Management
// Connect to multiple providers
providers := []string{"kafka", "rabbitmq", "nats"}

for _, provider := range providers {
    err := manager.Connect(ctx, provider)
    if err != nil {
        log.Printf("Failed to connect to %s: %v", provider, err)
    } else {
        log.Printf("Connected to %s successfully", provider)
    }
}

// Check connected providers
connectedProviders := manager.GetConnectedProviders()
fmt.Printf("Connected providers: %v\n", connectedProviders)

// Check if specific provider is connected
isConnected := manager.IsProviderConnected("kafka")
fmt.Printf("Kafka connected: %v\n", isConnected)

Best Practices

  1. Message Design: Design messages with clear types and consistent payloads
  2. Error Handling: Implement comprehensive error handling and retry logic
  3. Dead Letter Queues: Use DLQs for failed message handling
  4. Message Ordering: Consider message ordering requirements
  5. Batch Operations: Use batch operations for better performance
  6. Connection Management: Properly manage connections and subscriptions
  7. Monitoring: Monitor message throughput and error rates
  8. Security: Implement proper authentication and authorization
  9. Schema Evolution: Plan for message schema evolution
  10. Testing: Test message handling in different scenarios

Contributing

Contributions are welcome! Please read the contributing guidelines and submit pull requests for any improvements.

License

This library is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectionInfo

type ConnectionInfo struct {
	Host     string `json:"host"`
	Port     int    `json:"port"`
	Protocol string `json:"protocol"`
	Version  string `json:"version"`
}

ConnectionInfo represents messaging connection information

type CreateTopicRequest

type CreateTopicRequest struct {
	Topic             string                 `json:"topic"`
	Partitions        int                    `json:"partitions,omitempty"`
	ReplicationFactor int                    `json:"replication_factor,omitempty"`
	RetentionPeriod   *time.Duration         `json:"retention_period,omitempty"`
	Config            map[string]interface{} `json:"config,omitempty"`
}

CreateTopicRequest represents a create topic request

type DeleteTopicRequest

type DeleteTopicRequest struct {
	Topic string `json:"topic"`
}

DeleteTopicRequest represents a delete topic request

type GetTopicInfoRequest

type GetTopicInfoRequest struct {
	Topic string `json:"topic"`
}

GetTopicInfoRequest represents a get topic info request

type ManagerConfig

type ManagerConfig struct {
	DefaultProvider string            `json:"default_provider"`
	RetryAttempts   int               `json:"retry_attempts"`
	RetryDelay      time.Duration     `json:"retry_delay"`
	Timeout         time.Duration     `json:"timeout"`
	MaxMessageSize  int64             `json:"max_message_size"`
	Metadata        map[string]string `json:"metadata"`
}

ManagerConfig holds messaging manager configuration

func DefaultManagerConfig

func DefaultManagerConfig() *ManagerConfig

DefaultManagerConfig returns default messaging manager configuration

type Message

type Message struct {
	ID            uuid.UUID              `json:"id"`
	Type          string                 `json:"type"`
	Source        string                 `json:"source"`
	Target        string                 `json:"target"`
	Topic         string                 `json:"topic"`
	RoutingKey    string                 `json:"routing_key,omitempty"`
	Payload       map[string]interface{} `json:"payload"`
	Headers       map[string]interface{} `json:"headers,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	CreatedAt     time.Time              `json:"created_at"`
	ExpiresAt     *time.Time             `json:"expires_at,omitempty"`
	ScheduledAt   *time.Time             `json:"scheduled_at,omitempty"`
	Priority      int                    `json:"priority,omitempty"`
	TTL           *time.Duration         `json:"ttl,omitempty"`
	CorrelationID string                 `json:"correlation_id,omitempty"`
	ReplyTo       string                 `json:"reply_to,omitempty"`
	ProviderData  map[string]interface{} `json:"provider_data,omitempty"`
}

Message represents a unified message structure

func CreateMessage

func CreateMessage(messageType, source, target, topic string, payload map[string]interface{}) *Message

CreateMessage creates a new message with default values

func (*Message) AddHeader

func (m *Message) AddHeader(key string, value interface{})

AddHeader adds a header to the message

func (*Message) AddMetadata

func (m *Message) AddMetadata(key string, value interface{})

AddMetadata adds metadata to the message

func (*Message) GetHeader

func (m *Message) GetHeader(key string) (interface{}, bool)

GetHeader retrieves a header from the message

func (*Message) GetMetadata

func (m *Message) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves metadata from the message

func (*Message) SetCorrelationID

func (m *Message) SetCorrelationID(correlationID string)

SetCorrelationID sets message correlation ID

func (*Message) SetExpiration

func (m *Message) SetExpiration(duration time.Duration)

SetExpiration sets message expiration

func (*Message) SetPriority

func (m *Message) SetPriority(priority int)

SetPriority sets message priority

func (*Message) SetReplyTo

func (m *Message) SetReplyTo(replyTo string)

SetReplyTo sets message reply-to address

func (*Message) SetScheduledTime

func (m *Message) SetScheduledTime(scheduledAt time.Time)

SetScheduledTime sets message scheduled time

func (*Message) SetTTL

func (m *Message) SetTTL(ttl time.Duration)

SetTTL sets message TTL

type MessageHandler

type MessageHandler func(ctx context.Context, message *Message) error

MessageHandler handles incoming messages

type MessagingFeature

type MessagingFeature string

MessagingFeature represents a messaging feature

const (
	FeaturePublishSubscribe     MessagingFeature = "pub_sub"
	FeatureRequestReply         MessagingFeature = "request_reply"
	FeatureMessageRouting       MessagingFeature = "message_routing"
	FeatureMessageFiltering     MessagingFeature = "message_filtering"
	FeatureMessageOrdering      MessagingFeature = "message_ordering"
	FeatureMessageDeduplication MessagingFeature = "message_deduplication"
	FeatureMessageRetention     MessagingFeature = "message_retention"
	FeatureMessageCompression   MessagingFeature = "message_compression"
	FeatureMessageEncryption    MessagingFeature = "message_encryption"
	FeatureMessageBatching      MessagingFeature = "message_batching"
	FeatureMessagePartitioning  MessagingFeature = "message_partitioning"
	FeatureMessageReplay        MessagingFeature = "message_replay"
	FeatureMessageDeadLetter    MessagingFeature = "message_dead_letter"
	FeatureMessageScheduling    MessagingFeature = "message_scheduling"
	FeatureMessagePriority      MessagingFeature = "message_priority"
	FeatureMessageTTL           MessagingFeature = "message_ttl"
	FeatureMessageHeaders       MessagingFeature = "message_headers"
	FeatureMessageCorrelation   MessagingFeature = "message_correlation"
	FeatureMessageGrouping      MessagingFeature = "message_grouping"
	FeatureMessageStreaming     MessagingFeature = "message_streaming"
)

type MessagingManager

type MessagingManager struct {
	// contains filtered or unexported fields
}

MessagingManager manages multiple messaging providers

func NewMessagingManager

func NewMessagingManager(config *ManagerConfig, logger *logrus.Logger) *MessagingManager

NewMessagingManager creates a new messaging manager

func (*MessagingManager) Close

func (mm *MessagingManager) Close() error

Close closes all messaging connections

func (*MessagingManager) Connect

func (mm *MessagingManager) Connect(ctx context.Context, providerName string) error

Connect connects to a messaging system using the specified provider

func (*MessagingManager) CreateTopic

func (mm *MessagingManager) CreateTopic(ctx context.Context, providerName string, request *CreateTopicRequest) error

CreateTopic creates a topic using the specified provider

func (*MessagingManager) DeleteTopic

func (mm *MessagingManager) DeleteTopic(ctx context.Context, providerName string, request *DeleteTopicRequest) error

DeleteTopic deletes a topic using the specified provider

func (*MessagingManager) Disconnect

func (mm *MessagingManager) Disconnect(ctx context.Context, providerName string) error

Disconnect disconnects from a messaging system using the specified provider

func (*MessagingManager) GetConnectedProviders

func (mm *MessagingManager) GetConnectedProviders() []string

GetConnectedProviders returns a list of connected providers

func (*MessagingManager) GetDefaultProvider

func (mm *MessagingManager) GetDefaultProvider() (MessagingProvider, error)

GetDefaultProvider returns the default messaging provider

func (*MessagingManager) GetProvider

func (mm *MessagingManager) GetProvider(name string) (MessagingProvider, error)

GetProvider returns a messaging provider by name

func (*MessagingManager) GetProviderCapabilities

func (mm *MessagingManager) GetProviderCapabilities(providerName string) ([]MessagingFeature, *ConnectionInfo, error)

GetProviderCapabilities returns capabilities of a provider

func (*MessagingManager) GetStats

func (mm *MessagingManager) GetStats(ctx context.Context, providerName string) (*MessagingStats, error)

GetStats gets statistics from a provider

func (*MessagingManager) GetSupportedProviders

func (mm *MessagingManager) GetSupportedProviders() []string

GetSupportedProviders returns a list of registered providers

func (*MessagingManager) GetTopicInfo

func (mm *MessagingManager) GetTopicInfo(ctx context.Context, providerName string, request *GetTopicInfoRequest) (*TopicInfo, error)

GetTopicInfo gets topic information using the specified provider

func (*MessagingManager) HealthCheck

func (mm *MessagingManager) HealthCheck(ctx context.Context) map[string]error

HealthCheck performs health check on all providers

func (*MessagingManager) IsProviderConnected

func (mm *MessagingManager) IsProviderConnected(providerName string) bool

IsProviderConnected checks if a provider is connected

func (*MessagingManager) ListTopics

func (mm *MessagingManager) ListTopics(ctx context.Context, providerName string) ([]TopicInfo, error)

ListTopics lists topics using the specified provider

func (*MessagingManager) Ping

func (mm *MessagingManager) Ping(ctx context.Context, providerName string) error

Ping pings a messaging system using the specified provider

func (*MessagingManager) PublishBatch

func (mm *MessagingManager) PublishBatch(ctx context.Context, providerName string, request *PublishBatchRequest) (*PublishBatchResponse, error)

PublishBatch publishes multiple messages using the specified provider

func (*MessagingManager) PublishMessage

func (mm *MessagingManager) PublishMessage(ctx context.Context, providerName string, request *PublishRequest) (*PublishResponse, error)

PublishMessage publishes a message using the specified provider

func (*MessagingManager) RegisterProvider

func (mm *MessagingManager) RegisterProvider(provider MessagingProvider) error

RegisterProvider registers a messaging provider

func (*MessagingManager) SubscribeToTopic

func (mm *MessagingManager) SubscribeToTopic(ctx context.Context, providerName string, request *SubscribeRequest, handler MessageHandler) error

SubscribeToTopic subscribes to a topic using the specified provider

func (*MessagingManager) TopicExists

func (mm *MessagingManager) TopicExists(ctx context.Context, providerName string, request *TopicExistsRequest) (bool, error)

TopicExists checks if a topic exists using the specified provider

func (*MessagingManager) UnsubscribeFromTopic

func (mm *MessagingManager) UnsubscribeFromTopic(ctx context.Context, providerName string, request *UnsubscribeRequest) error

UnsubscribeFromTopic unsubscribes from a topic using the specified provider

type MessagingProvider

type MessagingProvider interface {
	// Provider information
	GetName() string
	GetSupportedFeatures() []MessagingFeature
	GetConnectionInfo() *ConnectionInfo

	// Connection management
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Ping(ctx context.Context) error
	IsConnected() bool

	// Message operations
	PublishMessage(ctx context.Context, request *PublishRequest) (*PublishResponse, error)
	SubscribeToTopic(ctx context.Context, request *SubscribeRequest, handler MessageHandler) error
	UnsubscribeFromTopic(ctx context.Context, request *UnsubscribeRequest) error

	// Topic/Queue management
	CreateTopic(ctx context.Context, request *CreateTopicRequest) error
	DeleteTopic(ctx context.Context, request *DeleteTopicRequest) error
	TopicExists(ctx context.Context, request *TopicExistsRequest) (bool, error)
	ListTopics(ctx context.Context) ([]TopicInfo, error)

	// Advanced operations
	PublishBatch(ctx context.Context, request *PublishBatchRequest) (*PublishBatchResponse, error)
	GetTopicInfo(ctx context.Context, request *GetTopicInfoRequest) (*TopicInfo, error)
	GetStats(ctx context.Context) (*MessagingStats, error)

	// Health and monitoring
	HealthCheck(ctx context.Context) error

	// Configuration
	Configure(config map[string]interface{}) error
	IsConfigured() bool
	Close() error
}

MessagingProvider interface for messaging backends

type MessagingStats

type MessagingStats struct {
	PublishedMessages   int64                  `json:"published_messages"`
	ConsumedMessages    int64                  `json:"consumed_messages"`
	FailedMessages      int64                  `json:"failed_messages"`
	ActiveConnections   int                    `json:"active_connections"`
	ActiveSubscriptions int                    `json:"active_subscriptions"`
	ProviderData        map[string]interface{} `json:"provider_data"`
}

MessagingStats represents messaging statistics

type PublishBatchRequest

type PublishBatchRequest struct {
	Topic      string                 `json:"topic"`
	Messages   []*Message             `json:"messages"`
	RoutingKey string                 `json:"routing_key,omitempty"`
	Options    map[string]interface{} `json:"options,omitempty"`
}

PublishBatchRequest represents a batch publish request

type PublishBatchResponse

type PublishBatchResponse struct {
	PublishedCount int                    `json:"published_count"`
	FailedCount    int                    `json:"failed_count"`
	FailedMessages []*Message             `json:"failed_messages,omitempty"`
	ProviderData   map[string]interface{} `json:"provider_data,omitempty"`
}

PublishBatchResponse represents a batch publish response

type PublishRequest

type PublishRequest struct {
	Topic      string                 `json:"topic"`
	Message    *Message               `json:"message"`
	RoutingKey string                 `json:"routing_key,omitempty"`
	Headers    map[string]interface{} `json:"headers,omitempty"`
	Options    map[string]interface{} `json:"options,omitempty"`
}

PublishRequest represents a publish message request

type PublishResponse

type PublishResponse struct {
	MessageID    string                 `json:"message_id"`
	Topic        string                 `json:"topic"`
	Partition    int                    `json:"partition,omitempty"`
	Offset       int64                  `json:"offset,omitempty"`
	Timestamp    time.Time              `json:"timestamp"`
	ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}

PublishResponse represents a publish message response

type SubscribeRequest

type SubscribeRequest struct {
	Topic         string                 `json:"topic"`
	GroupID       string                 `json:"group_id,omitempty"`
	ConsumerID    string                 `json:"consumer_id,omitempty"`
	AutoAck       bool                   `json:"auto_ack"`
	PrefetchCount int                    `json:"prefetch_count,omitempty"`
	StartOffset   string                 `json:"start_offset,omitempty"`
	Filter        map[string]interface{} `json:"filter,omitempty"`
	Options       map[string]interface{} `json:"options,omitempty"`
}

SubscribeRequest represents a subscribe request

type TopicExistsRequest

type TopicExistsRequest struct {
	Topic string `json:"topic"`
}

TopicExistsRequest represents a topic exists request

type TopicInfo

type TopicInfo struct {
	Name              string                 `json:"name"`
	Partitions        int                    `json:"partitions,omitempty"`
	ReplicationFactor int                    `json:"replication_factor,omitempty"`
	RetentionPeriod   *time.Duration         `json:"retention_period,omitempty"`
	MessageCount      int64                  `json:"message_count,omitempty"`
	Size              int64                  `json:"size,omitempty"`
	CreatedAt         time.Time              `json:"created_at,omitempty"`
	ProviderData      map[string]interface{} `json:"provider_data,omitempty"`
}

TopicInfo represents topic information

type UnsubscribeRequest

type UnsubscribeRequest struct {
	Topic      string `json:"topic"`
	GroupID    string `json:"group_id,omitempty"`
	ConsumerID string `json:"consumer_id,omitempty"`
}

UnsubscribeRequest represents an unsubscribe request

Directories

Path Synopsis
providers
sqs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL