mqreader

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package mqreader provides interfaces and types for reading messages from various message queue systems without consuming them.

Index

Constants

View Source
const (
	TypeSQS      = "sqs"
	TypeKafka    = "kafka"
	TypeRabbitMQ = "rabbitmq"
	TypePulsar   = "pulsar"
	TypePubSub   = "pubsub"
)

Supported message queue types

View Source
const (
	DefaultMaxMessages       = 10
	DefaultWaitTimeSeconds   = 0
	MaxMessagesPerSQSRequest = 10
)

Default configuration values

Variables

This section is empty.

Functions

func IsMQURL

func IsMQURL(urlStr string) bool

IsMQURL checks if the given URL is a message queue URL

func RegisterReader

func RegisterReader(mqType string, factory ReaderFactory)

RegisterReader registers a reader factory for a message queue type. This allows new backends to be added without modifying this package.

Types

type Config

type Config struct {
	// Type identifies the message queue system (sqs, kafka, rabbitmq, pulsar)
	Type string

	// URL is the connection URL or resource identifier
	URL string

	// Region is the cloud region (for cloud providers like AWS, GCP)
	Region string

	// QueueName is the name of the queue/topic
	QueueName string

	// MaxMessages is the maximum number of messages to retrieve per Peek call
	MaxMessages int

	// WaitTimeSeconds is the long polling wait time (for systems that support it)
	WaitTimeSeconds int

	// Credentials contains authentication information
	Credentials map[string]string

	// Options contains additional system-specific options
	Options map[string]string
}

Config holds generic configuration for connecting to a message queue.

func ParseURL

func ParseURL(urlStr string) (*Config, error)

ParseURL parses a message queue URL and returns a Config. Supported URL formats:

func (*Config) GetTableName

func (c *Config) GetTableName() string

GetTableName extracts a clean table name from the config

type Message

type Message struct {
	// ID is the unique identifier for the message
	ID string

	// Body is the message content (typically JSON or plain text)
	Body string

	// Timestamp is when the message was sent/published
	Timestamp time.Time

	// Metadata contains system-specific attributes
	// For SQS: MessageAttributes, MD5OfBody, ReceiptHandle, etc.
	// For Kafka: Headers, Partition, Offset, etc.
	// For RabbitMQ: Headers, Exchange, RoutingKey, etc.
	Metadata map[string]string

	// Source identifies the queue/topic/exchange the message came from
	Source string

	// ReceiveCount indicates how many times this message was received (if applicable)
	ReceiveCount int
}

Message represents a generic message from any message queue system. The fields are designed to be common across different systems while allowing system-specific data in the Metadata field.

type MessageQueueReader

type MessageQueueReader interface {
	// Connect establishes connection to the message queue system
	Connect(ctx context.Context) error

	// Peek reads messages without removing/consuming them.
	// The maxMessages parameter limits the number of messages to retrieve.
	// Returns a slice of messages and any error encountered.
	Peek(ctx context.Context, maxMessages int) ([]Message, error)

	// GetMetadata returns information about the queue/topic
	GetMetadata(ctx context.Context) (*QueueMetadata, error)

	// Close terminates the connection and releases resources
	Close() error
}

MessageQueueReader is the interface for reading messages from a message queue without consuming/deleting them. Implementations should use techniques like visibility timeout=0 (SQS) or consumer groups with no commit (Kafka) to achieve peek functionality.

func NewReader

func NewReader(config *Config) (MessageQueueReader, error)

NewReader creates a new MessageQueueReader based on the config type. It first checks the registry for a registered factory, then falls back to built-in support for known types.

func NewReaderFromURL

func NewReaderFromURL(urlStr string) (MessageQueueReader, error)

NewReaderFromURL creates a MessageQueueReader from a URL string. This is a convenience function that combines ParseURL and NewReader.

type QueueMetadata

type QueueMetadata struct {
	// Name is the queue/topic name
	Name string

	// ApproxMsgCount is the approximate number of messages in the queue
	ApproxMsgCount int64

	// Type identifies the message queue system (sqs, kafka, rabbitmq, etc.)
	Type string

	// AdditionalInfo contains system-specific metadata
	AdditionalInfo map[string]string
}

QueueMetadata contains information about a queue/topic.

type ReaderFactory

type ReaderFactory func(config *Config) (MessageQueueReader, error)

ReaderFactory is a function type that creates a MessageQueueReader from a Config

Directories

Path Synopsis
Package kafka provides a Kafka implementation of the MessageQueueReader interface.
Package kafka provides a Kafka implementation of the MessageQueueReader interface.
Package sqs provides an SQS implementation of the MessageQueueReader interface.
Package sqs provides an SQS implementation of the MessageQueueReader interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL