shuttle

package module
v2.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2025 License: MIT Imports: 19 Imported by: 9

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetCorrelationId

func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error

SetCorrelationId sets the ServiceBus message's correlation ID to a user-specified value

func SetLogHandler added in v2.7.1

func SetLogHandler(handler slog.Handler)

SetLogHandler allows to set a custom slog.Handler to be used by the go-shuttle logger. If handler is nil, the default slog handler will be used.

func SetLoggerFunc deprecated added in v2.4.0

func SetLoggerFunc(fn func(ctx context.Context) Logger)

Deprecated: Use SetLogHandler instead to adapt slog. SetLoggerFunc sets the function to be used to acquire a logger when go-shuttle logs.

func SetMessageDelay

func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error

SetMessageDelay schedules a message in the future

func SetMessageId

func SetMessageId(messageId *string) func(msg *azservicebus.Message) error

SetMessageId sets the ServiceBus message's ID to a user-specified value

func SetMessageTTL

func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error

SetMessageTTL sets the ServiceBus message's TimeToLive to a user-specified value

func SetScheduleAt

func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error

SetScheduleAt schedules a message to be enqueued in the future

func SetSubject added in v2.8.0

func SetSubject(subject string) func(msg *azservicebus.Message) error

SetSubject sets the ServiceBus message's Subject property to a user-specified value

func SetTo added in v2.8.0

func SetTo(to string) func(msg *azservicebus.Message) error

SetTo sets the ServiceBus message's To property to a user-specified value

func WithReceiverSpanNameFormatter added in v2.6.0

func WithReceiverSpanNameFormatter(format func(defaultSpanName string, message *azservicebus.ReceivedMessage) string) func(t *TracingHandlerOpts)

WithReceiverSpanNameFormatter allows formatting name of the span started by the tracing handler in NewTracingHandler.

func WithSpanStartOptions added in v2.6.0

func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)

WithSpanStartOptions allows setting custom span start options for the tracing handler in NewTracingHandler.

func WithTracePropagation

func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error

WithTracePropagation is a sender option to inject the trace context into the message

func WithTraceProvider added in v2.6.0

func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)

WithTraceProvider allows setting a custom trace provider for the tracing handler in NewTracingHandler.

Types

type Abandon

type Abandon struct {
	// contains filtered or unexported fields
}

Abandon settlement will cause a message to be available again from the queue or subscription. This will increment its delivery count, and potentially cause it to be dead-lettered depending on your queue or subscription's configuration.

func (*Abandon) Settle

func (a *Abandon) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type AzServiceBusSender

type AzServiceBusSender interface {
	SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
	SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
	NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
	Close(ctx context.Context) error
}

AzServiceBusSender is satisfied by *azservicebus.Sender

type Complete

type Complete struct {
	// contains filtered or unexported fields
}

Complete settlement completes a message, deleting it from the queue or subscription.

func (*Complete) Settle

func (a *Complete) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type ConstantDelayStrategy

type ConstantDelayStrategy struct {
	Delay time.Duration
}

ConstantDelayStrategy delays the message retry by the given duration

func (*ConstantDelayStrategy) GetDelay

func (s *ConstantDelayStrategy) GetDelay(_ uint32) time.Duration

type DeadLetter

type DeadLetter struct {
	// contains filtered or unexported fields
}

DeadLetter settlement moves the message to the dead letter queue for a queue or subscription. To process deadlettered messages, create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.

func (*DeadLetter) Settle

func (a *DeadLetter) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type DefaultJSONMarshaller

type DefaultJSONMarshaller struct {
}

DefaultJSONMarshaller is the default marshaller for JSON messages

func (*DefaultJSONMarshaller) ContentType

func (j *DefaultJSONMarshaller) ContentType() string

ContentType returns the content type for the JSON marshaller

func (*DefaultJSONMarshaller) Marshal

Marshal marshals the user-input struct into a JSON string and returns a new message with the JSON string as the body

func (*DefaultJSONMarshaller) Unmarshal

Unmarshal unmarshals the message body from a JSON string into the user-input struct

type DefaultProtoMarshaller

type DefaultProtoMarshaller struct {
}

DefaultProtoMarshaller is the default marshaller for protobuf messages

func (*DefaultProtoMarshaller) ContentType

func (p *DefaultProtoMarshaller) ContentType() string

ContentType returns teh contentType for the protobuf marshaller

func (*DefaultProtoMarshaller) Marshal

Marshal marshals the user-input struct into a protobuf message and returns a new ServiceBus message with the protofbuf message as the body

func (*DefaultProtoMarshaller) Unmarshal

Unmarshal unmarshalls the protobuf message from the ServiceBus message into the user-input struct

type Defer

type Defer struct {
	// contains filtered or unexported fields
}

Defer settlement will cause a message to be deferred. Deferred messages are moved to ta deferred queue. They can only be received using `Receiver.ReceiveDeferredMessages`.

func (*Defer) Settle

func (a *Defer) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type Handler

type Handler interface {
	Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

HandlerFunc is a func to handle the message received from a subscription

func NewLockRenewalHandler deprecated added in v2.4.0

func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc

Deprecated: use NewRenewLockHandler NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.

func NewPanicHandler

func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc

NewPanicHandler recovers panics from downstream handlers

func NewRenewLockHandler

func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc

NewRenewLockHandler returns a middleware handler that will renew the lock on the message at the specified interval.

func NewSettlementHandler

func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc

NewSettlementHandler creates a middleware to use the Settlement api in the message handler implementation.

Example
package main

import (
	"context"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"

	"github.com/Azure/go-shuttle/v2"
)

func main() {
	tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}
	client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
	if err != nil {
		panic(err)
	}
	receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
	if err != nil {
		panic(err)
	}
	lockRenewalInterval := 10 * time.Second
	p := shuttle.NewProcessor(receiver,
		shuttle.NewPanicHandler(nil,
			shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
				shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

	ctx, cancel := context.WithCancel(context.Background())
	err = p.Start(ctx)
	if err != nil {
		panic(err)
	}
	cancel()
}

func mySettlingHandler() shuttle.Settler {
	return func(ctx context.Context, message *azservicebus.ReceivedMessage) shuttle.Settlement {
		return &shuttle.Complete{}
	}
}

func NewTracingHandler

func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc

NewTracingHandler is a shuttle middleware that extracts the context from the message Application property if available, or from the existing context if not, and starts a span.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type HealthCheckable added in v2.7.1

type HealthCheckable interface {
	HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error
}

HealthCheckable is an interface for performing health checks on azservicebus.Sender and azservicebus.Receiver.

type HealthChecker added in v2.7.1

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker performs periodic health checks on the Service Bus Senders and Receivers.

func NewHealthChecker added in v2.7.1

func NewHealthChecker(clients map[string]*azservicebus.Client, options *HealthCheckerOptions) *HealthChecker

NewHealthChecker creates a new HealthChecker with the provided clients. clients is a map of namespaceName name to azservicebus.Client.

func (*HealthChecker) StartPeriodicHealthCheck added in v2.7.1

func (h *HealthChecker) StartPeriodicHealthCheck(ctx context.Context, hc HealthCheckable)

StartPeriodicHealthCheck starts the periodic health check for the provided HealthCheckable. The health check will run on each client in the HealthChecker. Stops when the context is canceled.

type HealthCheckerOptions added in v2.7.1

type HealthCheckerOptions struct {
	// HealthCheckInterval is the time between health checks.
	HealthCheckInterval time.Duration
	// HealthCheckTimeout is the context timeout for each health check
	HealthCheckTimeout time.Duration
}

HealthCheckerOptions configures the HealthChecker. HealthCheckInterval defaults to 1 minute if not set or set to <= 0. HealthCheckTimeout defaults to HealthChecker.interval if not set or set to 0 or set to be larger than interval.

type LockRenewalOptions added in v2.4.0

type LockRenewalOptions struct {
	// Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds.
	Interval *time.Duration
	// LockRenewalTimeout is the timeout value used on the context when sending RenewMessageLock() request.
	// Defaults to 5 seconds if not set or 0. Defaults to Lock Expiry time if set to a negative value.
	LockRenewalTimeout *time.Duration
	// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
	// Defaults to true.
	CancelMessageContextOnStop *bool
	// MetricRecorder allows to pass a custom metric recorder for the LockRenewer.
	// Defaults to processor.Metric instance.
	MetricRecorder processor.Recorder
}

LockRenewalOptions configures the lock renewal.

type LockRenewer

type LockRenewer interface {
	RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

LockRenewer abstracts the servicebus receiver client to only expose lock renewal

type Logger

type Logger interface {
	Info(s string)
	Warn(s string)
	Error(s string)
}

type ManagedSettler

type ManagedSettler struct {
	// contains filtered or unexported fields
}

ManagedSettler is a middleware that allows to reduce the message handler signature to ManagedSettlingFunc

func NewManagedSettlingHandler

func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingHandler) *ManagedSettler

NewManagedSettlingHandler allows to configure Retry decision logic and delay strategy. It also adapts the handler to let the user return an error from the handler, instead of a settlement. the settlement is inferred from the handler's return value. error -> abandon nil -> complete the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue)

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"

	"github.com/Azure/go-shuttle/v2"
)

func main() {
	tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}
	client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
	if err != nil {
		panic(err)
	}
	receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
	if err != nil {
		panic(err)
	}
	lockRenewalInterval := 10 * time.Second
	p := shuttle.NewProcessor(receiver,
		shuttle.NewPanicHandler(nil,
			shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
				shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{
					RetryDecision:      &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2},
					RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second},
					OnAbandoned: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) {
						fmt.Printf("message abandoned due to error: %s\n", err)
					},
					OnDeadLettered: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) {
						fmt.Printf("message deadlettered due to error: %s\n", err)
					},
				}, myManagedSettlementHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

	ctx, cancel := context.WithCancel(context.Background())
	err = p.Start(ctx)
	if err != nil {
		panic(err)
	}
	cancel()
}

func myManagedSettlementHandler() shuttle.ManagedSettlingHandler {
	count := 0
	return shuttle.ManagedSettlingFunc(func(ctx context.Context, message *azservicebus.ReceivedMessage) error {
		count++
		if count == 0 {
			// this will abandon the message
			return fmt.Errorf("this will abandon the message, and eventually move it to DLQ")
		}
		return nil // this will complete the message
	})
}

func (*ManagedSettler) Handle

func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type ManagedSettlingFunc

type ManagedSettlingFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error

ManagedSettlingFunc allows to convert a function with the signature func(context.Context, *azservicebus.ReceivedMessage) error to the ManagedSettlingHandler interface.

func (ManagedSettlingFunc) Handle added in v2.3.0

type ManagedSettlingHandler added in v2.3.0

type ManagedSettlingHandler interface {
	Handle(context.Context, *azservicebus.ReceivedMessage) error
}

ManagedSettlingHandler is the message Handler interface for the ManagedSettler.

type ManagedSettlingOptions

type ManagedSettlingOptions struct {
	// Allows to override the built-in error handling logic.
	// OnError is called before any message settling action is taken.
	// the ManagedSettlingOptions struct is passed as an argument so that the configuration
	// like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently
	OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error)
	// RetryDecision is invoked to decide whether an error should be retried.
	// the default is to retry 5 times before moving the message to the deadletter.
	RetryDecision RetryDecision
	// RetryDelayStrategy is invoked when a message handling does not complete successfully
	// and the RetryDecision decides to retry the message.
	// The handler will sleep for the time calculated by the delayStrategy before Abandoning the message.
	RetryDelayStrategy RetryDelayStrategy
	// OnAbandoned is invoked when the handler returns an error. It is invoked after the message is abandoned.
	OnAbandoned func(context.Context, *azservicebus.ReceivedMessage, error)
	// OnDeadLettered is invoked after the ManagedSettling dead-letters a message.
	// this occurs when the RetryDecision.CanRetry implementation returns false following an error returned by the handler
	// It is invoked after the message is dead-lettered.
	OnDeadLettered func(context.Context, *azservicebus.ReceivedMessage, error)
	// OnCompleted is a func that is invoked when the handler does not return any error. it is invoked after the message is completed.
	OnCompleted func(context.Context, *azservicebus.ReceivedMessage)
}

ManagedSettlingOptions allows to configure the ManagedSettling middleware

type Marshaller

type Marshaller interface {
	Marshal(mb MessageBody) (*azservicebus.Message, error)
	Unmarshal(msg *azservicebus.Message, mb MessageBody) error
	ContentType() string
}

type MaxAttemptsRetryDecision

type MaxAttemptsRetryDecision struct {
	MaxAttempts uint32
}

MaxAttemptsRetryDecision defines how many delivery the handler allows before explicitly moving the message to the deadletter queue. This requires the MaxDeliveryCount from the queue or subscription to be higher than the MaxAttempts property. If the queue or subscription's MaxDeliveryCount is lower than MaxAttempts, service bus will move the message to the DLQ before the handler reaches the MaxAttempts.

func (*MaxAttemptsRetryDecision) CanRetry

type MessageBody

type MessageBody any

MessageBody is a type to represent that an input message body can be of any type

type MessageSettler

type MessageSettler interface {
	AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error
	CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error
	DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error
	DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error
	RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

MessageSettler is passed to the handlers. it exposes the message settling functionality from the receiver needed within the handler.

type NoOp

type NoOp struct {
}

NoOp settlement exits the handler without taking an action, letting the message's peek lock expire before incrementing the delivery count, or moving it to the deadletter, depending on the queue or subscription's configuration

func (*NoOp) Settle

func (a *NoOp) Settle(ctx context.Context, _ MessageSettler, message *azservicebus.ReceivedMessage)

type PanicHandlerOptions

type PanicHandlerOptions struct {
	OnPanicRecovered func(
		ctx context.Context,
		settler MessageSettler,
		message *azservicebus.ReceivedMessage,
		recovered any)
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor encapsulates the message pump and concurrency handling of servicebus. it exposes a handler API to provides a middleware based message processing pipeline.

Example
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
	panic(err)
}
client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
if err != nil {
	panic(err)
}
receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
if err != nil {
	panic(err)
}
lockRenewalInterval := 10 * time.Second
lockRenewalOptions := &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}
p := shuttle.NewProcessor(receiver,
	shuttle.NewPanicHandler(nil,
		shuttle.NewRenewLockHandler(lockRenewalOptions,
			MyHandler(0*time.Second))),
	&shuttle.ProcessorOptions{
		MaxConcurrency:  10,
		StartMaxAttempt: 5,
	},
)

ctx, cancel := context.WithCancel(context.Background())
err = p.Start(ctx)
if err != nil {
	panic(err)
}
cancel()
Example (MultiProcessor)
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
	panic(err)
}
client, err := azservicebus.NewClient("myservicebus-1.servicebus.windows.net", tokenCredential, nil)
if err != nil {
	panic(err)
}
receiver1, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
if err != nil {
	panic(err)
}
client, err = azservicebus.NewClient("myservicebus-2.servicebus.windows.net", tokenCredential, nil)
if err != nil {
	panic(err)
}
receiver2, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
if err != nil {
	panic(err)
}
receivers := []*shuttle.ReceiverEx{
	shuttle.NewReceiverEx("receiver1", receiver1),
	shuttle.NewReceiverEx("receiver2", receiver2),
}
lockRenewalInterval := 10 * time.Second
lockRenewalOptions := &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}
p := shuttle.NewMultiProcessor(receivers,
	shuttle.NewPanicHandler(nil,
		shuttle.NewRenewLockHandler(lockRenewalOptions,
			MyHandler(0*time.Second))),
	&shuttle.ProcessorOptions{
		MaxConcurrency:  10,
		StartMaxAttempt: 5,
	},
)

ctx, cancel := context.WithCancel(context.Background())
err = p.Start(ctx)
if err != nil {
	panic(err)
}
cancel()

func NewMultiProcessor added in v2.7.1

func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor

NewMultiProcessor creates a new processor with a list of receivers and a handler.

func NewProcessor

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor

NewProcessor creates a new processor with the provided receiver and handler.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts processing on all the receivers of the processor and blocks until all processors are stopped or the context is canceled. It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. Returns a combined list of errors encountered during each processor start.

type ProcessorOptions

type ProcessorOptions struct {
	// MaxConcurrency is the maximum number of concurrent messages to process at a time.
	MaxConcurrency int

	// MaxReceiveCount is the maximum number of messages to receive at a time. This value is passed to the receiver.
	MaxReceiveCount int

	// ReceiveInterval is the interval between each receive call.
	ReceiveInterval *time.Duration

	// StartMaxAttempt is the maximum number of attempts to start the processor.
	StartMaxAttempt int

	// StartRetryDelay is the delay between each start attempt.
	StartRetryDelayStrategy RetryDelayStrategy
}

ProcessorOptions configures the processor MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. MaxReceiveCount defaults to MaxConcurrency if not set. Not setting MaxReceiveCount, or setting it to 0 or a negative value will fallback to the default. Setting it to a value greater than MaxConcurrency will also fallback to the default. ReceiveInterval defaults to 2 seconds if not set. StartMaxAttempt defaults to 1 if not set (no retries). Not setting StartMaxAttempt, or setting it to non-positive value will fallback to the default. StartRetryDelayStrategy defaults to a fixed 5-second delay if not set.

type Receiver

type Receiver interface {
	ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
	MessageSettler
}

type ReceiverEx added in v2.7.1

type ReceiverEx struct {
	// contains filtered or unexported fields
}

func NewReceiverEx added in v2.7.1

func NewReceiverEx(name string, sbReceiver Receiver) *ReceiverEx

type ReceiverHealthChecker added in v2.7.1

type ReceiverHealthChecker struct {
	EntityName       string
	SubscriptionName string
}

ReceiverHealthChecker performs health checks on azservicebus.Receiver.

func (*ReceiverHealthChecker) HealthCheck added in v2.7.1

func (r *ReceiverHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error

HealthCheck performs a health check on the azservicebus.Receiver by peeking a message.

type RetryDecision

type RetryDecision interface {
	// CanRetry inspects the error returned from the message handler, and the message itself to decide if it should be retried or not.
	CanRetry(err error, message *azservicebus.ReceivedMessage) bool
}

RetryDecision allows to provide custom retry decision.

type RetryDelayStrategy

type RetryDelayStrategy interface {
	GetDelay(attempt uint32) time.Duration
}

RetryDelayStrategy can be implemented to provide custom delay retry strategies.

type SendAsBatchOptions added in v2.9.0

type SendAsBatchOptions struct {
	// AllowMultipleBatch when true, allows splitting large message arrays into multiple batches.
	// When false, behaves like the original SendMessageBatch method.
	// Default: false
	AllowMultipleBatch bool
}

SendAsBatchOptions contains options for the SendAsBatch method

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message

func NewSender

func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender

NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue

func (*Sender) AzSender added in v2.2.0

func (d *Sender) AzSender() AzServiceBusSender

AzSender returns the underlying azservicebus.Sender instance.

func (*Sender) FailOver deprecated added in v2.7.1

func (d *Sender) FailOver(sender AzServiceBusSender)

Deprecated: use SetAzSender. FailOver sets the underlying azservicebus.Sender instance to the provided one.

func (*Sender) SendAsBatch added in v2.9.0

func (d *Sender) SendAsBatch(ctx context.Context, messages []*azservicebus.Message, options *SendAsBatchOptions) error

SendAsBatch sends the array of azservicebus messages as batches. When options.AllowMultipleBatch is true, large message arrays are split into multiple batches. When options.AllowMultipleBatch is false, behaves like SendMessageBatch (fails if messages don't fit in single batch).

func (*Sender) SendMessage

func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error

SendMessage sends a payload on the bus. the MessageBody is marshalled and set as the message body.

func (*Sender) SendMessageBatch added in v2.2.0

func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error

SendMessageBatch sends the array of azservicebus messages as a batch. Deprecated: Use SendAsBatch instead. This method will be removed in a future version.

func (*Sender) SetAzSender added in v2.7.2

func (d *Sender) SetAzSender(sender AzServiceBusSender)

SetAzSender sets the underlying azservicebus.Sender instance to the provided one. All ongoing send operations will continue to use the old sender instance, while new send operations will use the new sender instance.

func (*Sender) ToServiceBusMessage added in v2.2.0

func (d *Sender) ToServiceBusMessage(
	ctx context.Context,
	mb MessageBody,
	options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error)

ToServiceBusMessage transform a MessageBody into an azservicebus.Message. It marshals the body using the sender's configured marshaller, and set the bytes as the message.Body. the sender's configured options are applied to the azservicebus.Message before returning it.

type SenderHealthChecker added in v2.7.1

type SenderHealthChecker struct {
	EntityName string
}

SenderHealthChecker performs health checks on azservicebus.Sender.

func (*SenderHealthChecker) HealthCheck added in v2.7.1

func (s *SenderHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error

HealthCheck performs a health check on azservicebus.Sender by creating a new message batch.

type SenderOptions

type SenderOptions struct {
	// Marshaller will be used to marshall the messageBody to the azservicebus.Message Body property
	// defaults to DefaultJSONMarshaller
	Marshaller Marshaller
	// EnableTracingPropagation automatically applies WithTracePropagation option on all message sent through this sender
	EnableTracingPropagation bool
	// SendTimeout is the timeout value used on the context that sends messages
	// Defaults to 30 seconds if not set or 0
	// Disabled when set to a negative value
	SendTimeout time.Duration
}

type Settlement

type Settlement interface {
	Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}

Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp

type SettlementHandlerOptions

type SettlementHandlerOptions struct {
	// OnNilSettlement is a func that allows to handle cases where the downstream handler returns nil.
	// the default behavior is to panic.
	OnNilSettlement func() Settlement
}

SettlementHandlerOptions allows to configure the SettleHandler

type Settler

type Settler func(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement

func (Settler) Handle

type TracingHandlerOpts added in v2.6.0

type TracingHandlerOpts struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
Package metrics allows to configure, record and read go-shuttle metrics
Package metrics allows to configure, record and read go-shuttle metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL