Documentation
¶
Index ¶
- Constants
- Variables
- func CapacityBytes(value int64) int64
- func CapacityFrom(value string) (int64, error)
- func CapacityGB(value int64) int64
- func CapacityKB(value int64) int64
- func CapacityMB(value int64) int64
- func CapacityTB(value int64) int64
- func Debug(msg string, args ...any)
- func Error(msg string, args ...any)
- func ExtractWithoutPassword(addr string) string
- func Info(msg string, args ...any)
- func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error
- func NewMessage(body []byte) *amqp.Message
- func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)
- func NewMessageWithFilter(body []byte, filter string) *amqp.Message
- func Warn(msg string, args ...any)
- type AMQPBinding
- func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
- func (b *AMQPBinding) BindingKey(bindingKey string)
- func (b *AMQPBinding) Destination(name string, isQueue bool)
- func (b *AMQPBinding) SourceExchange(sourceName string)
- func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error
- type AMQPBindingInfo
- type AmqpAddress
- type AmqpConnOptions
- type AmqpConnection
- func (a *AmqpConnection) Close(ctx context.Context) error
- func (a *AmqpConnection) Id() string
- func (a *AmqpConnection) Management() *AmqpManagement
- func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)
- func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)
- func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpConnection) Properties() map[string]any
- func (a *AmqpConnection) RefreshToken(background context.Context, token string) error
- func (a *AmqpConnection) State() ILifeCycleState
- type AmqpExchange
- func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
- func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
- func (e *AmqpExchange) Delete(ctx context.Context) error
- func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)
- func (e *AmqpExchange) GetExchangeType() TExchangeType
- func (e *AmqpExchange) IsAutoDelete() bool
- func (e *AmqpExchange) Name() string
- type AmqpExchangeInfo
- type AmqpManagement
- func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)
- func (a *AmqpManagement) Close(ctx context.Context) error
- func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)
- func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
- func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
- func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
- func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, error)
- func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, ...) (map[string]any, error)
- func (a *AmqpManagement) State() ILifeCycleState
- func (a *AmqpManagement) Unbind(ctx context.Context, path string) error
- type AmqpQueue
- func (a *AmqpQueue) AutoDelete(isAutoDelete bool)
- func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
- func (a *AmqpQueue) Delete(ctx context.Context) error
- func (a *AmqpQueue) Exclusive(isExclusive bool)
- func (a *AmqpQueue) IsAutoDelete() bool
- func (a *AmqpQueue) IsExclusive() bool
- func (a *AmqpQueue) Name(queueName string)
- func (a *AmqpQueue) Purge(ctx context.Context) (int, error)
- func (a *AmqpQueue) QueueType(queueType QueueType)
- func (a *AmqpQueue) SetArguments(arguments map[string]any)
- type AmqpQueueInfo
- func (a *AmqpQueueInfo) Arguments() map[string]any
- func (a *AmqpQueueInfo) ConsumerCount() uint32
- func (a *AmqpQueueInfo) IsAutoDelete() bool
- func (a *AmqpQueueInfo) IsDurable() bool
- func (a *AmqpQueueInfo) IsExclusive() bool
- func (a *AmqpQueueInfo) Leader() string
- func (a *AmqpQueueInfo) Members() []string
- func (a *AmqpQueueInfo) MessageCount() uint64
- func (a *AmqpQueueInfo) Name() string
- func (a *AmqpQueueInfo) Type() TQueueType
- type AutoGeneratedQueueSpecification
- type BalancedLeaderLocator
- type ClassicQueueSpecification
- type ClientLocalLeaderLocator
- type Consumer
- type ConsumerOptions
- type CustomExchangeSpecification
- type DeliveryContext
- func (dc *DeliveryContext) Accept(ctx context.Context) error
- func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error
- func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- func (dc *DeliveryContext) Message() *amqp.Message
- func (dc *DeliveryContext) Requeue(ctx context.Context) error
- func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- type DeliveryState
- type DirectExchangeSpecification
- type DropHeadOverflowStrategy
- type Endpoint
- type Environment
- type ExchangeAddress
- type ExchangeToExchangeBindingSpecification
- type ExchangeToQueueBindingSpecification
- type ExchangeType
- type FanOutExchangeSpecification
- type HeadersExchangeSpecification
- type IBindingSpecification
- type IConsumerOptions
- type IExchangeSpecification
- type ILeaderLocator
- type ILifeCycleState
- type IOffsetSpecification
- type IOverflowStrategy
- type IPublisherOptions
- type IQueueSpecification
- type ITargetAddress
- type LifeCycle
- type OAuth2Options
- type OffsetFirst
- type OffsetLast
- type OffsetNext
- type OffsetValue
- type PublishResult
- type Publisher
- type PublisherOptions
- type QueueAddress
- type QueueType
- type QuorumQueueSpecification
- type RecoveryConfiguration
- type RejectPublishDlxOverflowStrategy
- type RejectPublishOverflowStrategy
- type StateAccepted
- type StateChanged
- type StateClosed
- type StateClosing
- type StateModified
- type StateOpen
- type StateReconnecting
- type StateRejected
- type StateReleased
- type StreamConsumerOptions
- type StreamFilterOptions
- type StreamQueueSpecification
- type TEndPointStrategy
- type TExchangeType
- type TQueueType
- type TopicExchangeSpecification
- type URI
- type Version
Constants ¶
const ( UnitMb string = "mb" UnitKb string = "kb" UnitGb string = "gb" UnitTb string = "tb" )
const AtLeastOnce = 1
const AtMostOnce = 0
const StreamFilterValue = "x-stream-filter-value"
Variables ¶
var ErrDoesNotExist = errors.New("does not exist")
var ErrPreconditionFailed = errors.New("precondition Failed")
Functions ¶
func CapacityBytes ¶
func CapacityFrom ¶
func CapacityGB ¶
func CapacityKB ¶
func CapacityMB ¶
func CapacityTB ¶
func ExtractWithoutPassword ¶
func MessagePropertyToAddress ¶
func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error
MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.
func NewMessage ¶
NewMessage creates a new AMQP 1.0 message with the given payload.
func NewMessageWithAddress ¶
func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)
NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.
func NewMessageWithFilter ¶
NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.
Types ¶
type AMQPBinding ¶
type AMQPBinding struct {
// contains filtered or unexported fields
}
func (*AMQPBinding) Bind ¶
func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.
func (*AMQPBinding) BindingKey ¶
func (b *AMQPBinding) BindingKey(bindingKey string)
func (*AMQPBinding) Destination ¶
func (b *AMQPBinding) Destination(name string, isQueue bool)
func (*AMQPBinding) SourceExchange ¶
func (b *AMQPBinding) SourceExchange(sourceName string)
type AMQPBindingInfo ¶
type AMQPBindingInfo struct {
}
type AmqpAddress ¶
type AmqpAddress struct {
// the address of the AMQP server
// it is in the form of amqp://<host>:<port>
// or amqps://<host>:<port>
// the port is optional
// the default port is 5672
// the default protocol is amqp
// the default host is localhost
// the default virtual host is "/"
// the default user is guest
// the default password is guest
// the default SASL type is SASLTypeAnonymous
Address string
// Options: Additional options for the connection
Options *AmqpConnOptions
}
type AmqpConnOptions ¶
type AmqpConnOptions struct {
// wrapper for amqp.ConnOptions
ContainerID string
// wrapper for amqp.ConnOptions
HostName string
// wrapper for amqp.ConnOptions
IdleTimeout time.Duration
// wrapper for amqp.ConnOptions
MaxFrameSize uint32
// wrapper for amqp.ConnOptions
MaxSessions uint16
// wrapper for amqp.ConnOptions
Properties map[string]any
// wrapper for amqp.ConnOptions
SASLType amqp.SASLType
// wrapper for amqp.ConnOptions
TLSConfig *tls.Config
// wrapper for amqp.ConnOptions
WriteTimeout time.Duration
// RecoveryConfiguration is used to configure the recovery behavior of the connection.
// when the connection is closed unexpectedly.
RecoveryConfiguration *RecoveryConfiguration
// The OAuth2Options is used to configure the connection with OAuth2 token.
OAuth2Options *OAuth2Options
// Local connection identifier (not sent to the server)
// if not provided, a random UUID is generated
Id string
}
func (*AmqpConnOptions) Clone ¶
func (a *AmqpConnOptions) Clone() *AmqpConnOptions
type AmqpConnection ¶
type AmqpConnection struct {
// contains filtered or unexported fields
}
func Dial ¶
func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error)
Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error.
func (*AmqpConnection) Close ¶
func (a *AmqpConnection) Close(ctx context.Context) error
Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.
func (*AmqpConnection) Id ¶
func (a *AmqpConnection) Id() string
func (*AmqpConnection) Management ¶
func (a *AmqpConnection) Management() *AmqpManagement
Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.
func (*AmqpConnection) NewConsumer ¶
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)
NewConsumer creates a new Consumer that listens to the provided Queue
func (*AmqpConnection) NewPublisher ¶
func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)
NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a ITargetAddress that can be a Queue or an Exchange with a routing key. options is an IPublisherOptions that can be used to configure the publisher. See QueueAddress and ExchangeAddress for more information.
func (*AmqpConnection) NotifyStatusChange ¶
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
NotifyStatusChange registers a channel to receive getState change notifications from the connection.
func (*AmqpConnection) Properties ¶
func (a *AmqpConnection) Properties() map[string]any
func (*AmqpConnection) RefreshToken ¶
func (a *AmqpConnection) RefreshToken(background context.Context, token string) error
func (*AmqpConnection) State ¶
func (a *AmqpConnection) State() ILifeCycleState
type AmqpExchange ¶
type AmqpExchange struct {
// contains filtered or unexported fields
}
func (*AmqpExchange) AutoDelete ¶
func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
func (*AmqpExchange) Declare ¶
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
func (*AmqpExchange) ExchangeType ¶
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)
func (*AmqpExchange) GetExchangeType ¶
func (e *AmqpExchange) GetExchangeType() TExchangeType
func (*AmqpExchange) IsAutoDelete ¶
func (e *AmqpExchange) IsAutoDelete() bool
func (*AmqpExchange) Name ¶
func (e *AmqpExchange) Name() string
type AmqpExchangeInfo ¶
type AmqpExchangeInfo struct {
// contains filtered or unexported fields
}
func (*AmqpExchangeInfo) Name ¶
func (a *AmqpExchangeInfo) Name() string
type AmqpManagement ¶
type AmqpManagement struct {
// contains filtered or unexported fields
}
AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings
func (*AmqpManagement) Bind ¶
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)
func (*AmqpManagement) DeclareExchange ¶
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)
func (*AmqpManagement) DeclareQueue ¶
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)
func (*AmqpManagement) DeleteExchange ¶
func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
func (*AmqpManagement) DeleteQueue ¶
func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
func (*AmqpManagement) NotifyStatusChange ¶
func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
func (*AmqpManagement) Open ¶
func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
func (*AmqpManagement) PurgeQueue ¶
PurgeQueue purges the queue returns the number of messages purged
func (*AmqpManagement) QueueInfo ¶
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
func (*AmqpManagement) Request ¶
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, expectedResponseCodes []int) (map[string]any, error)
Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods
func (*AmqpManagement) State ¶
func (a *AmqpManagement) State() ILifeCycleState
type AmqpQueue ¶
type AmqpQueue struct {
// contains filtered or unexported fields
}
func (*AmqpQueue) AutoDelete ¶
func (*AmqpQueue) Declare ¶
func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
func (*AmqpQueue) IsAutoDelete ¶
func (*AmqpQueue) IsExclusive ¶
func (*AmqpQueue) SetArguments ¶
type AmqpQueueInfo ¶
type AmqpQueueInfo struct {
// contains filtered or unexported fields
}
func (*AmqpQueueInfo) Arguments ¶
func (a *AmqpQueueInfo) Arguments() map[string]any
func (*AmqpQueueInfo) ConsumerCount ¶
func (a *AmqpQueueInfo) ConsumerCount() uint32
func (*AmqpQueueInfo) IsAutoDelete ¶
func (a *AmqpQueueInfo) IsAutoDelete() bool
func (*AmqpQueueInfo) IsDurable ¶
func (a *AmqpQueueInfo) IsDurable() bool
func (*AmqpQueueInfo) IsExclusive ¶
func (a *AmqpQueueInfo) IsExclusive() bool
func (*AmqpQueueInfo) Leader ¶
func (a *AmqpQueueInfo) Leader() string
func (*AmqpQueueInfo) Members ¶
func (a *AmqpQueueInfo) Members() []string
func (*AmqpQueueInfo) MessageCount ¶
func (a *AmqpQueueInfo) MessageCount() uint64
func (*AmqpQueueInfo) Name ¶
func (a *AmqpQueueInfo) Name() string
func (*AmqpQueueInfo) Type ¶
func (a *AmqpQueueInfo) Type() TQueueType
type AutoGeneratedQueueSpecification ¶
type AutoGeneratedQueueSpecification struct {
IsAutoDelete bool
IsExclusive bool
MaxLength int64
MaxLengthBytes int64
}
AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.
type BalancedLeaderLocator ¶
type BalancedLeaderLocator struct {
}
type ClassicQueueSpecification ¶
type ClassicQueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
MaxPriority int64
LeaderLocator ILeaderLocator
}
ClassicQueueSpecification represents the specification of the classic queue
type ClientLocalLeaderLocator ¶
type ClientLocalLeaderLocator struct {
}
type ConsumerOptions ¶
type ConsumerOptions struct {
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The id of the consumer
Id string
}
ConsumerOptions represents the options for quorum and classic queues
type DeliveryContext ¶
type DeliveryContext struct {
// contains filtered or unexported fields
}
func (*DeliveryContext) DiscardWithAnnotations ¶
func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
func (*DeliveryContext) Message ¶
func (dc *DeliveryContext) Message() *amqp.Message
func (*DeliveryContext) RequeueWithAnnotations ¶
func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
type DeliveryState ¶
type DeliveryState = amqp.DeliveryState
type DropHeadOverflowStrategy ¶
type DropHeadOverflowStrategy struct {
}
type Endpoint ¶
type Endpoint struct {
Address string
Options *AmqpConnOptions
}
func DefaultEndpoints ¶
func DefaultEndpoints() []Endpoint
type Environment ¶
type Environment struct {
EndPointStrategy TEndPointStrategy
// contains filtered or unexported fields
}
func NewClusterEnvironment ¶
func NewClusterEnvironment(endPoints []Endpoint) *Environment
func NewClusterEnvironmentWithStrategy ¶
func NewClusterEnvironmentWithStrategy(endPoints []Endpoint, strategy TEndPointStrategy) *Environment
func NewEnvironment ¶
func NewEnvironment(address string, options *AmqpConnOptions) *Environment
func (*Environment) CloseConnections ¶
func (e *Environment) CloseConnections(ctx context.Context) error
CloseConnections closes all the connections in the environment with all the publishers and consumers.
func (*Environment) Connections ¶
func (e *Environment) Connections() []*AmqpConnection
Connections gets the active connections in the environment
func (*Environment) NewConnection ¶
func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error)
NewConnection get a new connection from the environment. It picks an endpoint from the list of endpoints, based on EndPointStrategy, and tries to open a connection. It fails if all the endpoints are not reachable. The Environment will keep track of the connection and close it when the environment is closed.
type ExchangeAddress ¶
type ExchangeAddress struct {
Exchange string // The name of the exchange
Key string // The routing key. Can be empty
}
ExchangeAddress represents the address of an exchange with a routing key.
type ExchangeType ¶
type ExchangeType struct {
Type TExchangeType
}
func (ExchangeType) String ¶
func (e ExchangeType) String() string
type IBindingSpecification ¶
type IBindingSpecification interface {
// contains filtered or unexported methods
}
type IConsumerOptions ¶
type IConsumerOptions interface {
// contains filtered or unexported methods
}
type IExchangeSpecification ¶
type IExchangeSpecification interface {
// contains filtered or unexported methods
}
IExchangeSpecification represents the specification of an exchange
type ILeaderLocator ¶
type ILeaderLocator interface {
// contains filtered or unexported methods
}
type ILifeCycleState ¶
type ILifeCycleState interface {
// contains filtered or unexported methods
}
type IOffsetSpecification ¶
type IOffsetSpecification interface {
// contains filtered or unexported methods
}
type IOverflowStrategy ¶
type IOverflowStrategy interface {
// contains filtered or unexported methods
}
type IPublisherOptions ¶
type IPublisherOptions interface {
// contains filtered or unexported methods
}
type IQueueSpecification ¶
type IQueueSpecification interface {
// contains filtered or unexported methods
}
IQueueSpecification represents the specification of a queue
type ITargetAddress ¶
type ITargetAddress interface {
// contains filtered or unexported methods
}
ITargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.
type LifeCycle ¶
type LifeCycle struct {
// contains filtered or unexported fields
}
func NewLifeCycle ¶
func NewLifeCycle() *LifeCycle
func (*LifeCycle) SetState ¶
func (l *LifeCycle) SetState(value ILifeCycleState)
func (*LifeCycle) State ¶
func (l *LifeCycle) State() ILifeCycleState
type OAuth2Options ¶
type OAuth2Options struct {
Token string
}
func (OAuth2Options) Clone ¶
func (o OAuth2Options) Clone() *OAuth2Options
type OffsetFirst ¶
type OffsetFirst struct {
}
type OffsetLast ¶
type OffsetLast struct {
}
type OffsetNext ¶
type OffsetNext struct {
}
type OffsetValue ¶
type OffsetValue struct {
Offset uint64
}
type PublishResult ¶
type PublishResult struct {
Outcome DeliveryState
Message *amqp.Message
}
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher is a publisher that sends messages to a specific destination address.
func (*Publisher) Publish ¶
Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.
The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:
- StateAccepted
- StateReleased
- StateRejected See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
Note: If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
Exchange: "myExchangeName",
Key: "myRoutingKey",
}
.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
</code> Create a new publisher that sends messages based on message destination address: <code>
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)
</code>
type PublisherOptions ¶
type QueueAddress ¶
type QueueAddress struct {
Queue string // The name of the queue
Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}
QueueAddress represents the address of a queue.
type QueueType ¶
type QueueType struct {
Type TQueueType
}
type QuorumQueueSpecification ¶
type QuorumQueueSpecification struct {
Name string
AutoExpire int64
MessageTTL int64
OverflowStrategy IOverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
DeliveryLimit int64
TargetClusterSize int64
LeaderLocator ILeaderLocator
QuorumInitialGroupSize int
}
type RecoveryConfiguration ¶
type RecoveryConfiguration struct {
/*
ActiveRecovery Define if the recovery is activated.
If is not activated the connection will not try to createSender.
*/
ActiveRecovery bool
/*
BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
time will be increased exponentially with each attempt.
Default is 5 seconds, each attempt will double the time.
The minimum value is 1 second. Avoid setting a value low values since it can cause a high
number of reconnection attempts.
*/
BackOffReconnectInterval time.Duration
/*
MaxReconnectAttempts The maximum number of reconnection attempts.
Default is 5.
The minimum value is 1.
*/
MaxReconnectAttempts int
}
func NewRecoveryConfiguration ¶
func NewRecoveryConfiguration() *RecoveryConfiguration
func (*RecoveryConfiguration) Clone ¶
func (c *RecoveryConfiguration) Clone() *RecoveryConfiguration
type RejectPublishDlxOverflowStrategy ¶
type RejectPublishDlxOverflowStrategy struct {
}
type RejectPublishOverflowStrategy ¶
type RejectPublishOverflowStrategy struct {
}
type StateAccepted ¶
type StateAccepted = amqp.StateAccepted
type StateChanged ¶
type StateChanged struct {
From ILifeCycleState
To ILifeCycleState
}
func (StateChanged) String ¶
func (s StateChanged) String() string
type StateClosed ¶
type StateClosed struct {
// contains filtered or unexported fields
}
func (*StateClosed) GetError ¶
func (c *StateClosed) GetError() error
type StateClosing ¶
type StateClosing struct {
}
type StateModified ¶
type StateModified = amqp.StateModified
type StateReconnecting ¶
type StateReconnecting struct {
}
type StateRejected ¶
type StateRejected = amqp.StateRejected
type StateReleased ¶
type StateReleased = amqp.StateReleased
type StreamConsumerOptions ¶
type StreamConsumerOptions struct {
//ReceiverLinkName: see the IConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the IConsumerOptions interface
InitialCredits int32
// The offset specification for the stream consumer
// see the interface implementations
Offset IOffsetSpecification
StreamFilterOptions *StreamFilterOptions
Id string
}
StreamConsumerOptions represents the options for stream queues It is mandatory in case of creating a stream consumer.
type StreamFilterOptions ¶
type StreamFilterOptions struct {
// Filter values.
Values []string
//
MatchUnfiltered bool
// Filter the data based on Application Property
ApplicationProperties map[string]any
// Filter the data based on Message Properties
Properties *amqp.MessageProperties
}
StreamFilterOptions represents the options that can be used to filter the stream data. It is used in the StreamConsumerOptions. See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions/
type TEndPointStrategy ¶
type TEndPointStrategy int
const ( StrategyRandom TEndPointStrategy = iota StrategySequential TEndPointStrategy = iota )
type TExchangeType ¶
type TExchangeType string
TExchangeType represents the type of exchange
const ( Direct TExchangeType = "direct" Topic TExchangeType = "topic" FanOut TExchangeType = "fanout" Headers TExchangeType = "headers" )
type TQueueType ¶
type TQueueType string
const ( Quorum TQueueType = "quorum" Classic TQueueType = "classic" Stream TQueueType = "stream" )
Source Files
¶
- address.go
- amqp_binding.go
- amqp_connection.go
- amqp_connection_recovery.go
- amqp_consumer.go
- amqp_environment.go
- amqp_exchange.go
- amqp_management.go
- amqp_publisher.go
- amqp_queue.go
- amqp_types.go
- amqp_utils.go
- common.go
- converters.go
- entities.go
- features_available.go
- life_cycle.go
- log.go
- messages_helper.go
- test_utils.go
- uri.go