jsonl

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package jsonl provides a snapshot.Source that reads JSON Lines from any io.Reader — local files, S3 objects, gzip streams, etc. The canonical use case: bootstrap a Murmur pipeline from an S3-archived DDB export, a daily database dump, a Firehose archive, or any other "one JSON object per line" data set.

Pairs cleanly with pkg/replay/s3 — that's the LIVE-mode replay counterpart that drives the same JSON Lines through replay.Run. This package is the BOOTSTRAP-mode counterpart that drives them through bootstrap.Run with handoff-token capture for the bootstrap → live transition.

Common shapes the caller wires the Reader for:

  • Local file (development): f, _ := os.Open("orders.jsonl") defer f.Close() reader := f

  • S3 object: resp, _ := s3client.GetObject(ctx, &s3.GetObjectInput{...}) defer resp.Body.Close() reader := resp.Body

  • Gzipped S3 object: resp, _ := s3client.GetObject(...) gz, _ := gzip.NewReader(resp.Body) defer gz.Close() reader := gz

For multi-object S3 prefixes (a partitioned export — many .jsonl files in one prefix), wrap multiple readers with io.MultiReader, or run bootstrap.Run multiple times with different sources and a shared Deduper.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	// Reader is the line source. The Source consumes it sequentially;
	// the caller manages its lifecycle (typically defer Close after
	// bootstrap.Run returns).
	Reader io.Reader

	// Name is a stable identifier for logging / metrics. Typically the
	// S3 key, the file path, or "stdin".
	Name string

	// Decode converts a JSON line to T. Defaults to DefaultDecoder[T].
	Decode Decoder[T]

	// EventID derives the dedup key per record. Optional; see EventIDFn.
	EventID EventIDFn[T]

	// EventTime, when non-nil, derives the per-record EventTime used
	// for window bucket assignment. Defaults to time.Now() when unset;
	// backfill of windowed counters must wire this to the record's
	// source-of-truth timestamp (e.g., the bucket-mid `occurred_at`
	// field) or every row will land in the same bucket.
	EventTime EventTimeFn[T]

	// OnDecodeError, when non-nil, is invoked for lines that fail to
	// decode. Default behavior: drop silently and continue. Wire to a
	// DLQ producer or logger to surface poison lines.
	OnDecodeError func(line []byte, lineNum int, err error)

	// MaxLineSize caps the bufio.Scanner buffer per line. Defaults to
	// 1 MB. Lines longer than this are reported via OnDecodeError and
	// skipped.
	MaxLineSize int

	// HandoffToken, when non-nil, is what CaptureHandoff returns. The
	// JSON-Lines source has no inherent live-source resume position —
	// the caller is responsible for capturing one externally
	// (typically a Kinesis shard timestamp or Kafka offset taken
	// before the snapshot was generated) and passing it through here.
	HandoffToken snapshot.HandoffToken
}

Config configures a JSON-Lines snapshot Source.

type Decoder

type Decoder[T any] func(line []byte) (T, error)

Decoder converts a single JSON-Lines line to T. The default is json.Unmarshal into a typed struct via DefaultDecoder; override when the source format isn't standard JSON (e.g., DDB export's wrapped-AV shape needs custom decoding to flatten attribute values).

func DefaultDecoder

func DefaultDecoder[T any]() Decoder[T]

DefaultDecoder returns a Decoder that does json.Unmarshal into T.

type EventIDFn

type EventIDFn[T any] func(decoded T, lineNum int) string

EventIDFn extracts a stable per-record dedup key from the decoded value. Default — when unset, the source emits a synthetic "<reader-name>:<line-number>" pair, which is unique within a single run but NOT across re-runs (line numbers can shift if upstream re-exports). For idempotent re-runs, return the natural primary key (a Mongo `_id`, a DDB partition key).

type EventTimeFn

type EventTimeFn[T any] func(decoded T) time.Time

EventTimeFn extracts the upstream EventTime from a decoded record. When set, the returned time is used as Record.EventTime; the bootstrap runtime uses it for window bucket assignment. Default — when unset — the source emits time.Now(), which is correct for non-windowed counters but wrong for backfill of windowed counters (every row would land in the same bucket). Pre-aggregated Spark rows must wire this so each row lands in its source-of-truth hour.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements snapshot.Source[T] over a single io.Reader of JSON Lines.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs the Source. Validates required fields and applies defaults.

func NewSourceClosing

func NewSourceClosing[T any](cfg Config[T]) (*Source[T], error)

NewSourceClosing is the convenience variant for sources that want the snapshot.Source.Close to also close the underlying reader. The Reader must implement io.Closer; otherwise NewSource is the right choice.

func (*Source[T]) CaptureHandoff

func (s *Source[T]) CaptureHandoff(_ context.Context) (snapshot.HandoffToken, error)

CaptureHandoff returns the configured handoff token, or nil if none. The JSON-Lines source has no inherent live-source resume position — the caller's deployment system is responsible for capturing one before the snapshot was generated and passing it through Config.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close releases resources. If NewSourceClosing was used, also closes the underlying reader. Idempotent.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "jsonl:<configured-name>".

func (*Source[T]) Resume

func (s *Source[T]) Resume(ctx context.Context, _ []byte, out chan<- source.Record[T]) error

Resume — the JSON-Lines source doesn't inherently support mid-stream resumption (Reader is one-shot). Currently a no-op that re-Scans from the start; at-least-once dedup handles the duplicate emissions. Production callers wanting mid-stream resume should either: persist a per-line offset and seek the underlying io.Reader (file source), or restart from the next archive object (S3 source).

func (*Source[T]) Scan

func (s *Source[T]) Scan(ctx context.Context, out chan<- source.Record[T]) error

Scan consumes the configured Reader line-by-line, decoding each into T and emitting a source.Record. Returns nil when the reader returns io.EOF; non-nil only on a fatal scanner error (e.g., line larger than MaxLineSize that cannot be skipped).

func (*Source[T]) Stats

func (s *Source[T]) Stats() Stats

Stats returns the live counters. Safe for concurrent reads.

type Stats

type Stats struct {
	LinesScanned int64
	LinesDecoded int64
}

Stats reports the lines scanned vs successfully decoded — useful for surfacing "what fraction of the input was actually usable" after a large bootstrap.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL