Documentation
¶
Overview ¶
Package stream provides support for streaming contiguous entries from logs.
Index ¶
- func Entries[T any](bundles iter.Seq2[Bundle, error], bundleFn func([]byte) ([]T, error)) iter.Seq2[Entry[T], error]
- func EntryBundles(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, ...) iter.Seq2[Bundle, error]
- type Bundle
- type Entry
- type Follower
- type GetBundleFn
- type GetTreeSizeFn
- type Streamer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Entries ¶
func Entries[T any](bundles iter.Seq2[Bundle, error], bundleFn func([]byte) ([]T, error)) iter.Seq2[Entry[T], error]
Entries creates a new stream reader which uses the provided bundleFn to process bundles into processed entries of type T.
Different bundleFn implementations can be provided to return raw entry bytes, parsed entry structs, or derivations of entries (e.g. hashes) as needed.
func EntryBundles ¶
func EntryBundles(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, getBundle GetBundleFn, fromEntry uint64, N uint64) iter.Seq2[Bundle, error]
EntryBundles produces an iterator which returns a stream of Bundle structs which cover the requested range of entries in their natural order in the log.
If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned via the iterator.
This adaptor is optimised for the case where calling getBundle has some appreciable latency, and works around that by maintaining a read-ahead cache of subsequent bundles which is populated a number of parallel requests to getBundle. The request parallelism is set by the value of the numWorkers paramemter, which can be tuned to balance throughput against consumption of resources, but such balancing needs to be mindful of the nature of the source infrastructure, and how concurrent requests affect performance (e.g. GCS buckets vs. files on a single disk).
Types ¶
type Bundle ¶
type Bundle struct { // RangeInfo decribes which of the entries in this bundle are relevent. RangeInfo layout.RangeInfo // Data is the raw serialised bundle, as fetched from the log. Data []byte }
Bundle represents an entry bundle in a log, along with some metadata about which parts of the bundle are relevent.
type Entry ¶
type Entry[T any] struct { // Index is the index of the entry in the log. Index uint64 // Entry is the entry from the log. Entry T }
Entry represents a single leaf in a log.
type Follower ¶
type Follower interface { // Name returns a human readable name for this follower. Name() string // Follow should be implemented so as to visit entries in the log in order, using the provided // LogReader to access the entry bundles which contain them. // // Implementations should keep track of their progress such that they can pick-up where they left off // if e.g. the binary is restarted. Follow(context.Context, Streamer) // EntriesProcessed reports the progress of the follower, returning the total number of log entries // successfully seen/processed. EntriesProcessed(context.Context) (uint64, error) }
Follower describes the contract of something which is required to track the contents of the local log.
type GetBundleFn ¶
GetBundleFn is a function which knows how to fetch a single entry bundle from the specified address.
type GetTreeSizeFn ¶
GetTreeSizeFn is a function which knows how to return a tree size.
type Streamer ¶
type Streamer interface { // IntegratedSize returns the current size of the integrated tree. // // This tree will have in place all the static resources the returned size implies, but // there may not yet be a checkpoint for this size signed, witnessed, or published. // // It's ONLY safe to use this value for processes internal to the operation of the log (e.g. // populating antispam data structures); it MUST NOT not be used as a substitute for // reading the checkpoint when only data which has been publicly committed to by the // log should be used. If in doubt, use ReadCheckpoint instead. IntegratedSize(ctx context.Context) (uint64, error) // StreamEntries returns an iterator over the range of requested entries [startEntryIdx, startEntryIdx+N). // // The iterator will yield either a Bundle struct or an error. If an error is returned the caller should // stop consuming from the iterator as it's unlikely that a partial stream of entries from a transparency log // is useful. // // The returned Bundle contains the raw serialised form of the entry bundle, along with a layout.RangeInfo // struct that describes which of the entries in the entry bundle are part of the requested range. StreamEntries(ctx context.Context, startEntryIdx, N uint64) iter.Seq2[Bundle, error] }