Documentation
¶
Index ¶
- func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error
- func SetLogHandler(handler slog.Handler)
- func SetLoggerFunc(fn func(ctx context.Context) Logger)deprecated
- func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error
- func SetMessageId(messageId *string) func(msg *azservicebus.Message) error
- func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error
- func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error
- func SetSubject(subject string) func(msg *azservicebus.Message) error
- func SetTo(to string) func(msg *azservicebus.Message) error
- func WithReceiverSpanNameFormatter(...) func(t *TracingHandlerOpts)
- func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)
- func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error
- func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)
- type Abandon
- type AzServiceBusSender
- type Complete
- type ConstantDelayStrategy
- type DeadLetter
- type DefaultJSONMarshaller
- type DefaultProtoMarshaller
- type Defer
- type Handler
- type HandlerFunc
- func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFuncdeprecated
- func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc
- func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc
- func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc
- func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc
- type HealthCheckable
- type HealthChecker
- type HealthCheckerOptions
- type LockRenewalOptions
- type LockRenewer
- type Logger
- type ManagedSettler
- type ManagedSettlingFunc
- type ManagedSettlingHandler
- type ManagedSettlingOptions
- type Marshaller
- type MaxAttemptsRetryDecision
- type MessageBody
- type MessageSettler
- type NoOp
- type PanicHandlerOptions
- type Processor
- type ProcessorOptions
- type Receiver
- type ReceiverEx
- type ReceiverHealthChecker
- type RetryDecision
- type RetryDelayStrategy
- type SendAsBatchOptions
- type Sender
- func (d *Sender) AzSender() AzServiceBusSender
- func (d *Sender) FailOver(sender AzServiceBusSender)deprecated
- func (d *Sender) SendAsBatch(ctx context.Context, messages []*azservicebus.Message, ...) error
- func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, ...) error
- func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error
- func (d *Sender) SetAzSender(sender AzServiceBusSender)
- func (d *Sender) ToServiceBusMessage(ctx context.Context, mb MessageBody, ...) (*azservicebus.Message, error)
- type SenderHealthChecker
- type SenderOptions
- type Settlement
- type SettlementHandlerOptions
- type Settler
- type TracingHandlerOpts
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetCorrelationId ¶
func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error
SetCorrelationId sets the ServiceBus message's correlation ID to a user-specified value
func SetLogHandler ¶ added in v2.7.1
SetLogHandler allows to set a custom slog.Handler to be used by the go-shuttle logger. If handler is nil, the default slog handler will be used.
func SetLoggerFunc
deprecated
added in
v2.4.0
func SetMessageDelay ¶
func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error
SetMessageDelay schedules a message in the future
func SetMessageId ¶
func SetMessageId(messageId *string) func(msg *azservicebus.Message) error
SetMessageId sets the ServiceBus message's ID to a user-specified value
func SetMessageTTL ¶
func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error
SetMessageTTL sets the ServiceBus message's TimeToLive to a user-specified value
func SetScheduleAt ¶
func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error
SetScheduleAt schedules a message to be enqueued in the future
func SetSubject ¶ added in v2.8.0
func SetSubject(subject string) func(msg *azservicebus.Message) error
SetSubject sets the ServiceBus message's Subject property to a user-specified value
func SetTo ¶ added in v2.8.0
func SetTo(to string) func(msg *azservicebus.Message) error
SetTo sets the ServiceBus message's To property to a user-specified value
func WithReceiverSpanNameFormatter ¶ added in v2.6.0
func WithReceiverSpanNameFormatter(format func(defaultSpanName string, message *azservicebus.ReceivedMessage) string) func(t *TracingHandlerOpts)
WithReceiverSpanNameFormatter allows formatting name of the span started by the tracing handler in NewTracingHandler.
func WithSpanStartOptions ¶ added in v2.6.0
func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)
WithSpanStartOptions allows setting custom span start options for the tracing handler in NewTracingHandler.
func WithTracePropagation ¶
func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error
WithTracePropagation is a sender option to inject the trace context into the message
func WithTraceProvider ¶ added in v2.6.0
func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)
WithTraceProvider allows setting a custom trace provider for the tracing handler in NewTracingHandler.
Types ¶
type Abandon ¶
type Abandon struct {
// contains filtered or unexported fields
}
Abandon settlement will cause a message to be available again from the queue or subscription. This will increment its delivery count, and potentially cause it to be dead-lettered depending on your queue or subscription's configuration.
func (*Abandon) Settle ¶
func (a *Abandon) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type AzServiceBusSender ¶
type AzServiceBusSender interface { SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) Close(ctx context.Context) error }
AzServiceBusSender is satisfied by *azservicebus.Sender
type Complete ¶
type Complete struct {
// contains filtered or unexported fields
}
Complete settlement completes a message, deleting it from the queue or subscription.
func (*Complete) Settle ¶
func (a *Complete) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type ConstantDelayStrategy ¶
ConstantDelayStrategy delays the message retry by the given duration
type DeadLetter ¶
type DeadLetter struct {
// contains filtered or unexported fields
}
DeadLetter settlement moves the message to the dead letter queue for a queue or subscription. To process deadlettered messages, create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
func (*DeadLetter) Settle ¶
func (a *DeadLetter) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type DefaultJSONMarshaller ¶
type DefaultJSONMarshaller struct { }
DefaultJSONMarshaller is the default marshaller for JSON messages
func (*DefaultJSONMarshaller) ContentType ¶
func (j *DefaultJSONMarshaller) ContentType() string
ContentType returns the content type for the JSON marshaller
func (*DefaultJSONMarshaller) Marshal ¶
func (j *DefaultJSONMarshaller) Marshal(mb MessageBody) (*azservicebus.Message, error)
Marshal marshals the user-input struct into a JSON string and returns a new message with the JSON string as the body
func (*DefaultJSONMarshaller) Unmarshal ¶
func (j *DefaultJSONMarshaller) Unmarshal(msg *azservicebus.Message, mb MessageBody) error
Unmarshal unmarshals the message body from a JSON string into the user-input struct
type DefaultProtoMarshaller ¶
type DefaultProtoMarshaller struct { }
DefaultProtoMarshaller is the default marshaller for protobuf messages
func (*DefaultProtoMarshaller) ContentType ¶
func (p *DefaultProtoMarshaller) ContentType() string
ContentType returns teh contentType for the protobuf marshaller
func (*DefaultProtoMarshaller) Marshal ¶
func (p *DefaultProtoMarshaller) Marshal(mb MessageBody) (*azservicebus.Message, error)
Marshal marshals the user-input struct into a protobuf message and returns a new ServiceBus message with the protofbuf message as the body
func (*DefaultProtoMarshaller) Unmarshal ¶
func (p *DefaultProtoMarshaller) Unmarshal(msg *azservicebus.Message, mb MessageBody) error
Unmarshal unmarshalls the protobuf message from the ServiceBus message into the user-input struct
type Defer ¶
type Defer struct {
// contains filtered or unexported fields
}
Defer settlement will cause a message to be deferred. Deferred messages are moved to ta deferred queue. They can only be received using `Receiver.ReceiveDeferredMessages`.
func (*Defer) Settle ¶
func (a *Defer) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type Handler ¶
type Handler interface {
Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
HandlerFunc is a func to handle the message received from a subscription
func NewLockRenewalHandler
deprecated
added in
v2.4.0
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc
Deprecated: use NewRenewLockHandler NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewPanicHandler ¶
func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc
NewPanicHandler recovers panics from downstream handlers
func NewRenewLockHandler ¶
func NewRenewLockHandler(options *LockRenewalOptions, handler Handler) HandlerFunc
NewRenewLockHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewSettlementHandler ¶
func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc
NewSettlementHandler creates a middleware to use the Settlement api in the message handler implementation.
Example ¶
package main import ( "context" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-shuttle/v2" ) func main() { tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel() } func mySettlingHandler() shuttle.Settler { return func(ctx context.Context, message *azservicebus.ReceivedMessage) shuttle.Settlement { return &shuttle.Complete{} } }
func NewTracingHandler ¶
func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc
NewTracingHandler is a shuttle middleware that extracts the context from the message Application property if available, or from the existing context if not, and starts a span.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type HealthCheckable ¶ added in v2.7.1
type HealthCheckable interface {
HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error
}
HealthCheckable is an interface for performing health checks on azservicebus.Sender and azservicebus.Receiver.
type HealthChecker ¶ added in v2.7.1
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker performs periodic health checks on the Service Bus Senders and Receivers.
func NewHealthChecker ¶ added in v2.7.1
func NewHealthChecker(clients map[string]*azservicebus.Client, options *HealthCheckerOptions) *HealthChecker
NewHealthChecker creates a new HealthChecker with the provided clients. clients is a map of namespaceName name to azservicebus.Client.
func (*HealthChecker) StartPeriodicHealthCheck ¶ added in v2.7.1
func (h *HealthChecker) StartPeriodicHealthCheck(ctx context.Context, hc HealthCheckable)
StartPeriodicHealthCheck starts the periodic health check for the provided HealthCheckable. The health check will run on each client in the HealthChecker. Stops when the context is canceled.
type HealthCheckerOptions ¶ added in v2.7.1
type HealthCheckerOptions struct { // HealthCheckInterval is the time between health checks. HealthCheckInterval time.Duration // HealthCheckTimeout is the context timeout for each health check HealthCheckTimeout time.Duration }
HealthCheckerOptions configures the HealthChecker. HealthCheckInterval defaults to 1 minute if not set or set to <= 0. HealthCheckTimeout defaults to HealthChecker.interval if not set or set to 0 or set to be larger than interval.
type LockRenewalOptions ¶ added in v2.4.0
type LockRenewalOptions struct { // Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds. Interval *time.Duration // LockRenewalTimeout is the timeout value used on the context when sending RenewMessageLock() request. // Defaults to 5 seconds if not set or 0. Defaults to Lock Expiry time if set to a negative value. LockRenewalTimeout *time.Duration // CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped. // Defaults to true. CancelMessageContextOnStop *bool // MetricRecorder allows to pass a custom metric recorder for the LockRenewer. // Defaults to processor.Metric instance. MetricRecorder processor.Recorder }
LockRenewalOptions configures the lock renewal.
type LockRenewer ¶
type LockRenewer interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}
LockRenewer abstracts the servicebus receiver client to only expose lock renewal
type ManagedSettler ¶
type ManagedSettler struct {
// contains filtered or unexported fields
}
ManagedSettler is a middleware that allows to reduce the message handler signature to ManagedSettlingFunc
func NewManagedSettlingHandler ¶
func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingHandler) *ManagedSettler
NewManagedSettlingHandler allows to configure Retry decision logic and delay strategy. It also adapts the handler to let the user return an error from the handler, instead of a settlement. the settlement is inferred from the handler's return value. error -> abandon nil -> complete the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue)
Example ¶
package main import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-shuttle/v2" ) func main() { tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewRenewLockHandler(&shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{ RetryDecision: &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2}, RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second}, OnAbandoned: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) { fmt.Printf("message abandoned due to error: %s\n", err) }, OnDeadLettered: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) { fmt.Printf("message deadlettered due to error: %s\n", err) }, }, myManagedSettlementHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel() } func myManagedSettlementHandler() shuttle.ManagedSettlingHandler { count := 0 return shuttle.ManagedSettlingFunc(func(ctx context.Context, message *azservicebus.ReceivedMessage) error { count++ if count == 0 { // this will abandon the message return fmt.Errorf("this will abandon the message, and eventually move it to DLQ") } return nil // this will complete the message }) }
func (*ManagedSettler) Handle ¶
func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type ManagedSettlingFunc ¶
type ManagedSettlingFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error
ManagedSettlingFunc allows to convert a function with the signature func(context.Context, *azservicebus.ReceivedMessage) error to the ManagedSettlingHandler interface.
func (ManagedSettlingFunc) Handle ¶ added in v2.3.0
func (f ManagedSettlingFunc) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) error
type ManagedSettlingHandler ¶ added in v2.3.0
type ManagedSettlingHandler interface {
Handle(context.Context, *azservicebus.ReceivedMessage) error
}
ManagedSettlingHandler is the message Handler interface for the ManagedSettler.
type ManagedSettlingOptions ¶
type ManagedSettlingOptions struct { // Allows to override the built-in error handling logic. // OnError is called before any message settling action is taken. // the ManagedSettlingOptions struct is passed as an argument so that the configuration // like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) // RetryDecision is invoked to decide whether an error should be retried. // the default is to retry 5 times before moving the message to the deadletter. RetryDecision RetryDecision // RetryDelayStrategy is invoked when a message handling does not complete successfully // and the RetryDecision decides to retry the message. // The handler will sleep for the time calculated by the delayStrategy before Abandoning the message. RetryDelayStrategy RetryDelayStrategy // OnAbandoned is invoked when the handler returns an error. It is invoked after the message is abandoned. OnAbandoned func(context.Context, *azservicebus.ReceivedMessage, error) // OnDeadLettered is invoked after the ManagedSettling dead-letters a message. // this occurs when the RetryDecision.CanRetry implementation returns false following an error returned by the handler // It is invoked after the message is dead-lettered. OnDeadLettered func(context.Context, *azservicebus.ReceivedMessage, error) // OnCompleted is a func that is invoked when the handler does not return any error. it is invoked after the message is completed. OnCompleted func(context.Context, *azservicebus.ReceivedMessage) }
ManagedSettlingOptions allows to configure the ManagedSettling middleware
type Marshaller ¶
type Marshaller interface { Marshal(mb MessageBody) (*azservicebus.Message, error) Unmarshal(msg *azservicebus.Message, mb MessageBody) error ContentType() string }
type MaxAttemptsRetryDecision ¶
type MaxAttemptsRetryDecision struct {
MaxAttempts uint32
}
MaxAttemptsRetryDecision defines how many delivery the handler allows before explicitly moving the message to the deadletter queue. This requires the MaxDeliveryCount from the queue or subscription to be higher than the MaxAttempts property. If the queue or subscription's MaxDeliveryCount is lower than MaxAttempts, service bus will move the message to the DLQ before the handler reaches the MaxAttempts.
func (*MaxAttemptsRetryDecision) CanRetry ¶
func (d *MaxAttemptsRetryDecision) CanRetry(_ error, message *azservicebus.ReceivedMessage) bool
type MessageBody ¶
type MessageBody any
MessageBody is a type to represent that an input message body can be of any type
type MessageSettler ¶
type MessageSettler interface { AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error }
MessageSettler is passed to the handlers. it exposes the message settling functionality from the receiver needed within the handler.
type NoOp ¶
type NoOp struct { }
NoOp settlement exits the handler without taking an action, letting the message's peek lock expire before incrementing the delivery count, or moving it to the deadletter, depending on the queue or subscription's configuration
func (*NoOp) Settle ¶
func (a *NoOp) Settle(ctx context.Context, _ MessageSettler, message *azservicebus.ReceivedMessage)
type PanicHandlerOptions ¶
type PanicHandlerOptions struct { OnPanicRecovered func( ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, recovered any) }
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor encapsulates the message pump and concurrency handling of servicebus. it exposes a handler API to provides a middleware based message processing pipeline.
Example ¶
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second lockRenewalOptions := &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval} p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewRenewLockHandler(lockRenewalOptions, MyHandler(0*time.Second))), &shuttle.ProcessorOptions{ MaxConcurrency: 10, StartMaxAttempt: 5, }, ) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel()
Example (MultiProcessor) ¶
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus-1.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver1, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } client, err = azservicebus.NewClient("myservicebus-2.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver2, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } receivers := []*shuttle.ReceiverEx{ shuttle.NewReceiverEx("receiver1", receiver1), shuttle.NewReceiverEx("receiver2", receiver2), } lockRenewalInterval := 10 * time.Second lockRenewalOptions := &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval} p := shuttle.NewMultiProcessor(receivers, shuttle.NewPanicHandler(nil, shuttle.NewRenewLockHandler(lockRenewalOptions, MyHandler(0*time.Second))), &shuttle.ProcessorOptions{ MaxConcurrency: 10, StartMaxAttempt: 5, }, ) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel()
func NewMultiProcessor ¶ added in v2.7.1
func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor
NewMultiProcessor creates a new processor with a list of receivers and a handler.
func NewProcessor ¶
func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor
NewProcessor creates a new processor with the provided receiver and handler.
func (*Processor) Start ¶
Start starts processing on all the receivers of the processor and blocks until all processors are stopped or the context is canceled. It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. Returns a combined list of errors encountered during each processor start.
type ProcessorOptions ¶
type ProcessorOptions struct { // MaxConcurrency is the maximum number of concurrent messages to process at a time. MaxConcurrency int // MaxReceiveCount is the maximum number of messages to receive at a time. This value is passed to the receiver. MaxReceiveCount int // ReceiveInterval is the interval between each receive call. ReceiveInterval *time.Duration // StartMaxAttempt is the maximum number of attempts to start the processor. StartMaxAttempt int // StartRetryDelay is the delay between each start attempt. StartRetryDelayStrategy RetryDelayStrategy }
ProcessorOptions configures the processor MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. MaxReceiveCount defaults to MaxConcurrency if not set. Not setting MaxReceiveCount, or setting it to 0 or a negative value will fallback to the default. Setting it to a value greater than MaxConcurrency will also fallback to the default. ReceiveInterval defaults to 2 seconds if not set. StartMaxAttempt defaults to 1 if not set (no retries). Not setting StartMaxAttempt, or setting it to non-positive value will fallback to the default. StartRetryDelayStrategy defaults to a fixed 5-second delay if not set.
type Receiver ¶
type Receiver interface { ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) MessageSettler }
type ReceiverEx ¶ added in v2.7.1
type ReceiverEx struct {
// contains filtered or unexported fields
}
func NewReceiverEx ¶ added in v2.7.1
func NewReceiverEx(name string, sbReceiver Receiver) *ReceiverEx
type ReceiverHealthChecker ¶ added in v2.7.1
ReceiverHealthChecker performs health checks on azservicebus.Receiver.
func (*ReceiverHealthChecker) HealthCheck ¶ added in v2.7.1
func (r *ReceiverHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error
HealthCheck performs a health check on the azservicebus.Receiver by peeking a message.
type RetryDecision ¶
type RetryDecision interface { // CanRetry inspects the error returned from the message handler, and the message itself to decide if it should be retried or not. CanRetry(err error, message *azservicebus.ReceivedMessage) bool }
RetryDecision allows to provide custom retry decision.
type RetryDelayStrategy ¶
RetryDelayStrategy can be implemented to provide custom delay retry strategies.
type SendAsBatchOptions ¶ added in v2.9.0
type SendAsBatchOptions struct { // AllowMultipleBatch when true, allows splitting large message arrays into multiple batches. // When false, behaves like the original SendMessageBatch method. // Default: false AllowMultipleBatch bool }
SendAsBatchOptions contains options for the SendAsBatch method
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message
func NewSender ¶
func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender
NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue
func (*Sender) AzSender ¶ added in v2.2.0
func (d *Sender) AzSender() AzServiceBusSender
AzSender returns the underlying azservicebus.Sender instance.
func (*Sender) FailOver
deprecated
added in
v2.7.1
func (d *Sender) FailOver(sender AzServiceBusSender)
Deprecated: use SetAzSender. FailOver sets the underlying azservicebus.Sender instance to the provided one.
func (*Sender) SendAsBatch ¶ added in v2.9.0
func (d *Sender) SendAsBatch(ctx context.Context, messages []*azservicebus.Message, options *SendAsBatchOptions) error
SendAsBatch sends the array of azservicebus messages as batches. When options.AllowMultipleBatch is true, large message arrays are split into multiple batches. When options.AllowMultipleBatch is false, behaves like SendMessageBatch (fails if messages don't fit in single batch).
func (*Sender) SendMessage ¶
func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error
SendMessage sends a payload on the bus. the MessageBody is marshalled and set as the message body.
func (*Sender) SendMessageBatch ¶ added in v2.2.0
SendMessageBatch sends the array of azservicebus messages as a batch. Deprecated: Use SendAsBatch instead. This method will be removed in a future version.
func (*Sender) SetAzSender ¶ added in v2.7.2
func (d *Sender) SetAzSender(sender AzServiceBusSender)
SetAzSender sets the underlying azservicebus.Sender instance to the provided one. All ongoing send operations will continue to use the old sender instance, while new send operations will use the new sender instance.
func (*Sender) ToServiceBusMessage ¶ added in v2.2.0
func (d *Sender) ToServiceBusMessage( ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error)
ToServiceBusMessage transform a MessageBody into an azservicebus.Message. It marshals the body using the sender's configured marshaller, and set the bytes as the message.Body. the sender's configured options are applied to the azservicebus.Message before returning it.
type SenderHealthChecker ¶ added in v2.7.1
type SenderHealthChecker struct {
EntityName string
}
SenderHealthChecker performs health checks on azservicebus.Sender.
func (*SenderHealthChecker) HealthCheck ¶ added in v2.7.1
func (s *SenderHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error
HealthCheck performs a health check on azservicebus.Sender by creating a new message batch.
type SenderOptions ¶
type SenderOptions struct { // Marshaller will be used to marshall the messageBody to the azservicebus.Message Body property // defaults to DefaultJSONMarshaller Marshaller Marshaller // EnableTracingPropagation automatically applies WithTracePropagation option on all message sent through this sender EnableTracingPropagation bool // SendTimeout is the timeout value used on the context that sends messages // Defaults to 30 seconds if not set or 0 // Disabled when set to a negative value SendTimeout time.Duration }
type Settlement ¶
type Settlement interface {
Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp
type SettlementHandlerOptions ¶
type SettlementHandlerOptions struct { // OnNilSettlement is a func that allows to handle cases where the downstream handler returns nil. // the default behavior is to panic. OnNilSettlement func() Settlement }
SettlementHandlerOptions allows to configure the SettleHandler
type Settler ¶
type Settler func(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement
func (Settler) Handle ¶
func (s Settler) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement
type TracingHandlerOpts ¶ added in v2.6.0
type TracingHandlerOpts struct {
// contains filtered or unexported fields
}