Documentation
¶
Index ¶
- Variables
- type AckNackFn
- type ChannelSubscriber
- type HandlerBuilder
- type MessageProcessor
- type MessageSubscriber
- type OnAckNackErrorFn
- type Option
- type RoutingKey
- type RoutingKeyGenerator
- type Subscriber
- type SubscriptionEngine
- type SubscriptionRouter
- type SubscriptionRouterOptions
- type TypedMessageHandler
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("subscriber is closed")
Functions ¶
This section is empty.
Types ¶
type ChannelSubscriber ¶
type ChannelSubscriber struct {
// contains filtered or unexported fields
}
func NewChannelSubscriber ¶
func NewChannelSubscriber(inputCh chan *message.Message, maxConcurrent int) *ChannelSubscriber
func (*ChannelSubscriber) Close ¶
func (c *ChannelSubscriber) Close() error
type HandlerBuilder ¶
type HandlerBuilder struct {
// contains filtered or unexported fields
}
func (*HandlerBuilder) AddMiddleware ¶
func (hb *HandlerBuilder) AddMiddleware(m middleware.Middleware) *HandlerBuilder
func (*HandlerBuilder) Handle ¶
func (hb *HandlerBuilder) Handle(handler message.HandlerFunc)
type MessageProcessor ¶
type MessageProcessor[T any] struct { Ack AckNackFn[T] Nack AckNackFn[T] MessageUnmarshall func(ctx context.Context, msgImpl T) *message.Message ProcessingTimeout time.Duration OnProcessingTimeout func(ctx context.Context, msgImpl T) OnAckError OnAckNackErrorFn[T] OnNackError OnAckNackErrorFn[T] }
MessageProcessor is a helper struct to facilitate the different implementations of Subscribers. You just need to provide the different functions to act on the underlying message of your implementation.
func (MessageProcessor[T]) ProcessMessage ¶
func (p MessageProcessor[T]) ProcessMessage(ctx context.Context, msgImpl T, outputCh chan *message.Message)
type MessageSubscriber ¶
type MessageSubscriber[T any] struct { // InitializeFn is a function that will be called when the subscriber is initialized. The context passed is a // cancel enabled context that will be cancelled when the subscriber is closed. InitializeFn func(ctx context.Context, outputCh chan *message.Message) error // contains filtered or unexported fields }
MessageSubscriber is a helper struct to facilitate the different implementations of Subscribers.
func (*MessageSubscriber[T]) Close ¶
func (p *MessageSubscriber[T]) Close() error
type OnAckNackErrorFn ¶
type OnAckNackErrorFn[T any] func(ctx context.Context, msgImpl T, ack bool, ackNackFn AckNackFn[T], err error)
OnAckNackErrorFn is an error handler for when Ack or Nack fails. The `ack` bool if the error occured for Ack (true) or Nack (false). The `ackNackFn` is the internal function that performs the Ack or Nack. It's provided to be able to retry the operation. `err` is the original error that caused the Ack or Nack to fail.
func BackoffRetryOnAckNackError ¶
func BackoffRetryOnAckNackError[T any]( retryAttempts uint, minDelay, maxDelay time.Duration, onRetryFail OnAckNackErrorFn[T]) OnAckNackErrorFn[T]
BackoffRetryOnAckNackError is a helper function to create an OnAckErrorFn that retries the AckNackFn on error. You need to provide a fallback method `onRetryFail` in case the retries fails
type Option ¶
type Option func(*SubscriptionRouterOptions)
func WithAckOnUnknownRoute ¶
func WithAckOnUnknownRoute() Option
WithAckOnUnknownRoute will ack messages that do not have a route handler defined, if set to true. This is mutually exclusive with the default route handler. If you have a default route handler, this option has no effect.
type RoutingKey ¶
type RoutingKey string
type RoutingKeyGenerator ¶
type RoutingKeyGenerator = func(msg *message.Message) RoutingKey
func RouteFromMetadataKey ¶
func RouteFromMetadataKey(metadataKey string) RoutingKeyGenerator
type Subscriber ¶
type Subscriber interface {
// Subscribe will return a channel of messages. The channel will be closed when the subscriber is closed.
// Be advised that when the channel is closed you will receive 'nil' if you are currently ranging over the channel.
Subscribe(ctx context.Context) (<-chan *message.Message, error)
Close() error
}
type SubscriptionEngine ¶
type SubscriptionEngine struct {
// contains filtered or unexported fields
}
func NewSubscriptionEngine ¶
func NewSubscriptionEngine(sub Subscriber, router SubscriptionRouter) *SubscriptionEngine
func (*SubscriptionEngine) AddMiddleware ¶
func (p *SubscriptionEngine) AddMiddleware(m middleware.Middleware) *SubscriptionEngine
type SubscriptionRouter ¶
type SubscriptionRouter struct {
// contains filtered or unexported fields
}
func NewRouter ¶
func NewRouter(generator RoutingKeyGenerator, opts ...Option) *SubscriptionRouter
func (*SubscriptionRouter) AddDefaultHandler ¶
func (d *SubscriptionRouter) AddDefaultHandler(handler message.HandlerFunc)
func (*SubscriptionRouter) AddHandler ¶
func (d *SubscriptionRouter) AddHandler(value string) *HandlerBuilder
func (*SubscriptionRouter) HandlerFunc ¶
func (d *SubscriptionRouter) HandlerFunc() message.HandlerFunc
type SubscriptionRouterOptions ¶
type SubscriptionRouterOptions struct {
// contains filtered or unexported fields
}