Documentation
¶
Index ¶
- Variables
- func Concat[T any](iter1, iter2 storage.Iterator[T]) storage.Iterator[T]
- func Drain(ch <-chan *Msg) *sync.WaitGroup
- func Error[T any](err error) storage.Iterator[T]
- func FanInIteratorChannels(ctx context.Context, chans []<-chan *Msg) <-chan *Msg
- func FromChannel(in chan *Msg) storage.Iterator[string]
- func Merge[T any](iter1, iter2 storage.Iterator[T], compareFn func(a, b T) int) storage.Iterator[T]
- func NewFilteredIterator[T any](iter storage.Iterator[T], filters ...FilterFunc[T]) storage.Iterator[T]
- func NextItemInSliceStreams(ctx context.Context, streamSlices []*Stream, streamToProcess []int) (string, error)
- func SkipTo(ctx context.Context, iter storage.Iterator[string], target string) error
- func ToChannel[T any](ctx context.Context, iter storage.Iterator[T], batchSize int) chan ValueMsg[T]
- func Validate[T any](base storage.Iterator[T], fn func(T) (bool, error)) storage.Iterator[T]
- type FilterFunc
- type MergedIterator
- type Msg
- type Stream
- func (s *Stream) Drain(ctx context.Context) ([]string, error)
- func (s *Stream) Head(ctx context.Context) (string, error)
- func (s *Stream) Idx() int
- func (s *Stream) Next(ctx context.Context) (string, error)
- func (s *Stream) SkipToTargetObject(ctx context.Context, target string) error
- func (s *Stream) Stop()
- type Streams
- type ValueMsg
Constants ¶
This section is empty.
Variables ¶
var ErrHeadNotSupportedFilterIterator = errors.New("head() not supported on filter iterator")
var ErrHeadNotSupportedMergedIterator = errors.New("head() not supported on merged iterator")
Functions ¶
func Concat ¶ added in v1.12.0
Concat returns an iterator that first yields all items from iter1, then all items from iter2. It exhausts iter1 completely before moving to iter2.
This iterator is not thread-safe and should only be consumed by a single goroutine.
func FanInIteratorChannels ¶ added in v1.8.15
func NewFilteredIterator ¶ added in v1.12.0
func NextItemInSliceStreams ¶
func NextItemInSliceStreams(ctx context.Context, streamSlices []*Stream, streamToProcess []int) (string, error)
NextItemInSliceStreams will advance all streamSlices specified in streamToProcess and return the item advanced. Assumption is that the stream slices first item is identical, and we want to advance all these streams.
Types ¶
type FilterFunc ¶ added in v1.12.0
FilterFunc is a function that determines whether an item should be included in the iterator results. It returns true if the item passes the filter, false otherwise. If an error occurs during filtering, it should be returned.
type MergedIterator ¶ added in v1.12.0
type MergedIterator[T any] struct { // contains filtered or unexported fields }
func (*MergedIterator[T]) Head ¶ added in v1.12.0
func (m *MergedIterator[T]) Head(ctx context.Context) (T, error)
func (*MergedIterator[T]) Next ¶ added in v1.12.0
func (m *MergedIterator[T]) Next(ctx context.Context) (T, error)
func (*MergedIterator[T]) Stop ¶ added in v1.12.0
func (m *MergedIterator[T]) Stop()
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream aggregates multiple iterators that are sent to a source channel into one iterator.
func (*Stream) Head ¶
Head returns the first item in the buffer. If the Head is sourceIsClosed or cancelled, it will stop the buffer and set the buffer to nil.
func (*Stream) SkipToTargetObject ¶
SkipToTargetObject moves the buffer until the buffer's head object is >= target object. If the buffer is drained and no more items, it will set to stop and buffer will be nil.
type Streams ¶
type Streams struct {
// contains filtered or unexported fields
}
func NewStreams ¶
func (*Streams) CleanDone ¶
CleanDone will clean up the sourceIsClosed iterator streams and return a list of the remaining active streams. To be considered active your source channel must still be open.
func (*Streams) GetActiveStreamsCount ¶
GetActiveStreamsCount will return the active streams from the last time CleanDone was called.