Documentation
¶
Index ¶
- Constants
- Variables
- func CapacityBytes(value int64) int64
- func CapacityFrom(value string) (int64, error)
- func CapacityGB(value int64) int64
- func CapacityKB(value int64) int64
- func CapacityMB(value int64) int64
- func CapacityTB(value int64) int64
- func Debug(msg string, args ...any)
- func Error(msg string, args ...any)
- func ExtractWithoutPassword(addr string) string
- func Info(msg string, args ...any)
- func MessagePropertyToAddress(msgRef *amqp.Message, target TargetAddress) error
- func NewMessage(body []byte) *amqp.Message
- func NewMessageWithAddress(body []byte, target TargetAddress) (*amqp.Message, error)
- func NewMessageWithFilter(body []byte, filter string) *amqp.Message
- func Warn(msg string, args ...any)
- type AMQPBinding
- func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
- func (b *AMQPBinding) BindingKey(bindingKey string)
- func (b *AMQPBinding) Destination(name string, isQueue bool)
- func (b *AMQPBinding) SourceExchange(sourceName string)
- func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error
- type AMQPBindingInfo
- type AMQPConsumerOptions
- type AMQPProducerOptions
- type AmqpConnOptions
- type AmqpConnection
- func (a *AmqpConnection) Close(ctx context.Context) error
- func (a *AmqpConnection) Id() string
- func (a *AmqpConnection) Management() *AmqpManagement
- func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error)
- func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAddress, linkName string) (*Publisher, error)
- func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpConnection) State() LifeCycleState
- type AmqpExchange
- func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
- func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
- func (e *AmqpExchange) Delete(ctx context.Context) error
- func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)
- func (e *AmqpExchange) GetExchangeType() TExchangeType
- func (e *AmqpExchange) IsAutoDelete() bool
- func (e *AmqpExchange) Name() string
- type AmqpExchangeInfo
- type AmqpManagement
- func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingSpecification) (string, error)
- func (a *AmqpManagement) Close(ctx context.Context) error
- func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification ExchangeSpecification) (*AmqpExchangeInfo, error)
- func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification QueueSpecification) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
- func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
- func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
- func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
- func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, error)
- func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
- func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, ...) (map[string]any, error)
- func (a *AmqpManagement) State() LifeCycleState
- func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error
- type AmqpQueue
- func (a *AmqpQueue) AutoDelete(isAutoDelete bool)
- func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
- func (a *AmqpQueue) Delete(ctx context.Context) error
- func (a *AmqpQueue) Exclusive(isExclusive bool)
- func (a *AmqpQueue) IsAutoDelete() bool
- func (a *AmqpQueue) IsExclusive() bool
- func (a *AmqpQueue) Name(queueName string)
- func (a *AmqpQueue) Purge(ctx context.Context) (int, error)
- func (a *AmqpQueue) QueueType(queueType QueueType)
- func (a *AmqpQueue) SetArguments(arguments map[string]any)
- type AmqpQueueInfo
- func (a *AmqpQueueInfo) Arguments() map[string]any
- func (a *AmqpQueueInfo) IsAutoDelete() bool
- func (a *AmqpQueueInfo) IsDurable() bool
- func (a *AmqpQueueInfo) IsExclusive() bool
- func (a *AmqpQueueInfo) Leader() string
- func (a *AmqpQueueInfo) Members() []string
- func (a *AmqpQueueInfo) Name() string
- func (a *AmqpQueueInfo) Type() TQueueType
- type AutoGeneratedQueueSpecification
- type BalancedLeaderLocator
- type BindingSpecification
- type ClassicQueueSpecification
- type ClientLocalLeaderLocator
- type Consumer
- type ConsumerOptions
- type DeliveryContext
- func (dc *DeliveryContext) Accept(ctx context.Context) error
- func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error
- func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- func (dc *DeliveryContext) Message() *amqp.Message
- func (dc *DeliveryContext) Requeue(ctx context.Context) error
- func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
- type DeliveryState
- type DirectExchangeSpecification
- type DropHeadOverflowStrategy
- type Environment
- type ExchangeAddress
- type ExchangeSpecification
- type ExchangeToExchangeBindingSpecification
- type ExchangeToQueueBindingSpecification
- type ExchangeType
- type FanOutExchangeSpecification
- type LeaderLocator
- type LifeCycle
- type LifeCycleState
- type OffsetFirst
- type OffsetLast
- type OffsetNext
- type OffsetSpecification
- type OffsetValue
- type OverflowStrategy
- type ProducerOptions
- type PublishResult
- type Publisher
- type QueueAddress
- type QueueSpecification
- type QueueType
- type QuorumQueueSpecification
- type RecoveryConfiguration
- type RejectPublishDlxOverflowStrategy
- type RejectPublishOverflowStrategy
- type StateAccepted
- type StateChanged
- type StateClosed
- type StateClosing
- type StateModified
- type StateOpen
- type StateReconnecting
- type StateRejected
- type StateReleased
- type StreamConsumerOptions
- type StreamQueueSpecification
- type TExchangeType
- type TQueueType
- type TargetAddress
- type TopicExchangeSpecification
- type URI
Constants ¶
const ( UnitMb string = "mb" UnitKb string = "kb" UnitGb string = "gb" UnitTb string = "tb" )
const AtLeastOnce = 1
const AtMostOnce = 0
const StreamFilterValue = "x-stream-filter-value"
Variables ¶
var ErrDoesNotExist = errors.New("does not exist")
var ErrPreconditionFailed = errors.New("precondition Failed")
Functions ¶
func CapacityBytes ¶
func CapacityFrom ¶
func CapacityGB ¶
func CapacityKB ¶
func CapacityMB ¶
func CapacityTB ¶
func ExtractWithoutPassword ¶
func MessagePropertyToAddress ¶
func MessagePropertyToAddress(msgRef *amqp.Message, target TargetAddress) error
MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.
func NewMessage ¶
NewMessage creates a new AMQP 1.0 message with the given payload.
func NewMessageWithAddress ¶
func NewMessageWithAddress(body []byte, target TargetAddress) (*amqp.Message, error)
NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.
func NewMessageWithFilter ¶
NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.
Types ¶
type AMQPBinding ¶
type AMQPBinding struct {
// contains filtered or unexported fields
}
func (*AMQPBinding) Bind ¶
func (b *AMQPBinding) Bind(ctx context.Context) (string, error)
Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.
func (*AMQPBinding) BindingKey ¶
func (b *AMQPBinding) BindingKey(bindingKey string)
func (*AMQPBinding) Destination ¶
func (b *AMQPBinding) Destination(name string, isQueue bool)
func (*AMQPBinding) SourceExchange ¶
func (b *AMQPBinding) SourceExchange(sourceName string)
type AMQPBindingInfo ¶
type AMQPBindingInfo struct {
}
type AMQPConsumerOptions ¶
type AMQPProducerOptions ¶
type AMQPProducerOptions struct {
SenderLinkName string
}
type AmqpConnOptions ¶
type AmqpConnOptions struct {
// wrapper for amqp.ConnOptions
ContainerID string
// wrapper for amqp.ConnOptions
HostName string
// wrapper for amqp.ConnOptions
IdleTimeout time.Duration
// wrapper for amqp.ConnOptions
MaxFrameSize uint32
// wrapper for amqp.ConnOptions
MaxSessions uint16
// wrapper for amqp.ConnOptions
Properties map[string]any
// wrapper for amqp.ConnOptions
SASLType amqp.SASLType
// wrapper for amqp.ConnOptions
TLSConfig *tls.Config
// wrapper for amqp.ConnOptions
WriteTimeout time.Duration
// RecoveryConfiguration is used to configure the recovery behavior of the connection.
// when the connection is closed unexpectedly.
RecoveryConfiguration *RecoveryConfiguration
// contains filtered or unexported fields
}
type AmqpConnection ¶
type AmqpConnection struct {
// contains filtered or unexported fields
}
func Dial ¶
func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions, args ...string) (*AmqpConnection, error)
Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error. addresses is a list of addresses to connect to. It picks one randomly. It is enough that one of the addresses is reachable.
func (*AmqpConnection) Close ¶
func (a *AmqpConnection) Close(ctx context.Context) error
Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.
func (*AmqpConnection) Id ¶
func (a *AmqpConnection) Id() string
func (*AmqpConnection) Management ¶
func (a *AmqpConnection) Management() *AmqpManagement
Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.
func (*AmqpConnection) NewConsumer ¶
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error)
NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
func (*AmqpConnection) NewPublisher ¶
func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAddress, linkName string) (*Publisher, error)
NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a TargetAddress that can be a Queue or an Exchange with a routing key. See QueueAddress and ExchangeAddress for more information.
func (*AmqpConnection) NotifyStatusChange ¶
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)
NotifyStatusChange registers a channel to receive getState change notifications from the connection.
func (*AmqpConnection) State ¶
func (a *AmqpConnection) State() LifeCycleState
type AmqpExchange ¶
type AmqpExchange struct {
// contains filtered or unexported fields
}
func (*AmqpExchange) AutoDelete ¶
func (e *AmqpExchange) AutoDelete(isAutoDelete bool)
func (*AmqpExchange) Declare ¶
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)
func (*AmqpExchange) ExchangeType ¶
func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)
func (*AmqpExchange) GetExchangeType ¶
func (e *AmqpExchange) GetExchangeType() TExchangeType
func (*AmqpExchange) IsAutoDelete ¶
func (e *AmqpExchange) IsAutoDelete() bool
func (*AmqpExchange) Name ¶
func (e *AmqpExchange) Name() string
type AmqpExchangeInfo ¶
type AmqpExchangeInfo struct {
// contains filtered or unexported fields
}
func (*AmqpExchangeInfo) Name ¶
func (a *AmqpExchangeInfo) Name() string
type AmqpManagement ¶
type AmqpManagement struct {
// contains filtered or unexported fields
}
AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings
func NewAmqpManagement ¶
func NewAmqpManagement() *AmqpManagement
func (*AmqpManagement) Bind ¶
func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingSpecification) (string, error)
func (*AmqpManagement) DeclareExchange ¶
func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification ExchangeSpecification) (*AmqpExchangeInfo, error)
func (*AmqpManagement) DeclareQueue ¶
func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification QueueSpecification) (*AmqpQueueInfo, error)
func (*AmqpManagement) DeleteExchange ¶
func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
func (*AmqpManagement) DeleteQueue ¶
func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error
func (*AmqpManagement) NotifyStatusChange ¶
func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)
func (*AmqpManagement) Open ¶
func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error
func (*AmqpManagement) PurgeQueue ¶
func (*AmqpManagement) QueueInfo ¶
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)
func (*AmqpManagement) Request ¶
func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, expectedResponseCodes []int) (map[string]any, error)
Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods
func (*AmqpManagement) State ¶
func (a *AmqpManagement) State() LifeCycleState
type AmqpQueue ¶
type AmqpQueue struct {
// contains filtered or unexported fields
}
func (*AmqpQueue) AutoDelete ¶
func (*AmqpQueue) Declare ¶
func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)
func (*AmqpQueue) IsAutoDelete ¶
func (*AmqpQueue) IsExclusive ¶
func (*AmqpQueue) SetArguments ¶
type AmqpQueueInfo ¶
type AmqpQueueInfo struct {
// contains filtered or unexported fields
}
func (*AmqpQueueInfo) Arguments ¶
func (a *AmqpQueueInfo) Arguments() map[string]any
func (*AmqpQueueInfo) IsAutoDelete ¶
func (a *AmqpQueueInfo) IsAutoDelete() bool
func (*AmqpQueueInfo) IsDurable ¶
func (a *AmqpQueueInfo) IsDurable() bool
func (*AmqpQueueInfo) IsExclusive ¶
func (a *AmqpQueueInfo) IsExclusive() bool
func (*AmqpQueueInfo) Leader ¶
func (a *AmqpQueueInfo) Leader() string
func (*AmqpQueueInfo) Members ¶
func (a *AmqpQueueInfo) Members() []string
func (*AmqpQueueInfo) Name ¶
func (a *AmqpQueueInfo) Name() string
func (*AmqpQueueInfo) Type ¶
func (a *AmqpQueueInfo) Type() TQueueType
type AutoGeneratedQueueSpecification ¶
type AutoGeneratedQueueSpecification struct {
IsAutoDelete bool
IsExclusive bool
MaxLength int64
MaxLengthBytes int64
}
AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.
type BalancedLeaderLocator ¶
type BalancedLeaderLocator struct {
}
type BindingSpecification ¶
type BindingSpecification interface {
// contains filtered or unexported methods
}
type ClassicQueueSpecification ¶
type ClassicQueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
AutoExpire int64
MessageTTL int64
OverflowStrategy OverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
MaxPriority int64
LeaderLocator LeaderLocator
}
ClassicQueueSpecification represents the specification of the classic queue
type ClientLocalLeaderLocator ¶
type ClientLocalLeaderLocator struct {
}
type ConsumerOptions ¶
type ConsumerOptions interface {
// contains filtered or unexported methods
}
type DeliveryContext ¶
type DeliveryContext struct {
// contains filtered or unexported fields
}
func (*DeliveryContext) DiscardWithAnnotations ¶
func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
func (*DeliveryContext) Message ¶
func (dc *DeliveryContext) Message() *amqp.Message
func (*DeliveryContext) RequeueWithAnnotations ¶
func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error
type DeliveryState ¶
type DeliveryState = amqp.DeliveryState
type DropHeadOverflowStrategy ¶
type DropHeadOverflowStrategy struct {
}
type Environment ¶
type Environment struct {
// contains filtered or unexported fields
}
func NewEnvironment ¶
func NewEnvironment(addresses []string, connOptions *AmqpConnOptions) *Environment
func (*Environment) CloseConnections ¶
func (e *Environment) CloseConnections(ctx context.Context) error
CloseConnections closes all the connections in the environment with all the publishers and consumers.
func (*Environment) Connections ¶
func (e *Environment) Connections() []*AmqpConnection
func (*Environment) NewConnection ¶
func (e *Environment) NewConnection(ctx context.Context, args ...string) (*AmqpConnection, error)
NewConnection get a new connection from the environment. If the connection id is provided, it will be used as the connection id. If the connection id is not provided, a new connection id will be generated. The connection id is unique in the environment. The Environment will keep track of the connection and close it when the environment is closed.
type ExchangeAddress ¶
type ExchangeAddress struct {
Exchange string // The name of the exchange
Key string // The routing key. Can be empty
}
ExchangeAddress represents the address of an exchange with a routing key.
type ExchangeSpecification ¶
type ExchangeSpecification interface {
// contains filtered or unexported methods
}
ExchangeSpecification represents the specification of an exchange
type ExchangeType ¶
type ExchangeType struct {
Type TExchangeType
}
func (ExchangeType) String ¶
func (e ExchangeType) String() string
type LeaderLocator ¶
type LeaderLocator interface {
// contains filtered or unexported methods
}
type LifeCycle ¶
type LifeCycle struct {
// contains filtered or unexported fields
}
func NewLifeCycle ¶
func NewLifeCycle() *LifeCycle
func (*LifeCycle) SetState ¶
func (l *LifeCycle) SetState(value LifeCycleState)
func (*LifeCycle) State ¶
func (l *LifeCycle) State() LifeCycleState
type LifeCycleState ¶
type LifeCycleState interface {
// contains filtered or unexported methods
}
type OffsetFirst ¶
type OffsetFirst struct {
}
type OffsetLast ¶
type OffsetLast struct {
}
type OffsetNext ¶
type OffsetNext struct {
}
type OffsetSpecification ¶
type OffsetSpecification interface {
// contains filtered or unexported methods
}
type OffsetValue ¶
type OffsetValue struct {
Offset uint64
}
type OverflowStrategy ¶
type OverflowStrategy interface {
// contains filtered or unexported methods
}
type ProducerOptions ¶
type ProducerOptions interface {
// contains filtered or unexported methods
}
type PublishResult ¶
type PublishResult struct {
Outcome DeliveryState
Message *amqp.Message
}
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher is a publisher that sends messages to a specific destination address.
func (*Publisher) Publish ¶
Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.
The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:
- StateAccepted
- StateReleased
- StateRejected See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
Note: If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>
publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
Exchange: "myExchangeName",
Key: "myRoutingKey",
}
.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
</code> Create a new publisher that sends messages based on message destination address: <code>
publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)
</code>
type QueueAddress ¶
type QueueAddress struct {
Queue string // The name of the queue
Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}
QueueAddress represents the address of a queue.
type QueueSpecification ¶
type QueueSpecification interface {
// contains filtered or unexported methods
}
QueueSpecification represents the specification of a queue
type QueueType ¶
type QueueType struct {
Type TQueueType
}
type QuorumQueueSpecification ¶
type QuorumQueueSpecification struct {
Name string
AutoExpire int64
MessageTTL int64
OverflowStrategy OverflowStrategy
SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
MaxLength int64
MaxLengthBytes int64
DeliveryLimit int64
TargetClusterSize int64
LeaderLocator LeaderLocator
}
type RecoveryConfiguration ¶
type RecoveryConfiguration struct {
/*
ActiveRecovery Define if the recovery is activated.
If is not activated the connection will not try to createSender.
*/
ActiveRecovery bool
/*
BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
time will be increased exponentially with each attempt.
Default is 5 seconds, each attempt will double the time.
The minimum value is 1 second. Avoid setting a value low values since it can cause a high
number of reconnection attempts.
*/
BackOffReconnectInterval time.Duration
/*
MaxReconnectAttempts The maximum number of reconnection attempts.
Default is 5.
The minimum value is 1.
*/
MaxReconnectAttempts int
}
func NewRecoveryConfiguration ¶
func NewRecoveryConfiguration() *RecoveryConfiguration
type RejectPublishDlxOverflowStrategy ¶
type RejectPublishDlxOverflowStrategy struct {
}
type RejectPublishOverflowStrategy ¶
type RejectPublishOverflowStrategy struct {
}
type StateAccepted ¶
type StateAccepted = amqp.StateAccepted
type StateChanged ¶
type StateChanged struct {
From LifeCycleState
To LifeCycleState
}
func (StateChanged) String ¶
func (s StateChanged) String() string
type StateClosed ¶
type StateClosed struct {
// contains filtered or unexported fields
}
func (*StateClosed) GetError ¶
func (c *StateClosed) GetError() error
type StateClosing ¶
type StateClosing struct {
}
type StateModified ¶
type StateModified = amqp.StateModified
type StateReconnecting ¶
type StateReconnecting struct {
}
type StateRejected ¶
type StateRejected = amqp.StateRejected
type StateReleased ¶
type StateReleased = amqp.StateReleased
type StreamConsumerOptions ¶
type StreamConsumerOptions struct {
//ReceiverLinkName: see the ConsumerOptions interface
ReceiverLinkName string
//InitialCredits: see the ConsumerOptions interface
InitialCredits int32
// The offset specification for the stream consumer
// see the interface implementations
Offset OffsetSpecification
// Filter values.
// See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions for more details
Filters []string
//
FilterMatchUnfiltered bool
}
StreamConsumerOptions represents the options that can be used to create a stream consumer. It is mandatory in case of creating a stream consumer.
type TExchangeType ¶
type TExchangeType string
TExchangeType represents the type of exchange
const ( Direct TExchangeType = "direct" Topic TExchangeType = "topic" FanOut TExchangeType = "fanout" )
type TQueueType ¶
type TQueueType string
const ( Quorum TQueueType = "quorum" Classic TQueueType = "classic" Stream TQueueType = "stream" )
type TargetAddress ¶
type TargetAddress interface {
// contains filtered or unexported methods
}
TargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.