rabbitmqamqp

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: Apache-2.0 Imports: 21 Imported by: 1

Documentation

Overview

Example
package main

import (
	"bufio"
	"context"
	"fmt"
	"os"
	"os/signal"

	"github.com/Azure/go-amqp"
	"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)

type echoRpcServer struct {
	conn   *rabbitmqamqp.AmqpConnection
	server rabbitmqamqp.Responder
}

func (s *echoRpcServer) stop(ctx context.Context) {
	s.server.Close(ctx)
	s.conn.Close(ctx)
}

func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
	conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
		Name: rpcServerQueueName,
	})
	srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
		RequestQueue: rpcServerQueueName,
		Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
			fmt.Printf("echo: %s\n", request.GetData())
			return request, nil
		},
	})
	if err != nil {
		panic(err)
	}
	return &echoRpcServer{
		conn:   conn,
		server: srv,
	}
}

const rpcServerQueueName = "rpc-queue"

func main() {
	// Dial rabbit for RPC server connection
	srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
	if err != nil {
		panic(err)
	}

	srv := newEchoRpcServer(srvConn)

	// Dial rabbit for RPC client connection
	clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
	if err != nil {
		panic(err)
	}

	rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
		RequestQueueName: rpcServerQueueName,
	})
	if err != nil {
		panic(err)
	}

	// Set up a channel to listen for OS signals
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, os.Interrupt) // Listen for Ctrl+C

	// Goroutine to handle graceful shutdown
	go func() {
		<-sigs // Wait for Ctrl+C
		fmt.Println("\nReceived Ctrl+C, gracefully shutting down...")
		srv.stop(context.TODO())
		_ = clientConn.Close(context.TODO())
		_ = srvConn.Close(context.TODO())
		os.Exit(0)
	}()

	reader := bufio.NewReader(os.Stdin)
	fmt.Println("Type a message and press Enter to send (Ctrl+C to quit):")

	for {
		fmt.Print("Enter message: ")
		input, _ := reader.ReadString('\n')
		// Remove newline character from input
		message := input[:len(input)-1]

		if message == "" {
			continue
		}

		resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
		if err != nil {
			fmt.Printf("Error calling RPC: %v\n", err)
			continue
		}
		m, ok := <-resp
		if !ok {
			fmt.Println("timed out waiting for response")
			continue
		}
		fmt.Printf("response: %s\n", m.GetData())
	}
}

Index

Examples

Constants

View Source
const (
	UnitMb string = "mb"
	UnitKb string = "kb"
	UnitGb string = "gb"
	UnitTb string = "tb"
)
View Source
const AMQPS = "amqps"
View Source
const AtLeastOnce = 1
View Source
const AtMostOnce = 0
View Source
const DescriptorCodeSqlFilter = 0x120

DescriptorCodeSqlFilter see: https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/amqp10_common/include/amqp10_filter.hrl see DESCRIPTOR_CODE_SQL_FILTER in rabbitmq-server DESCRIPTOR_CODE_SQL_FILTER is the uint64 code for amqpSqlFilter = "amqp:sql-filter"

View Source
const StreamFilterValue = "x-stream-filter-value"

Variables

View Source
var DefaultRpcRequestTimeout = 30 * time.Second
View Source
var ErrConnectionClosed = errors.New("connection is closed")
View Source
var ErrDoesNotExist = errors.New("does not exist")
View Source
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")

ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached

View Source
var ErrPreconditionFailed = errors.New("precondition Failed")

Functions

func CapacityBytes

func CapacityBytes(value int64) int64

func CapacityFrom

func CapacityFrom(value string) (int64, error)

func CapacityGB

func CapacityGB(value int64) int64

func CapacityKB

func CapacityKB(value int64) int64

func CapacityMB

func CapacityMB(value int64) int64

func CapacityTB

func CapacityTB(value int64) int64

func Debug

func Debug(msg string, args ...any)

func Error

func Error(msg string, args ...any)

func ExtractWithoutPassword

func ExtractWithoutPassword(addr string) string

func Info

func Info(msg string, args ...any)

func MessagePropertyToAddress

func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error

MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.

func NewGinkgoHandler added in v0.2.0

func NewGinkgoHandler(level slog.Level, writer io.Writer) slog.Handler

func NewMessage

func NewMessage(body []byte) *amqp.Message

NewMessage creates a new AMQP 1.0 message with the given payload.

func NewMessageWithAddress

func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)

NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.

func NewMessageWithFilter

func NewMessageWithFilter(body []byte, filter string) *amqp.Message

NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.

func SetSlogHandler added in v0.2.0

func SetSlogHandler(handler slog.Handler)

func Warn

func Warn(msg string, args ...any)

Types

type AMQPBinding

type AMQPBinding struct {
	// contains filtered or unexported fields
}

func (*AMQPBinding) Arguments added in v0.2.0

func (b *AMQPBinding) Arguments(arguments map[string]any)

func (*AMQPBinding) Bind

func (b *AMQPBinding) Bind(ctx context.Context) (string, error)

Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.

func (*AMQPBinding) BindingKey

func (b *AMQPBinding) BindingKey(bindingKey string)

func (*AMQPBinding) Destination

func (b *AMQPBinding) Destination(name string, isQueue bool)

func (*AMQPBinding) SourceExchange

func (b *AMQPBinding) SourceExchange(sourceName string)

func (*AMQPBinding) Unbind

func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error

Unbind removes a binding between an exchange and a queue or exchange with the specified binding key. The bindingPath is the unique path that was returned when the binding was created.

type AMQPBindingInfo

type AMQPBindingInfo struct {
}

type AmqpAddress

type AmqpAddress struct {
	// the address of the AMQP server
	// it is in the form of amqp://<host>:<port>
	// or amqps://<host>:<port>
	// the port is optional
	// the default port is 5672
	// the default protocol is amqp
	// the default host is localhost
	// the default virtual host is "/"
	// the default user is guest
	// the default password is guest
	// the default SASL type is SASLTypeAnonymous
	Address string
	// Options: Additional options for the connection
	Options *AmqpConnOptions
}

type AmqpConnOptions

type AmqpConnOptions struct {
	// wrapper for amqp.ConnOptions
	ContainerID string

	// wrapper for amqp.ConnOptions
	HostName string
	// wrapper for amqp.ConnOptions
	IdleTimeout time.Duration

	// wrapper for amqp.ConnOptions
	MaxFrameSize uint32

	// wrapper for amqp.ConnOptions
	MaxSessions uint16

	// wrapper for amqp.ConnOptions
	Properties map[string]any

	// wrapper for amqp.ConnOptions
	SASLType amqp.SASLType

	// wrapper for amqp.ConnOptions
	TLSConfig *tls.Config

	// wrapper for amqp.ConnOptions
	WriteTimeout time.Duration

	// RecoveryConfiguration is used to configure the recovery behavior of the connection.
	// when the connection is closed unexpectedly.
	RecoveryConfiguration *RecoveryConfiguration

	// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
	TopologyRecoveryOptions TopologyRecoveryOptions

	// The OAuth2Options is used to configure the connection with OAuth2 token.
	OAuth2Options *OAuth2Options

	// Local connection identifier (not sent to the server)
	// if not provided, a random UUID is generated
	Id string
}

func (*AmqpConnOptions) Clone

func (a *AmqpConnOptions) Clone() *AmqpConnOptions

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error)

Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error.

func (*AmqpConnection) Close

func (a *AmqpConnection) Close(ctx context.Context) error

Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.

func (*AmqpConnection) Id

func (a *AmqpConnection) Id() string

func (*AmqpConnection) Management

func (a *AmqpConnection) Management() *AmqpManagement

Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.

func (*AmqpConnection) NewConsumer

func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)

NewConsumer creates a new Consumer that listens to the provided Queue

func (*AmqpConnection) NewPublisher

func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)

NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a ITargetAddress that can be a Queue or an Exchange with a routing key. options is an IPublisherOptions that can be used to configure the publisher. See QueueAddress and ExchangeAddress for more information.

func (*AmqpConnection) NewRequester added in v0.4.0

func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error)

NewRequester creates a new RPC client that sends requests to the specified queue and receives replies on a dynamically created reply queue.

func (*AmqpConnection) NewResponder added in v0.4.0

func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error)

NewResponder creates a new RPC server that processes requests from the specified queue. The requestQueue in options is mandatory, while other fields are optional and will use defaults if not provided.

func (*AmqpConnection) NotifyStatusChange

func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)

NotifyStatusChange registers a channel to receive getState change notifications from the connection.

func (*AmqpConnection) Properties

func (a *AmqpConnection) Properties() map[string]any

func (*AmqpConnection) RefreshToken

func (a *AmqpConnection) RefreshToken(background context.Context, token string) error

func (*AmqpConnection) State

func (a *AmqpConnection) State() ILifeCycleState

type AmqpExchange

type AmqpExchange struct {
	// contains filtered or unexported fields
}

func (*AmqpExchange) Arguments added in v0.2.0

func (e *AmqpExchange) Arguments(arguments map[string]any)

func (*AmqpExchange) AutoDelete

func (e *AmqpExchange) AutoDelete(isAutoDelete bool)

func (*AmqpExchange) Declare

func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)

func (*AmqpExchange) Delete

func (e *AmqpExchange) Delete(ctx context.Context) error

func (*AmqpExchange) ExchangeType

func (e *AmqpExchange) ExchangeType(exchangeType TExchangeType)

func (*AmqpExchange) GetExchangeType

func (e *AmqpExchange) GetExchangeType() TExchangeType

func (*AmqpExchange) IsAutoDelete

func (e *AmqpExchange) IsAutoDelete() bool

func (*AmqpExchange) Name

func (e *AmqpExchange) Name() string

type AmqpExchangeInfo

type AmqpExchangeInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpExchangeInfo) Name

func (a *AmqpExchangeInfo) Name() string

type AmqpManagement

type AmqpManagement struct {
	// contains filtered or unexported fields
}

AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings

func (*AmqpManagement) Bind

func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)

func (*AmqpManagement) Close

func (a *AmqpManagement) Close(ctx context.Context) error

func (*AmqpManagement) DeclareExchange

func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)

func (*AmqpManagement) DeclareQueue

func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)

func (*AmqpManagement) DeleteExchange

func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error

func (*AmqpManagement) DeleteQueue

func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error

func (*AmqpManagement) NotifyStatusChange

func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)

func (*AmqpManagement) Open

func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error

func (*AmqpManagement) PurgeQueue

func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, error)

PurgeQueue purges the queue returns the number of messages purged

func (*AmqpManagement) QueueInfo

func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)

func (*AmqpManagement) Request

func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
	expectedResponseCodes []int) (map[string]any, error)

Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods

func (*AmqpManagement) State

func (a *AmqpManagement) State() ILifeCycleState

func (*AmqpManagement) Unbind

func (a *AmqpManagement) Unbind(ctx context.Context, path string) error

type AmqpQueue

type AmqpQueue struct {
	// contains filtered or unexported fields
}

func (*AmqpQueue) Arguments added in v0.2.0

func (a *AmqpQueue) Arguments(arguments map[string]any)

func (*AmqpQueue) AutoDelete

func (a *AmqpQueue) AutoDelete(isAutoDelete bool)

func (*AmqpQueue) Declare

func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)

func (*AmqpQueue) Delete

func (a *AmqpQueue) Delete(ctx context.Context) error

func (*AmqpQueue) Exclusive

func (a *AmqpQueue) Exclusive(isExclusive bool)

func (*AmqpQueue) IsAutoDelete

func (a *AmqpQueue) IsAutoDelete() bool

func (*AmqpQueue) IsExclusive

func (a *AmqpQueue) IsExclusive() bool

func (*AmqpQueue) Name

func (a *AmqpQueue) Name(queueName string)

func (*AmqpQueue) Purge

func (a *AmqpQueue) Purge(ctx context.Context) (int, error)

func (*AmqpQueue) QueueType

func (a *AmqpQueue) QueueType(queueType TQueueType)

type AmqpQueueInfo

type AmqpQueueInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpQueueInfo) Arguments

func (a *AmqpQueueInfo) Arguments() map[string]any

func (*AmqpQueueInfo) ConsumerCount

func (a *AmqpQueueInfo) ConsumerCount() uint32

func (*AmqpQueueInfo) IsAutoDelete

func (a *AmqpQueueInfo) IsAutoDelete() bool

func (*AmqpQueueInfo) IsDurable

func (a *AmqpQueueInfo) IsDurable() bool

func (*AmqpQueueInfo) IsExclusive

func (a *AmqpQueueInfo) IsExclusive() bool

func (*AmqpQueueInfo) Leader

func (a *AmqpQueueInfo) Leader() string

func (*AmqpQueueInfo) Members

func (a *AmqpQueueInfo) Members() []string

func (*AmqpQueueInfo) MessageCount

func (a *AmqpQueueInfo) MessageCount() uint64

func (*AmqpQueueInfo) Name

func (a *AmqpQueueInfo) Name() string

func (*AmqpQueueInfo) Type

func (a *AmqpQueueInfo) Type() TQueueType

type AutoGeneratedQueueSpecification

type AutoGeneratedQueueSpecification struct {
	IsAutoDelete   bool
	IsExclusive    bool
	MaxLength      int64
	MaxLengthBytes int64
	Arguments      map[string]any
	// contains filtered or unexported fields
}

AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.

type BalancedLeaderLocator

type BalancedLeaderLocator struct {
}

type ClassicQueueSpecification

type ClassicQueueSpecification struct {
	Name                 string
	IsAutoDelete         bool
	IsExclusive          bool
	AutoExpire           int64
	MessageTTL           int64
	OverflowStrategy     IOverflowStrategy
	SingleActiveConsumer bool
	DeadLetterExchange   string
	DeadLetterRoutingKey string
	MaxLength            int64
	MaxLengthBytes       int64
	MaxPriority          int64
	LeaderLocator        ILeaderLocator
	Arguments            map[string]any
}

ClassicQueueSpecification represents the specification of the classic queue

type ClientLocalLeaderLocator

type ClientLocalLeaderLocator struct {
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) GetQueue added in v0.4.0

func (c *Consumer) GetQueue() (string, error)

GetQueue returns the queue the consumer is connected to. When the user sets the destination address to a dynamic address, this function will return the dynamic address. like direct-reply-to address. In other cases, it will return the queue address.

func (*Consumer) Id

func (c *Consumer) Id() string

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error)

type ConsumerOptions

type ConsumerOptions struct {
	//ReceiverLinkName: see the IConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the IConsumerOptions interface
	InitialCredits int32
	// The id of the consumer
	Id string

	//
	DirectReplyTo bool
}

ConsumerOptions represents the options for quorum and classic queues

type CorrelationIdExtractor added in v0.2.0

type CorrelationIdExtractor func(message *amqp.Message) any

CorrelationIdExtractor defines the signature for a function that extracts the correlation ID from an AMQP message. Then returned value must be a valid AMQP type that can be binary encoded.

type CorrelationIdSupplier added in v0.2.0

type CorrelationIdSupplier interface {
	Get() any
}

CorrelationIdSupplier is an interface for providing correlation IDs for RPC requests. Implementations should generate unique identifiers for each request. The returned value from `Get()` should be an AMQP type, or a type that can be encoded into an AMQP message property (e.g., string, int, []byte, etc.).

type CustomExchangeSpecification

type CustomExchangeSpecification struct {
	Name             string
	IsAutoDelete     bool
	ExchangeTypeName string
	Arguments        map[string]any
}

type DeliveryContext

type DeliveryContext struct {
	// contains filtered or unexported fields
}

func (*DeliveryContext) Accept

func (dc *DeliveryContext) Accept(ctx context.Context) error

func (*DeliveryContext) Discard

func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error

func (*DeliveryContext) DiscardWithAnnotations

func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

func (*DeliveryContext) Message

func (dc *DeliveryContext) Message() *amqp.Message

func (*DeliveryContext) Requeue

func (dc *DeliveryContext) Requeue(ctx context.Context) error

func (*DeliveryContext) RequeueWithAnnotations

func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

type DeliveryState

type DeliveryState = amqp.DeliveryState

type DirectExchangeSpecification

type DirectExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type DropHeadOverflowStrategy

type DropHeadOverflowStrategy struct {
}

type Endpoint

type Endpoint struct {
	Address string
	Options *AmqpConnOptions
}

func DefaultEndpoints

func DefaultEndpoints() []Endpoint

type Environment

type Environment struct {
	EndPointStrategy TEndPointStrategy
	// contains filtered or unexported fields
}

func NewClusterEnvironment

func NewClusterEnvironment(endPoints []Endpoint) *Environment

func NewClusterEnvironmentWithStrategy

func NewClusterEnvironmentWithStrategy(endPoints []Endpoint, strategy TEndPointStrategy) *Environment

func NewEnvironment

func NewEnvironment(address string, options *AmqpConnOptions) *Environment

func (*Environment) CloseConnections

func (e *Environment) CloseConnections(ctx context.Context) error

CloseConnections closes all the connections in the environment with all the publishers and consumers.

func (*Environment) Connections

func (e *Environment) Connections() []*AmqpConnection

Connections gets the active connections in the environment

func (*Environment) NewConnection

func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error)

NewConnection get a new connection from the environment. It picks an endpoint from the list of endpoints, based on EndPointStrategy, and tries to open a connection. It fails if all the endpoints are not reachable. The Environment will keep track of the connection and close it when the environment is closed.

type ExchangeAddress

type ExchangeAddress struct {
	Exchange string // The name of the exchange
	Key      string // The routing key. Can be empty
}

ExchangeAddress represents the address of an exchange with a routing key.

type ExchangeToExchangeBindingSpecification

type ExchangeToExchangeBindingSpecification struct {
	SourceExchange      string
	DestinationExchange string
	BindingKey          string
	Arguments           map[string]any
}

type ExchangeToQueueBindingSpecification

type ExchangeToQueueBindingSpecification struct {
	SourceExchange   string
	DestinationQueue string
	BindingKey       string
	Arguments        map[string]any
}

type FanOutExchangeSpecification

type FanOutExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type GinkgoLogHandler added in v0.2.0

type GinkgoLogHandler struct {
	slog.Handler
	// contains filtered or unexported fields
}

func (*GinkgoLogHandler) Handle added in v0.2.0

func (h *GinkgoLogHandler) Handle(_ context.Context, r slog.Record) error

type HeadersExchangeSpecification

type HeadersExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type IBindingSpecification

type IBindingSpecification interface {
	// contains filtered or unexported methods
}

type IConsumerOptions

type IConsumerOptions interface {
	// contains filtered or unexported methods
}

type IExchangeSpecification

type IExchangeSpecification interface {
	// contains filtered or unexported methods
}

IExchangeSpecification represents the specification of an exchange

type ILeaderLocator

type ILeaderLocator interface {
	// contains filtered or unexported methods
}

type ILifeCycleState

type ILifeCycleState interface {
	// contains filtered or unexported methods
}

type IOffsetSpecification

type IOffsetSpecification interface {
	// contains filtered or unexported methods
}

type IOverflowStrategy

type IOverflowStrategy interface {
	// contains filtered or unexported methods
}

type IPublisherOptions

type IPublisherOptions interface {
	// contains filtered or unexported methods
}

type IQueueSpecification

type IQueueSpecification interface {
	// contains filtered or unexported methods
}

IQueueSpecification represents the specification of a queue

type ITargetAddress

type ITargetAddress interface {
	// contains filtered or unexported methods
}

ITargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.

type LifeCycle

type LifeCycle struct {
	// contains filtered or unexported fields
}

func NewLifeCycle

func NewLifeCycle() *LifeCycle

func (*LifeCycle) SetState

func (l *LifeCycle) SetState(value ILifeCycleState)

func (*LifeCycle) State

func (l *LifeCycle) State() ILifeCycleState

type OAuth2Options

type OAuth2Options struct {
	Token string
}

func (OAuth2Options) Clone

func (o OAuth2Options) Clone() *OAuth2Options

type OffsetFirst

type OffsetFirst struct {
}

type OffsetLast

type OffsetLast struct {
}

type OffsetNext

type OffsetNext struct {
}

type OffsetValue

type OffsetValue struct {
	Offset uint64
}

type PublishResult

type PublishResult struct {
	Outcome DeliveryState
	Message *amqp.Message
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a publisher that sends messages to a specific destination address.

func (*Publisher) Close

func (m *Publisher) Close(ctx context.Context) error

Close closes the publisher.

func (*Publisher) Id

func (m *Publisher) Id() string

func (*Publisher) Publish

func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error)

Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.

The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:

If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>

publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
			Exchange: "myExchangeName",
			Key:      "myRoutingKey",
		}

.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))

</code> Create a new publisher that sends messages based on message destination address: <code>

publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)

</code>

The message is persistent by default by setting the Header.Durable to true when Header is nil. You can set the message to be non-persistent by setting the Header.Durable to false. Note: When you use the `Header` is up to you to set the message properties, You need set the `Header.Durable` to true or false.

<code>

</code>

type PublisherOptions

type PublisherOptions struct {
	Id             string
	SenderLinkName string
}

type QueueAddress

type QueueAddress struct {
	Queue      string // The name of the queue
	Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}

QueueAddress represents the address of a queue.

type QuorumQueueSpecification

type QuorumQueueSpecification struct {
	Name                   string
	AutoExpire             int64
	MessageTTL             int64
	OverflowStrategy       IOverflowStrategy
	SingleActiveConsumer   bool
	DeadLetterExchange     string
	DeadLetterRoutingKey   string
	MaxLength              int64
	MaxLengthBytes         int64
	DeliveryLimit          int64
	TargetClusterSize      int64
	LeaderLocator          ILeaderLocator
	QuorumInitialGroupSize int
	Arguments              map[string]any
}

type RecoveryConfiguration

type RecoveryConfiguration struct {
	/*
		ActiveRecovery Define if the recovery is activated.
		If is not activated the connection will not try to createSender.
	*/
	ActiveRecovery bool

	/*
		BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
		time will be increased exponentially with each attempt.
		Default is 5 seconds, each attempt will double the time.
		The minimum value is 1 second. Avoid setting a value low values since it can cause a high
		number of reconnection attempts.
	*/
	BackOffReconnectInterval time.Duration

	/*
		MaxReconnectAttempts The maximum number of reconnection attempts.
		Default is 5.
		The minimum value is 1.
	*/
	MaxReconnectAttempts int
}

func NewRecoveryConfiguration

func NewRecoveryConfiguration() *RecoveryConfiguration

func (*RecoveryConfiguration) Clone

type RejectPublishDlxOverflowStrategy

type RejectPublishDlxOverflowStrategy struct {
}

type RejectPublishOverflowStrategy

type RejectPublishOverflowStrategy struct {
}

type ReplyPostProcessor added in v0.2.0

type ReplyPostProcessor func(reply *amqp.Message, correlationID any) *amqp.Message

ReplyPostProcessor is a function that is called after the request handler has processed the request. It can be used to modify the reply message before it is sent.

type RequestPostProcessor added in v0.2.0

type RequestPostProcessor func(request *amqp.Message, correlationID any) *amqp.Message

RequestPostProcessor is a function that modifies an AMQP message before it is sent as an RPC request. It receives the message about to be sent and the correlation ID generated for the request. Implementations must assign the correlation ID to a message property (e.g., `MessageID` or `CorrelationID`) and set the `ReplyTo` address for the reply queue. The function must return the modified message.

The default `RequestPostProcessor` implementation (used when `RequestPostProcessor` is not explicitly set in `RequesterOptions`) performs the following:

  • Assigns the `correlationID` to the `MessageID` property of the `amqp.Message`.
  • Sets the `ReplyTo` message property to a client-generated exclusive auto-delete queue.

type Requester added in v0.4.0

type Requester interface {
	Close(context.Context) error
	Message(body []byte) *amqp.Message
	Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
	GetReplyQueue() (string, error)
}

Requester is an interface for making RPC (Remote Procedure Call) requests over AMQP. Implementations of this interface should handle the sending of requests and the receiving of corresponding replies, managing correlation IDs and timeouts.

The default implementation provides the following behaviour:

  • Requests are published to a specified request queue. This queue must be pre-declared.
  • Replies are consumed from a dedicated reply-to queue. This queue is dynamically created by the client.
  • Correlation IDs are used to match requests with replies. The default implementation uses a random UUID as prefix and an auto-incrementing counter as suffix. The UUIDs are set as MessageID in the request message.
  • A request timeout mechanism is in place to handle unacknowledged replies.
  • Messages are pre-processed before publishing. The default implementation assigns the correlation ID to the MessageID property of the request message.
  • Replies are simply sent over the "callback" channel.

Implementers should ensure that:

  • `Close` properly shuts down underlying resources like publishers and consumers.
  • `Message` provides a basic AMQP message structure for RPC requests.
  • `Publish` sends the request message and returns a channel that will receive the reply message, or be closed if a timeout occurs or the client is closed.
  • `GetReplyQueue` returns the address of the reply queue used by the requester.
Example
// open a connection
conn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", &rabbitmqamqp.AmqpConnOptions{
	Properties: map[string]any{"connection_name": "example rpc client"},
})
if err != nil {
	panic(err)
}
defer conn.Close(context.TODO())

// Create RPC client options
// RequestQueueName is mandatory. The queue must exist.
options := rabbitmqamqp.RequesterOptions{
	RequestQueueName: "rpc-queue",
}
// Create a new RPC client
requester, err := conn.NewRequester(context.TODO(), &options)
if err != nil {
	panic(err)
}
defer requester.Close(context.TODO())

// Create an AMQP message with some initial data
msg := requester.Message([]byte("hello world"))
// Add some application properties to the message
msg.ApplicationProperties = map[string]any{"example": "rpc"}

// Send the message to the server
pendingRequestCh, err := requester.Publish(context.TODO(), msg)
if err != nil {
	panic(err)
}

// Wait for the reply from the server
replyFromServer := <-pendingRequestCh
// Print the reply from the server
// This example assumes that the server is an "echo" server, that just returns the message it received.
fmt.Printf("application property 'example': %s\n", replyFromServer.ApplicationProperties["example"])
fmt.Printf("reply correlation ID: %s\n", replyFromServer.Properties.CorrelationID)
Example (CustomCorrelationID)
// type fooCorrelationIdSupplier struct {
// 	count int
// }
//
// func (c *fooCorrelationIdSupplier) Get() any {
// 	c.count++
// 	return fmt.Sprintf("foo-%d", c.count)
// }

// Connection setup
conn, _ := rabbitmqamqp.Dial(context.TODO(), "amqp://", nil)
defer conn.Close(context.TODO())

// Create RPC client options
options := rabbitmqamqp.RequesterOptions{
	RequestQueueName:      "rpc-queue", // the queue must exist
	CorrelationIdSupplier: &fooCorrelationIdSupplier{},
}

// Create a new RPC client
rpcClient, _ := conn.NewRequester(context.TODO(), &options)
pendingRequestCh, _ := rpcClient.Publish(context.TODO(), rpcClient.Message([]byte("hello world")))
replyFromServer := <-pendingRequestCh
fmt.Printf("reply correlation ID: %s\n", replyFromServer.Properties.CorrelationID)
// Should print: foo-1

type RequesterOptions added in v0.4.0

type RequesterOptions struct {
	// The name of the queue to send requests to. This queue must exist.
	//
	// Mandatory.
	RequestQueueName string
	// The name of the queue to receive replies from.
	//
	// Optional. If not set, a dedicated reply-to queue will be created for each request.
	ReplyToQueueName string
	// Generator of correlation IDs for requests. Each correlationID generated must be unique.
	//
	// Optional. If not set, a random UUID will be used as prefix and an auto-incrementing counter as suffix.
	CorrelationIdSupplier CorrelationIdSupplier
	// Function to extract correlation IDs from replies.
	//
	// Optional. If not set, the `CorrelationID` message property will be used.
	CorrelationIdExtractor CorrelationIdExtractor
	// Function to modify requests before they are sent.
	//
	// Optional. If not set, the default `RequestPostProcessor` assigns the correlation ID to the `MessageID` property.
	RequestPostProcessor RequestPostProcessor
	// The timeout for requests.
	//
	// Optional. If not set, a default timeout of 30 seconds will be used.
	RequestTimeout time.Duration

	// If true, the requester will set the 'Direct-Reply-To' feature for RabbitMQ.
	// see: https://www.rabbitmq.com/direct-reply-to.html
	DirectReplyTo bool
}

RequesterOptions is a struct that contains the options for the RPC client. It is used to configure the RPC client.

type Responder added in v0.4.0

type Responder interface {
	// Close the RPC server and its underlying resources.
	Close(context.Context) error
	// Pause the server to stop receiving messages.
	Pause()
	// Unpause requests to receive messages again.
	Unpause() error

	GetRequestQueue() (string, error)
}

Responder is Remote Procedure Call server that receives a message, process them, and sends a response.

type ResponderHandler added in v0.4.0

type ResponderHandler func(ctx context.Context, request *amqp.Message) (*amqp.Message, error)

ResponderHandler is a function that processes a request message and returns a response message. If the server wants to send a response to the client, it must return a response message. If the function returns nil, the server will not send a response. If the server does not send a response message, this high level RPC server doesn't make much sense, and it is better to use a normal AMQP 1.0 consumer.

The server handler blocks until this function returns. It is highly recommended to use functions that process and return quickly.

Example:

func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
	return amqp.NewMessage([]byte(fmt.Sprintf("Pong: %s", request.GetData()))), nil
}

type ResponderOptions added in v0.4.0

type ResponderOptions struct {
	// RequestQueue is the name of the queue to subscribe to. This queue must be pre-declared.
	// The RPC server does not declare the queue, it is the responsibility of the caller to declare the queue.
	//
	// Mandatory.
	RequestQueue string
	// Handler is a function to process the request message. If the server wants to send a response to
	// the client, it must return a response message. If the function returns nil, the server will not send a response.
	//
	// It is encouraged to initialise the response message properties in the handler. If the handler returns a non-nil
	// error, the server will discard the request message and log an error.
	//
	// The server handler blocks until this function returns. It is highly recommended to functions that process and return quickly.
	// If you need to perform a long running operation, it's advisable to dispatch the operation to another queue.
	//
	// Example:
	// 		func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
	// 			return amqp.NewMessage([]byte(fmt.Sprintf("Pong: %s", request.GetData()))), nil
	// 		}
	//
	// Mandatory.
	Handler ResponderHandler
	// CorrectionIdExtractor is a function that extracts a correction ID from the request message.
	// The returned value should be an AMQP type that can be binary encoded.
	//
	// This field is optional. If not provided, the server will use the `MessageID` as the correlation ID.
	//
	// Example:
	// 		func(message *amqp.Message) any {
	// 			return message.Properties.MessageID
	// 		}
	//
	// The default correlation ID extractor also handles nil cases.
	//
	// Optional.
	CorrelationIdExtractor CorrelationIdExtractor
	// PostProcessor is a function that receives the reply message and the extracted correlation ID, just before the reply is sent.
	// It can be used to modify the reply message before it is sent.
	//
	// The post processor must set the correlation ID in the reply message properties.
	//
	// This field is optional. If not provided, the server will set the correlation ID in the reply message properties, using
	// the correlation ID extracted from the CorrelationIdExtractor.
	//
	// Example:
	// 		func(reply *amqp.Message, correlationID any) *amqp.Message {
	// 			reply.Properties.CorrelationID = correlationID
	// 			return reply
	// 		}
	//
	// The default post processor also handles nil cases.
	//
	// Optional.
	ReplyPostProcessor ReplyPostProcessor
}

type StateAccepted

type StateAccepted = amqp.StateAccepted

type StateChanged

type StateChanged struct {
	From ILifeCycleState
	To   ILifeCycleState
}

func (StateChanged) String

func (s StateChanged) String() string

type StateClosed

type StateClosed struct {
	// contains filtered or unexported fields
}

func (*StateClosed) GetError

func (c *StateClosed) GetError() error

type StateClosing

type StateClosing struct {
}

type StateModified

type StateModified = amqp.StateModified

type StateOpen

type StateOpen struct {
}

type StateReconnecting

type StateReconnecting struct {
}

type StateRejected

type StateRejected = amqp.StateRejected

type StateReleased

type StateReleased = amqp.StateReleased

type StreamConsumerOptions

type StreamConsumerOptions struct {
	//ReceiverLinkName: see the IConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the IConsumerOptions interface
	InitialCredits int32
	// The offset specification for the stream consumer
	// see the interface implementations
	Offset              IOffsetSpecification
	StreamFilterOptions *StreamFilterOptions
	Id                  string
}

StreamConsumerOptions represents the options for stream queues It is mandatory in case of creating a stream consumer.

type StreamFilterOptions

type StreamFilterOptions struct {
	// Filter values.
	Values []string
	//
	MatchUnfiltered bool

	// Filter the data based on Application Property
	ApplicationProperties map[string]any

	// Filter the data based on Message Properties
	Properties *amqp.MessageProperties

	/* SQLFilter: documentation https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
	It requires RabbitMQ 4.2 or later

	Example:
	<code>
	Define a message like:
	var  msg  := NewMessage([]byte(..))
	msg.Properties = &amqp.MessageProperties{Subject: ptr("mySubject"), To: ptr("To")}
	msg.ApplicationProperties = map[string]interface{}{"filter_key": "filter_value"}

	publisher.Publish(context.Background(), msg)
	Then you can create a consumer with a SQL filter like:
	consumer, err := connection.NewConsumer(context.Background(), "myQueue", &StreamConsumerOptions{
			InitialCredits: 200,
			Offset:         &OffsetFirst{},
			StreamFilterOptions: &StreamFilterOptions{
				SQL: "properties.subject LIKE '%mySubject%' AND properties.to = 'To' AND filter_key = 'filter_value'",
			},
		})
	</code>
	*/
	SQL string
}

StreamFilterOptions represents the options that can be used to filter the stream data. It is used in the StreamConsumerOptions. See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions/

type StreamQueueSpecification

type StreamQueueSpecification struct {
	Name               string
	MaxLengthBytes     int64
	InitialClusterSize int
	Arguments          map[string]any
}

type TEndPointStrategy

type TEndPointStrategy int
const (
	StrategyRandom     TEndPointStrategy = iota
	StrategySequential TEndPointStrategy = iota
)

type TExchangeType

type TExchangeType string

TExchangeType represents the type of exchange

const (
	Direct  TExchangeType = "direct"
	Topic   TExchangeType = "topic"
	FanOut  TExchangeType = "fanout"
	Headers TExchangeType = "headers"
)

type TQueueType

type TQueueType string
const (
	Quorum  TQueueType = "quorum"
	Classic TQueueType = "classic"
	Stream  TQueueType = "stream"
)

type TopicExchangeSpecification

type TopicExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type TopologyRecoveryOptions added in v0.3.0

type TopologyRecoveryOptions byte

TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection. See TopologyRecoveryDisabled, TopologyRecoveryOnlyTransient, and TopologyRecoveryAllEnabled for more information.

const (
	// TopologyRecoveryOnlyTransient recovers only queues declared as exclusive and/or auto delete, and
	// related bindings. Exchanges are not recovered.
	TopologyRecoveryOnlyTransient TopologyRecoveryOptions = iota
	// TopologyRecoveryDisabled disables the topology recovery.
	TopologyRecoveryDisabled
	// TopologyRecoveryAllEnabled recovers all the topology. All exchanges, queues, and bindings are recovered.
	TopologyRecoveryAllEnabled
)

type URI

type URI struct {
	Scheme   string
	Host     string
	Port     int
	Username string
	Password string
	Vhost    string
}

URI represents a parsed AMQP URI string.

func ParseURI

func ParseURI(uri string) (URI, error)

ParseURI attempts to parse the given AMQP URI according to the spec. See http://www.rabbitmq.com/uri-spec.html.

Default values for the fields are:

Scheme: amqp
Host: localhost
Port: 5672
Username: guest
Password: guest
Vhost: /

type Version

type Version struct {
	Major int
	Minor int
	Patch int
}

func (Version) Compare

func (v Version) Compare(other Version) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL