queue

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ValkeyUsernameForNamespace added in v0.3.1

func ValkeyUsernameForNamespace(namespace string) string

ValkeyUsernameForNamespace returns the Valkey ACL username for a given namespace.

Types

type PendingEntry added in v0.2.6

type PendingEntry struct {
	ID       string
	Consumer string
	IdleMs   int64
}

PendingEntry represents a pending message with its consumer and idle time.

type ValkeyClient

type ValkeyClient struct {
	// contains filtered or unexported fields
}

ValkeyClient wraps Valkey stream operations for pipeline runs

func NewValkeyClient

func NewValkeyClient(addr, password string) (*ValkeyClient, error)

NewValkeyClient creates a new Valkey client from connection options

func (*ValkeyClient) AckMessage

func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error

AckMessage acknowledges a message in the stream

func (*ValkeyClient) AddToDLQ

func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error

AddToDLQ adds a failed message to the dead letter queue

func (*ValkeyClient) AutoClaim

func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]XMessage, error)

AutoClaim reclaims pending messages that have been idle for too long Returns the reclaimed messages

func (*ValkeyClient) ClaimMessages added in v0.2.6

func (v *ValkeyClient) ClaimMessages(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]XMessage, error)

ClaimMessages transfers ownership of specific messages to a new consumer using XCLAIM.

func (*ValkeyClient) Close

func (v *ValkeyClient) Close()

Close closes the Valkey client connection

func (*ValkeyClient) CreateStreamAndGroup

func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error

CreateStreamAndGroup creates a Valkey stream and consumer group for a pipeline run If the stream doesn't exist, it creates it with MKSTREAM Then creates the consumer group starting from the beginning of the stream

func (*ValkeyClient) DeleteACLUser added in v0.3.1

func (v *ValkeyClient) DeleteACLUser(ctx context.Context, username string) error

DeleteACLUser removes a Valkey ACL user.

func (*ValkeyClient) DeleteMessages

func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error

DeleteMessages removes messages from a stream.

func (*ValkeyClient) EnqueueFileWithAttempts

func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)

EnqueueFileWithAttempts adds a file to the work stream with a specific attempts count Message format: {run: runId, file: filepath, attempts: attemptsCount}

func (*ValkeyClient) EnsureACLUser added in v0.3.1

func (v *ValkeyClient) EnsureACLUser(ctx context.Context, username, password, namespace string) error

EnsureACLUser creates or updates a Valkey ACL user scoped to a namespace key prefix. The user gets minimal permissions: ping and xreadgroup on keys matching ns:<namespace>:*

func (*ValkeyClient) GetConsumerGroupLag

func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)

GetConsumerGroupLag returns the lag (unread messages) for a consumer group

func (*ValkeyClient) GetPendingCount

func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)

GetPendingCount returns the number of pending messages in a consumer group

func (*ValkeyClient) GetPendingEntryDetails added in v0.2.6

func (v *ValkeyClient) GetPendingEntryDetails(ctx context.Context, streamKey, groupName string, minIdleTime int64, count int64) ([]PendingEntry, error)

GetPendingEntryDetails returns pending entries with consumer names and idle times. It uses XPENDING with a range to get details about pending entries whose idle time exceeds minIdleTime milliseconds.

func (*ValkeyClient) GetPendingForConsumer

func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)

GetPendingForConsumer returns up to 'count' pending message IDs for a specific consumer in the given stream and consumer group. It executes: XPENDING <stream> <group> <start> <end> <count> <consumer>

func (*ValkeyClient) GetStreamLength

func (v *ValkeyClient) GetStreamLength(ctx context.Context, streamKey string) (int64, error)

GetStreamLength returns the number of messages in a stream

func (*ValkeyClient) ReadRange

func (v *ValkeyClient) ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]XMessage, error)

ReadRange reads a range of messages from a stream without consuming them.

type XMessage

type XMessage struct {
	ID     string
	Values map[string]string
}

XMessage represents a message from the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL