Documentation
¶
Overview ¶
Package s3 provides a snapshot.Source that scans every JSON-Lines object under an S3 prefix and emits records for bootstrap. The canonical use case: bootstrap a Murmur pipeline from a partitioned archive — a Firehose-archived stream, a daily DDB export, or a Hive-style partitioned dump.
Composes pkg/source/snapshot/jsonl (which decodes one io.Reader of JSON Lines) with the S3 ListObjectsV2 + GetObject + gzip pattern. Most users want this rather than the bare jsonl source — the prefix- scan plus per-object gzip handling is what makes the S3 case operationally usable.
Key ordering ¶
S3 ListObjectsV2 returns keys in lexicographic order. For Firehose archives partitioned as `prefix/year=2026/month=05/day=08/`, the resulting scan order is chronological, which is the natural order for replay. For other partitioning schemes, callers can pre-filter the key list via Config.KeyFilter.
Gzipped objects ¶
Keys ending in `.gz` are auto-decompressed via gzip.NewReader before being passed through to the JSON Lines decoder. Other compressions (snappy, zstd) require a custom Config.OpenObject hook.
Bounded concurrency ¶
Set Config.Concurrency to fetch and decode N objects in parallel. Default 1 (sequential, preserves S3's lexicographic key order in the emitted record stream). Bumping to 4–16 is the usual operational move when the prefix contains many small objects and GetObject latency dominates; bootstrap dedup downstream handles the non-deterministic record ordering that parallel mode produces.
Index ¶
- type Config
- type Source
- func (s *Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)
- func (s *Source[T]) Close() error
- func (s *Source[T]) Name() string
- func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error
- func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[T any] struct { // Client is an SDK v2 S3 client. The Source does not own its // lifecycle. Client *awss3.Client // Bucket is the S3 bucket to scan. Bucket string // Prefix narrows the scan (e.g. "events/year=2026/month=05/"). // Empty means the entire bucket — usually undesirable. Prefix string // Decode is the per-line JSON-Lines decoder. Defaults to // jsonl.DefaultDecoder[T]. Decode jsonl.Decoder[T] // EventID derives the dedup key per record. Receives the source // object key + line number for context. Default: "<key>:<line>". EventID func(decoded T, key string, lineNum int) string // EventTime, when non-nil, derives the per-record EventTime used // for window bucket assignment. Defaults to time.Now() when unset; // backfill of windowed counters must wire this to the record's // source-of-truth timestamp (e.g., the bucket-mid `occurred_at` // field on a Spark-aggregated row) or every row will land in the // same bucket. EventTime func(decoded T) time.Time // KeyFilter, when non-nil, is called for every listed object key // and must return true for keys that should be scanned. Use this // to skip non-data objects (manifests, _SUCCESS markers) or to // restrict the scan to a date range without changing the prefix. KeyFilter func(key string) bool // OnDecodeError is forwarded to the underlying jsonl source. OnDecodeError func(key string, lineNum int, line []byte, err error) // HandoffToken is what CaptureHandoff returns. Same caller-supplied // pattern as jsonl: the live-source resume position is captured // externally (e.g., when generating the S3 archive). HandoffToken snapshot.HandoffToken // MaxLineSize is forwarded to the jsonl source. Defaults to 1 MB. MaxLineSize int // OpenObject, when non-nil, overrides the default GetObject + gzip // handling. Use for custom compression (snappy, zstd) or for tests // that want to inject a static body. The returned Closer is closed // after the per-object scan completes. OpenObject func(ctx context.Context, key string) (io.ReadCloser, error) // ListKeys, when non-nil, overrides the default ListObjectsV2 paged // scan. Use for tests that want a fixed key set, or for callers // that maintain an external manifest of archive keys (e.g., a // `_manifest.json` written alongside the data files). When nil, // the source uses Client.ListObjectsV2 paged over the configured // Bucket + Prefix. ListKeys func(ctx context.Context) ([]string, error) // Concurrency bounds the number of objects fetched and decoded in // parallel. Default 1 (sequential, preserves lexicographic key // order for the emitted records). Set higher (typically 4–16) when // the prefix contains many small objects and the per-object // `GetObject` latency dominates. With Concurrency > 1 the record // emission order is non-deterministic across keys; per-record // EventID derivation handles dedup so this is safe under the // at-least-once contract. Concurrency int }
Config configures an S3 prefix-scanning snapshot Source.
type Source ¶
type Source[T any] struct { // contains filtered or unexported fields }
Source implements snapshot.Source[T] over an S3 prefix.
func (*Source[T]) CaptureHandoff ¶
CaptureHandoff returns the configured handoff token, or nil.
func (*Source[T]) Resume ¶
Resume restarts the scan from the beginning. At-least-once dedup at bootstrap.Run absorbs duplicate emissions; mid-prefix resumption would require persisting a per-key checkpoint and is left as future work.
func (*Source[T]) Scan ¶
Scan lists the prefix and scans each object's lines through the configured decoder, emitting records into out. Returns nil when every object has been consumed; non-nil on the first fatal error.
Per-object decode errors fire OnDecodeError but do NOT abort — a single poison line in a 100-object archive shouldn't fail the bootstrap.
With Config.Concurrency > 1, up to N objects are fetched and decoded in parallel; records from any worker funnel into the single `out` channel. The emission order across keys is non-deterministic in that mode (within a single key, lines remain in file order). Sequential (Concurrency <= 1) preserves the lexicographic key order S3 returns.