rabbitmqamqp

package
v0.1.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2025 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Index

Constants

View Source
const (
	UnitMb string = "mb"
	UnitKb string = "kb"
	UnitGb string = "gb"
	UnitTb string = "tb"
)
View Source
const AtLeastOnce = 1
View Source
const AtMostOnce = 0
View Source
const StreamFilterValue = "x-stream-filter-value"

Variables

View Source
var ErrDoesNotExist = errors.New("does not exist")
View Source
var ErrPreconditionFailed = errors.New("precondition Failed")

Functions

func CapacityBytes

func CapacityBytes(value int64) int64

func CapacityFrom

func CapacityFrom(value string) (int64, error)

func CapacityGB

func CapacityGB(value int64) int64

func CapacityKB

func CapacityKB(value int64) int64

func CapacityMB

func CapacityMB(value int64) int64

func CapacityTB

func CapacityTB(value int64) int64

func Debug

func Debug(msg string, args ...any)

func Error

func Error(msg string, args ...any)

func ExtractWithoutPassword

func ExtractWithoutPassword(addr string) string

func Info

func Info(msg string, args ...any)

func MessagePropertyToAddress

func MessagePropertyToAddress(msgRef *amqp.Message, target TargetAddress) error

MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.

func NewMessage

func NewMessage(body []byte) *amqp.Message

NewMessage creates a new AMQP 1.0 message with the given payload.

func NewMessageWithAddress

func NewMessageWithAddress(body []byte, target TargetAddress) (*amqp.Message, error)

NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.

func NewMessageWithFilter

func NewMessageWithFilter(body []byte, filter string) *amqp.Message

NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.

func Warn

func Warn(msg string, args ...any)

Types

type AMQPBinding

type AMQPBinding struct {
	// contains filtered or unexported fields
}

func (*AMQPBinding) Bind

func (b *AMQPBinding) Bind(ctx context.Context) (string, error)

Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.

func (*AMQPBinding) BindingKey

func (b *AMQPBinding) BindingKey(bindingKey string)

func (*AMQPBinding) Destination

func (b *AMQPBinding) Destination(name string, isQueue bool)

func (*AMQPBinding) SourceExchange

func (b *AMQPBinding) SourceExchange(sourceName string)

func (*AMQPBinding) Unbind

func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error

Unbind removes a binding between an exchange and a queue or exchange with the specified binding key. The bindingPath is the unique path that was returned when the binding was created.

type AMQPBindingInfo

type AMQPBindingInfo struct {
}

type AMQPConsumerOptions

type AMQPConsumerOptions struct {
	//ReceiverLinkName: see the ConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the ConsumerOptions interface
	InitialCredits int32
}

type AMQPProducerOptions

type AMQPProducerOptions struct {
	SenderLinkName string
}

type AmqpConnOptions

type AmqpConnOptions struct {
	// wrapper for amqp.ConnOptions
	ContainerID string
	// wrapper for amqp.ConnOptions
	HostName string
	// wrapper for amqp.ConnOptions
	IdleTimeout time.Duration

	// wrapper for amqp.ConnOptions
	MaxFrameSize uint32

	// wrapper for amqp.ConnOptions
	MaxSessions uint16

	// wrapper for amqp.ConnOptions
	Properties map[string]any

	// wrapper for amqp.ConnOptions
	SASLType amqp.SASLType

	// wrapper for amqp.ConnOptions
	TLSConfig *tls.Config

	// wrapper for amqp.ConnOptions
	WriteTimeout time.Duration

	// RecoveryConfiguration is used to configure the recovery behavior of the connection.
	// when the connection is closed unexpectedly.
	RecoveryConfiguration *RecoveryConfiguration
	// contains filtered or unexported fields
}

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(ctx context.Context, addresses []string, connOptions *AmqpConnOptions, args ...string) (*AmqpConnection, error)

Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error. addresses is a list of addresses to connect to. It picks one randomly. It is enough that one of the addresses is reachable.

func (*AmqpConnection) Close

func (a *AmqpConnection) Close(ctx context.Context) error

Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.

func (*AmqpConnection) Id

func (a *AmqpConnection) Id() string

func (*AmqpConnection) Management

func (a *AmqpConnection) Management() *AmqpManagement

Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.

func (*AmqpConnection) NewConsumer

func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error)

NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.

func (*AmqpConnection) NewPublisher

func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAddress, linkName string) (*Publisher, error)

NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a TargetAddress that can be a Queue or an Exchange with a routing key. See QueueAddress and ExchangeAddress for more information.

func (*AmqpConnection) NotifyStatusChange

func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)

NotifyStatusChange registers a channel to receive getState change notifications from the connection.

func (*AmqpConnection) State

func (a *AmqpConnection) State() LifeCycleState

type AmqpExchange

type AmqpExchange struct {
	// contains filtered or unexported fields
}

func (*AmqpExchange) AutoDelete

func (e *AmqpExchange) AutoDelete(isAutoDelete bool)

func (*AmqpExchange) Declare

func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)

func (*AmqpExchange) Delete

func (e *AmqpExchange) Delete(ctx context.Context) error

func (*AmqpExchange) ExchangeType

func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)

func (*AmqpExchange) GetExchangeType

func (e *AmqpExchange) GetExchangeType() TExchangeType

func (*AmqpExchange) IsAutoDelete

func (e *AmqpExchange) IsAutoDelete() bool

func (*AmqpExchange) Name

func (e *AmqpExchange) Name() string

type AmqpExchangeInfo

type AmqpExchangeInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpExchangeInfo) Name

func (a *AmqpExchangeInfo) Name() string

type AmqpManagement

type AmqpManagement struct {
	// contains filtered or unexported fields
}

AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings

func NewAmqpManagement

func NewAmqpManagement() *AmqpManagement

func (*AmqpManagement) Bind

func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingSpecification) (string, error)

func (*AmqpManagement) Close

func (a *AmqpManagement) Close(ctx context.Context) error

func (*AmqpManagement) DeclareExchange

func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification ExchangeSpecification) (*AmqpExchangeInfo, error)

func (*AmqpManagement) DeclareQueue

func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification QueueSpecification) (*AmqpQueueInfo, error)

func (*AmqpManagement) DeleteExchange

func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error

func (*AmqpManagement) DeleteQueue

func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error

func (*AmqpManagement) NotifyStatusChange

func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)

func (*AmqpManagement) Open

func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error

func (*AmqpManagement) PurgeQueue

func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, error)

func (*AmqpManagement) QueueInfo

func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)

func (*AmqpManagement) Request

func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
	expectedResponseCodes []int) (map[string]any, error)

Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods

func (*AmqpManagement) State

func (a *AmqpManagement) State() LifeCycleState

func (*AmqpManagement) Unbind

func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error

type AmqpQueue

type AmqpQueue struct {
	// contains filtered or unexported fields
}

func (*AmqpQueue) AutoDelete

func (a *AmqpQueue) AutoDelete(isAutoDelete bool)

func (*AmqpQueue) Declare

func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)

func (*AmqpQueue) Delete

func (a *AmqpQueue) Delete(ctx context.Context) error

func (*AmqpQueue) Exclusive

func (a *AmqpQueue) Exclusive(isExclusive bool)

func (*AmqpQueue) IsAutoDelete

func (a *AmqpQueue) IsAutoDelete() bool

func (*AmqpQueue) IsExclusive

func (a *AmqpQueue) IsExclusive() bool

func (*AmqpQueue) Name

func (a *AmqpQueue) Name(queueName string)

func (*AmqpQueue) Purge

func (a *AmqpQueue) Purge(ctx context.Context) (int, error)

func (*AmqpQueue) QueueType

func (a *AmqpQueue) QueueType(queueType QueueType)

func (*AmqpQueue) SetArguments

func (a *AmqpQueue) SetArguments(arguments map[string]any)

type AmqpQueueInfo

type AmqpQueueInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpQueueInfo) Arguments

func (a *AmqpQueueInfo) Arguments() map[string]any

func (*AmqpQueueInfo) IsAutoDelete

func (a *AmqpQueueInfo) IsAutoDelete() bool

func (*AmqpQueueInfo) IsDurable

func (a *AmqpQueueInfo) IsDurable() bool

func (*AmqpQueueInfo) IsExclusive

func (a *AmqpQueueInfo) IsExclusive() bool

func (*AmqpQueueInfo) Leader

func (a *AmqpQueueInfo) Leader() string

func (*AmqpQueueInfo) Members

func (a *AmqpQueueInfo) Members() []string

func (*AmqpQueueInfo) Name

func (a *AmqpQueueInfo) Name() string

func (*AmqpQueueInfo) Type

func (a *AmqpQueueInfo) Type() TQueueType

type AutoGeneratedQueueSpecification

type AutoGeneratedQueueSpecification struct {
	IsAutoDelete   bool
	IsExclusive    bool
	MaxLength      int64
	MaxLengthBytes int64
}

AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.

type BalancedLeaderLocator

type BalancedLeaderLocator struct {
}

type BindingSpecification

type BindingSpecification interface {
	// contains filtered or unexported methods
}

type ClassicQueueSpecification

type ClassicQueueSpecification struct {
	Name                 string
	IsAutoDelete         bool
	IsExclusive          bool
	AutoExpire           int64
	MessageTTL           int64
	OverflowStrategy     OverflowStrategy
	SingleActiveConsumer bool
	DeadLetterExchange   string
	DeadLetterRoutingKey string
	MaxLength            int64
	MaxLengthBytes       int64
	MaxPriority          int64
	LeaderLocator        LeaderLocator
}

ClassicQueueSpecification represents the specification of the classic queue

type ClientLocalLeaderLocator

type ClientLocalLeaderLocator struct {
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) Id

func (c *Consumer) Id() string

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error)

type ConsumerOptions

type ConsumerOptions interface {
	// contains filtered or unexported methods
}

type DeliveryContext

type DeliveryContext struct {
	// contains filtered or unexported fields
}

func (*DeliveryContext) Accept

func (dc *DeliveryContext) Accept(ctx context.Context) error

func (*DeliveryContext) Discard

func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error

func (*DeliveryContext) DiscardWithAnnotations

func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

func (*DeliveryContext) Message

func (dc *DeliveryContext) Message() *amqp.Message

func (*DeliveryContext) Requeue

func (dc *DeliveryContext) Requeue(ctx context.Context) error

func (*DeliveryContext) RequeueWithAnnotations

func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

type DeliveryState

type DeliveryState = amqp.DeliveryState

type DirectExchangeSpecification

type DirectExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
}

type DropHeadOverflowStrategy

type DropHeadOverflowStrategy struct {
}

type Environment

type Environment struct {
	// contains filtered or unexported fields
}

func NewEnvironment

func NewEnvironment(addresses []string, connOptions *AmqpConnOptions) *Environment

func (*Environment) CloseConnections

func (e *Environment) CloseConnections(ctx context.Context) error

CloseConnections closes all the connections in the environment with all the publishers and consumers.

func (*Environment) Connections

func (e *Environment) Connections() []*AmqpConnection

func (*Environment) NewConnection

func (e *Environment) NewConnection(ctx context.Context, args ...string) (*AmqpConnection, error)

NewConnection get a new connection from the environment. If the connection id is provided, it will be used as the connection id. If the connection id is not provided, a new connection id will be generated. The connection id is unique in the environment. The Environment will keep track of the connection and close it when the environment is closed.

type ExchangeAddress

type ExchangeAddress struct {
	Exchange string // The name of the exchange
	Key      string // The routing key. Can be empty
}

ExchangeAddress represents the address of an exchange with a routing key.

type ExchangeSpecification

type ExchangeSpecification interface {
	// contains filtered or unexported methods
}

ExchangeSpecification represents the specification of an exchange

type ExchangeToExchangeBindingSpecification

type ExchangeToExchangeBindingSpecification struct {
	SourceExchange      string
	DestinationExchange string
	BindingKey          string
}

type ExchangeToQueueBindingSpecification

type ExchangeToQueueBindingSpecification struct {
	SourceExchange   string
	DestinationQueue string
	BindingKey       string
}

type ExchangeType

type ExchangeType struct {
	Type TExchangeType
}

func (ExchangeType) String

func (e ExchangeType) String() string

type FanOutExchangeSpecification

type FanOutExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
}

type LeaderLocator

type LeaderLocator interface {
	// contains filtered or unexported methods
}

type LifeCycle

type LifeCycle struct {
	// contains filtered or unexported fields
}

func NewLifeCycle

func NewLifeCycle() *LifeCycle

func (*LifeCycle) SetState

func (l *LifeCycle) SetState(value LifeCycleState)

func (*LifeCycle) State

func (l *LifeCycle) State() LifeCycleState

type LifeCycleState

type LifeCycleState interface {
	// contains filtered or unexported methods
}

type OffsetFirst

type OffsetFirst struct {
}

type OffsetLast

type OffsetLast struct {
}

type OffsetNext

type OffsetNext struct {
}

type OffsetSpecification

type OffsetSpecification interface {
	// contains filtered or unexported methods
}

type OffsetValue

type OffsetValue struct {
	Offset uint64
}

type OverflowStrategy

type OverflowStrategy interface {
	// contains filtered or unexported methods
}

type ProducerOptions

type ProducerOptions interface {
	// contains filtered or unexported methods
}

type PublishResult

type PublishResult struct {
	Outcome DeliveryState
	Message *amqp.Message
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a publisher that sends messages to a specific destination address.

func (*Publisher) Close

func (m *Publisher) Close(ctx context.Context) error

Close closes the publisher.

func (*Publisher) Id

func (m *Publisher) Id() string

func (*Publisher) Publish

func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error)

Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.

The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:

Note: If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>

publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
			Exchange: "myExchangeName",
			Key:      "myRoutingKey",
		}

.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))

</code> Create a new publisher that sends messages based on message destination address: <code>

publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)

</code>

type QueueAddress

type QueueAddress struct {
	Queue      string // The name of the queue
	Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}

QueueAddress represents the address of a queue.

type QueueSpecification

type QueueSpecification interface {
	// contains filtered or unexported methods
}

QueueSpecification represents the specification of a queue

type QueueType

type QueueType struct {
	Type TQueueType
}

func (QueueType) String

func (e QueueType) String() string

type QuorumQueueSpecification

type QuorumQueueSpecification struct {
	Name                 string
	AutoExpire           int64
	MessageTTL           int64
	OverflowStrategy     OverflowStrategy
	SingleActiveConsumer bool
	DeadLetterExchange   string
	DeadLetterRoutingKey string
	MaxLength            int64
	MaxLengthBytes       int64
	DeliveryLimit        int64
	TargetClusterSize    int64
	LeaderLocator        LeaderLocator
}

type RecoveryConfiguration

type RecoveryConfiguration struct {
	/*
		ActiveRecovery Define if the recovery is activated.
		If is not activated the connection will not try to createSender.
	*/
	ActiveRecovery bool

	/*
		BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
		time will be increased exponentially with each attempt.
		Default is 5 seconds, each attempt will double the time.
		The minimum value is 1 second. Avoid setting a value low values since it can cause a high
		number of reconnection attempts.
	*/
	BackOffReconnectInterval time.Duration

	/*
		MaxReconnectAttempts The maximum number of reconnection attempts.
		Default is 5.
		The minimum value is 1.
	*/
	MaxReconnectAttempts int
}

func NewRecoveryConfiguration

func NewRecoveryConfiguration() *RecoveryConfiguration

type RejectPublishDlxOverflowStrategy

type RejectPublishDlxOverflowStrategy struct {
}

type RejectPublishOverflowStrategy

type RejectPublishOverflowStrategy struct {
}

type StateAccepted

type StateAccepted = amqp.StateAccepted

type StateChanged

type StateChanged struct {
	From LifeCycleState
	To   LifeCycleState
}

func (StateChanged) String

func (s StateChanged) String() string

type StateClosed

type StateClosed struct {
	// contains filtered or unexported fields
}

func (*StateClosed) GetError

func (c *StateClosed) GetError() error

type StateClosing

type StateClosing struct {
}

type StateModified

type StateModified = amqp.StateModified

type StateOpen

type StateOpen struct {
}

type StateReconnecting

type StateReconnecting struct {
}

type StateRejected

type StateRejected = amqp.StateRejected

type StateReleased

type StateReleased = amqp.StateReleased

type StreamConsumerOptions

type StreamConsumerOptions struct {
	//ReceiverLinkName: see the ConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the ConsumerOptions interface
	InitialCredits int32
	// The offset specification for the stream consumer
	// see the interface implementations
	Offset OffsetSpecification
	// Filter values.
	// See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions for more details
	Filters []string
	//
	FilterMatchUnfiltered bool
}

StreamConsumerOptions represents the options that can be used to create a stream consumer. It is mandatory in case of creating a stream consumer.

type StreamQueueSpecification

type StreamQueueSpecification struct {
	Name               string
	MaxLengthBytes     int64
	InitialClusterSize int
}

type TExchangeType

type TExchangeType string

TExchangeType represents the type of exchange

const (
	Direct TExchangeType = "direct"
	Topic  TExchangeType = "topic"
	FanOut TExchangeType = "fanout"
)

type TQueueType

type TQueueType string
const (
	Quorum  TQueueType = "quorum"
	Classic TQueueType = "classic"
	Stream  TQueueType = "stream"
)

type TargetAddress

type TargetAddress interface {
	// contains filtered or unexported methods
}

TargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.

type TopicExchangeSpecification

type TopicExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
}

type URI

type URI struct {
	Scheme   string
	Host     string
	Port     int
	Username string
	Password string
	Vhost    string
}

URI represents a parsed AMQP URI string.

func ParseURI

func ParseURI(uri string) (URI, error)

ParseURI attempts to parse the given AMQP URI according to the spec. See http://www.rabbitmq.com/uri-spec.html.

Default values for the fields are:

Scheme: amqp
Host: localhost
Port: 5672
Username: guest
Password: guest
Vhost: /

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL