impl

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: LGPL-2.1, LGPL-2.1-or-later Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRedisStreamClient

func NewRedisStreamClient(redisClient redis.UniversalClient, serviceName string,
	opts ...RecoverableRedisOption) (types.RedisStreamClient, error)

NewRedisStreamClient creates a new RedisStreamClient

This function creates a new RedisStreamClient with the given redis client and stream name Stream is the name of the stream to read from where actual data is transmitted

Types

type RecoverableRedisOption added in v0.1.5

type RecoverableRedisOption func(*RecoverableRedisStreamClient) error

func WithForceConfigOverride added in v0.3.0

func WithForceConfigOverride() RecoverableRedisOption

WithForceConfigOverride when set overrides the redis configuration for key space notifications

func WithKspChanSize added in v0.3.0

func WithKspChanSize(size int) RecoverableRedisOption

WithKspChanSize sets the size of the ksp channel which corresponds to number of pub sub notifications that we can receive from redis

func WithKspChanTimeout added in v0.3.0

func WithKspChanTimeout(timeout time.Duration) RecoverableRedisOption

WithKspChanTimeout is the duration after which an outstanding pub sub message from redis pub sub is dropped from channel

func WithLBSIdleTime added in v0.1.5

func WithLBSIdleTime(idleTime time.Duration) RecoverableRedisOption

WithLBSIdleTime sets the time after which a message is considered idle and will be recovered

func WithLBSRecoveryCount added in v0.1.5

func WithLBSRecoveryCount(count int) RecoverableRedisOption

WithLBSRecoveryCount sets the number of messages to fetch at a time during recovery

func WithLogger added in v0.3.5

func WithLogger(logger *slog.Logger) RecoverableRedisOption

WithLogger allows clients to provide their own logger implementation based on slog.Logger

func WithOutputChanSize added in v0.3.2

func WithOutputChanSize(size int) RecoverableRedisOption

WithOutputChanSize lets the clients set the outputChanSize where different notifications are sent

func WithRetryConfig added in v0.3.4

func WithRetryConfig(config RetryConfig) RecoverableRedisOption

WithRetryConfig configures retry-related settings

type RecoverableRedisStreamClient

type RecoverableRedisStreamClient struct {
	// contains filtered or unexported fields
}

RecoverableRedisStreamClient is an implementation of the RedisStreamClient interface

func (*RecoverableRedisStreamClient) Claim

Claim claims pending messages from a stream

func (*RecoverableRedisStreamClient) Done

Done marks the end of processing for a client

Note that done is called when the client is shutting down and is not expected to be called again It cleans up all the streams handled by the client To cleanup a specific stream, use DoneStream

func (*RecoverableRedisStreamClient) DoneStream added in v0.1.5

func (r *RecoverableRedisStreamClient) DoneStream(ctx context.Context, dataStreamName string) error

DoneStream marks end of processing for a particular stream

This function is used to mark the end of processing for a particular stream It unlocks the stream, acknowledges the message and deletes the message from the stream.

func (*RecoverableRedisStreamClient) ID

ID returns the consumer name that uniquely identifies the consumer

func (*RecoverableRedisStreamClient) Init

Init initializes the RedisStreamClient

This function initializes the RedisStreamClient by enabling keyspace notifications for expired events, subscribing to expired events, and starting a blocking read on the LBS stream Returns a channel to read messages from the LBS stream. The client should read from this channel and process the messages.

type RetryConfig added in v0.3.4

type RetryConfig struct {
	// MaxRetries is the maximum number of retry attempts
	// -1 => unlimited retries
	// 0 => no retries, fail immediately
	// >0 => specific number of retry attempts
	MaxRetries int

	// InitialRetryDelay is the initial delay before the first retry attempt
	InitialRetryDelay time.Duration

	// MaxRetryDelay is the maximum delay between retries (exponential backoff cap)
	MaxRetryDelay time.Duration
}

RetryConfig holds all retry-related configuration

func DefaultRetryConfig added in v0.3.4

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns the default retry configuration

func (RetryConfig) Validate added in v0.3.4

func (rc RetryConfig) Validate() error

Validate checks if the retry configuration is valid

type StreamLocksInfo added in v0.2.0

type StreamLocksInfo struct {
	LBSInfo        notifs.LBSInfo
	Mutex          *redsync.Mutex
	AdditionalInfo map[string]any
}

StreamLocksInfo holds information needed to operation with data streams and their management for synchronization

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL