Documentation
¶
Overview ¶
Package redissub provides a Redis Stream subscriber with automatic retry, timeout, and lifecycle management.
This package wraps Watermill's Redis Stream subscriber to consume events from Redis Streams with configurable retry logic, execution timeouts, and graceful shutdown. It supports consumer groups for distributed message processing.
Basic usage:
handler := func(ctx context.Context, msg *message.Message) error {
var event MyEvent
if err := json.Unmarshal(msg.Payload, &event); err != nil {
return err
}
// process event
return nil
}
subscriber, err := redissub.New(redisClient, "my-consumer-group", &redissub.Config{
ExecTimeout: 30 * time.Second,
MaxRetries: 3,
ShutdownTimeout: 20 * time.Second,
})
if err != nil {
log.Fatal(err)
}
subscriber.Subscribe(ctx, "events-topic", handler)
if err != nil {
log.Fatal(err)
}
Messages are acknowledged only after successful processing. Failed messages are retried up to MaxRetries times before being nacked. Timeouts on message handlers are enforced via the ExecTimeout configuration.
Index ¶
- Variables
- type MessageHandler
- type Metrics
- type MultiSubscriber
- func (m *MultiSubscriber) IsHealthy() bool
- func (m *MultiSubscriber) Name() string
- func (m *MultiSubscriber) Start(ctx context.Context) error
- func (m *MultiSubscriber) Stop() error
- func (m *MultiSubscriber) Subscribe(topic string, messageHandler MessageHandler) error
- func (m *MultiSubscriber) SubscriberCount() int
- type RetryConfig
- type Subscriber
- type SubscriberConfig
- type SubscriberOption
- func WithBlockTime(d time.Duration) SubscriberOption
- func WithClaimInterval(d time.Duration) SubscriberOption
- func WithExecTimeout(d time.Duration) SubscriberOption
- func WithMaxIdleTime(d time.Duration) SubscriberOption
- func WithMetrics(m Metrics) SubscriberOption
- func WithRetry(maxRetries int, retryDelay time.Duration, dlqTopic string) SubscriberOption
- func WithShutdownTimeout(d time.Duration) SubscriberOption
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrEmptyTopic = errors.New("multi_subscriber: topic cannot be empty") ErrSubscriberCreation = errors.New("multi_subscriber: failed to create subscriber") ErrClosingSubscribers = errors.New("multi_subscriber: errors while closing subscribers") ErrNoSubscribers = errors.New("multi_subscriber: no subscribers registered") ErrMultiSubscriberAlreadyRunning = errors.New("multi_subscriber: already running") )
View Source
var ( ErrNilRedisClient = errors.New("subscriber: redis client cannot be nil") ErrEmptyConsumerGroup = errors.New("subscriber: consumer group name cannot be empty") ErrEmptyTopicName = errors.New("subscriber: topic name cannot be empty") ErrNilMessageHandler = errors.New("subscriber: message handler cannot be nil") ErrMessageHandlerNotDefined = errors.New("subscriber: message handler is not defined") ErrMaxRetriesExceeded = errors.New("subscriber: max retries exceeded") ErrExecTimeout = errors.New("subscriber: message handler execution timed out") ErrAlreadyRunning = errors.New("subscriber: already running") )
Functions ¶
This section is empty.
Types ¶
type MultiSubscriber ¶
type MultiSubscriber struct {
// contains filtered or unexported fields
}
func NewMultiSubscriber ¶
func NewMultiSubscriber( name string, redisClient goredis.UniversalClient, consumerGroup string, opts ...SubscriberOption, ) *MultiSubscriber
func (*MultiSubscriber) IsHealthy ¶
func (m *MultiSubscriber) IsHealthy() bool
func (*MultiSubscriber) Name ¶
func (m *MultiSubscriber) Name() string
func (*MultiSubscriber) Stop ¶
func (m *MultiSubscriber) Stop() error
func (*MultiSubscriber) Subscribe ¶
func (m *MultiSubscriber) Subscribe(topic string, messageHandler MessageHandler) error
func (*MultiSubscriber) SubscriberCount ¶
func (m *MultiSubscriber) SubscriberCount() int
type RetryConfig ¶
type Subscriber ¶
type Subscriber struct {
*redisstream.Subscriber
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( redisClient goredis.UniversalClient, consumerGroup, topic string, messageHandler MessageHandler, opts ...SubscriberOption, ) (*Subscriber, error)
func (*Subscriber) ConsumerGroup ¶
func (s *Subscriber) ConsumerGroup() string
func (*Subscriber) IsHealthy ¶
func (s *Subscriber) IsHealthy() bool
func (*Subscriber) Name ¶
func (s *Subscriber) Name() string
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop() error
func (*Subscriber) Topic ¶
func (s *Subscriber) Topic() string
type SubscriberConfig ¶
type SubscriberConfig struct {
BlockTime time.Duration // How long to block waiting for messages
ClaimInterval time.Duration // Interval for claiming pending messages
MaxIdleTime time.Duration // Max idle time before message can be claimed
ShutdownTimeout time.Duration // Maximum time to wait for graceful shutdown before force closing
ExecTimeout time.Duration // Maximum time allowed for message handler execution
Metrics Metrics
Retry *RetryConfig
}
type SubscriberOption ¶
type SubscriberOption func(*SubscriberConfig)
func WithBlockTime ¶
func WithBlockTime(d time.Duration) SubscriberOption
func WithClaimInterval ¶
func WithClaimInterval(d time.Duration) SubscriberOption
func WithExecTimeout ¶
func WithExecTimeout(d time.Duration) SubscriberOption
func WithMaxIdleTime ¶
func WithMaxIdleTime(d time.Duration) SubscriberOption
func WithMetrics ¶
func WithMetrics(m Metrics) SubscriberOption
func WithRetry ¶
func WithRetry(maxRetries int, retryDelay time.Duration, dlqTopic string) SubscriberOption
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) SubscriberOption
Click to show internal directories.
Click to hide internal directories.