rabbitmqamqp

package
v0.1.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2025 License: Apache-2.0 Imports: 17 Imported by: 1

Documentation

Index

Constants

View Source
const (
	UnitMb string = "mb"
	UnitKb string = "kb"
	UnitGb string = "gb"
	UnitTb string = "tb"
)
View Source
const AtLeastOnce = 1
View Source
const AtMostOnce = 0
View Source
const StreamFilterValue = "x-stream-filter-value"

Variables

View Source
var ErrDoesNotExist = errors.New("does not exist")
View Source
var ErrPreconditionFailed = errors.New("precondition Failed")

Functions

func CapacityBytes

func CapacityBytes(value int64) int64

func CapacityFrom

func CapacityFrom(value string) (int64, error)

func CapacityGB

func CapacityGB(value int64) int64

func CapacityKB

func CapacityKB(value int64) int64

func CapacityMB

func CapacityMB(value int64) int64

func CapacityTB

func CapacityTB(value int64) int64

func Debug

func Debug(msg string, args ...any)

func Error

func Error(msg string, args ...any)

func ExtractWithoutPassword

func ExtractWithoutPassword(addr string) string

func Info

func Info(msg string, args ...any)

func MessagePropertyToAddress

func MessagePropertyToAddress(msgRef *amqp.Message, target ITargetAddress) error

MessagePropertyToAddress sets the To property of the message to the address of the target. The target must be a QueueAddress or an ExchangeAddress. Note: The field msgRef.Properties.To will be overwritten if it is already set.

func NewMessage

func NewMessage(body []byte) *amqp.Message

NewMessage creates a new AMQP 1.0 message with the given payload.

func NewMessageWithAddress

func NewMessageWithAddress(body []byte, target ITargetAddress) (*amqp.Message, error)

NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. The target must be a QueueAddress or an ExchangeAddress. This function is a helper that combines NewMessage and MessagePropertyToAddress.

func NewMessageWithFilter

func NewMessageWithFilter(body []byte, filter string) *amqp.Message

NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the StreamFilterValue property to the filter value.

func Warn

func Warn(msg string, args ...any)

Types

type AMQPBinding

type AMQPBinding struct {
	// contains filtered or unexported fields
}

func (*AMQPBinding) Bind

func (b *AMQPBinding) Bind(ctx context.Context) (string, error)

Bind creates a binding between an exchange and a queue or exchange with the specified binding key. Returns the binding path that can be used to unbind the binding. Given a virtual host, the binding path is unique.

func (*AMQPBinding) BindingKey

func (b *AMQPBinding) BindingKey(bindingKey string)

func (*AMQPBinding) Destination

func (b *AMQPBinding) Destination(name string, isQueue bool)

func (*AMQPBinding) SourceExchange

func (b *AMQPBinding) SourceExchange(sourceName string)

func (*AMQPBinding) Unbind

func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error

Unbind removes a binding between an exchange and a queue or exchange with the specified binding key. The bindingPath is the unique path that was returned when the binding was created.

type AMQPBindingInfo

type AMQPBindingInfo struct {
}

type AmqpAddress

type AmqpAddress struct {
	// the address of the AMQP server
	// it is in the form of amqp://<host>:<port>
	// or amqps://<host>:<port>
	// the port is optional
	// the default port is 5672
	// the default protocol is amqp
	// the default host is localhost
	// the default virtual host is "/"
	// the default user is guest
	// the default password is guest
	// the default SASL type is SASLTypeAnonymous
	Address string
	// Options: Additional options for the connection
	Options *AmqpConnOptions
}

type AmqpConnOptions

type AmqpConnOptions struct {
	// wrapper for amqp.ConnOptions
	ContainerID string

	// wrapper for amqp.ConnOptions
	HostName string
	// wrapper for amqp.ConnOptions
	IdleTimeout time.Duration

	// wrapper for amqp.ConnOptions
	MaxFrameSize uint32

	// wrapper for amqp.ConnOptions
	MaxSessions uint16

	// wrapper for amqp.ConnOptions
	Properties map[string]any

	// wrapper for amqp.ConnOptions
	SASLType amqp.SASLType

	// wrapper for amqp.ConnOptions
	TLSConfig *tls.Config

	// wrapper for amqp.ConnOptions
	WriteTimeout time.Duration

	// RecoveryConfiguration is used to configure the recovery behavior of the connection.
	// when the connection is closed unexpectedly.
	RecoveryConfiguration *RecoveryConfiguration

	// The OAuth2Options is used to configure the connection with OAuth2 token.
	OAuth2Options *OAuth2Options

	// Local connection identifier (not sent to the server)
	// if not provided, a random UUID is generated
	Id string
}

func (*AmqpConnOptions) Clone

func (a *AmqpConnOptions) Clone() *AmqpConnOptions

type AmqpConnection

type AmqpConnection struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error)

Dial connect to the AMQP 1.0 server using the provided connectionSettings Returns a pointer to the new AmqpConnection if successful else an error.

func (*AmqpConnection) Close

func (a *AmqpConnection) Close(ctx context.Context) error

Close closes the connection to the AMQP 1.0 server and the management interface. All the publishers and consumers are closed as well.

func (*AmqpConnection) Id

func (a *AmqpConnection) Id() string

func (*AmqpConnection) Management

func (a *AmqpConnection) Management() *AmqpManagement

Management returns the management interface for the connection. The management interface is used to declare and delete exchanges, queues, and bindings.

func (*AmqpConnection) NewConsumer

func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options IConsumerOptions) (*Consumer, error)

NewConsumer creates a new Consumer that listens to the provided Queue

func (*AmqpConnection) NewPublisher

func (a *AmqpConnection) NewPublisher(ctx context.Context, destination ITargetAddress, options IPublisherOptions) (*Publisher, error)

NewPublisher creates a new Publisher that sends messages to the provided destination. The destination is a ITargetAddress that can be a Queue or an Exchange with a routing key. options is an IPublisherOptions that can be used to configure the publisher. See QueueAddress and ExchangeAddress for more information.

func (*AmqpConnection) NotifyStatusChange

func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged)

NotifyStatusChange registers a channel to receive getState change notifications from the connection.

func (*AmqpConnection) Properties

func (a *AmqpConnection) Properties() map[string]any

func (*AmqpConnection) RefreshToken

func (a *AmqpConnection) RefreshToken(background context.Context, token string) error

func (*AmqpConnection) State

func (a *AmqpConnection) State() ILifeCycleState

type AmqpExchange

type AmqpExchange struct {
	// contains filtered or unexported fields
}

func (*AmqpExchange) AutoDelete

func (e *AmqpExchange) AutoDelete(isAutoDelete bool)

func (*AmqpExchange) Declare

func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error)

func (*AmqpExchange) Delete

func (e *AmqpExchange) Delete(ctx context.Context) error

func (*AmqpExchange) ExchangeType

func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType)

func (*AmqpExchange) GetExchangeType

func (e *AmqpExchange) GetExchangeType() TExchangeType

func (*AmqpExchange) IsAutoDelete

func (e *AmqpExchange) IsAutoDelete() bool

func (*AmqpExchange) Name

func (e *AmqpExchange) Name() string

type AmqpExchangeInfo

type AmqpExchangeInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpExchangeInfo) Name

func (a *AmqpExchangeInfo) Name() string

type AmqpManagement

type AmqpManagement struct {
	// contains filtered or unexported fields
}

AmqpManagement is the interface to the RabbitMQ /management endpoint The management interface is used to declare/delete exchanges, queues, and bindings

func (*AmqpManagement) Bind

func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBindingSpecification) (string, error)

func (*AmqpManagement) Close

func (a *AmqpManagement) Close(ctx context.Context) error

func (*AmqpManagement) DeclareExchange

func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification IExchangeSpecification) (*AmqpExchangeInfo, error)

func (*AmqpManagement) DeclareQueue

func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification IQueueSpecification) (*AmqpQueueInfo, error)

func (*AmqpManagement) DeleteExchange

func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error

func (*AmqpManagement) DeleteQueue

func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error

func (*AmqpManagement) NotifyStatusChange

func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged)

func (*AmqpManagement) Open

func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error

func (*AmqpManagement) PurgeQueue

func (a *AmqpManagement) PurgeQueue(ctx context.Context, name string) (int, error)

PurgeQueue purges the queue returns the number of messages purged

func (*AmqpManagement) QueueInfo

func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error)

func (*AmqpManagement) Request

func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string,
	expectedResponseCodes []int) (map[string]any, error)

Request sends a request to the /management endpoint. It is a generic method that can be used to send any request to the management endpoint. In most of the cases you don't need to use this method directly, instead use the standard methods

func (*AmqpManagement) State

func (a *AmqpManagement) State() ILifeCycleState

func (*AmqpManagement) Unbind

func (a *AmqpManagement) Unbind(ctx context.Context, path string) error

type AmqpQueue

type AmqpQueue struct {
	// contains filtered or unexported fields
}

func (*AmqpQueue) AutoDelete

func (a *AmqpQueue) AutoDelete(isAutoDelete bool)

func (*AmqpQueue) Declare

func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error)

func (*AmqpQueue) Delete

func (a *AmqpQueue) Delete(ctx context.Context) error

func (*AmqpQueue) Exclusive

func (a *AmqpQueue) Exclusive(isExclusive bool)

func (*AmqpQueue) IsAutoDelete

func (a *AmqpQueue) IsAutoDelete() bool

func (*AmqpQueue) IsExclusive

func (a *AmqpQueue) IsExclusive() bool

func (*AmqpQueue) Name

func (a *AmqpQueue) Name(queueName string)

func (*AmqpQueue) Purge

func (a *AmqpQueue) Purge(ctx context.Context) (int, error)

func (*AmqpQueue) QueueType

func (a *AmqpQueue) QueueType(queueType QueueType)

func (*AmqpQueue) SetArguments

func (a *AmqpQueue) SetArguments(arguments map[string]any)

type AmqpQueueInfo

type AmqpQueueInfo struct {
	// contains filtered or unexported fields
}

func (*AmqpQueueInfo) Arguments

func (a *AmqpQueueInfo) Arguments() map[string]any

func (*AmqpQueueInfo) ConsumerCount

func (a *AmqpQueueInfo) ConsumerCount() uint32

func (*AmqpQueueInfo) IsAutoDelete

func (a *AmqpQueueInfo) IsAutoDelete() bool

func (*AmqpQueueInfo) IsDurable

func (a *AmqpQueueInfo) IsDurable() bool

func (*AmqpQueueInfo) IsExclusive

func (a *AmqpQueueInfo) IsExclusive() bool

func (*AmqpQueueInfo) Leader

func (a *AmqpQueueInfo) Leader() string

func (*AmqpQueueInfo) Members

func (a *AmqpQueueInfo) Members() []string

func (*AmqpQueueInfo) MessageCount

func (a *AmqpQueueInfo) MessageCount() uint64

func (*AmqpQueueInfo) Name

func (a *AmqpQueueInfo) Name() string

func (*AmqpQueueInfo) Type

func (a *AmqpQueueInfo) Type() TQueueType

type AutoGeneratedQueueSpecification

type AutoGeneratedQueueSpecification struct {
	IsAutoDelete   bool
	IsExclusive    bool
	MaxLength      int64
	MaxLengthBytes int64
}

AutoGeneratedQueueSpecification represents the specification of the auto-generated queue. It is a classic queue with auto-generated name. It is useful in context like RPC or when you need a temporary queue.

type BalancedLeaderLocator

type BalancedLeaderLocator struct {
}

type ClassicQueueSpecification

type ClassicQueueSpecification struct {
	Name                 string
	IsAutoDelete         bool
	IsExclusive          bool
	AutoExpire           int64
	MessageTTL           int64
	OverflowStrategy     IOverflowStrategy
	SingleActiveConsumer bool
	DeadLetterExchange   string
	DeadLetterRoutingKey string
	MaxLength            int64
	MaxLengthBytes       int64
	MaxPriority          int64
	LeaderLocator        ILeaderLocator
}

ClassicQueueSpecification represents the specification of the classic queue

type ClientLocalLeaderLocator

type ClientLocalLeaderLocator struct {
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

func (*Consumer) Id

func (c *Consumer) Id() string

func (*Consumer) Receive

func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error)

type ConsumerOptions

type ConsumerOptions struct {
	//ReceiverLinkName: see the IConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the IConsumerOptions interface
	InitialCredits int32
	// The id of the consumer
	Id string
}

ConsumerOptions represents the options for quorum and classic queues

type CustomExchangeSpecification

type CustomExchangeSpecification struct {
	Name             string
	IsAutoDelete     bool
	ExchangeTypeName string
	Arguments        map[string]any
}

type DeliveryContext

type DeliveryContext struct {
	// contains filtered or unexported fields
}

func (*DeliveryContext) Accept

func (dc *DeliveryContext) Accept(ctx context.Context) error

func (*DeliveryContext) Discard

func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error

func (*DeliveryContext) DiscardWithAnnotations

func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

func (*DeliveryContext) Message

func (dc *DeliveryContext) Message() *amqp.Message

func (*DeliveryContext) Requeue

func (dc *DeliveryContext) Requeue(ctx context.Context) error

func (*DeliveryContext) RequeueWithAnnotations

func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error

type DeliveryState

type DeliveryState = amqp.DeliveryState

type DirectExchangeSpecification

type DirectExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type DropHeadOverflowStrategy

type DropHeadOverflowStrategy struct {
}

type Endpoint

type Endpoint struct {
	Address string
	Options *AmqpConnOptions
}

func DefaultEndpoints

func DefaultEndpoints() []Endpoint

type Environment

type Environment struct {
	EndPointStrategy TEndPointStrategy
	// contains filtered or unexported fields
}

func NewClusterEnvironment

func NewClusterEnvironment(endPoints []Endpoint) *Environment

func NewClusterEnvironmentWithStrategy

func NewClusterEnvironmentWithStrategy(endPoints []Endpoint, strategy TEndPointStrategy) *Environment

func NewEnvironment

func NewEnvironment(address string, options *AmqpConnOptions) *Environment

func (*Environment) CloseConnections

func (e *Environment) CloseConnections(ctx context.Context) error

CloseConnections closes all the connections in the environment with all the publishers and consumers.

func (*Environment) Connections

func (e *Environment) Connections() []*AmqpConnection

Connections gets the active connections in the environment

func (*Environment) NewConnection

func (e *Environment) NewConnection(ctx context.Context) (*AmqpConnection, error)

NewConnection get a new connection from the environment. It picks an endpoint from the list of endpoints, based on EndPointStrategy, and tries to open a connection. It fails if all the endpoints are not reachable. The Environment will keep track of the connection and close it when the environment is closed.

type ExchangeAddress

type ExchangeAddress struct {
	Exchange string // The name of the exchange
	Key      string // The routing key. Can be empty
}

ExchangeAddress represents the address of an exchange with a routing key.

type ExchangeToExchangeBindingSpecification

type ExchangeToExchangeBindingSpecification struct {
	SourceExchange      string
	DestinationExchange string
	BindingKey          string
}

type ExchangeToQueueBindingSpecification

type ExchangeToQueueBindingSpecification struct {
	SourceExchange   string
	DestinationQueue string
	BindingKey       string
}

type ExchangeType

type ExchangeType struct {
	Type TExchangeType
}

func (ExchangeType) String

func (e ExchangeType) String() string

type FanOutExchangeSpecification

type FanOutExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type HeadersExchangeSpecification

type HeadersExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type IBindingSpecification

type IBindingSpecification interface {
	// contains filtered or unexported methods
}

type IConsumerOptions

type IConsumerOptions interface {
	// contains filtered or unexported methods
}

type IExchangeSpecification

type IExchangeSpecification interface {
	// contains filtered or unexported methods
}

IExchangeSpecification represents the specification of an exchange

type ILeaderLocator

type ILeaderLocator interface {
	// contains filtered or unexported methods
}

type ILifeCycleState

type ILifeCycleState interface {
	// contains filtered or unexported methods
}

type IOffsetSpecification

type IOffsetSpecification interface {
	// contains filtered or unexported methods
}

type IOverflowStrategy

type IOverflowStrategy interface {
	// contains filtered or unexported methods
}

type IPublisherOptions

type IPublisherOptions interface {
	// contains filtered or unexported methods
}

type IQueueSpecification

type IQueueSpecification interface {
	// contains filtered or unexported methods
}

IQueueSpecification represents the specification of a queue

type ITargetAddress

type ITargetAddress interface {
	// contains filtered or unexported methods
}

ITargetAddress is an interface that represents an address that can be used to send messages to. It can be either a Queue or an Exchange with a routing key.

type LifeCycle

type LifeCycle struct {
	// contains filtered or unexported fields
}

func NewLifeCycle

func NewLifeCycle() *LifeCycle

func (*LifeCycle) SetState

func (l *LifeCycle) SetState(value ILifeCycleState)

func (*LifeCycle) State

func (l *LifeCycle) State() ILifeCycleState

type OAuth2Options

type OAuth2Options struct {
	Token string
}

func (OAuth2Options) Clone

func (o OAuth2Options) Clone() *OAuth2Options

type OffsetFirst

type OffsetFirst struct {
}

type OffsetLast

type OffsetLast struct {
}

type OffsetNext

type OffsetNext struct {
}

type OffsetValue

type OffsetValue struct {
	Offset uint64
}

type PublishResult

type PublishResult struct {
	Outcome DeliveryState
	Message *amqp.Message
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher is a publisher that sends messages to a specific destination address.

func (*Publisher) Close

func (m *Publisher) Close(ctx context.Context) error

Close closes the publisher.

func (*Publisher) Id

func (m *Publisher) Id() string

func (*Publisher) Publish

func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error)

Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.

The message is sent and the outcome of the operation is returned. The outcome is a DeliveryState that indicates if the message was accepted or rejected. RabbitMQ supports the following DeliveryState types:

Note: If the destination address is not defined during the creation, the message must have a TO property set. You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: <code>

publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{
			Exchange: "myExchangeName",
			Key:      "myRoutingKey",
		}

.. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))

</code> Create a new publisher that sends messages based on message destination address: <code>

publisher, err := connection.NewPublisher(context.Background(), nil, "test")
msg := amqp.NewMessage([]byte("hello"))
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)

</code>

type PublisherOptions

type PublisherOptions struct {
	Id             string
	SenderLinkName string
}

type QueueAddress

type QueueAddress struct {
	Queue      string // The name of the queue
	Parameters string // Additional parameters not related to the queue. Most of the time it is empty
}

QueueAddress represents the address of a queue.

type QueueType

type QueueType struct {
	Type TQueueType
}

func (QueueType) String

func (e QueueType) String() string

type QuorumQueueSpecification

type QuorumQueueSpecification struct {
	Name                   string
	AutoExpire             int64
	MessageTTL             int64
	OverflowStrategy       IOverflowStrategy
	SingleActiveConsumer   bool
	DeadLetterExchange     string
	DeadLetterRoutingKey   string
	MaxLength              int64
	MaxLengthBytes         int64
	DeliveryLimit          int64
	TargetClusterSize      int64
	LeaderLocator          ILeaderLocator
	QuorumInitialGroupSize int
}

type RecoveryConfiguration

type RecoveryConfiguration struct {
	/*
		ActiveRecovery Define if the recovery is activated.
		If is not activated the connection will not try to createSender.
	*/
	ActiveRecovery bool

	/*
		BackOffReconnectInterval The time to wait before trying to createSender after a connection is closed.
		time will be increased exponentially with each attempt.
		Default is 5 seconds, each attempt will double the time.
		The minimum value is 1 second. Avoid setting a value low values since it can cause a high
		number of reconnection attempts.
	*/
	BackOffReconnectInterval time.Duration

	/*
		MaxReconnectAttempts The maximum number of reconnection attempts.
		Default is 5.
		The minimum value is 1.
	*/
	MaxReconnectAttempts int
}

func NewRecoveryConfiguration

func NewRecoveryConfiguration() *RecoveryConfiguration

func (*RecoveryConfiguration) Clone

type RejectPublishDlxOverflowStrategy

type RejectPublishDlxOverflowStrategy struct {
}

type RejectPublishOverflowStrategy

type RejectPublishOverflowStrategy struct {
}

type StateAccepted

type StateAccepted = amqp.StateAccepted

type StateChanged

type StateChanged struct {
	From ILifeCycleState
	To   ILifeCycleState
}

func (StateChanged) String

func (s StateChanged) String() string

type StateClosed

type StateClosed struct {
	// contains filtered or unexported fields
}

func (*StateClosed) GetError

func (c *StateClosed) GetError() error

type StateClosing

type StateClosing struct {
}

type StateModified

type StateModified = amqp.StateModified

type StateOpen

type StateOpen struct {
}

type StateReconnecting

type StateReconnecting struct {
}

type StateRejected

type StateRejected = amqp.StateRejected

type StateReleased

type StateReleased = amqp.StateReleased

type StreamConsumerOptions

type StreamConsumerOptions struct {
	//ReceiverLinkName: see the IConsumerOptions interface
	ReceiverLinkName string
	//InitialCredits: see the IConsumerOptions interface
	InitialCredits int32
	// The offset specification for the stream consumer
	// see the interface implementations
	Offset              IOffsetSpecification
	StreamFilterOptions *StreamFilterOptions

	Id string
}

StreamConsumerOptions represents the options for stream queues It is mandatory in case of creating a stream consumer.

type StreamFilterOptions

type StreamFilterOptions struct {
	// Filter values.
	Values []string
	//
	MatchUnfiltered bool

	// Filter the data based on Application Property
	ApplicationProperties map[string]any

	// Filter the data based on Message Properties
	Properties *amqp.MessageProperties
}

StreamFilterOptions represents the options that can be used to filter the stream data. It is used in the StreamConsumerOptions. See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions/

type StreamQueueSpecification

type StreamQueueSpecification struct {
	Name               string
	MaxLengthBytes     int64
	InitialClusterSize int
}

type TEndPointStrategy

type TEndPointStrategy int
const (
	StrategyRandom     TEndPointStrategy = iota
	StrategySequential TEndPointStrategy = iota
)

type TExchangeType

type TExchangeType string

TExchangeType represents the type of exchange

const (
	Direct  TExchangeType = "direct"
	Topic   TExchangeType = "topic"
	FanOut  TExchangeType = "fanout"
	Headers TExchangeType = "headers"
)

type TQueueType

type TQueueType string
const (
	Quorum  TQueueType = "quorum"
	Classic TQueueType = "classic"
	Stream  TQueueType = "stream"
)

type TopicExchangeSpecification

type TopicExchangeSpecification struct {
	Name         string
	IsAutoDelete bool
	Arguments    map[string]any
}

type URI

type URI struct {
	Scheme   string
	Host     string
	Port     int
	Username string
	Password string
	Vhost    string
}

URI represents a parsed AMQP URI string.

func ParseURI

func ParseURI(uri string) (URI, error)

ParseURI attempts to parse the given AMQP URI according to the spec. See http://www.rabbitmq.com/uri-spec.html.

Default values for the fields are:

Scheme: amqp
Host: localhost
Port: 5672
Username: guest
Password: guest
Vhost: /

type Version

type Version struct {
	Major int
	Minor int
	Patch int
}

func (Version) Compare

func (v Version) Compare(other Version) int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL