Documentation
¶
Overview ¶
Package parquet provides a snapshot.Source that reads Apache Parquet from any io.ReaderAt — local files, S3 objects fetched into memory, in-memory bytes from tests, etc. The canonical use case: bootstrap a Murmur pipeline from a Spark-produced backfill written to S3.
Parquet is the recommended interchange format when the upstream is Spark (or any columnar engine): 5–20x smaller than gzipped JSON Lines, predicate-pushdown-friendly, and native to every modern distributed query engine. JSON Lines (pkg/source/snapshot/jsonl) is still the right choice for ad-hoc / hand-built dumps and database exports.
Pairs with the S3 prefix scanner (pkg/source/snapshot/parquet#S3Source) — that's the multi-file partition-aware Hive-style discoverer that most production callers want. This file is the single-file reader that decodes one Parquet object.
Compression ¶
Snappy / gzip / zstd / lz4 are handled transparently by the Arrow Parquet library based on the per-column-chunk metadata. No caller-side compression handling is required.
Decoding ¶
Parquet is columnar, but bootstrap.Run consumes one Record[T] at a time. This source materializes one row group of arrow.Records at a time and walks each row into T via a user-supplied Decoder[T]. The Decoder receives an arrow.Record + row index so it can pull only the columns it needs by name.
Package parquet — S3 prefix scanner with Hive-style partition discovery and predicate-pushdown.
Spark writes partitioned datasets as
s3://bucket/prefix/year=2026/month=05/day=08/hour=14/part-00000.parquet
where the partition columns are embedded in the key path (Hive convention). This source lists all `.parquet` objects under a prefix, parses the partition columns out of each key, optionally drops whole partitions via a caller-supplied predicate, and decodes the remaining objects through pkg/source/snapshot/parquet#Source.
Concurrency ¶
Object fetches run in parallel up to MaxConcurrency (default 4). Decoded records still arrive on the single out channel in roughly-arrival order from the bounded worker pool — not strictly partition-sorted. Bootstrap callers should not depend on arrival order; downstream monoid combine is commutative.
Index ¶
- type Config
- type Decoder
- type EventIDFn
- type EventTimeFn
- type Partition
- type PartitionFilter
- type S3Config
- type S3Source
- func (s *S3Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)
- func (s *S3Source[T]) Close() error
- func (s *S3Source[T]) Name() string
- func (s *S3Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error
- func (s *S3Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error
- type Source
- func (s *Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)
- func (s *Source[T]) Close() error
- func (s *Source[T]) Name() string
- func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error
- func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error
- func (s *Source[T]) Stats() Stats
- type Stats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[T any] struct { // Reader is the Parquet object. parquet.ReaderAtSeeker is // satisfied by *os.File and by aws.ReadSeekCloser bodies. // The Source does not own its lifecycle; close it externally // (or use NewSourceClosing to delegate). Reader parquet.ReaderAtSeeker // Name is a stable identifier for logging / metrics. Typically the // S3 key or the file path. Name string // Decode converts an Arrow row into T. Required — no // default schema-introspection exists; callers know their schema. Decode Decoder[T] // EventID derives the dedup key per record. Optional; see EventIDFn. EventID EventIDFn[T] // EventTime extracts the upstream event time. Optional; default // is time.Now at decode. EventTime EventTimeFn[T] // OnDecodeError, when non-nil, is invoked for rows that fail to // decode. Default behavior: drop silently and continue. Wire to a // DLQ producer or logger to surface poison rows. OnDecodeError func(rowOrdinal int, err error) // BatchSize is the number of rows materialized per arrow.Record // pulled out of Parquet. Defaults to 4096. Lower for large rows // (less peak memory), higher for tiny rows (less per-batch // overhead). BatchSize int64 // Allocator is the Arrow memory allocator. Defaults to // memory.DefaultAllocator. Override for tracked allocators in // tests or for arena-style reuse. Allocator memory.Allocator // HandoffToken, when non-nil, is what CaptureHandoff returns. // Parquet snapshots have no inherent live-source resume position; // the caller's deployment system is responsible for capturing one // before the Parquet file was generated (typically a Kinesis shard // timestamp or Kafka offset). HandoffToken snapshot.HandoffToken }
Config configures a single-Parquet-file snapshot Source.
type Decoder ¶
Decoder converts a single Parquet row (column-array + row index) into T. Implementations should pull only the columns they need from the arrow.Record by name; cross-record allocations are the caller's responsibility (the Record is reused after the call returns when streaming via record batches).
type EventIDFn ¶
EventIDFn extracts a stable per-record dedup key from the decoded value. When unset, the source emits a synthetic "<reader-name>:<row-ordinal>" pair, unique within a single run but not stable across re-runs. For idempotent re-runs, return the natural primary key (entity_id + bucket).
type EventTimeFn ¶
EventTimeFn extracts the upstream event time. When unset, the source uses processing time at decode (time.Now). For the canonical CountEvent schema, a typical impl returns time.UnixMilli(occurredAtUnixMs).
type Partition ¶
type Partition struct {
// Key is the full S3 object key.
Key string
// Values maps each `name=value` segment found in the key path
// (relative to Config.Prefix) to its value.
Values map[string]string
}
Partition is the set of Hive-style partition columns parsed from a single Parquet object key. Only columns present in the key path are populated; absent columns hold the zero string. Values are kept as strings — the caller's PartitionFilter does its own type conversion.
Multi-segment keys (e.g. `events/year=2026/month=05/day=08/hour=14/part.parquet`) yield {Key: "events/.../part.parquet", Values: {"year": "2026", "month": "05", "day": "08", "hour": "14"}}.
type PartitionFilter ¶
PartitionFilter, when non-nil on Config, is invoked once per discovered object. Return false to skip the object entirely (no GetObject, no decode). Use this to prune whole partitions by time-range without listing them out of S3 (S3 ListObjectsV2 is already filtered by prefix, but Hive partitioning typically wants finer-grained pruning).
type S3Config ¶
type S3Config[T any] struct { // Client is an SDK v2 S3 client. The Source does not own its // lifecycle. Client *awss3.Client // Bucket is the S3 bucket to scan. Bucket string // Prefix narrows the scan, e.g. "events/". Empty means the entire // bucket — usually undesirable. Hive-style partition segments // (`year=...`) under this prefix are auto-discovered. Prefix string // Decode is the per-row Arrow decoder. Required; see // parquet.Decoder. Decode Decoder[T] // EventID derives the dedup key per record. Receives the object // key + row ordinal within that object. Default: // "<key>:<row-ordinal>". EventID func(decoded T, key string, rowOrdinal int) string // EventTime extracts the upstream event time from a decoded row. // Optional; default is processing time at decode. EventTime EventTimeFn[T] // PartitionFilter, when non-nil, is invoked once per discovered // object. Return false to skip — no GetObject, no decode. Use // for time-range pruning under a static prefix. PartitionFilter PartitionFilter // KeyFilter, when non-nil, is a low-level pre-PartitionFilter // hook. Receives the raw key and must return true to keep it. // Use this to drop control objects like `_SUCCESS` markers or // `_committed_*` files that Spark writes alongside the data. // Defaults to skipping anything that doesn't end in `.parquet`. KeyFilter func(key string) bool // OnDecodeError is forwarded to the underlying Parquet source for // each object. Receives the object key + row ordinal + error. OnDecodeError func(key string, rowOrdinal int, err error) // MaxConcurrency caps the number of concurrent object reads. // Default 4. Capped at len(matched keys). MaxConcurrency int // BatchSize is forwarded to the per-file source as the Arrow // record-batch size. Defaults to 4096. BatchSize int64 // HandoffToken is what CaptureHandoff returns. Same pattern as // the other snapshot sources: caller is responsible for capturing // it before the Parquet dataset was generated. HandoffToken snapshot.HandoffToken // OpenObject, when non-nil, overrides the default GetObject path. // Use for tests that want to inject deterministic file bodies // without a live S3 client. The returned bytes must be the // complete Parquet file (the Arrow library needs ReaderAt + seek, // so partial / streaming bodies aren't supported). OpenObject func(ctx context.Context, key string) ([]byte, error) }
S3Config configures an S3-prefix Parquet snapshot Source.
type S3Source ¶
type S3Source[T any] struct { // contains filtered or unexported fields }
S3Source implements snapshot.Source[T] over an S3 prefix of Hive-partitioned Parquet objects.
func NewS3Source ¶
NewS3Source constructs the S3Source. Validates required fields.
func (*S3Source[T]) CaptureHandoff ¶
CaptureHandoff returns the configured handoff token, or nil.
func (*S3Source[T]) Resume ¶
Resume restarts the scan from the beginning. At-least-once dedup at bootstrap.Run absorbs duplicate emissions.
type Source ¶
type Source[T any] struct { // contains filtered or unexported fields }
Source implements snapshot.Source[T] over a single Parquet file.
func NewSourceClosing ¶
NewSourceClosing is the convenience variant for callers that want snapshot.Source.Close to also close the underlying Reader. The Reader must implement io.Closer; otherwise NewSource is the right choice.
func (*Source[T]) CaptureHandoff ¶
CaptureHandoff returns the configured handoff token, or nil if none. See Config.HandoffToken.
func (*Source[T]) Close ¶
Close releases resources. If NewSourceClosing was used, also closes the underlying Reader. Idempotent.
func (*Source[T]) Resume ¶
Resume — Parquet sources don't inherently support mid-stream resumption; the file is read top-to-bottom. Re-Scan from the start; at-least-once dedup absorbs the duplicate emissions. Mid-stream resumption would require persisting (row-group-index, row-offset) and is left as future work.