queuefx

package
v0.6.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

README

ajan/queuefx

Overview

The queuefx package is a flexible message queue package that provides a unified interface for different message queue backends. Currently, it supports RabbitMQ (AMQP) as a message queue backend.

The documentation below provides an overview of the package, its types, functions, and usage examples. For more detailed information, refer to the source code and tests.

Configuration

Configuration struct for the message queue:

type Config struct {
  Brokers map[string]ConfigBroker `conf:"BROKERS"`
}

type ConfigBroker struct {
  Provider string `conf:"PROVIDER"`
  DSN      string `conf:"DSN"`
}

// Consumer configuration
type ConsumerConfig struct {
  Args      map[string]any // Additional arguments for declaration
  AutoAck   bool          // Automatic message acknowledgment
  Exclusive bool          // Exclusive queue access
  NoLocal   bool          // Don't receive messages published by this connection
  NoWait    bool          // Don't wait for server confirmation
}

Example configuration:

config := &queuefx.Config{
  Brokers: map[string]queuefx.ConfigBroker{
    "default": {
      Provider: "amqp",
      DSN:      "amqp://guest:guest@localhost:5672",
    },
    "events": {
      Provider: "amqp",
      DSN:      "amqp://user:pass@events:5672",
    },
  },
}

// Consumer configuration
consumerConfig := queuefx.DefaultConsumerConfig() // Get defaults
consumerConfig.AutoAck = true                     // Customize as needed

Features

  • RabbitMQ (AMQP) message queue backend support
  • Configurable queue dialects
  • Automatic reconnection handling
  • Message acknowledgment control
  • Flexible consumer configuration
  • Easy to extend for additional message queue backends

API

Usage
import "github.com/eser/ajan/queuefx"

// Create a new AMQP broker instance
broker, err := queuefx.NewAmqpBroker(ctx, queuefx.DialectAmqp, "amqp://localhost:5672")
if err != nil {
  log.Fatal(err)
}

// Declare a queue
queueName, err := broker.QueueDeclare(ctx, "my-queue")
if err != nil {
  log.Fatal(err)
}

// Publish a message
err = broker.Publish(ctx, queueName, []byte("Hello, World!"))
if err != nil {
  log.Fatal(err)
}

// Configure consumer
config := queuefx.DefaultConsumerConfig()
config.AutoAck = false

// Start consuming messages
messages, errors := broker.Consume(ctx, queueName, config)

// Handle messages and errors
go func() {
  for {
    select {
    case msg := <-messages:
      fmt.Printf("Received: %s\n", string(msg.Body))
      msg.Ack() // Acknowledge the message
    case err := <-errors:
      fmt.Printf("Error: %v\n", err)
    case <-ctx.Done():
      return
    }
  }
}()
Consumer Configuration

The package provides flexible consumer configuration through the ConsumerConfig struct:

type ConsumerConfig struct {
  Args      map[string]any // Additional arguments for declaration
  AutoAck   bool          // Automatic message acknowledgment
  Exclusive bool          // Exclusive queue access
  NoLocal   bool          // Don't receive messages published by this connection
  NoWait    bool          // Don't wait for server confirmation
}

You can use DefaultConsumerConfig() to get started with sensible defaults and customize as needed.

Documentation

Index

Constants

View Source
const DefaultBroker = "default"

Variables

View Source
var (
	ErrUnknownProvider          = errors.New("unknown provider")
	ErrUnableToDetermineDialect = errors.New("unable to determine dialect")
)

Functions

This section is empty.

Types

type AmqpBroker

type AmqpBroker struct {
	// contains filtered or unexported fields
}

func NewAmqpBroker

func NewAmqpBroker(ctx context.Context, dialect Dialect, dsn string) (*AmqpBroker, error)

func (*AmqpBroker) Consume added in v0.6.17

func (broker *AmqpBroker) Consume(ctx context.Context, queueName string, config ConsumerConfig) (<-chan Message, <-chan error)

The consumer will automatically reconnect on connection failures.

func (*AmqpBroker) GetDialect

func (broker *AmqpBroker) GetDialect() Dialect

func (*AmqpBroker) Publish

func (broker *AmqpBroker) Publish(ctx context.Context, name string, body []byte) error

func (*AmqpBroker) QueueDeclare

func (broker *AmqpBroker) QueueDeclare(ctx context.Context, name string) (string, error)

type Broker

type Broker interface {
	GetDialect() Dialect

	QueueDeclare(ctx context.Context, name string) (string, error)
	Publish(ctx context.Context, name string, body []byte) error
}

type Config

type Config struct {
	Brokers map[string]ConfigBroker `conf:"BROKERS"`
}

type ConfigBroker

type ConfigBroker struct {
	Provider string `conf:"PROVIDER"`
	DSN      string `conf:"DSN"`
}

type ConsumerConfig added in v0.6.17

type ConsumerConfig struct {
	// Args additional arguments for declaration
	Args map[string]any
	// AutoAck when true, the server will automatically acknowledge messages
	AutoAck bool
	// Exclusive when true, only this consumer can access the queue
	Exclusive bool
	// NoLocal when true, the server will not send messages to the connection that published them
	NoLocal bool
	// NoWait when true, the server will not respond to the declare
	NoWait bool
}

ConsumerConfig holds configuration for message consumption.

func DefaultConsumerConfig added in v0.6.17

func DefaultConsumerConfig() ConsumerConfig

DefaultConsumerConfig returns a default configuration for consuming messages.

type Dialect

type Dialect string
const (
	DialectAmqp Dialect = "amqp"
)

func DetermineDialect

func DetermineDialect(provider string, dsn string) (Dialect, error)

type Message added in v0.6.17

type Message struct {
	Headers map[string]any

	Body []byte
	// contains filtered or unexported fields
}

Message represents a consumed message with its metadata and acknowledgment functions.

func (*Message) Ack added in v0.6.17

func (m *Message) Ack() error

Ack acknowledges the message.

func (*Message) Nack added in v0.6.17

func (m *Message) Nack(requeue bool) error

Nack negatively acknowledges the message.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry(logger *logfx.Logger) *Registry

func (*Registry) AddConnection

func (registry *Registry) AddConnection(ctx context.Context, name string, provider string, dsn string) error

func (*Registry) GetDefault added in v0.6.11

func (registry *Registry) GetDefault() Broker

func (*Registry) GetNamed added in v0.6.11

func (registry *Registry) GetNamed(name string) Broker

func (*Registry) LoadFromConfig

func (registry *Registry) LoadFromConfig(ctx context.Context, config *Config) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL