Documentation
¶
Overview ¶
Example ¶
package main
import (
"bufio"
"context"
"fmt"
"os"
"os/signal"
"github.com/Azure/go-amqp"
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
)
type echoRpcServer struct {
conn *rabbitmqamqp.AmqpConnection
server rabbitmqamqp.Responder
}
func (s *echoRpcServer) stop(ctx context.Context) {
s.server.Close(ctx)
s.conn.Close(ctx)
}
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
Name: rpcServerQueueName,
})
srv, err := conn.NewResponder(context.TODO(), rabbitmqamqp.ResponderOptions{
RequestQueue: rpcServerQueueName,
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
fmt.Printf("echo: %s\n", request.GetData())
return request, nil
},
})
if err != nil {
panic(err)
}
return &echoRpcServer{
conn: conn,
server: srv,
}
}
const rpcServerQueueName = "rpc-queue"
func main() {
// Dial rabbit for RPC server connection
srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
if err != nil {
panic(err)
}
srv := newEchoRpcServer(srvConn)
// Dial rabbit for RPC client connection
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
if err != nil {
panic(err)
}
rpcClient, err := clientConn.NewRequester(context.TODO(), &rabbitmqamqp.RequesterOptions{
RequestQueueName: rpcServerQueueName,
})
if err != nil {
panic(err)
}
// Set up a channel to listen for OS signals
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) // Listen for Ctrl+C
// Goroutine to handle graceful shutdown
go func() {
<-sigs // Wait for Ctrl+C
fmt.Println("\nReceived Ctrl+C, gracefully shutting down...")
srv.stop(context.TODO())
_ = clientConn.Close(context.TODO())
_ = srvConn.Close(context.TODO())
os.Exit(0)
}()
reader := bufio.NewReader(os.Stdin)
fmt.Println("Type a message and press Enter to send (Ctrl+C to quit):")
for {
fmt.Print("Enter message: ")
input, _ := reader.ReadString('\n')
// Remove newline character from input
message := input[:len(input)-1]
if message == "" {
continue
}
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
if err != nil {
fmt.Printf("Error calling RPC: %v\n", err)
continue
}
m, ok := <-resp
if !ok {
fmt.Println("timed out waiting for response")
continue
}
fmt.Printf("response: %s\n", m.GetData())
}
}
Index ¶
- Constants
- Variables
- func CapacityBytes(value int64) int64
- func CapacityFrom(value string) (int64, error)
- func CapacityGB(value int64) int64
- func CapacityKB(value int64) int64
- func CapacityMB(value int64) int64
- func CapacityTB(value int64) int64
- func Debug(msg string, args ...any)
- func Error(msg string, args ...any)
- func ExtractWithoutPassword(addr string) string
- func Info(msg string, args ...any)
- func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error
- func NewGinkgoHandler(level slog.Level, writer io.Writer) slog.Handler
- func NewMessage(body []byte) *amqp.Message
- func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)
- func NewMessageWithFilter(body []byte, filter string) *amqp.Message
- func SetSlogHandler(handler slog.Handler)
- func Warn(msg string, args ...any)
- type AMQPBinding
- func (b *AMQPBinding) Arguments(arguments map[string]any)
- func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
- func (b *AMQPBinding) BindingKey(bindingKey string)
- func (b *AMQPBinding) Destination(name string, isQueue bool)
- func (b *AMQPBinding) SourceExchange(sourceName string)
- func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error
- type AMQPBindingInfo
- type AmqpAddress
- type AmqpConnOptions
- type AmqpConnection
- func (a *AmqpConnection) Close(ctx context.Context) error
- func (a *AmqpConnection) Id() string
- func (a *AmqpConnection) Management() *AmqpManagement
- func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)
- func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)
- func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error)
- func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error)
- func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpConnection) Properties() map[string]any
- func (a *AmqpConnection) RefreshToken(background context.Context, token string) error
- func (a *AmqpConnection) State() ILifeCycleState
- type AmqpExchange
- func (e *AmqpExchange) Arguments(arguments map[string]any)
- func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
- func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
- func (e *AmqpExchange) Delete(ctx context.Context) error
- func (e *AmqpExchange) ExchangeType(exchangeType TExchangeType)
- func (e *AmqpExchange) GetExchangeType() TExchangeType
- func (e *AmqpExchange) IsAutoDelete() bool
- func (e *AmqpExchange) Name() string
- type AmqpExchangeInfo
- type AmqpManagement
- func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)
- func (a *AmqpManagement) Close(ctx context.Context) error
- func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)
- func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
- func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
- func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
- func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, error)
- func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, ...) (map[string]any, error)
- func (a *AmqpManagement) State() ILifeCycleState
- func (a *AmqpManagement) Unbind(ctx context.Context, path string) error
- type AmqpQueue
- func (a *AmqpQueue) Arguments(arguments map[string]any)
- func (a *AmqpQueue) AutoDelete(isAutoDelete bool)
- func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
- func (a *AmqpQueue) Delete(ctx context.Context) error
- func (a *AmqpQueue) Exclusive(isExclusive bool)
- func (a *AmqpQueue) IsAutoDelete() bool
- func (a *AmqpQueue) IsExclusive() bool
- func (a *AmqpQueue) Name(queueName string)
- func (a *AmqpQueue) Purge(ctx context.Context) (int, error)
- func (a *AmqpQueue) QueueType(queueType TQueueType)
- type AmqpQueueInfo
- func (a *AmqpQueueInfo) Arguments() map[string]any
- func (a *AmqpQueueInfo) ConsumerCount() uint32
- func (a *AmqpQueueInfo) IsAutoDelete() bool
- func (a *AmqpQueueInfo) IsDurable() bool
- func (a *AmqpQueueInfo) IsExclusive() bool
- func (a *AmqpQueueInfo) Leader() string
- func (a *AmqpQueueInfo) Members() []string
- func (a *AmqpQueueInfo) MessageCount() uint64
- func (a *AmqpQueueInfo) Name() string
- func (a *AmqpQueueInfo) Type() TQueueType
- type AutoGeneratedQueueSpecification
- type BalancedLeaderLocator
- type ClassicQueueSpecification
- type ClientLocalLeaderLocator
- type Consumer
- type ConsumerOptions
- type CorrelationIdExtractor
- type CorrelationIdSupplier
- type CustomExchangeSpecification
- type DeliveryContext
- func (dc *DeliveryContext) Accept(ctx context.Context) error
- func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error
- func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- func (dc *DeliveryContext) Message() *amqp.Message
- func (dc *DeliveryContext) Requeue(ctx context.Context) error
- func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- type DeliveryState
- type DirectExchangeSpecification
- type DropHeadOverflowStrategy
- type Endpoint
- type Environment
- type ExchangeAddress
- type ExchangeToExchangeBindingSpecification
- type ExchangeToQueueBindingSpecification
- type FanOutExchangeSpecification
- type GinkgoLogHandler
- type HeadersExchangeSpecification
- type IBindingSpecification
- type IConsumerOptions
- type IExchangeSpecification
- type ILeaderLocator
- type ILifeCycleState
- type IOffsetSpecification
- type IOverflowStrategy
- type IPublisherOptions
- type IQueueSpecification
- type ITargetAddress
- type LifeCycle
- type OAuth2Options
- type OffsetFirst
- type OffsetLast
- type OffsetNext
- type OffsetValue
- type PublishResult
- type Publisher
- type PublisherOptions
- type QueueAddress
- type QuorumQueueSpecification
- type RecoveryConfiguration
- type RejectPublishDlxOverflowStrategy
- type RejectPublishOverflowStrategy
- type ReplyPostProcessor
- type RequestPostProcessor
- type Requester
- type RequesterOptions
- type Responder
- type ResponderHandler
- type ResponderOptions
- type StateAccepted
- type StateChanged
- type StateClosed
- type StateClosing
- type StateModified
- type StateOpen
- type StateReconnecting
- type StateRejected
- type StateReleased
- type StreamConsumerOptions
- type StreamFilterOptions
- type StreamQueueSpecification
- type TEndPointStrategy
- type TExchangeType
- type TQueueType
- type TopicExchangeSpecification
- type TopologyRecoveryOptions
- type URI
- type Version
Examples ¶
Constants ¶
const ( UnitMb string = "mb" UnitKb string = "kb" UnitGb string = "gb" UnitTb string = "tb" )
const AMQPS = "amqps"
const AtLeastOnce = 1
const AtMostOnce = 0
const DescriptorCodeSqlFilter = 0x120
DescriptorCodeSqlFilter see: https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/amqp10_common/include/amqp10_filter.hrl see DESCRIPTOR_CODE_SQL_FILTER in rabbitmq-server DESCRIPTOR_CODE_SQL_FILTER is the uint64 code for amqpSqlFilter = "amqp:sql-filter"
const StreamFilterValue = "x-stream-filter-value"
Variables ¶
var DefaultRpcRequestTimeout = 30 * time.Second
var ErrConnectionClosed = errors.New("connection is closed")
var ErrDoesNotExist = errors.New("does not exist")
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")
ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached
var ErrPreconditionFailed = errors.New("precondition Failed")
Functions ¶
func CapacityBytes ¶
func CapacityFrom ¶
func CapacityGB ¶
func CapacityKB ¶
func CapacityMB ¶
func CapacityTB ¶
func ExtractWithoutPassword ¶
func MessagePropertyToAddress ¶
func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error
MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.
func NewGinkgoHandler ¶ added in v0.2.0
func NewMessage ¶
NewMessage creates a new AMQP 1.0 message with the given payload.
func NewMessageWithAddress ¶
func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)
NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.
func NewMessageWithFilter ¶
NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.
func SetSlogHandler ¶ added in v0.2.0
Types ¶
type AMQPBinding ¶
type AMQPBinding struct {
// contains filtered or unexported fields
}
func (*AMQPBinding) Arguments ¶ added in v0.2.0
func (b *AMQPBinding) Arguments(arguments map[string]any)
func (*AMQPBinding) Bind ¶
func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.
func (*AMQPBinding) BindingKey ¶
func (b *AMQPBinding) BindingKey(bindingKey string)
func (*AMQPBinding) Destination ¶
func (b *AMQPBinding) Destination(name string, isQueue bool)
func (*AMQPBinding) SourceExchange ¶
func (b *AMQPBinding) SourceExchange(sourceName string)
type AMQPBindingInfo ¶
type AMQPBindingInfo struct {
}
type AmqpAddress ¶
type AmqpAddress struct {
// the address of the AMQP server
// it is in the form of amqp://<host>:<port>
// or amqps://<host>:<port>
// the port is optional
// the default port is 5672
// the default protocol is amqp
// the default host is localhost
// the default virtual host is "/"
// the default user is guest
// the default password is guest
// the default SASL type is SASLTypeAnonymous
Address string
// Options: Additional options for the connection
Options *AmqpConnOptions
}
type AmqpConnOptions ¶
type AmqpConnOptions struct {
// wrapper for amqp.ConnOptions
ContainerID string
// wrapper for amqp.ConnOptions
HostName string
// wrapper for amqp.ConnOptions
IdleTimeout time.Duration
// wrapper for amqp.ConnOptions
MaxFrameSize uint32
// wrapper for amqp.ConnOptions
MaxSessions uint16
// wrapper for amqp.ConnOptions
Properties map[string]any
// wrapper for amqp.ConnOptions
SASLType amqp.SASLType
// wrapper for amqp.ConnOptions
TLSConfig *tls.Config
// wrapper for amqp.ConnOptions
WriteTimeout time.Duration
// RecoveryConfiguration is used to configure the recovery behavior of the connection.
// when the connection is closed unexpectedly.
RecoveryConfiguration *RecoveryConfiguration
// TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection.
TopologyRecoveryOptions TopologyRecoveryOptions
// The OAuth2Options is used to configure the connection with OAuth2 token.
OAuth2Options *OAuth2Options
// Local connection identifier (not sent to the server)
// if not provided, a random UUID is generated
Id string
}
func (*AmqpConnOptions) Clone ¶
func (a *AmqpConnOptions) Clone() *AmqpConnOptions
type AmqpConnection ¶
type AmqpConnection struct {
// contains filtered or unexported fields
}
func Dial ¶
func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error)
Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error.
func (*AmqpConnection) Close ¶
func (a *AmqpConnection) Close(ctx context.Context) error
Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.
func (*AmqpConnection) Id ¶
func (a *AmqpConnection) Id() string
func (*AmqpConnection) Management ¶
func (a *AmqpConnection) Management() *AmqpManagement
Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.
func (*AmqpConnection) NewConsumer ¶
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)
NewConsumer creates a new Consumer that listens to the provided Queue
func (*AmqpConnection) NewPublisher ¶
func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)
NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a ITargetAddress that can be a Queue or an Exchange with a routing key. options is an IPublisherOptions that can be used to configure the publisher. See QueueAddress and ExchangeAddress for more information.
func (*AmqpConnection) NewRequester ¶ added in v0.4.0
func (a *AmqpConnection) NewRequester(ctx context.Context, options *RequesterOptions) (Requester, error)
NewRequester creates a new RPC client that sends requests to the specified queue and receives replies on a dynamically created reply queue.
func (*AmqpConnection) NewResponder ¶ added in v0.4.0
func (a *AmqpConnection) NewResponder(ctx context.Context, options ResponderOptions) (Responder, error)
NewResponder creates a new RPC server that processes requests from the specified queue. The requestQueue in options is mandatory, while other fields are optional and will use defaults if not provided.
func (*AmqpConnection) NotifyStatusChange ¶
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
NotifyStatusChange registers a channel to receive getState change notifications from the connection.
func (*AmqpConnection) Properties ¶
func (a *AmqpConnection) Properties() map[string]any
func (*AmqpConnection) RefreshToken ¶
func (a *AmqpConnection) RefreshToken(background context.Context, token string) error
func (*AmqpConnection) State ¶
func (a *AmqpConnection) State() ILifeCycleState
type AmqpExchange ¶
type AmqpExchange struct {
// contains filtered or unexported fields
}
func (*AmqpExchange) Arguments ¶ added in v0.2.0
func (e *AmqpExchange) Arguments(arguments map[string]any)
func (*AmqpExchange) AutoDelete ¶
func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
func (*AmqpExchange) Declare ¶
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
func (*AmqpExchange) ExchangeType ¶
func (e *AmqpExchange) ExchangeType(exchangeType TExchangeType)
func (*AmqpExchange) GetExchangeType ¶
func (e *AmqpExchange) GetExchangeType() TExchangeType
func (*AmqpExchange) IsAutoDelete ¶
func (e *AmqpExchange) IsAutoDelete() bool
func (*AmqpExchange) Name ¶
func (e *AmqpExchange) Name() string
type AmqpExchangeInfo ¶
type AmqpExchangeInfo struct {
// contains filtered or unexported fields
}
func (*AmqpExchangeInfo) Name ¶
func (a *AmqpExchangeInfo) Name() string
type AmqpManagement ¶
type AmqpManagement struct {
// contains filtered or unexported fields
}
AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings
func (*AmqpManagement) Bind ¶
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)
func (*AmqpManagement) DeclareExchange ¶
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)
func (*AmqpManagement) DeclareQueue ¶
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)
func (*AmqpManagement) DeleteExchange ¶
func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
func (*AmqpManagement) DeleteQueue ¶
func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
func (*AmqpManagement) NotifyStatusChange ¶
func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
func (*AmqpManagement) Open ¶
func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
func (*AmqpManagement) PurgeQueue ¶
PurgeQueue purges the queue returns the number of messages purged
func (*AmqpManagement) QueueInfo ¶
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
func (*AmqpManagement) Request ¶
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, expectedResponseCodes []int) (map[string]any, error)
Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods
func (*AmqpManagement) State ¶
func (a *AmqpManagement) State() ILifeCycleState
type AmqpQueue ¶
type AmqpQueue struct {
// contains filtered or unexported fields
}
func (*AmqpQueue) AutoDelete ¶
func (*AmqpQueue) Declare ¶
func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
func (*AmqpQueue) IsAutoDelete ¶
func (*AmqpQueue) IsExclusive ¶
func (*AmqpQueue) QueueType ¶
func (a *AmqpQueue) QueueType(queueType TQueueType)
type AmqpQueueInfo ¶
type AmqpQueueInfo struct {
// contains filtered or unexported fields
}
func (*AmqpQueueInfo) Arguments ¶
func (a *AmqpQueueInfo) Arguments() map[string]any
func (*AmqpQueueInfo) ConsumerCount ¶
func (a *AmqpQueueInfo) ConsumerCount() uint32
func (*AmqpQueueInfo) IsAutoDelete ¶
func (a *AmqpQueueInfo) IsAutoDelete() bool
func (*AmqpQueueInfo) IsDurable ¶
func (a *AmqpQueueInfo) IsDurable() bool
func (*AmqpQueueInfo) IsExclusive ¶
func (a *AmqpQueueInfo) IsExclusive() bool
func (*AmqpQueueInfo) Leader ¶
func (a *AmqpQueueInfo) Leader() string
func (*AmqpQueueInfo) Members ¶
func (a *AmqpQueueInfo) Members() []string
func (*AmqpQueueInfo) MessageCount ¶
func (a *AmqpQueueInfo) MessageCount() uint64
func (*AmqpQueueInfo) Name ¶
func (a *AmqpQueueInfo) Name() string
func (*AmqpQueueInfo) Type ¶
func (a *AmqpQueueInfo) Type() TQueueType
type AutoGeneratedQueueSpecification ¶
type AutoGeneratedQueueSpecification struct {
IsAutoDelete bool
IsExclusive bool
MaxLength int64
MaxLengthBytes int64
Arguments map[string]any
// contains filtered or unexported fields
}
AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.
type BalancedLeaderLocator ¶
type BalancedLeaderLocator struct {
}
type ClassicQueueSpecification ¶
type ClassicQueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
MaxPriority int64
LeaderLocator ILeaderLocator
Arguments map[string]any
}
ClassicQueueSpecification represents the specification of the classic queue
type ClientLocalLeaderLocator ¶
type ClientLocalLeaderLocator struct {
}
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
type ConsumerOptions ¶
type ConsumerOptions struct {
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The id of the consumer
Id string
//
DirectReplyTo bool
}
ConsumerOptions represents the options for quorum and classic queues
type CorrelationIdExtractor ¶ added in v0.2.0
CorrelationIdExtractor defines the signature for a function that extracts the correlation ID from an AMQP message. Then returned value must be a valid AMQP type that can be binary encoded.
type CorrelationIdSupplier ¶ added in v0.2.0
type CorrelationIdSupplier interface {
Get() any
}
CorrelationIdSupplier is an interface for providing correlation IDs for RPC requests. Implementations should generate unique identifiers for each request. The returned value from `Get()` should be an AMQP type, or a type that can be encoded into an AMQP message property (e.g., string, int, []byte, etc.).
type DeliveryContext ¶
type DeliveryContext struct {
// contains filtered or unexported fields
}
func (*DeliveryContext) DiscardWithAnnotations ¶
func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
func (*DeliveryContext) Message ¶
func (dc *DeliveryContext) Message() *amqp.Message
func (*DeliveryContext) RequeueWithAnnotations ¶
func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
type DeliveryState ¶
type DeliveryState = amqp.DeliveryState
type DropHeadOverflowStrategy ¶
type DropHeadOverflowStrategy struct {
}
type Endpoint ¶
type Endpoint struct {
Address string
Options *AmqpConnOptions
}
func DefaultEndpoints ¶
func DefaultEndpoints() []Endpoint
type Environment ¶
type Environment struct {
EndPointStrategy TEndPointStrategy
// contains filtered or unexported fields
}
func NewClusterEnvironment ¶
func NewClusterEnvironment(endPoints []Endpoint) *Environment
func NewClusterEnvironmentWithStrategy ¶
func NewClusterEnvironmentWithStrategy(endPoints []Endpoint, strategy TEndPointStrategy) *Environment
func NewEnvironment ¶
func NewEnvironment(address string, options *AmqpConnOptions) *Environment
func (*Environment) CloseConnections ¶
func (e *Environment) CloseConnections(ctx context.Context) error
CloseConnections closes all the connections in the environment with all the publishers and consumers.
func (*Environment) Connections ¶
func (e *Environment) Connections() []*AmqpConnection
Connections gets the active connections in the environment
func (*Environment) NewConnection ¶
func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error)
NewConnection get a new connection from the environment. It picks an endpoint from the list of endpoints, based on EndPointStrategy, and tries to open a connection. It fails if all the endpoints are not reachable. The Environment will keep track of the connection and close it when the environment is closed.
type ExchangeAddress ¶
type ExchangeAddress struct {
Exchange string // The name of the exchange
Key string // The routing key. Can be empty
}
ExchangeAddress represents the address of an exchange with a routing key.
type GinkgoLogHandler ¶ added in v0.2.0
type IBindingSpecification ¶
type IBindingSpecification interface {
// contains filtered or unexported methods
}
type IConsumerOptions ¶
type IConsumerOptions interface {
// contains filtered or unexported methods
}
type IExchangeSpecification ¶
type IExchangeSpecification interface {
// contains filtered or unexported methods
}
IExchangeSpecification represents the specification of an exchange
type ILeaderLocator ¶
type ILeaderLocator interface {
// contains filtered or unexported methods
}
type ILifeCycleState ¶
type ILifeCycleState interface {
// contains filtered or unexported methods
}
type IOffsetSpecification ¶
type IOffsetSpecification interface {
// contains filtered or unexported methods
}
type IOverflowStrategy ¶
type IOverflowStrategy interface {
// contains filtered or unexported methods
}
type IPublisherOptions ¶
type IPublisherOptions interface {
// contains filtered or unexported methods
}
type IQueueSpecification ¶
type IQueueSpecification interface {
// contains filtered or unexported methods
}
IQueueSpecification represents the specification of a queue
type ITargetAddress ¶
type ITargetAddress interface {
// contains filtered or unexported methods
}
ITargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.
type LifeCycle ¶
type LifeCycle struct {
// contains filtered or unexported fields
}
func NewLifeCycle ¶
func NewLifeCycle() *LifeCycle
func (*LifeCycle) SetState ¶
func (l *LifeCycle) SetState(value ILifeCycleState)
func (*LifeCycle) State ¶
func (l *LifeCycle) State() ILifeCycleState
type OAuth2Options ¶
type OAuth2Options struct {
Token string
}
func (OAuth2Options) Clone ¶
func (o OAuth2Options) Clone() *OAuth2Options
type OffsetFirst ¶
type OffsetFirst struct {
}
type OffsetLast ¶
type OffsetLast struct {
}
type OffsetNext ¶
type OffsetNext struct {
}
type OffsetValue ¶
type OffsetValue struct {
Offset uint64
}
type PublishResult ¶
type PublishResult struct {
Outcome DeliveryState
Message *amqp.Message
}
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher is a publisher that sends messages to a specific destination address.
func (*Publisher) Publish ¶
Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.
The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:
- StateAccepted
- StateReleased
- StateRejected See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
Exchange: "myExchangeName",
Key: "myRoutingKey",
}
.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
</code> Create a new publisher that sends messages based on message destination address: <code>
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)
</code>
The message is persistent by default by setting the Header.Durable to true when Header is nil. You can set the message to be non-persistent by setting the Header.Durable to false. Note: When you use the `Header` is up to you to set the message properties, You need set the `Header.Durable` to true or false.
<code>
</code>
type PublisherOptions ¶
type QueueAddress ¶
type QueueAddress struct {
Queue string // The name of the queue
Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}
QueueAddress represents the address of a queue.
type QuorumQueueSpecification ¶
type QuorumQueueSpecification struct {
Name string
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
DeliveryLimit int64
TargetClusterSize int64
LeaderLocator ILeaderLocator
QuorumInitialGroupSize int
Arguments map[string]any
}
type RecoveryConfiguration ¶
type RecoveryConfiguration struct {
/*
ActiveRecovery Define if the recovery is activated.
If is not activated the connection will not try to createSender.
*/
ActiveRecovery bool
/*
BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
time will be increased exponentially with each attempt.
Default is 5 seconds, each attempt will double the time.
The minimum value is 1 second. Avoid setting a value low values since it can cause a high
number of reconnection attempts.
*/
BackOffReconnectInterval time.Duration
/*
MaxReconnectAttempts The maximum number of reconnection attempts.
Default is 5.
The minimum value is 1.
*/
MaxReconnectAttempts int
}
func NewRecoveryConfiguration ¶
func NewRecoveryConfiguration() *RecoveryConfiguration
func (*RecoveryConfiguration) Clone ¶
func (c *RecoveryConfiguration) Clone() *RecoveryConfiguration
type RejectPublishDlxOverflowStrategy ¶
type RejectPublishDlxOverflowStrategy struct {
}
type RejectPublishOverflowStrategy ¶
type RejectPublishOverflowStrategy struct {
}
type ReplyPostProcessor ¶ added in v0.2.0
ReplyPostProcessor is a function that is called after the request handler has processed the request. It can be used to modify the reply message before it is sent.
type RequestPostProcessor ¶ added in v0.2.0
RequestPostProcessor is a function that modifies an AMQP message before it is sent as an RPC request. It receives the message about to be sent and the correlation ID generated for the request. Implementations must assign the correlation ID to a message property (e.g., `MessageID` or `CorrelationID`) and set the `ReplyTo` address for the reply queue. The function must return the modified message.
The default `RequestPostProcessor` implementation (used when `RequestPostProcessor` is not explicitly set in `RequesterOptions`) performs the following:
- Assigns the `correlationID` to the `MessageID` property of the `amqp.Message`.
- Sets the `ReplyTo` message property to a client-generated exclusive auto-delete queue.
type Requester ¶ added in v0.4.0
type Requester interface {
Close(context.Context) error
Message(body []byte) *amqp.Message
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
GetReplyQueue() (string, error)
}
Requester is an interface for making RPC (Remote Procedure Call) requests over AMQP. Implementations of this interface should handle the sending of requests and the receiving of corresponding replies, managing correlation IDs and timeouts.
The default implementation provides the following behaviour:
- Requests are published to a specified request queue. This queue must be pre-declared.
- Replies are consumed from a dedicated reply-to queue. This queue is dynamically created by the client.
- Correlation IDs are used to match requests with replies. The default implementation uses a random UUID as prefix and an auto-incrementing counter as suffix. The UUIDs are set as MessageID in the request message.
- A request timeout mechanism is in place to handle unacknowledged replies.
- Messages are pre-processed before publishing. The default implementation assigns the correlation ID to the MessageID property of the request message.
- Replies are simply sent over the "callback" channel.
Implementers should ensure that:
- `Close` properly shuts down underlying resources like publishers and consumers.
- `Message` provides a basic AMQP message structure for RPC requests.
- `Publish` sends the request message and returns a channel that will receive the reply message, or be closed if a timeout occurs or the client is closed.
- `GetReplyQueue` returns the address of the reply queue used by the requester.
Example ¶
// open a connection
conn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", &rabbitmqamqp.AmqpConnOptions{
Properties: map[string]any{"connection_name": "example rpc client"},
})
if err != nil {
panic(err)
}
defer conn.Close(context.TODO())
// Create RPC client options
// RequestQueueName is mandatory. The queue must exist.
options := rabbitmqamqp.RequesterOptions{
RequestQueueName: "rpc-queue",
}
// Create a new RPC client
requester, err := conn.NewRequester(context.TODO(), &options)
if err != nil {
panic(err)
}
defer requester.Close(context.TODO())
// Create an AMQP message with some initial data
msg := requester.Message([]byte("hello world"))
// Add some application properties to the message
msg.ApplicationProperties = map[string]any{"example": "rpc"}
// Send the message to the server
pendingRequestCh, err := requester.Publish(context.TODO(), msg)
if err != nil {
panic(err)
}
// Wait for the reply from the server
replyFromServer := <-pendingRequestCh
// Print the reply from the server
// This example assumes that the server is an "echo" server, that just returns the message it received.
fmt.Printf("application property 'example': %s\n", replyFromServer.ApplicationProperties["example"])
fmt.Printf("reply correlation ID: %s\n", replyFromServer.Properties.CorrelationID)
Example (CustomCorrelationID) ¶
// type fooCorrelationIdSupplier struct {
// count int
// }
//
// func (c *fooCorrelationIdSupplier) Get() any {
// c.count++
// return fmt.Sprintf("foo-%d", c.count)
// }
// Connection setup
conn, _ := rabbitmqamqp.Dial(context.TODO(), "amqp://", nil)
defer conn.Close(context.TODO())
// Create RPC client options
options := rabbitmqamqp.RequesterOptions{
RequestQueueName: "rpc-queue", // the queue must exist
CorrelationIdSupplier: &fooCorrelationIdSupplier{},
}
// Create a new RPC client
rpcClient, _ := conn.NewRequester(context.TODO(), &options)
pendingRequestCh, _ := rpcClient.Publish(context.TODO(), rpcClient.Message([]byte("hello world")))
replyFromServer := <-pendingRequestCh
fmt.Printf("reply correlation ID: %s\n", replyFromServer.Properties.CorrelationID)
// Should print: foo-1
type RequesterOptions ¶ added in v0.4.0
type RequesterOptions struct {
// The name of the queue to send requests to. This queue must exist.
//
// Mandatory.
RequestQueueName string
// The name of the queue to receive replies from.
//
// Optional. If not set, a dedicated reply-to queue will be created for each request.
ReplyToQueueName string
// Generator of correlation IDs for requests. Each correlationID generated must be unique.
//
// Optional. If not set, a random UUID will be used as prefix and an auto-incrementing counter as suffix.
CorrelationIdSupplier CorrelationIdSupplier
// Function to extract correlation IDs from replies.
//
// Optional. If not set, the `CorrelationID` message property will be used.
CorrelationIdExtractor CorrelationIdExtractor
// Function to modify requests before they are sent.
//
// Optional. If not set, the default `RequestPostProcessor` assigns the correlation ID to the `MessageID` property.
RequestPostProcessor RequestPostProcessor
// The timeout for requests.
//
// Optional. If not set, a default timeout of 30 seconds will be used.
RequestTimeout time.Duration
// If true, the requester will set the 'Direct-Reply-To' feature for RabbitMQ.
// see: https://www.rabbitmq.com/direct-reply-to.html
DirectReplyTo bool
}
RequesterOptions is a struct that contains the options for the RPC client. It is used to configure the RPC client.
type Responder ¶ added in v0.4.0
type Responder interface {
// Close the RPC server and its underlying resources.
Close(context.Context) error
// Pause the server to stop receiving messages.
Pause()
// Unpause requests to receive messages again.
Unpause() error
GetRequestQueue() (string, error)
}
Responder is Remote Procedure Call server that receives a message, process them, and sends a response.
type ResponderHandler ¶ added in v0.4.0
ResponderHandler is a function that processes a request message and returns a response message. If the server wants to send a response to the client, it must return a response message. If the function returns nil, the server will not send a response. If the server does not send a response message, this high level RPC server doesn't make much sense, and it is better to use a normal AMQP 1.0 consumer.
The server handler blocks until this function returns. It is highly recommended to use functions that process and return quickly.
Example:
func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
return amqp.NewMessage([]byte(fmt.Sprintf("Pong: %s", request.GetData()))), nil
}
type ResponderOptions ¶ added in v0.4.0
type ResponderOptions struct {
// RequestQueue is the name of the queue to subscribe to. This queue must be pre-declared.
// The RPC server does not declare the queue, it is the responsibility of the caller to declare the queue.
//
// Mandatory.
RequestQueue string
// Handler is a function to process the request message. If the server wants to send a response to
// the client, it must return a response message. If the function returns nil, the server will not send a response.
//
// It is encouraged to initialise the response message properties in the handler. If the handler returns a non-nil
// error, the server will discard the request message and log an error.
//
// The server handler blocks until this function returns. It is highly recommended to functions that process and return quickly.
// If you need to perform a long running operation, it's advisable to dispatch the operation to another queue.
//
// Example:
// func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
// return amqp.NewMessage([]byte(fmt.Sprintf("Pong: %s", request.GetData()))), nil
// }
//
// Mandatory.
Handler ResponderHandler
// CorrectionIdExtractor is a function that extracts a correction ID from the request message.
// The returned value should be an AMQP type that can be binary encoded.
//
// This field is optional. If not provided, the server will use the `MessageID` as the correlation ID.
//
// Example:
// func(message *amqp.Message) any {
// return message.Properties.MessageID
// }
//
// The default correlation ID extractor also handles nil cases.
//
// Optional.
CorrelationIdExtractor CorrelationIdExtractor
// PostProcessor is a function that receives the reply message and the extracted correlation ID, just before the reply is sent.
// It can be used to modify the reply message before it is sent.
//
// The post processor must set the correlation ID in the reply message properties.
//
// This field is optional. If not provided, the server will set the correlation ID in the reply message properties, using
// the correlation ID extracted from the CorrelationIdExtractor.
//
// Example:
// func(reply *amqp.Message, correlationID any) *amqp.Message {
// reply.Properties.CorrelationID = correlationID
// return reply
// }
//
// The default post processor also handles nil cases.
//
// Optional.
ReplyPostProcessor ReplyPostProcessor
}
type StateAccepted ¶
type StateAccepted = amqp.StateAccepted
type StateChanged ¶
type StateChanged struct {
From ILifeCycleState
To ILifeCycleState
}
func (StateChanged) String ¶
func (s StateChanged) String() string
type StateClosed ¶
type StateClosed struct {
// contains filtered or unexported fields
}
func (*StateClosed) GetError ¶
func (c *StateClosed) GetError() error
type StateClosing ¶
type StateClosing struct {
}
type StateModified ¶
type StateModified = amqp.StateModified
type StateReconnecting ¶
type StateReconnecting struct {
}
type StateRejected ¶
type StateRejected = amqp.StateRejected
type StateReleased ¶
type StateReleased = amqp.StateReleased
type StreamConsumerOptions ¶
type StreamConsumerOptions struct {
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The offset specification for the stream consumer
// see the interface implementations
Offset IOffsetSpecification
StreamFilterOptions *StreamFilterOptions
Id string
}
StreamConsumerOptions represents the options for stream queues It is mandatory in case of creating a stream consumer.
type StreamFilterOptions ¶
type StreamFilterOptions struct {
// Filter values.
Values []string
//
MatchUnfiltered bool
// Filter the data based on Application Property
ApplicationProperties map[string]any
// Filter the data based on Message Properties
Properties *amqp.MessageProperties
/* SQLFilter: documentation https://www.rabbitmq.com/docs/next/stream-filtering#sql-filter-expressions
It requires RabbitMQ 4.2 or later
Example:
<code>
Define a message like:
var msg := NewMessage([]byte(..))
msg.Properties = &amqp.MessageProperties{Subject: ptr("mySubject"), To: ptr("To")}
msg.ApplicationProperties = map[string]interface{}{"filter_key": "filter_value"}
publisher.Publish(context.Background(), msg)
Then you can create a consumer with a SQL filter like:
consumer, err := connection.NewConsumer(context.Background(), "myQueue", &StreamConsumerOptions{
InitialCredits: 200,
Offset: &OffsetFirst{},
StreamFilterOptions: &StreamFilterOptions{
SQL: "properties.subject LIKE '%mySubject%' AND properties.to = 'To' AND filter_key = 'filter_value'",
},
})
</code>
*/
SQL string
}
StreamFilterOptions represents the options that can be used to filter the stream data. It is used in the StreamConsumerOptions. See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions/
type TEndPointStrategy ¶
type TEndPointStrategy int
const ( StrategyRandom TEndPointStrategy = iota StrategySequential TEndPointStrategy = iota )
type TExchangeType ¶
type TExchangeType string
TExchangeType represents the type of exchange
const ( Direct TExchangeType = "direct" Topic TExchangeType = "topic" FanOut TExchangeType = "fanout" Headers TExchangeType = "headers" )
type TQueueType ¶
type TQueueType string
const ( Quorum TQueueType = "quorum" Classic TQueueType = "classic" Stream TQueueType = "stream" )
type TopologyRecoveryOptions ¶ added in v0.3.0
type TopologyRecoveryOptions byte
TopologyRecoveryOptions is used to configure the topology recovery behavior of the connection. See TopologyRecoveryDisabled, TopologyRecoveryOnlyTransient, and TopologyRecoveryAllEnabled for more information.
const ( // TopologyRecoveryOnlyTransient recovers only queues declared as exclusive and/or auto delete, and // related bindings. Exchanges are not recovered. TopologyRecoveryOnlyTransient TopologyRecoveryOptions = iota // TopologyRecoveryDisabled disables the topology recovery. TopologyRecoveryDisabled // TopologyRecoveryAllEnabled recovers all the topology. All exchanges, queues, and bindings are recovered. TopologyRecoveryAllEnabled )
Source Files
¶
- address.go
- amqp_binding.go
- amqp_connection.go
- amqp_connection_recovery.go
- amqp_consumer.go
- amqp_environment.go
- amqp_exchange.go
- amqp_management.go
- amqp_publisher.go
- amqp_queue.go
- amqp_types.go
- amqp_utils.go
- common.go
- converters.go
- entities.go
- features_available.go
- life_cycle.go
- log.go
- messages_helper.go
- requester.go
- responder.go
- test_utils.go
- uri.go