Documentation
¶
Index ¶
- type ValkeyClient
- func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
- func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, ...) error
- func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, ...) ([]XMessage, error)
- func (v *ValkeyClient) Close()
- func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
- func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
- func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
- func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
- func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
- func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
- func (v *ValkeyClient) GetStreamLength(ctx context.Context, streamKey string) (int64, error)
- func (v *ValkeyClient) ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]XMessage, error)
- type XMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ValkeyClient ¶
type ValkeyClient struct {
// contains filtered or unexported fields
}
ValkeyClient wraps Valkey stream operations for pipeline runs
func NewValkeyClient ¶
func NewValkeyClient(addr, password string) (*ValkeyClient, error)
NewValkeyClient creates a new Valkey client from connection options
func (*ValkeyClient) AckMessage ¶
func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
AckMessage acknowledges a message in the stream
func (*ValkeyClient) AddToDLQ ¶
func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
AddToDLQ adds a failed message to the dead letter queue
func (*ValkeyClient) AutoClaim ¶
func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]XMessage, error)
AutoClaim reclaims pending messages that have been idle for too long Returns the reclaimed messages
func (*ValkeyClient) Close ¶
func (v *ValkeyClient) Close()
Close closes the Valkey client connection
func (*ValkeyClient) CreateStreamAndGroup ¶
func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
CreateStreamAndGroup creates a Valkey stream and consumer group for a pipeline run If the stream doesn't exist, it creates it with MKSTREAM Then creates the consumer group starting from the beginning of the stream
func (*ValkeyClient) DeleteMessages ¶
func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
DeleteMessages removes messages from a stream.
func (*ValkeyClient) EnqueueFileWithAttempts ¶
func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
EnqueueFileWithAttempts adds a file to the work stream with a specific attempts count Message format: {run: runId, file: filepath, attempts: attemptsCount}
func (*ValkeyClient) GetConsumerGroupLag ¶
func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
GetConsumerGroupLag returns the lag (unread messages) for a consumer group
func (*ValkeyClient) GetPendingCount ¶
func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingCount returns the number of pending messages in a consumer group
func (*ValkeyClient) GetPendingForConsumer ¶
func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
GetPendingForConsumer returns up to 'count' pending message IDs for a specific consumer in the given stream and consumer group. It executes: XPENDING <stream> <group> <start> <end> <count> <consumer>
func (*ValkeyClient) GetStreamLength ¶
GetStreamLength returns the number of messages in a stream