subscriber

package
v0.0.0-...-7593c98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("subscriber is closed")

Functions

This section is empty.

Types

type AckNackFn

type AckNackFn[T any] func(ctx context.Context, msgImpl T) error

type ChannelSubscriber

type ChannelSubscriber struct {
	// contains filtered or unexported fields
}

func NewChannelSubscriber

func NewChannelSubscriber(inputCh chan *message.Message, maxConcurrent int) *ChannelSubscriber

func (*ChannelSubscriber) Close

func (c *ChannelSubscriber) Close() error

func (*ChannelSubscriber) Subscribe

func (c *ChannelSubscriber) Subscribe(ctx context.Context) (<-chan *message.Message, error)

type HandlerBuilder

type HandlerBuilder struct {
	// contains filtered or unexported fields
}

func (*HandlerBuilder) AddMiddleware

func (hb *HandlerBuilder) AddMiddleware(m middleware.Middleware) *HandlerBuilder

func (*HandlerBuilder) Handle

func (hb *HandlerBuilder) Handle(handler message.HandlerFunc)

type MessageProcessor

type MessageProcessor[T any] struct {
	Ack  AckNackFn[T]
	Nack AckNackFn[T]

	MessageUnmarshall func(ctx context.Context, msgImpl T) *message.Message

	ProcessingTimeout   time.Duration
	OnProcessingTimeout func(ctx context.Context, msgImpl T)

	OnAckError  OnAckNackErrorFn[T]
	OnNackError OnAckNackErrorFn[T]
}

MessageProcessor is a helper struct to facilitate the different implementations of Subscribers. You just need to provide the different functions to act on the underlying message of your implementation.

func (MessageProcessor[T]) ProcessMessage

func (p MessageProcessor[T]) ProcessMessage(ctx context.Context, msgImpl T, outputCh chan *message.Message)

type MessageSubscriber

type MessageSubscriber[T any] struct {
	// InitializeFn is a function that will be called when the subscriber is initialized. The context passed is a
	// cancel enabled context that will be cancelled when the subscriber is closed.
	InitializeFn func(ctx context.Context, outputCh chan *message.Message) error
	// contains filtered or unexported fields
}

MessageSubscriber is a helper struct to facilitate the different implementations of Subscribers.

func (*MessageSubscriber[T]) Close

func (p *MessageSubscriber[T]) Close() error

func (*MessageSubscriber[T]) Subscribe

func (p *MessageSubscriber[T]) Subscribe(ctx context.Context) (chan *message.Message, error)

type OnAckNackErrorFn

type OnAckNackErrorFn[T any] func(ctx context.Context, msgImpl T, ack bool, ackNackFn AckNackFn[T], err error)

OnAckNackErrorFn is an error handler for when Ack or Nack fails. The `ack` bool if the error occured for Ack (true) or Nack (false). The `ackNackFn` is the internal function that performs the Ack or Nack. It's provided to be able to retry the operation. `err` is the original error that caused the Ack or Nack to fail.

func BackoffRetryOnAckNackError

func BackoffRetryOnAckNackError[T any](
	retryAttempts uint,
	minDelay, maxDelay time.Duration,
	onRetryFail OnAckNackErrorFn[T]) OnAckNackErrorFn[T]

BackoffRetryOnAckNackError is a helper function to create an OnAckErrorFn that retries the AckNackFn on error. You need to provide a fallback method `onRetryFail` in case the retries fails

type Option

type Option func(*SubscriptionRouterOptions)

func WithAckOnUnknownRoute

func WithAckOnUnknownRoute() Option

WithAckOnUnknownRoute will ack messages that do not have a route handler defined, if set to true. This is mutually exclusive with the default route handler. If you have a default route handler, this option has no effect.

type RoutingKey

type RoutingKey string

type RoutingKeyGenerator

type RoutingKeyGenerator = func(msg *message.Message) RoutingKey

func RouteFromMetadataKey

func RouteFromMetadataKey(metadataKey string) RoutingKeyGenerator

type Subscriber

type Subscriber interface {
	// Subscribe will return a channel of messages. The channel will be closed when the subscriber is closed.
	// Be advised that when the channel is closed you will receive 'nil' if you are currently ranging over the channel.
	Subscribe(ctx context.Context) (<-chan *message.Message, error)
	Close() error
}

type SubscriptionEngine

type SubscriptionEngine struct {
	// contains filtered or unexported fields
}

func NewSubscriptionEngine

func NewSubscriptionEngine(sub Subscriber, router SubscriptionRouter) *SubscriptionEngine

func (*SubscriptionEngine) AddMiddleware

func (*SubscriptionEngine) Start

func (e *SubscriptionEngine) Start(ctx context.Context) error

type SubscriptionRouter

type SubscriptionRouter struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(generator RoutingKeyGenerator, opts ...Option) *SubscriptionRouter

func (*SubscriptionRouter) AddDefaultHandler

func (d *SubscriptionRouter) AddDefaultHandler(handler message.HandlerFunc)

func (*SubscriptionRouter) AddHandler

func (d *SubscriptionRouter) AddHandler(value string) *HandlerBuilder

func (*SubscriptionRouter) HandlerFunc

func (d *SubscriptionRouter) HandlerFunc() message.HandlerFunc

type SubscriptionRouterOptions

type SubscriptionRouterOptions struct {
	// contains filtered or unexported fields
}

type TypedMessageHandler

type TypedMessageHandler[T any] interface {
	OnMessage(ctx context.Context, event *T) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL