parquet

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package parquet provides a snapshot.Source that reads Apache Parquet from any io.ReaderAt — local files, S3 objects fetched into memory, in-memory bytes from tests, etc. The canonical use case: bootstrap a Murmur pipeline from a Spark-produced backfill written to S3.

Parquet is the recommended interchange format when the upstream is Spark (or any columnar engine): 5–20x smaller than gzipped JSON Lines, predicate-pushdown-friendly, and native to every modern distributed query engine. JSON Lines (pkg/source/snapshot/jsonl) is still the right choice for ad-hoc / hand-built dumps and database exports.

Pairs with the S3 prefix scanner (pkg/source/snapshot/parquet#S3Source) — that's the multi-file partition-aware Hive-style discoverer that most production callers want. This file is the single-file reader that decodes one Parquet object.

Compression

Snappy / gzip / zstd / lz4 are handled transparently by the Arrow Parquet library based on the per-column-chunk metadata. No caller-side compression handling is required.

Decoding

Parquet is columnar, but bootstrap.Run consumes one Record[T] at a time. This source materializes one row group of arrow.Records at a time and walks each row into T via a user-supplied Decoder[T]. The Decoder receives an arrow.Record + row index so it can pull only the columns it needs by name.

Package parquet — S3 prefix scanner with Hive-style partition discovery and predicate-pushdown.

Spark writes partitioned datasets as

s3://bucket/prefix/year=2026/month=05/day=08/hour=14/part-00000.parquet

where the partition columns are embedded in the key path (Hive convention). This source lists all `.parquet` objects under a prefix, parses the partition columns out of each key, optionally drops whole partitions via a caller-supplied predicate, and decodes the remaining objects through pkg/source/snapshot/parquet#Source.

Concurrency

Object fetches run in parallel up to MaxConcurrency (default 4). Decoded records still arrive on the single out channel in roughly-arrival order from the bounded worker pool — not strictly partition-sorted. Bootstrap callers should not depend on arrival order; downstream monoid combine is commutative.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// Reader is the Parquet object. parquet.ReaderAtSeeker is
	// satisfied by *os.File and by aws.ReadSeekCloser bodies.
	// The Source does not own its lifecycle; close it externally
	// (or use NewSourceClosing to delegate).
	Reader parquet.ReaderAtSeeker

	// Name is a stable identifier for logging / metrics. Typically the
	// S3 key or the file path.
	Name string

	// Decode converts an Arrow row into T. Required — no
	// default schema-introspection exists; callers know their schema.
	Decode Decoder[T]

	// EventID derives the dedup key per record. Optional; see EventIDFn.
	EventID EventIDFn[T]

	// EventTime extracts the upstream event time. Optional; default
	// is time.Now at decode.
	EventTime EventTimeFn[T]

	// OnDecodeError, when non-nil, is invoked for rows that fail to
	// decode. Default behavior: drop silently and continue. Wire to a
	// DLQ producer or logger to surface poison rows.
	OnDecodeError func(rowOrdinal int, err error)

	// BatchSize is the number of rows materialized per arrow.Record
	// pulled out of Parquet. Defaults to 4096. Lower for large rows
	// (less peak memory), higher for tiny rows (less per-batch
	// overhead).
	BatchSize int64

	// Allocator is the Arrow memory allocator. Defaults to
	// memory.DefaultAllocator. Override for tracked allocators in
	// tests or for arena-style reuse.
	Allocator memory.Allocator

	// HandoffToken, when non-nil, is what CaptureHandoff returns.
	// Parquet snapshots have no inherent live-source resume position;
	// the caller's deployment system is responsible for capturing one
	// before the Parquet file was generated (typically a Kinesis shard
	// timestamp or Kafka offset).
	HandoffToken snapshot.HandoffToken
}

Config configures a single-Parquet-file snapshot Source.

type Decoder

type Decoder[T any] func(rec arrow.Record, row int) (T, error)

Decoder converts a single Parquet row (column-array + row index) into T. Implementations should pull only the columns they need from the arrow.Record by name; cross-record allocations are the caller's responsibility (the Record is reused after the call returns when streaming via record batches).

type EventIDFn

type EventIDFn[T any] func(decoded T, rowOrdinal int) string

EventIDFn extracts a stable per-record dedup key from the decoded value. When unset, the source emits a synthetic "<reader-name>:<row-ordinal>" pair, unique within a single run but not stable across re-runs. For idempotent re-runs, return the natural primary key (entity_id + bucket).

type EventTimeFn

type EventTimeFn[T any] func(decoded T) time.Time

EventTimeFn extracts the upstream event time. When unset, the source uses processing time at decode (time.Now). For the canonical CountEvent schema, a typical impl returns time.UnixMilli(occurredAtUnixMs).

type Partition

type Partition struct {
	// Key is the full S3 object key.
	Key string

	// Values maps each `name=value` segment found in the key path
	// (relative to Config.Prefix) to its value.
	Values map[string]string
}

Partition is the set of Hive-style partition columns parsed from a single Parquet object key. Only columns present in the key path are populated; absent columns hold the zero string. Values are kept as strings — the caller's PartitionFilter does its own type conversion.

Multi-segment keys (e.g. `events/year=2026/month=05/day=08/hour=14/part.parquet`) yield {Key: "events/.../part.parquet", Values: {"year": "2026", "month": "05", "day": "08", "hour": "14"}}.

type PartitionFilter

type PartitionFilter func(p Partition) bool

PartitionFilter, when non-nil on Config, is invoked once per discovered object. Return false to skip the object entirely (no GetObject, no decode). Use this to prune whole partitions by time-range without listing them out of S3 (S3 ListObjectsV2 is already filtered by prefix, but Hive partitioning typically wants finer-grained pruning).

type S3Config

type S3Config[T any] struct {
	// Client is an SDK v2 S3 client. The Source does not own its
	// lifecycle.
	Client *awss3.Client

	// Bucket is the S3 bucket to scan.
	Bucket string

	// Prefix narrows the scan, e.g. "events/". Empty means the entire
	// bucket — usually undesirable. Hive-style partition segments
	// (`year=...`) under this prefix are auto-discovered.
	Prefix string

	// Decode is the per-row Arrow decoder. Required; see
	// parquet.Decoder.
	Decode Decoder[T]

	// EventID derives the dedup key per record. Receives the object
	// key + row ordinal within that object. Default:
	// "<key>:<row-ordinal>".
	EventID func(decoded T, key string, rowOrdinal int) string

	// EventTime extracts the upstream event time from a decoded row.
	// Optional; default is processing time at decode.
	EventTime EventTimeFn[T]

	// PartitionFilter, when non-nil, is invoked once per discovered
	// object. Return false to skip — no GetObject, no decode. Use
	// for time-range pruning under a static prefix.
	PartitionFilter PartitionFilter

	// KeyFilter, when non-nil, is a low-level pre-PartitionFilter
	// hook. Receives the raw key and must return true to keep it.
	// Use this to drop control objects like `_SUCCESS` markers or
	// `_committed_*` files that Spark writes alongside the data.
	// Defaults to skipping anything that doesn't end in `.parquet`.
	KeyFilter func(key string) bool

	// OnDecodeError is forwarded to the underlying Parquet source for
	// each object. Receives the object key + row ordinal + error.
	OnDecodeError func(key string, rowOrdinal int, err error)

	// MaxConcurrency caps the number of concurrent object reads.
	// Default 4. Capped at len(matched keys).
	MaxConcurrency int

	// BatchSize is forwarded to the per-file source as the Arrow
	// record-batch size. Defaults to 4096.
	BatchSize int64

	// HandoffToken is what CaptureHandoff returns. Same pattern as
	// the other snapshot sources: caller is responsible for capturing
	// it before the Parquet dataset was generated.
	HandoffToken snapshot.HandoffToken

	// OpenObject, when non-nil, overrides the default GetObject path.
	// Use for tests that want to inject deterministic file bodies
	// without a live S3 client. The returned bytes must be the
	// complete Parquet file (the Arrow library needs ReaderAt + seek,
	// so partial / streaming bodies aren't supported).
	OpenObject func(ctx context.Context, key string) ([]byte, error)
}

S3Config configures an S3-prefix Parquet snapshot Source.

type S3Source

type S3Source[T any] struct {
	// contains filtered or unexported fields
}

S3Source implements snapshot.Source[T] over an S3 prefix of Hive-partitioned Parquet objects.

func NewS3Source

func NewS3Source[T any](cfg S3Config[T]) (*S3Source[T], error)

NewS3Source constructs the S3Source. Validates required fields.

func (*S3Source[T]) CaptureHandoff

func (s *S3Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)

CaptureHandoff returns the configured handoff token, or nil.

func (*S3Source[T]) Close

func (s *S3Source[T]) Close() error

Close is a no-op; the underlying S3 client is owned by the caller.

func (*S3Source[T]) Name

func (s *S3Source[T]) Name() string

Name returns "parquet-s3:<bucket>/<prefix>".

func (*S3Source[T]) Resume

func (s *S3Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error

Resume restarts the scan from the beginning. At-least-once dedup at bootstrap.Run absorbs duplicate emissions.

func (*S3Source[T]) Scan

func (s *S3Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan lists the prefix, applies KeyFilter and PartitionFilter, and decodes each surviving Parquet object in parallel (bounded by MaxConcurrency). Per-object errors abort the scan; per-row decode errors fire OnDecodeError without aborting.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements snapshot.Source[T] over a single Parquet file.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs the Source. Validates required fields and applies defaults.

func NewSourceClosing

func NewSourceClosing[T any](cfg Config[T]) (*Source[T], error)

NewSourceClosing is the convenience variant for callers that want snapshot.Source.Close to also close the underlying Reader. The Reader must implement io.Closer; otherwise NewSource is the right choice.

func (*Source[T]) CaptureHandoff

func (s *Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)

CaptureHandoff returns the configured handoff token, or nil if none. See Config.HandoffToken.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close releases resources. If NewSourceClosing was used, also closes the underlying Reader. Idempotent.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "parquet:<configured-name>".

func (*Source[T]) Resume

func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error

Resume — Parquet sources don't inherently support mid-stream resumption; the file is read top-to-bottom. Re-Scan from the start; at-least-once dedup absorbs the duplicate emissions. Mid-stream resumption would require persisting (row-group-index, row-offset) and is left as future work.

func (*Source[T]) Scan

func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan reads the Parquet file row-group-by-row-group, decoding each row into T and emitting a source.Record. Returns nil on EOF; non-nil on a fatal reader error.

Per-row decode errors fire OnDecodeError but do NOT abort.

func (*Source[T]) Stats

func (s *Source[T]) Stats() Stats

Stats returns the live counters. Safe for concurrent reads.

type Stats

type Stats struct {
	RowsScanned int64
	RowsDecoded int64
}

Stats reports rows scanned vs successfully decoded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL