Documentation
¶
Index ¶
- func NewRedisStreamClient(redisClient redis.UniversalClient, serviceName string, ...) (types.RedisStreamClient, error)
- type RecoverableRedisOption
- func WithForceConfigOverride() RecoverableRedisOption
- func WithKspChanSize(size int) RecoverableRedisOption
- func WithKspChanTimeout(timeout time.Duration) RecoverableRedisOption
- func WithLBSIdleTime(idleTime time.Duration) RecoverableRedisOption
- func WithLBSRecoveryCount(count int) RecoverableRedisOption
- func WithLogger(logger *slog.Logger) RecoverableRedisOption
- func WithOutputChanSize(size int) RecoverableRedisOption
- func WithRetryConfig(config RetryConfig) RecoverableRedisOption
- type RecoverableRedisStreamClient
- func (r *RecoverableRedisStreamClient) Claim(ctx context.Context, lbsInfo notifs.LBSInfo) error
- func (r *RecoverableRedisStreamClient) Done(ctx context.Context) error
- func (r *RecoverableRedisStreamClient) DoneStream(ctx context.Context, dataStreamName string) error
- func (r *RecoverableRedisStreamClient) ID() string
- func (r *RecoverableRedisStreamClient) Init(ctx context.Context) (<-chan notifs.RecoverableRedisNotification, error)
- type RetryConfig
- type StreamLocksInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRedisStreamClient ¶
func NewRedisStreamClient(redisClient redis.UniversalClient, serviceName string, opts ...RecoverableRedisOption) (types.RedisStreamClient, error)
NewRedisStreamClient creates a new RedisStreamClient
This function creates a new RedisStreamClient with the given redis client and stream name Stream is the name of the stream to read from where actual data is transmitted
Types ¶
type RecoverableRedisOption ¶ added in v0.1.5
type RecoverableRedisOption func(*RecoverableRedisStreamClient) error
func WithForceConfigOverride ¶ added in v0.3.0
func WithForceConfigOverride() RecoverableRedisOption
WithForceConfigOverride when set overrides the redis configuration for key space notifications
func WithKspChanSize ¶ added in v0.3.0
func WithKspChanSize(size int) RecoverableRedisOption
WithKspChanSize sets the size of the ksp channel which corresponds to number of pub sub notifications that we can receive from redis
func WithKspChanTimeout ¶ added in v0.3.0
func WithKspChanTimeout(timeout time.Duration) RecoverableRedisOption
WithKspChanTimeout is the duration after which an outstanding pub sub message from redis pub sub is dropped from channel
func WithLBSIdleTime ¶ added in v0.1.5
func WithLBSIdleTime(idleTime time.Duration) RecoverableRedisOption
WithLBSIdleTime sets the time after which a message is considered idle and will be recovered
func WithLBSRecoveryCount ¶ added in v0.1.5
func WithLBSRecoveryCount(count int) RecoverableRedisOption
WithLBSRecoveryCount sets the number of messages to fetch at a time during recovery
func WithLogger ¶ added in v0.3.5
func WithLogger(logger *slog.Logger) RecoverableRedisOption
WithLogger allows clients to provide their own logger implementation based on slog.Logger
func WithOutputChanSize ¶ added in v0.3.2
func WithOutputChanSize(size int) RecoverableRedisOption
WithOutputChanSize lets the clients set the outputChanSize where different notifications are sent
func WithRetryConfig ¶ added in v0.3.4
func WithRetryConfig(config RetryConfig) RecoverableRedisOption
WithRetryConfig configures retry-related settings
type RecoverableRedisStreamClient ¶
type RecoverableRedisStreamClient struct {
// contains filtered or unexported fields
}
RecoverableRedisStreamClient is an implementation of the RedisStreamClient interface
func (*RecoverableRedisStreamClient) Done ¶
func (r *RecoverableRedisStreamClient) Done(ctx context.Context) error
Done marks the end of processing for a client
Note that done is called when the client is shutting down and is not expected to be called again It cleans up all the streams handled by the client To cleanup a specific stream, use DoneStream
func (*RecoverableRedisStreamClient) DoneStream ¶ added in v0.1.5
func (r *RecoverableRedisStreamClient) DoneStream(ctx context.Context, dataStreamName string) error
DoneStream marks end of processing for a particular stream
This function is used to mark the end of processing for a particular stream It unlocks the stream, acknowledges the message and deletes the message from the stream.
func (*RecoverableRedisStreamClient) ID ¶
func (r *RecoverableRedisStreamClient) ID() string
ID returns the consumer name that uniquely identifies the consumer
func (*RecoverableRedisStreamClient) Init ¶
func (r *RecoverableRedisStreamClient) Init(ctx context.Context) (<-chan notifs.RecoverableRedisNotification, error)
Init initializes the RedisStreamClient
This function initializes the RedisStreamClient by enabling keyspace notifications for expired events, subscribing to expired events, and starting a blocking read on the LBS stream Returns a channel to read messages from the LBS stream. The client should read from this channel and process the messages.
type RetryConfig ¶ added in v0.3.4
type RetryConfig struct {
// MaxRetries is the maximum number of retry attempts
// -1 => unlimited retries
// 0 => no retries, fail immediately
// >0 => specific number of retry attempts
MaxRetries int
// InitialRetryDelay is the initial delay before the first retry attempt
InitialRetryDelay time.Duration
// MaxRetryDelay is the maximum delay between retries (exponential backoff cap)
MaxRetryDelay time.Duration
}
RetryConfig holds all retry-related configuration
func DefaultRetryConfig ¶ added in v0.3.4
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns the default retry configuration
func (RetryConfig) Validate ¶ added in v0.3.4
func (rc RetryConfig) Validate() error
Validate checks if the retry configuration is valid