s3

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package s3 provides a snapshot.Source that scans every JSON-Lines object under an S3 prefix and emits records for bootstrap. The canonical use case: bootstrap a Murmur pipeline from a partitioned archive — a Firehose-archived stream, a daily DDB export, or a Hive-style partitioned dump.

Composes pkg/source/snapshot/jsonl (which decodes one io.Reader of JSON Lines) with the S3 ListObjectsV2 + GetObject + gzip pattern. Most users want this rather than the bare jsonl source — the prefix- scan plus per-object gzip handling is what makes the S3 case operationally usable.

Key ordering

S3 ListObjectsV2 returns keys in lexicographic order. For Firehose archives partitioned as `prefix/year=2026/month=05/day=08/`, the resulting scan order is chronological, which is the natural order for replay. For other partitioning schemes, callers can pre-filter the key list via Config.KeyFilter.

Gzipped objects

Keys ending in `.gz` are auto-decompressed via gzip.NewReader before being passed through to the JSON Lines decoder. Other compressions (snappy, zstd) require a custom Config.OpenObject hook.

Bounded concurrency

Set Config.Concurrency to fetch and decode N objects in parallel. Default 1 (sequential, preserves S3's lexicographic key order in the emitted record stream). Bumping to 4–16 is the usual operational move when the prefix contains many small objects and GetObject latency dominates; bootstrap dedup downstream handles the non-deterministic record ordering that parallel mode produces.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// Client is an SDK v2 S3 client. The Source does not own its
	// lifecycle.
	Client *awss3.Client

	// Bucket is the S3 bucket to scan.
	Bucket string

	// Prefix narrows the scan (e.g. "events/year=2026/month=05/").
	// Empty means the entire bucket — usually undesirable.
	Prefix string

	// Decode is the per-line JSON-Lines decoder. Defaults to
	// jsonl.DefaultDecoder[T].
	Decode jsonl.Decoder[T]

	// EventID derives the dedup key per record. Receives the source
	// object key + line number for context. Default: "<key>:<line>".
	EventID func(decoded T, key string, lineNum int) string

	// EventTime, when non-nil, derives the per-record EventTime used
	// for window bucket assignment. Defaults to time.Now() when unset;
	// backfill of windowed counters must wire this to the record's
	// source-of-truth timestamp (e.g., the bucket-mid `occurred_at`
	// field on a Spark-aggregated row) or every row will land in the
	// same bucket.
	EventTime func(decoded T) time.Time

	// KeyFilter, when non-nil, is called for every listed object key
	// and must return true for keys that should be scanned. Use this
	// to skip non-data objects (manifests, _SUCCESS markers) or to
	// restrict the scan to a date range without changing the prefix.
	KeyFilter func(key string) bool

	// OnDecodeError is forwarded to the underlying jsonl source.
	OnDecodeError func(key string, lineNum int, line []byte, err error)

	// HandoffToken is what CaptureHandoff returns. Same caller-supplied
	// pattern as jsonl: the live-source resume position is captured
	// externally (e.g., when generating the S3 archive).
	HandoffToken snapshot.HandoffToken

	// MaxLineSize is forwarded to the jsonl source. Defaults to 1 MB.
	MaxLineSize int

	// OpenObject, when non-nil, overrides the default GetObject + gzip
	// handling. Use for custom compression (snappy, zstd) or for tests
	// that want to inject a static body. The returned Closer is closed
	// after the per-object scan completes.
	OpenObject func(ctx context.Context, key string) (io.ReadCloser, error)

	// ListKeys, when non-nil, overrides the default ListObjectsV2 paged
	// scan. Use for tests that want a fixed key set, or for callers
	// that maintain an external manifest of archive keys (e.g., a
	// `_manifest.json` written alongside the data files). When nil,
	// the source uses Client.ListObjectsV2 paged over the configured
	// Bucket + Prefix.
	ListKeys func(ctx context.Context) ([]string, error)

	// Concurrency bounds the number of objects fetched and decoded in
	// parallel. Default 1 (sequential, preserves lexicographic key
	// order for the emitted records). Set higher (typically 4–16) when
	// the prefix contains many small objects and the per-object
	// `GetObject` latency dominates. With Concurrency > 1 the record
	// emission order is non-deterministic across keys; per-record
	// EventID derivation handles dedup so this is safe under the
	// at-least-once contract.
	Concurrency int
}

Config configures an S3 prefix-scanning snapshot Source.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements snapshot.Source[T] over an S3 prefix.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs the Source. Validates required fields.

func (*Source[T]) CaptureHandoff

func (s *Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)

CaptureHandoff returns the configured handoff token, or nil.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close is a no-op; the underlying S3 client is owned by the caller.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "s3:<bucket>/<prefix>".

func (*Source[T]) Resume

func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error

Resume restarts the scan from the beginning. At-least-once dedup at bootstrap.Run absorbs duplicate emissions; mid-prefix resumption would require persisting a per-key checkpoint and is left as future work.

func (*Source[T]) Scan

func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan lists the prefix and scans each object's lines through the configured decoder, emitting records into out. Returns nil when every object has been consumed; non-nil on the first fatal error.

Per-object decode errors fire OnDecodeError but do NOT abort — a single poison line in a 100-object archive shouldn't fail the bootstrap.

With Config.Concurrency > 1, up to N objects are fetched and decoded in parallel; records from any worker funnel into the single `out` channel. The emission order across keys is non-deterministic in that mode (within a single key, lines remain in file order). Sequential (Concurrency <= 1) preserves the lexicographic key order S3 returns.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL