Documentation
¶
Index ¶
- Variables
- type MessageHandler
- type Metrics
- type MultiSubscriber
- func (m *MultiSubscriber) IsHealthy() bool
- func (m *MultiSubscriber) Name() string
- func (m *MultiSubscriber) Start(ctx context.Context) error
- func (m *MultiSubscriber) Stop() error
- func (m *MultiSubscriber) Subscribe(topic string, messageHandler MessageHandler) error
- func (m *MultiSubscriber) SubscriberCount() int
- type RetryConfig
- type Subscriber
- type SubscriberConfig
- type SubscriberOption
- func WithBlockTime(d time.Duration) SubscriberOption
- func WithClaimInterval(d time.Duration) SubscriberOption
- func WithExecTimeout(d time.Duration) SubscriberOption
- func WithMaxIdleTime(d time.Duration) SubscriberOption
- func WithMetrics(m Metrics) SubscriberOption
- func WithRetry(maxRetries int, retryDelay time.Duration, dlqTopic string) SubscriberOption
- func WithShutdownTimeout(d time.Duration) SubscriberOption
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrEmptyTopic = errors.New("multi_subscriber: topic cannot be empty") ErrSubscriberCreation = errors.New("multi_subscriber: failed to create subscriber") ErrClosingSubscribers = errors.New("multi_subscriber: errors while closing subscribers") ErrNoSubscribers = errors.New("multi_subscriber: no subscribers registered") ErrMultiSubscriberAlreadyRunning = errors.New("multi_subscriber: already running") )
View Source
var ( ErrNilRedisClient = errors.New("subscriber: redis client cannot be nil") ErrEmptyConsumerGroup = errors.New("subscriber: consumer group name cannot be empty") ErrEmptyTopicName = errors.New("subscriber: topic name cannot be empty") ErrNilMessageHandler = errors.New("subscriber: message handler cannot be nil") ErrMessageHandlerNotDefined = errors.New("subscriber: message handler is not defined") ErrMaxRetriesExceeded = errors.New("subscriber: max retries exceeded") ErrExecTimeout = errors.New("subscriber: message handler execution timed out") ErrAlreadyRunning = errors.New("subscriber: already running") )
Functions ¶
This section is empty.
Types ¶
type MultiSubscriber ¶
type MultiSubscriber struct {
// contains filtered or unexported fields
}
func NewMultiSubscriber ¶
func NewMultiSubscriber( name string, redisClient goredis.UniversalClient, consumerGroup string, opts ...SubscriberOption, ) *MultiSubscriber
func (*MultiSubscriber) IsHealthy ¶
func (m *MultiSubscriber) IsHealthy() bool
func (*MultiSubscriber) Name ¶
func (m *MultiSubscriber) Name() string
func (*MultiSubscriber) Stop ¶
func (m *MultiSubscriber) Stop() error
func (*MultiSubscriber) Subscribe ¶
func (m *MultiSubscriber) Subscribe(topic string, messageHandler MessageHandler) error
func (*MultiSubscriber) SubscriberCount ¶
func (m *MultiSubscriber) SubscriberCount() int
type RetryConfig ¶
type Subscriber ¶
type Subscriber struct {
*redisstream.Subscriber
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( redisClient goredis.UniversalClient, consumerGroup, topic string, messageHandler MessageHandler, opts ...SubscriberOption, ) (*Subscriber, error)
func (*Subscriber) ConsumerGroup ¶
func (s *Subscriber) ConsumerGroup() string
func (*Subscriber) IsHealthy ¶
func (s *Subscriber) IsHealthy() bool
func (*Subscriber) Name ¶
func (s *Subscriber) Name() string
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop() error
func (*Subscriber) Topic ¶
func (s *Subscriber) Topic() string
type SubscriberConfig ¶
type SubscriberConfig struct {
BlockTime time.Duration // How long to block waiting for messages
ClaimInterval time.Duration // Interval for claiming pending messages
MaxIdleTime time.Duration // Max idle time before message can be claimed
ShutdownTimeout time.Duration // Maximum time to wait for graceful shutdown before force closing
ExecTimeout time.Duration // Maximum time allowed for message handler execution
Metrics Metrics
Retry *RetryConfig
}
type SubscriberOption ¶
type SubscriberOption func(*SubscriberConfig)
func WithBlockTime ¶
func WithBlockTime(d time.Duration) SubscriberOption
func WithClaimInterval ¶
func WithClaimInterval(d time.Duration) SubscriberOption
func WithExecTimeout ¶
func WithExecTimeout(d time.Duration) SubscriberOption
func WithMaxIdleTime ¶
func WithMaxIdleTime(d time.Duration) SubscriberOption
func WithMetrics ¶
func WithMetrics(m Metrics) SubscriberOption
func WithRetry ¶
func WithRetry(maxRetries int, retryDelay time.Duration, dlqTopic string) SubscriberOption
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) SubscriberOption
Click to show internal directories.
Click to hide internal directories.