s3

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package s3 provides a replay.Driver that reads JSON-Lines event archives from S3 (or any S3-compatible store like MinIO) and feeds the records into the pipeline.

This is the standard "kappa replay" path for Kinesis-backed pipelines whose live retention is shorter than the desired backfill window. Events are typically archived to S3 by Kinesis Data Firehose with date-partitioning (e.g. year=/month=/day=/), or by Kafka Connect's S3 sink for Kafka topics.

Phase 1 supports JSON-Lines (one JSON object per line). Parquet support is a Phase 2 addition gated on real demand.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	Client *s3.Client
	Bucket string
	// Prefix narrows the object scan (e.g. "events/year=2026/month=05/"). Empty means
	// the entire bucket.
	Prefix string
	// Decode converts a single JSON-Lines line to T.
	Decode Decoder[T]
	// OnDecodeError, if non-nil, is called for every line whose Decode returned an
	// error. Default behavior is to drop silently.
	OnDecodeError func(key string, lineNum int, raw []byte, err error)
}

Config configures an S3 replay driver.

type Decoder

type Decoder[T any] func([]byte) (T, error)

Decoder converts a single JSON line to a typed Record value.

type Driver

type Driver[T any] struct {
	// contains filtered or unexported fields
}

Driver implements replay.Driver for S3-archived JSON-Lines events.

func NewDriver

func NewDriver[T any](cfg Config[T]) (*Driver[T], error)

NewDriver returns a Driver. The client is owned by the caller.

func (*Driver[T]) Close

func (d *Driver[T]) Close() error

Close is a no-op; the underlying client is owned by the caller.

func (*Driver[T]) Name

func (d *Driver[T]) Name() string

Name returns "s3:<bucket>/<prefix>".

func (*Driver[T]) Replay

func (d *Driver[T]) Replay(ctx context.Context, out chan<- source.Record[T]) error

Replay enumerates all objects under (Bucket, Prefix), reads each as JSON-Lines, and emits one source.Record per line. Returns when all objects have been consumed or ctx is canceled. Object enumeration is paged via ListObjectsV2.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL