servicebus

package
v1.12.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MessageKeyMessageID defines the metadata key for the message id.
	MessageKeyMessageID = "MessageId" // read, write.
	// MessageKeyMessageIDAlias is an alias for "MessageId" for write only, for backwards-compatibility
	MessageKeyMessageIDAlias = "id"

	// MessageKeyCorrelationID defines the metadata key for the correlation id.
	MessageKeyCorrelationID = "CorrelationId" // read, write.
	// MessageKeyCorrelationIDAlias is an alias for "CorrelationId" for write only, for backwards-compatibility
	MessageKeyCorrelationIDAlias = "correlationID"

	// MessageKeySessionID defines the metadata key for the session id.
	MessageKeySessionID = "SessionId" // read, write.

	// MessageKeyLabel defines the metadata key for the label.
	MessageKeyLabel = "Label" // read, write.

	// MessageKeyReplyTo defines the metadata key for the reply to value.
	MessageKeyReplyTo = "ReplyTo" // read, write.

	// MessageKeyTo defines the metadata key for the to value.
	MessageKeyTo = "To" // read, write.

	// MessageKeyPartitionKey defines the metadata key for the partition key.
	MessageKeyPartitionKey = "PartitionKey" // read, write.

	// MessageKeyContentType defines the metadata key for the content type.
	MessageKeyContentType = "ContentType" // read, write.

	// MessageKeyDeliveryCount defines the metadata key for the delivery count.
	MessageKeyDeliveryCount = "DeliveryCount" // read.

	// MessageKeyLockedUntilUtc defines the metadata key for the locked until utc value.
	MessageKeyLockedUntilUtc = "LockedUntilUtc" // read.

	// MessageKeyLockToken defines the metadata key for the lock token.
	MessageKeyLockToken = "LockToken" // read.

	// MessageKeyEnqueuedTimeUtc defines the metadata key for the enqueued time utc value.
	MessageKeyEnqueuedTimeUtc = "EnqueuedTimeUtc" // read.

	// MessageKeySequenceNumber defines the metadata key for the sequence number.
	MessageKeySequenceNumber = "SequenceNumber" // read.

	// MessageKeyScheduledEnqueueTimeUtc defines the metadata key for the scheduled enqueue time utc value.
	MessageKeyScheduledEnqueueTimeUtc = "ScheduledEnqueueTimeUtc" // read, write.

	// MessageKeyReplyToSessionID defines the metadata key for the reply to session id.
	// Currently unused.
	MessageKeyReplyToSessionID = "ReplyToSessionId" // read, write.
)
View Source
const (
	MetadataModeBinding byte = 1 << iota
	MetadataModeTopics

	MetadataModeQueues byte = 0
)

Modes for ParseMetadata.

View Source
const (
	RequireSessionsMetadataKey       = "requireSessions"
	SessionIdleTimeoutMetadataKey    = "sessionIdleTimeoutInSec"
	MaxConcurrentSessionsMetadataKey = "maxConcurrentSessions"

	DefaultSesssionIdleTimeoutInSec = 60
	DefaultMaxConcurrentSessions    = 8
)

Variables

This section is empty.

Functions

func IsLockLostError added in v1.10.1

func IsLockLostError(err error) bool

IsLockLostError returns true if the error is "locklost".

func IsNetworkError added in v1.9.0

func IsNetworkError(err error) bool

IsNetworkError returns true if the error returned by Service Bus is a network-level one, which would require reconnecting.

func IsRetriableAMQPError added in v1.10.1

func IsRetriableAMQPError(err error) bool

IsRetriableAMQPError returns true if the error returned by Service Bus is a retriable error from AMQP, which doesn't require reconnecting.

func NewASBMessageFromBulkMessageEntry added in v1.10.1

func NewASBMessageFromBulkMessageEntry(entry pubsub.BulkMessageEntry) (*azservicebus.Message, error)

NewASBMessageFromBulkMessageEntry builds a new Azure Service Bus message from a BulkMessageEntry.

func NewASBMessageFromInvokeRequest added in v1.10.1

func NewASBMessageFromInvokeRequest(req *bindings.InvokeRequest) (*azservicebus.Message, error)

NewASBMessageFromInvokeRequest builds a new Azure Service Bus message from a binding's Invoke request.

func NewASBMessageFromPubsubRequest added in v1.10.1

func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error)

NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest.

func NewBulkMessageEntryFromASBMessage added in v1.10.1

func NewBulkMessageEntryFromASBMessage(asbMsg *azservicebus.ReceivedMessage) (pubsub.BulkMessageEntry, error)

NewBulkMessageEntryFromASBMessage returns a pubsub.NewMessageEntry from a bulk message received from ASB.

func NewPubsubMessageFromASBMessage added in v1.10.1

func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error)

NewPubsubMessageFromASBMessage returns a pubsub.NewMessage from a message received from ASB.

func UpdateASBBatchMessageWithBulkPublishRequest added in v1.10.1

func UpdateASBBatchMessageWithBulkPublishRequest(asbMsgBatch *azservicebus.MessageBatch, req *pubsub.BulkPublishRequest) error

UpdateASBBatchMessageWithBulkPublishRequest updates the batch message with messages from the bulk publish request.

Types

type Client added in v1.10.1

type Client struct {
	// contains filtered or unexported fields
}

Client contains the clients for Service Bus and methods to get senders and to create topics, subscriptions, queues.

func NewClient added in v1.10.1

func NewClient(metadata *Metadata, rawMetadata map[string]string) (*Client, error)

NewClient creates a new Client object.

func (*Client) Close added in v1.10.1

func (c *Client) Close(log logger.Logger)

Close the client and every sender or consumer created by the connnection.

func (*Client) CloseAllSenders added in v1.10.1

func (c *Client) CloseAllSenders(log logger.Logger)

CloseAllSenders closes all sender connections.

func (*Client) CloseSender added in v1.10.1

func (c *Client) CloseSender(queueOrTopic string, log logger.Logger)

CloseSender closes a sender for a queue or topic.

func (*Client) EnsureQueue added in v1.10.1

func (c *Client) EnsureQueue(ctx context.Context, queue string) error

EnsureTopic creates the queue if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) EnsureSubscription added in v1.10.1

func (c *Client) EnsureSubscription(ctx context.Context, name string, topic string, opts SubscribeOptions) error

EnsureSubscription creates the topic subscription if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) EnsureTopic added in v1.10.1

func (c *Client) EnsureTopic(ctx context.Context, topic string) error

EnsureTopic creates the topic if it doesn't exist. Returns with nil error if the admin client doesn't exist.

func (*Client) GetClient added in v1.10.1

func (c *Client) GetClient() *servicebus.Client

GetClient returns the azservicebus.Client object.

func (*Client) GetSender added in v1.10.1

func (c *Client) GetSender(ctx context.Context, queueOrTopic string, ensureFn ensureFn) (*servicebus.Sender, error)

GetSenderForTopic returns the sender for a queue or topic, or creates a new one if it doesn't exist

func (*Client) PublishBinding added in v1.10.1

func (c *Client) PublishBinding(ctx context.Context, req *bindings.InvokeRequest, queueOrTopic string, log logger.Logger) (*bindings.InvokeResponse, error)

PublishBinding is used by binding components to publish messages. It includes a retry logic that can also cause reconnections. Note this doesn't invoke "EnsureQueue" or "EnsureTopic" because bindings don't do that on publishing.

func (*Client) PublishPubSub added in v1.10.1

func (c *Client) PublishPubSub(ctx context.Context, req *pubsub.PublishRequest, ensureFn ensureFn, log logger.Logger) error

PublishPubSub is used by PubSub components to publish messages. It includes a retry logic that can also cause reconnections.

func (*Client) PublishPubSubBulk added in v1.10.1

func (c *Client) PublishPubSubBulk(ctx context.Context, req *pubsub.BulkPublishRequest, ensureFn ensureFn, log logger.Logger) (pubsub.BulkPublishResponse, error)

PublishPubSubBulk is used by PubSub components to publush bulk messages.

func (*Client) ReconnectionBackoff added in v1.10.1

func (c *Client) ReconnectionBackoff() backoff.BackOff

ReconnectionBackoff returns the backoff for reconnecting in a subscription.

type HandlerFn added in v1.10.1

type HandlerFn func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]HandlerResponseItem, error)

HandlerFn is the type for handlers that receive messages

func GetBulkPubSubHandlerFunc added in v1.10.1

func GetBulkPubSubHandlerFunc(topic string, handler pubsub.BulkHandler, log logger.Logger, timeout time.Duration) HandlerFn

GetPubSubHandlerFunc returns the handler function for bulk pubsub messages.

func GetPubSubHandlerFunc added in v1.10.1

func GetPubSubHandlerFunc(topic string, handler pubsub.Handler, log logger.Logger, timeout time.Duration) HandlerFn

GetPubSubHandlerFunc returns the handler function for pubsub messages.

type HandlerResponseItem added in v1.10.1

type HandlerResponseItem struct {
	EntryId string //nolint:stylecheck
	Error   error
}

HandlerResponseItem represents a response from the handler for each message.

type MessageReceiver added in v1.10.1

type MessageReceiver struct {
	*azservicebus.Receiver
}

func NewMessageReceiver added in v1.10.1

func NewMessageReceiver(r *azservicebus.Receiver) *MessageReceiver

type Metadata added in v1.10.1

type Metadata struct {
	/** For bindings and pubsubs **/
	ConnectionString                string `mapstructure:"connectionString"`
	ConsumerID                      string `mapstructure:"consumerID"` // Only topics
	TimeoutInSec                    int    `mapstructure:"timeoutInSec"`
	HandlerTimeoutInSec             int    `mapstructure:"handlerTimeoutInSec"`
	LockRenewalInSec                int    `mapstructure:"lockRenewalInSec"`
	MaxActiveMessages               int    `mapstructure:"maxActiveMessages"`
	MaxConnectionRecoveryInSec      int    `mapstructure:"maxConnectionRecoveryInSec"`
	MinConnectionRecoveryInSec      int    `mapstructure:"minConnectionRecoveryInSec"`
	DisableEntityManagement         bool   `mapstructure:"disableEntityManagement"`
	MaxRetriableErrorsPerSec        int    `mapstructure:"maxRetriableErrorsPerSec"`
	MaxDeliveryCount                *int32 `mapstructure:"maxDeliveryCount"`              // Only used during subscription creation - default is set by the server (10)
	LockDurationInSec               *int   `mapstructure:"lockDurationInSec"`             // Only used during subscription creation - default is set by the server (60s)
	DefaultMessageTimeToLiveInSec   *int   `mapstructure:"defaultMessageTimeToLiveInSec"` // Only used during subscription creation - default is set by the server (depends on the tier)
	AutoDeleteOnIdleInSec           *int   `mapstructure:"autoDeleteOnIdleInSec"`         // Only used during subscription creation - default is set by the server (disabled)
	MaxConcurrentHandlers           int    `mapstructure:"maxConcurrentHandlers"`
	PublishMaxRetries               int    `mapstructure:"publishMaxRetries"`
	PublishInitialRetryIntervalInMs int    `mapstructure:"publishInitialRetryIntervalInMs"`
	NamespaceName                   string `mapstructure:"namespaceName"` // Only for Azure AD

	/** For bindings only **/
	QueueName string `mapstructure:"queueName" mdonly:"bindings"` // Only queues
}

Metadata options for Service Bus components. Note: AzureAD-related keys are handled separately.

func ParseMetadata added in v1.10.1

func ParseMetadata(md map[string]string, logger logger.Logger, mode byte) (m *Metadata, err error)

ParseMetadata parses metadata keys that are common to all Service Bus components

func (Metadata) CreateQueueProperties added in v1.10.1

func (a Metadata) CreateQueueProperties() *sbadmin.QueueProperties

CreateQueueProperties returns the QueueProperties object to create new Queues in Service Bus.

func (Metadata) CreateSubscriptionProperties added in v1.10.1

func (a Metadata) CreateSubscriptionProperties(opts SubscribeOptions) *sbadmin.SubscriptionProperties

CreateSubscriptionProperties returns the SubscriptionProperties object to create new Subscriptions to Service Bus topics.

type Receiver added in v1.10.1

type Receiver interface {
	ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
	CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.CompleteMessageOptions) error
	AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage, opts *azservicebus.AbandonMessageOptions) error
	Close(ctx context.Context) error
}

type SessionReceiver added in v1.10.1

type SessionReceiver struct {
	*azservicebus.SessionReceiver
}

func NewSessionReceiver added in v1.10.1

func NewSessionReceiver(r *azservicebus.SessionReceiver) *SessionReceiver

func (*SessionReceiver) RenewSessionLocks added in v1.10.1

func (s *SessionReceiver) RenewSessionLocks(ctx context.Context, timeout time.Duration) error

type SubscribeOptions added in v1.10.1

type SubscribeOptions struct {
	RequireSessions      bool
	MaxConcurrentSesions int
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.

func NewSubscription

func NewSubscription(opts SubscriptionOptions, logger logger.Logger) *Subscription

NewBulkSubscription returns a new Subscription object. Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.

func (*Subscription) AbandonMessage

func (s *Subscription) AbandonMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)

AbandonMessage marks a messsage as abandoned.

func (*Subscription) CompleteMessage

func (s *Subscription) CompleteMessage(ctx context.Context, receiver Receiver, m *azservicebus.ReceivedMessage)

CompleteMessage marks a message as complete.

func (*Subscription) Connect

func (s *Subscription) Connect(ctx context.Context, newReceiverFunc func() (Receiver, error)) (Receiver, error)

Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).

func (*Subscription) ReceiveBlocking added in v1.10.1

func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler HandlerFn, receiver Receiver, onFirstSuccess func(), logMsg string) error

ReceiveBlocking is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.

type SubscriptionOptions added in v1.10.1

type SubscriptionOptions struct {
	MaxActiveMessages     int
	TimeoutInSec          int
	MaxBulkSubCount       *int
	MaxRetriableEPS       int
	MaxConcurrentHandlers int
	Entity                string
	LockRenewalInSec      int
	RequireSessions       bool
	SessionIdleTimeout    time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL