redissub

package
v1.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTopic                    = errors.New("multi_subscriber: topic cannot be empty")
	ErrSubscriberCreation            = errors.New("multi_subscriber: failed to create subscriber")
	ErrClosingSubscribers            = errors.New("multi_subscriber: errors while closing subscribers")
	ErrNoSubscribers                 = errors.New("multi_subscriber: no subscribers registered")
	ErrMultiSubscriberAlreadyRunning = errors.New("multi_subscriber: already running")
)
View Source
var (
	ErrNilRedisClient           = errors.New("subscriber: redis client cannot be nil")
	ErrEmptyConsumerGroup       = errors.New("subscriber: consumer group name cannot be empty")
	ErrEmptyTopicName           = errors.New("subscriber: topic name cannot be empty")
	ErrNilMessageHandler        = errors.New("subscriber: message handler cannot be nil")
	ErrMessageHandlerNotDefined = errors.New("subscriber: message handler is not defined")
	ErrMaxRetriesExceeded       = errors.New("subscriber: max retries exceeded")
	ErrExecTimeout              = errors.New("subscriber: message handler execution timed out")
	ErrAlreadyRunning           = errors.New("subscriber: already running")
)

Functions

This section is empty.

Types

type MessageHandler

type MessageHandler func(ctx context.Context, payload message.Payload) error

type Metrics

type Metrics interface {
	MessageReceived(topic string)
	MessageProcessed(topic string, duration time.Duration, err error)
	MessageAcked(topic string)
	MessageNacked(topic string)
	MessageSentToDLQ(topic string)
}

type MultiSubscriber

type MultiSubscriber struct {
	// contains filtered or unexported fields
}

func NewMultiSubscriber

func NewMultiSubscriber(
	name string,
	redisClient goredis.UniversalClient,
	consumerGroup string,
	opts ...SubscriberOption,
) *MultiSubscriber

func (*MultiSubscriber) IsHealthy

func (m *MultiSubscriber) IsHealthy() bool

func (*MultiSubscriber) Name

func (m *MultiSubscriber) Name() string

func (*MultiSubscriber) Start

func (m *MultiSubscriber) Start(ctx context.Context) error

func (*MultiSubscriber) Stop

func (m *MultiSubscriber) Stop() error

func (*MultiSubscriber) Subscribe

func (m *MultiSubscriber) Subscribe(topic string, messageHandler MessageHandler) error

func (*MultiSubscriber) SubscriberCount

func (m *MultiSubscriber) SubscriberCount() int

type RetryConfig

type RetryConfig struct {
	MaxRetries int           // Maximum number of retries (0 = no retries)
	RetryDelay time.Duration // Delay between retries
	DLQTopic   string        // Dead letter queue topic (empty = no DLQ)
}

type Subscriber

type Subscriber struct {
	*redisstream.Subscriber
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(
	redisClient goredis.UniversalClient,
	consumerGroup,
	topic string,
	messageHandler MessageHandler,
	opts ...SubscriberOption,
) (*Subscriber, error)

func (*Subscriber) ConsumerGroup

func (s *Subscriber) ConsumerGroup() string

func (*Subscriber) IsHealthy

func (s *Subscriber) IsHealthy() bool

func (*Subscriber) Name

func (s *Subscriber) Name() string

func (*Subscriber) Start

func (s *Subscriber) Start(ctx context.Context) error

func (*Subscriber) Stop

func (s *Subscriber) Stop() error

func (*Subscriber) Topic

func (s *Subscriber) Topic() string

type SubscriberConfig

type SubscriberConfig struct {
	BlockTime       time.Duration // How long to block waiting for messages
	ClaimInterval   time.Duration // Interval for claiming pending messages
	MaxIdleTime     time.Duration // Max idle time before message can be claimed
	ShutdownTimeout time.Duration // Maximum time to wait for graceful shutdown before force closing
	ExecTimeout     time.Duration // Maximum time allowed for message handler execution
	Metrics         Metrics
	Retry           *RetryConfig
}

type SubscriberOption

type SubscriberOption func(*SubscriberConfig)

func WithBlockTime

func WithBlockTime(d time.Duration) SubscriberOption

func WithClaimInterval

func WithClaimInterval(d time.Duration) SubscriberOption

func WithExecTimeout

func WithExecTimeout(d time.Duration) SubscriberOption

func WithMaxIdleTime

func WithMaxIdleTime(d time.Duration) SubscriberOption

func WithMetrics

func WithMetrics(m Metrics) SubscriberOption

func WithRetry

func WithRetry(maxRetries int, retryDelay time.Duration, dlqTopic string) SubscriberOption

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) SubscriberOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL