Documentation
¶
Index ¶
- func ValkeyUsernameForNamespace(namespace string) string
- type PendingEntry
- type ValkeyClient
- func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
- func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, ...) error
- func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, ...) ([]XMessage, error)
- func (v *ValkeyClient) ClaimMessages(ctx context.Context, streamKey, groupName, consumerName string, ...) ([]XMessage, error)
- func (v *ValkeyClient) Close()
- func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
- func (v *ValkeyClient) DeleteACLUser(ctx context.Context, username string) error
- func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
- func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
- func (v *ValkeyClient) EnsureACLUser(ctx context.Context, username, password, namespace string) error
- func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
- func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
- func (v *ValkeyClient) GetPendingEntryDetails(ctx context.Context, streamKey, groupName string, minIdleTime int64, ...) ([]PendingEntry, error)
- func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
- func (v *ValkeyClient) GetStreamLength(ctx context.Context, streamKey string) (int64, error)
- func (v *ValkeyClient) ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]XMessage, error)
- type XMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ValkeyUsernameForNamespace ¶ added in v0.3.1
ValkeyUsernameForNamespace returns the Valkey ACL username for a given namespace.
Types ¶
type PendingEntry ¶ added in v0.2.6
PendingEntry represents a pending message with its consumer and idle time.
type ValkeyClient ¶
type ValkeyClient struct {
// contains filtered or unexported fields
}
ValkeyClient wraps Valkey stream operations for pipeline runs
func NewValkeyClient ¶
func NewValkeyClient(addr, password string) (*ValkeyClient, error)
NewValkeyClient creates a new Valkey client from connection options
func (*ValkeyClient) AckMessage ¶
func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error
AckMessage acknowledges a message in the stream
func (*ValkeyClient) AddToDLQ ¶
func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error
AddToDLQ adds a failed message to the dead letter queue
func (*ValkeyClient) AutoClaim ¶
func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]XMessage, error)
AutoClaim reclaims pending messages that have been idle for too long Returns the reclaimed messages
func (*ValkeyClient) ClaimMessages ¶ added in v0.2.6
func (v *ValkeyClient) ClaimMessages(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]XMessage, error)
ClaimMessages transfers ownership of specific messages to a new consumer using XCLAIM.
func (*ValkeyClient) Close ¶
func (v *ValkeyClient) Close()
Close closes the Valkey client connection
func (*ValkeyClient) CreateStreamAndGroup ¶
func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error
CreateStreamAndGroup creates a Valkey stream and consumer group for a pipeline run If the stream doesn't exist, it creates it with MKSTREAM Then creates the consumer group starting from the beginning of the stream
func (*ValkeyClient) DeleteACLUser ¶ added in v0.3.1
func (v *ValkeyClient) DeleteACLUser(ctx context.Context, username string) error
DeleteACLUser removes a Valkey ACL user.
func (*ValkeyClient) DeleteMessages ¶
func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error
DeleteMessages removes messages from a stream.
func (*ValkeyClient) EnqueueFileWithAttempts ¶
func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)
EnqueueFileWithAttempts adds a file to the work stream with a specific attempts count Message format: {run: runId, file: filepath, attempts: attemptsCount}
func (*ValkeyClient) EnsureACLUser ¶ added in v0.3.1
func (v *ValkeyClient) EnsureACLUser(ctx context.Context, username, password, namespace string) error
EnsureACLUser creates or updates a Valkey ACL user scoped to a namespace key prefix. The user gets minimal permissions: ping and xreadgroup on keys matching ns:<namespace>:*
func (*ValkeyClient) GetConsumerGroupLag ¶
func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)
GetConsumerGroupLag returns the lag (unread messages) for a consumer group
func (*ValkeyClient) GetPendingCount ¶
func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)
GetPendingCount returns the number of pending messages in a consumer group
func (*ValkeyClient) GetPendingEntryDetails ¶ added in v0.2.6
func (v *ValkeyClient) GetPendingEntryDetails(ctx context.Context, streamKey, groupName string, minIdleTime int64, count int64) ([]PendingEntry, error)
GetPendingEntryDetails returns pending entries with consumer names and idle times. It uses XPENDING with a range to get details about pending entries whose idle time exceeds minIdleTime milliseconds.
func (*ValkeyClient) GetPendingForConsumer ¶
func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)
GetPendingForConsumer returns up to 'count' pending message IDs for a specific consumer in the given stream and consumer group. It executes: XPENDING <stream> <group> <start> <end> <count> <consumer>
func (*ValkeyClient) GetStreamLength ¶
GetStreamLength returns the number of messages in a stream