iterator

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrHeadNotSupportedFilterIterator = errors.New("head() not supported on filter iterator")
View Source
var ErrHeadNotSupportedMergedIterator = errors.New("head() not supported on merged iterator")

Functions

func Concat added in v1.12.0

func Concat[T any](iter1, iter2 storage.Iterator[T]) storage.Iterator[T]

Concat returns an iterator that first yields all items from iter1, then all items from iter2. It exhausts iter1 completely before moving to iter2.

This iterator is not thread-safe and should only be consumed by a single goroutine.

func Drain added in v1.8.13

func Drain(ch <-chan *Msg) *sync.WaitGroup

func Error added in v1.11.4

func Error[T any](err error) storage.Iterator[T]

func FanInIteratorChannels added in v1.8.15

func FanInIteratorChannels(ctx context.Context, chans []<-chan *Msg) <-chan *Msg

func FromChannel added in v1.12.0

func FromChannel(in chan *Msg) storage.Iterator[string]

func Merge added in v1.12.0

func Merge[T any](iter1, iter2 storage.Iterator[T], compareFn func(a, b T) int) storage.Iterator[T]

func NewFilteredIterator added in v1.12.0

func NewFilteredIterator[T any](iter storage.Iterator[T], filters ...FilterFunc[T]) storage.Iterator[T]

func NextItemInSliceStreams

func NextItemInSliceStreams(ctx context.Context, streamSlices []*Stream, streamToProcess []int) (string, error)

NextItemInSliceStreams will advance all streamSlices specified in streamToProcess and return the item advanced. Assumption is that the stream slices first item is identical, and we want to advance all these streams.

func SkipTo added in v1.12.0

func SkipTo(ctx context.Context, iter storage.Iterator[string], target string) error

func ToChannel added in v1.12.0

func ToChannel[T any](ctx context.Context, iter storage.Iterator[T], batchSize int) chan ValueMsg[T]

func Validate added in v1.11.4

func Validate[T any](base storage.Iterator[T], fn func(T) (bool, error)) storage.Iterator[T]

Types

type FilterFunc added in v1.12.0

type FilterFunc[T any] func(T) (bool, error)

FilterFunc is a function that determines whether an item should be included in the iterator results. It returns true if the item passes the filter, false otherwise. If an error occurs during filtering, it should be returned.

type MergedIterator added in v1.12.0

type MergedIterator[T any] struct {
	// contains filtered or unexported fields
}

func (*MergedIterator[T]) Head added in v1.12.0

func (m *MergedIterator[T]) Head(ctx context.Context) (T, error)

func (*MergedIterator[T]) Next added in v1.12.0

func (m *MergedIterator[T]) Next(ctx context.Context) (T, error)

func (*MergedIterator[T]) Stop added in v1.12.0

func (m *MergedIterator[T]) Stop()

type Msg

type Msg struct {
	Iter storage.Iterator[string] // NOTE: This could be made completely generic if needed
	Err  error
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream aggregates multiple iterators that are sent to a source channel into one iterator.

func NewStream

func NewStream(idx int, source chan *Msg) *Stream

func (*Stream) Drain

func (s *Stream) Drain(ctx context.Context) ([]string, error)

Drain all item in the stream's buffer and return these items.

func (*Stream) Head

func (s *Stream) Head(ctx context.Context) (string, error)

Head returns the first item in the buffer. If the Head is sourceIsClosed or cancelled, it will stop the buffer and set the buffer to nil.

func (*Stream) Idx

func (s *Stream) Idx() int

func (*Stream) Next

func (s *Stream) Next(ctx context.Context) (string, error)

func (*Stream) SkipToTargetObject

func (s *Stream) SkipToTargetObject(ctx context.Context, target string) error

SkipToTargetObject moves the buffer until the buffer's head object is >= target object. If the buffer is drained and no more items, it will set to stop and buffer will be nil.

func (*Stream) Stop

func (s *Stream) Stop()

type Streams

type Streams struct {
	// contains filtered or unexported fields
}

func NewStreams

func NewStreams(streams []*Stream) *Streams

func (*Streams) CleanDone

func (s *Streams) CleanDone(ctx context.Context) ([]*Stream, error)

CleanDone will clean up the sourceIsClosed iterator streams and return a list of the remaining active streams. To be considered active your source channel must still be open.

func (*Streams) GetActiveStreamsCount

func (s *Streams) GetActiveStreamsCount() int

GetActiveStreamsCount will return the active streams from the last time CleanDone was called.

func (*Streams) Stop

func (s *Streams) Stop()

Stop will Drain all streams completely to avoid leaving dangling resources NOTE: caller should consider running this in a goroutine to not block.

type ValueMsg added in v1.12.0

type ValueMsg[T any] struct {
	Value T
	Err   error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL