queue

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ValkeyClient

type ValkeyClient struct {
	// contains filtered or unexported fields
}

ValkeyClient wraps Valkey stream operations for pipeline runs

func NewValkeyClient

func NewValkeyClient(addr, password string) (*ValkeyClient, error)

NewValkeyClient creates a new Valkey client from connection options

func (*ValkeyClient) AckMessage

func (v *ValkeyClient) AckMessage(ctx context.Context, streamKey, groupName, messageID string) error

AckMessage acknowledges a message in the stream

func (*ValkeyClient) AddToDLQ

func (v *ValkeyClient) AddToDLQ(ctx context.Context, dlqKey, runID, filepath string, attempts int, reason string) error

AddToDLQ adds a failed message to the dead letter queue

func (*ValkeyClient) AutoClaim

func (v *ValkeyClient) AutoClaim(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, count int64) ([]XMessage, error)

AutoClaim reclaims pending messages that have been idle for too long Returns the reclaimed messages

func (*ValkeyClient) Close

func (v *ValkeyClient) Close()

Close closes the Valkey client connection

func (*ValkeyClient) CreateStreamAndGroup

func (v *ValkeyClient) CreateStreamAndGroup(ctx context.Context, streamKey, groupName string) error

CreateStreamAndGroup creates a Valkey stream and consumer group for a pipeline run If the stream doesn't exist, it creates it with MKSTREAM Then creates the consumer group starting from the beginning of the stream

func (*ValkeyClient) DeleteMessages

func (v *ValkeyClient) DeleteMessages(ctx context.Context, streamKey string, messageIDs ...string) error

DeleteMessages removes messages from a stream.

func (*ValkeyClient) EnqueueFileWithAttempts

func (v *ValkeyClient) EnqueueFileWithAttempts(ctx context.Context, streamKey, runID, filepath string, attempts int) (string, error)

EnqueueFileWithAttempts adds a file to the work stream with a specific attempts count Message format: {run: runId, file: filepath, attempts: attemptsCount}

func (*ValkeyClient) GetConsumerGroupLag

func (v *ValkeyClient) GetConsumerGroupLag(ctx context.Context, streamKey, groupName string) (int64, error)

GetConsumerGroupLag returns the lag (unread messages) for a consumer group

func (*ValkeyClient) GetPendingCount

func (v *ValkeyClient) GetPendingCount(ctx context.Context, streamKey, groupName string) (int64, error)

GetPendingCount returns the number of pending messages in a consumer group

func (*ValkeyClient) GetPendingForConsumer

func (v *ValkeyClient) GetPendingForConsumer(ctx context.Context, streamKey, groupName, consumer string, count int64) ([]string, error)

GetPendingForConsumer returns up to 'count' pending message IDs for a specific consumer in the given stream and consumer group. It executes: XPENDING <stream> <group> <start> <end> <count> <consumer>

func (*ValkeyClient) GetStreamLength

func (v *ValkeyClient) GetStreamLength(ctx context.Context, streamKey string) (int64, error)

GetStreamLength returns the number of messages in a stream

func (*ValkeyClient) ReadRange

func (v *ValkeyClient) ReadRange(ctx context.Context, streamKey, start, end string, count int64) ([]XMessage, error)

ReadRange reads a range of messages from a stream without consuming them.

type XMessage

type XMessage struct {
	ID     string
	Values map[string]string
}

XMessage represents a message from the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL