wqueue

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package wqueue provides a write queue implementation for managing data shards.

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownShard = errors.New("unknown shard")

ErrUnknownShard indicates that the shard is not found.

Functions

This section is empty.

Types

type Metrics

type Metrics interface {
	// DeleteAll deletes all metrics.
	DeleteAll()
}

Metrics is the interface of metrics.

type Opts

type Opts[S SubQueue, O any] struct {
	SubQueueCreator SubQueueCreator[S, O]
	GetNodes        func(common.ShardID) []string
	Metrics         Metrics
	Option          O
	Group           string
	Location        string
	SegmentInterval storage.IntervalRule
	ShardNum        uint32
}

Opts contains configuration options for creating a queue.

type Queue

type Queue[S SubQueue, O any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Queue represents a write queue that manages multiple shards.

func Open

func Open[S SubQueue, O any](ctx context.Context, opts Opts[S, O], _ string) (*Queue[S, O], error)

Open creates and initializes a new queue with the given options.

func (*Queue[S, O]) Close

func (q *Queue[S, O]) Close() error

Close closes the queue and all its shards.

func (*Queue[S, O]) GetNodes

func (q *Queue[S, O]) GetNodes(shardID common.ShardID) []string

GetNodes returns the nodes for the given shard ID.

func (*Queue[S, O]) GetOrCreateShard

func (q *Queue[S, O]) GetOrCreateShard(shardID common.ShardID) (*Shard[S], error)

GetOrCreateShard gets or creates a shard with the given ShardID. If the shard already exists, it returns it without locking. If the shard doesn't exist, it creates a new one with proper locking.

func (*Queue[S, O]) GetTimeRange

func (q *Queue[S, O]) GetTimeRange(ts time.Time) timestamp.TimeRange

GetTimeRange returns a valid time range based on an input timestamp. It uses the Queue's SegmentInterval to generate the time range.

func (*Queue[S, O]) UpdateOptions

func (q *Queue[S, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts)

UpdateOptions updates the queue options with new resource options.

type Shard

type Shard[S SubQueue] struct {
	// contains filtered or unexported fields
}

Shard represents a data shard containing a sub-queue and associated metadata.

func (Shard[S]) Close

func (s Shard[S]) Close() error

Close closes the shard and its underlying sub-queue.

func (Shard[S]) SubQueue

func (s Shard[S]) SubQueue() S

SubQueue returns the underlying sub-queue of the shard.

type SubQueue

type SubQueue interface {
	io.Closer
}

SubQueue represents a sub-queue interface that can be closed.

type SubQueueCreator

type SubQueueCreator[S SubQueue, O any] func(fileSystem fs.FileSystem, root string, position common.Position,
	l *logger.Logger, option O, metrics any, group string, shardID common.ShardID, getNodes func() []string) (S, error)

SubQueueCreator is a function type that creates a sub-queue with the given parameters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL