servicebus

package
v1.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: Apache-2.0 Imports: 27 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// MessageKeyMessageID defines the metadata key for the message id.
	MessageKeyMessageID = "MessageId" // read, write.
	// MessageKeyMessageIDAlias is an alias for "MessageId" for write only, for backwards-compatibility
	MessageKeyMessageIDAlias = "id"

	// MessageKeyCorrelationID defines the metadata key for the correlation id.
	MessageKeyCorrelationID = "CorrelationId" // read, write.
	// MessageKeyCorrelationIDAlias is an alias for "CorrelationId" for write only, for backwards-compatibility
	MessageKeyCorrelationIDAlias = "correlationID"

	// MessageKeySessionID defines the metadata key for the session id.
	MessageKeySessionID = "SessionId" // read, write.

	// MessageKeyLabel defines the metadata key for the label.
	MessageKeyLabel = "Label" // read, write.

	// MessageKeyReplyTo defines the metadata key for the reply to value.
	MessageKeyReplyTo = "ReplyTo" // read, write.

	// MessageKeyTo defines the metadata key for the to value.
	MessageKeyTo = "To" // read, write.

	// MessageKeyPartitionKey defines the metadata key for the partition key.
	MessageKeyPartitionKey = "PartitionKey" // read, write.

	// MessageKeyContentType defines the metadata key for the content type.
	MessageKeyContentType = "ContentType" // read, write.

	// MessageKeyDeliveryCount defines the metadata key for the delivery count.
	MessageKeyDeliveryCount = "DeliveryCount" // read.

	// MessageKeyLockedUntilUtc defines the metadata key for the locked until utc value.
	MessageKeyLockedUntilUtc = "LockedUntilUtc" // read.

	// MessageKeyLockToken defines the metadata key for the lock token.
	MessageKeyLockToken = "LockToken" // read.

	// MessageKeyEnqueuedTimeUtc defines the metadata key for the enqueued time utc value.
	MessageKeyEnqueuedTimeUtc = "EnqueuedTimeUtc" // read.

	// MessageKeySequenceNumber defines the metadata key for the sequence number.
	MessageKeySequenceNumber = "SequenceNumber" // read.

	// MessageKeyScheduledEnqueueTimeUtc defines the metadata key for the scheduled enqueue time utc value.
	MessageKeyScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc" // read, write.

	// MessageKeyReplyToSessionID defines the metadata key for the reply to session id.
	// Currently unused.
	MessageKeyReplyToSessionID = "ReplyToSessionId" // read, write.
)
View Source
const (
	MetadataModeBinding byte = 1 << iota
	MetadataModeTopics

	MetadataModeQueues byte = 0
)

Modes for ParseMetadata.

View Source
const (
	RequireSessionsMetadataKey       = "requireSessions"
	SessionIdleTimeoutMetadataKey    = "sessionIdleTimeoutInSec"
	MaxConcurrentSessionsMetadataKey = "maxConcurrentSessions"

	DefaultSesssionIdleTimeoutInSec = 60
	DefaultMaxConcurrentSessions    = 8
)

Variables

This section is empty.

Functions

func IsLockLostError

func IsLockLostError(err error) bool

IsLockLostError returns true if the error is "locklost".

func IsNetworkError

func IsNetworkError(err error) bool

IsNetworkError returns true if the error returned by Service Bus is a network-level one, which would require reconnecting.

func IsRetriableAMQPError

func IsRetriableAMQPError(err error) bool

IsRetriableAMQPError returns true if the error returned by Service Bus is a retriable error from AMQP, which doesn't require reconnecting.

func NewASBMessageFromBulkMessageEntry

func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error)

NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.

func NewASBMessageFromInvokeRequest

func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error)

NewASBMessageFromInvokeRequest builds a new Azure Service Bus message from a binding's Invoke request.

func NewASBMessageFromPubsubRequest

func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error)

NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.

func NewBulkMessageEntryFromASBMessage

func NewBulkMessageEntryFromASBMessage(asbMsg *azservicebus.ReceivedMessage) (pubsub.BulkMessageEntry, error)

NewBulkMessageEntryFromASBMessage returns a pubsub.NewMessageEntry from a bulk message received from ASB.

func NewPubsubMessageFromASBMessage

func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error)

NewPubsubMessageFromASBMessage returns a pubsub.NewMessage from a message received from ASB.

func UpdateASBBatchMessageWithBulkPublishRequest

func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error

UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client contains the clients for Service Bus and methods to get senders and to create topics, subscriptions, queues.

func NewClient

func NewClient(metadata *Metadata, rawMetadata map[string]string) (*Client, error)

NewClient creates a new Client object.

func (*Client) Close

func (c *Client) Close(log logger.Logger)

Close the client and every sender or consumer created by the connnection.

func (*Client) CloseAllSenders

func (c *Client) CloseAllSenders(log logger.Logger)

CloseAllSenders closes all sender connections.

func (*Client) CloseSender

func (c *Client) CloseSender(queueOrTopic string, log logger.Logger)

CloseSender closes a sender for a queue or topic.

func (*Client) EnsureQueue

func (c *Client) EnsureQueue(ctx context.Context, queue string) error

EnsureTopic creates the queue if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) EnsureSubscription

func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscribeOptions) error

EnsureSubscription creates the topic subscription if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) EnsureTopic

func (c *Client) EnsureTopic(ctx context.Context, topic string) error

EnsureTopic creates the topic if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) GetClient

func (c *Client) GetClient() *servicebus.Client

GetClient returns the azservicebus.Client object.

func (*Client) GetSender

func (c *Client) GetSender(ctx context.Context, queueOrTopic string, ensureFn ensureFn) (*servicebus.Sender, error)

GetSenderForTopic returns the sender for a queue or topic, or creates a new one if it doesn't exist

func (*Client) PublishBinding

func (c *Client) PublishBinding(ctx context.Context, req *bindings.InvokeRequest, queueOrTopic string, log logger.Logger) (*bindings.InvokeResponse, error)

PublishBinding is used by binding components to publish messages. It includes a retry logic that can also cause reconnections. Note this doesn't invoke "EnsureQueue" or "EnsureTopic" because bindings don't do that on publishing.

func (*Client) PublishPubSub

func (c *Client) PublishPubSub(ctx context.Context, req *pubsub.PublishRequest, ensureFn ensureFn, log logger.Logger) error

PublishPubSub is used by PubSub components to publish messages. It includes a retry logic that can also cause reconnections.

func (*Client) PublishPubSubBulk

func (c *Client) PublishPubSubBulk(ctx context.Context, req *pubsub.BulkPublishRequest, ensureFn ensureFn, log logger.Logger) (pubsub.BulkPublishResponse, error)

PublishPubSubBulk is used by PubSub components to publush bulk messages.

func (*Client) ReconnectionBackoff

func (c *Client) ReconnectionBackoff() backoff.BackOff

ReconnectionBackoff returns the backoff for reconnecting in a subscription.

type HandlerFn

type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error)

HandlerFn is the type for handlers that receive messages

func GetBulkPubSubHandlerFunc

func GetBulkPubSubHandlerFunc(topic string, handler pubsub.BulkHandler, log logger.Logger, timeout time.Duration) HandlerFn

GetPubSubHandlerFunc returns the handler function for bulk pubsub messages.

func GetPubSubHandlerFunc

func GetPubSubHandlerFunc(topic string, handler pubsub.Handler, log logger.Logger, timeout time.Duration) HandlerFn

GetPubSubHandlerFunc returns the handler function for pubsub messages.

type HandlerResponseItem

type HandlerResponseItem struct {
	EntryId string //nolint:stylecheck
	Error   error
}

HandlerResponseItem represents a response from the handler for each message.

type MessageReceiver

type MessageReceiver struct {
	*azservicebus.Receiver
}

func NewMessageReceiver

func NewMessageReceiver(r *azservicebus.Receiver) *MessageReceiver

type Metadata

type Metadata struct {
	/** For bindings and pubsubs **/
	ConnectionString                string `mapstructure:"connectionString"`
	ConsumerID                      string `mapstructure:"consumerID"` // Only topics
	TimeoutInSec                    int    `mapstructure:"timeoutInSec"`
	HandlerTimeoutInSec             int    `mapstructure:"handlerTimeoutInSec"`
	LockRenewalInSec                int    `mapstructure:"lockRenewalInSec"`
	MaxActiveMessages               int    `mapstructure:"maxActiveMessages"`
	MaxConnectionRecoveryInSec      int    `mapstructure:"maxConnectionRecoveryInSec"`
	MinConnectionRecoveryInSec      int    `mapstructure:"minConnectionRecoveryInSec"`
	DisableEntityManagement         bool   `mapstructure:"disableEntityManagement"`
	MaxRetriableErrorsPerSec        int    `mapstructure:"maxRetriableErrorsPerSec"`
	MaxDeliveryCount                *int32 `mapstructure:"maxDeliveryCount"`              // Only used during subscription creation - default is set by the server (10)
	LockDurationInSec               *int   `mapstructure:"lockDurationInSec"`             // Only used during subscription creation - default is set by the server (60s)
	DefaultMessageTimeToLiveInSec   *int   `mapstructure:"defaultMessageTimeToLiveInSec"` // Only used during subscription creation - default is set by the server (depends on the tier)
	AutoDeleteOnIdleInSec           *int   `mapstructure:"autoDeleteOnIdleInSec"`         // Only used during subscription creation - default is set by the server (disabled)
	MaxConcurrentHandlers           int    `mapstructure:"maxConcurrentHandlers"`
	PublishMaxRetries               int    `mapstructure:"publishMaxRetries"`
	PublishInitialRetryIntervalInMs int    `mapstructure:"publishInitialRetryIntervalInMs"`
	NamespaceName                   string `mapstructure:"namespaceName"` // Only for Azure AD

	/** For bindings only **/
	QueueName string `mapstructure:"queueName" mdonly:"bindings"` // Only queues
}

Metadata options for Service Bus components. Note: AzureAD-related keys are handled separately.

func ParseMetadata

func ParseMetadata(md map[string]string, logger logger.Logger, mode byte) (m *Metadata, err error)

ParseMetadata parses metadata keys that are common to all Service Bus components

func (Metadata) CreateQueueProperties

func (a Metadata) CreateQueueProperties() *sbadmin.QueueProperties

CreateQueueProperties returns the QueueProperties object to create new Queues in Service Bus.

func (Metadata) CreateSubscriptionProperties

func (a Metadata) CreateSubscriptionProperties(opts SubscribeOptions) *sbadmin.SubscriptionProperties

CreateSubscriptionProperties returns the SubscriptionProperties object to create new Subscriptions to Service Bus topics.

type Receiver

type Receiver interface {
	ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
	CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.CompleteMessageOptions) error
	AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.AbandonMessageOptions) error
	Close(ctx context.Context) error
}

type SessionReceiver

type SessionReceiver struct {
	*azservicebus.SessionReceiver
}

func (*SessionReceiver) RenewSessionLocks

func (s *SessionReceiver) RenewSessionLocks(ctx context.Context, timeout time.Duration) error

type SubscribeOptions

type SubscribeOptions struct {
	RequireSessions      bool
	MaxConcurrentSesions int
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.

func NewSubscription

func NewSubscription(opts SubscriptionOptions, logger logger.Logger) *Subscription

NewBulkSubscription returns a new Subscription object. Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.

func (*Subscription) AbandonMessage

func (s *Subscription) AbandonMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)

AbandonMessage marks a messsage as abandoned.

func (*Subscription) CompleteMessage

func (s *Subscription) CompleteMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)

CompleteMessage marks a message as complete.

func (*Subscription) Connect

func (s *Subscription) Connect(ctx context.Context, newReceiverFunc func() (Receiver, error)) (Receiver, error)

Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).

func (*Subscription) ReceiveBlocking

func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler HandlerFn, receiver Receiver, onFirstSuccess func(), logMsg string) error

ReceiveBlocking is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.

type SubscriptionOptions

type SubscriptionOptions struct {
	MaxActiveMessages     int
	TimeoutInSec          int
	MaxBulkSubCount       *int
	MaxRetriableEPS       int
	MaxConcurrentHandlers int
	Entity                string
	LockRenewalInSec      int
	RequireSessions       bool
	SessionIdleTimeout    time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL