backfill

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

README

backfill-from-spark

This example shows how to seed a Murmur pipeline from a Spark-produced backfill written to S3.

Two interchange formats are supported. Pick the right one for the data size:

Format Source Best for
JSON Lines pkg/source/snapshot/s3 (gzip-aware) Hand-built dumps, DB exports, small backfills
Parquet pkg/source/snapshot/parquet#S3Source Spark-produced backfills; 5–20× smaller, columnar, partition-prunable

Both sources implement snapshot.Source[T] and plug directly into bootstrap.Run. The Parquet path is recommended whenever Spark is the producer — it's Spark's native output format and avoids a wasteful .write.json(...).gz round-trip.

Redshift / warehouse → Spark aggregation → S3 → bootstrap.Run → DDB Sum store
                                                       ▲
                                                       │
                                        live Kafka/Kinesis worker
                                        (takes over from HandoffToken)

Because the Murmur Sum monoid is associative, a pre-aggregated row with count=42 produces the same bucket value as forty-two streaming events with count=1. The pipeline definition is identical; only the source differs.

Canonical Parquet schema

Spark jobs targeting Murmur should write rows in this shape:

message CountEvent {
  required binary entity_id  (STRING);
  required int64  count;
  required int64  occurred_at_unix_ms;
  required int32  year;
  required int32  month;
  required int32  day;
  required int32  hour;
}

The year / month / day / hour columns are duplicated in two places:

  1. As columns inside every row (so the data is self-describing without needing the S3 key).
  2. As Hive-style partition segments in the object key, e.g. events/year=2026/month=05/day=08/hour=14/part-00000.parquet.

The duplication is what makes PartitionFilter predicate-pushdown viable: the S3Source can drop entire .parquet objects without reading them based on the partition values parsed from the key path.

Spark writer (PySpark)

(events
  .withColumn("year",  year("occurred_at"))
  .withColumn("month", month("occurred_at"))
  .withColumn("day",   dayofmonth("occurred_at"))
  .withColumn("hour",  hour("occurred_at"))
  .withColumn("occurred_at_unix_ms", (unix_timestamp("occurred_at") * 1000).cast("long"))
  .select("entity_id", "count", "occurred_at_unix_ms", "year", "month", "day", "hour")
  .write
  .partitionBy("year", "month", "day", "hour")
  .mode("overwrite")
  .parquet("s3://bucket/murmur/page_views/v2/"))

Spark defaults to Snappy compression — leave it. The Parquet snapshot source handles Snappy / gzip / zstd / lz4 transparently.

Go bootstrap (Parquet)

import (
    "context"

    "github.com/apache/arrow-go/v18/arrow"
    "github.com/apache/arrow-go/v18/arrow/array"
    awss3 "github.com/aws/aws-sdk-go-v2/service/s3"

    "github.com/gallowaysoftware/murmur/pkg/source/snapshot/parquet"
)

type CountEvent struct {
    EntityID         string
    Count            int64
    OccurredAtUnixMs int64
}

func decode(rec arrow.Record, row int) (CountEvent, error) {
    sc := rec.Schema()
    var e CountEvent
    e.EntityID         = rec.Column(sc.FieldIndices("entity_id")[0]).(*array.String).Value(row)
    e.Count            = rec.Column(sc.FieldIndices("count")[0]).(*array.Int64).Value(row)
    e.OccurredAtUnixMs = rec.Column(sc.FieldIndices("occurred_at_unix_ms")[0]).(*array.Int64).Value(row)
    return e, nil
}

func newSource(ctx context.Context, client *awss3.Client) (*parquet.S3Source[CountEvent], error) {
    return parquet.NewS3Source(parquet.S3Config[CountEvent]{
        Client:         client,
        Bucket:         "my-bucket",
        Prefix:         "murmur/page_views/v2/",
        Decode:         decode,
        MaxConcurrency: 8,

        // Optional: prune entire partitions. Spark may have written
        // historical hours we don't want to replay.
        PartitionFilter: func(p parquet.Partition) bool {
            return p.Values["year"] == "2026" && p.Values["month"] == "05"
        },

        // Optional: use the natural key for dedup so re-runs are
        // idempotent under at-least-once.
        EventID: func(e CountEvent, _ string, _ int) string {
            return e.EntityID + "|" + strconv.FormatInt(e.OccurredAtUnixMs, 10)
        },
    })
}

Wire the returned *parquet.S3Source into bootstrap.Run exactly as you would the JSON-Lines variant — the Source contract is identical.

JSON Lines variant

For ad-hoc backfills or sources Spark isn't the producer for, pkg/source/snapshot/s3 is the right choice. The runnable binary in cmd/backfill/ exercises this path end-to-end.

The schema mirrors the Parquet shape but JSON-encoded one event per line:

Field Type Required Purpose
entity_id string yes The keying dimension. Maps to the pipeline's Key/KeyByMany output.
count int64 yes The pre-aggregated count for this (entity_id, bucket).
occurred_at string (RFC3339 UTC) yes Bucket-mid timestamp; the source emits Record.EventTime from this so windowed counters bucket correctly.
year / month / day / hour int informational Partition components, redundant with occurred_at but useful for ad hoc SQL over the archive.

File layout — Hive-style partitioned, gzipped JSON-Lines:

s3://my-bucket/counters/<counter-name>/year=2026/month=05/day=08/hour=14/part-00000.jsonl.gz
s3://my-bucket/counters/<counter-name>/year=2026/month=05/day=08/hour=15/part-00000.jsonl.gz
…

S3 returns keys in lexicographic order, so the Hive partitioning above scans chronologically. For backfill that doesn't matter — Sum is commutative — but it makes the operator-side log line readable when watching a 40-day backfill walk forward.

Gzip is auto-detected by .gz suffix. For Snappy / Zstd, supply a custom OpenObject hook (see the s3 source's OpenObject field).

Usage
import (
    "github.com/gallowaysoftware/murmur/examples/backfill-from-spark"
    "github.com/gallowaysoftware/murmur/pkg/exec/bootstrap"
    "github.com/gallowaysoftware/murmur/pkg/source/snapshot/s3"
)

src, err := s3.NewSource(s3.Config[backfill.CountEvent]{
    Client:      s3Client,
    Bucket:      "my-bucket",
    Prefix:      "counters/bot_interaction/",
    Concurrency: 8,                      // parallel fetches
    Decode:      backfill.DecodeJSONL,   // canonical decoder
    EventID:     backfill.StableEventID, // (entity, hour) hash for dedup
})
if err != nil { /* ... */ }

token, err := bootstrap.Run(ctx, pipe, src,
    bootstrap.WithDedup(deduper), // idempotent re-runs
)

Run the included binary:

go run ./examples/backfill-from-spark/cmd/backfill \
    -bucket=my-bucket \
    -prefix=counters/bot_interaction/ \
    -table=bot_interaction_counts \
    -name=bot_interaction \
    -concurrency=8 \
    -retention=720h        # 30 days

The pipeline shape is intentionally minimal — Key + Value + Sum over a Daily window — to keep the example focused on the source. Real counters add KeyByMany for scope-key fan-out (lifetime:<id>, hourly:<id>:<bucket>, trailing_7d:<id>), a cache layer, and a state.Deduper.

Bounded concurrency

s3.Config.Concurrency caps the number of objects fetched and decoded in parallel. Default 1 (sequential, preserves S3 lexicographic order). Bump to 4–16 when:

  • The prefix has many small objects (each GetObject round-trip is a fixed cost).
  • The DDB-side write throughput exceeds what a single goroutine can drive.

With Concurrency > 1 the record emission order is non-deterministic across keys; within a single key, lines are emitted in file order. The dedup contract handles the rest: EventID is derived from the row payload, not the file position, so re-ordering across keys doesn't change the dedup decision.

Files
File Purpose
event.go CountEvent struct, DecodeJSONL, and the StableEventID extractor. Wired into s3.Config.Decode / s3.Config.EventID.
cmd/backfill/main.go Minimal runnable binary: flags → pipeline → bootstrap.Run. Loads AWS config from the ambient environment.
What's NOT in the JSONL example
  • A Spark job. Lives in the consumer repository (each counter has its own aggregation SQL). This example documents the schema the Spark job must produce.
  • A state.Deduper. Wire bootstrap.WithDedup(...) in main.go when you need re-run idempotence; left out here to keep the example single-step.
  • Custom partition pruning. s3.Config.KeyFilter is the seam — wire it to skip partitions outside the target backfill window or to exclude _SUCCESS markers.
  • Handoff to live mode. bootstrap.Run returns a HandoffToken (nil for S3, which has no live-mode resume position); a production wrapper persists the token alongside its progress and starts the live Kafka/Kinesis worker pointed at the same DDB table.

Picking between the two

  • Spark is the producer → Parquet (5–20× smaller, no extra job to serialize as JSON).
  • DB export, Firehose archive, hand-written dump → JSON Lines (zero schema setup, every tool can read it).
  • Mixed → wire both, run bootstrap twice with a shared Deduper.

Documentation

Overview

Package backfill defines the canonical S3 JSON-Lines schema that upstream Spark jobs emit for Murmur backfill, plus a decoder and EventID extractor wired to make `bootstrap.Run` idempotent across re-runs.

Schema (one JSON object per line):

{
  "entity_id":   "<string>",        // the keying dimension
  "count":       <int64>,           // pre-aggregated count for the bucket
  "occurred_at": "<RFC3339 UTC>",   // bucket-mid timestamp (see README)
  "year":        <int>,             // partition components (informational)
  "month":       <int>,
  "day":         <int>,
  "hour":        <int>
}

File layout: gzipped JSON-Lines (`*.jsonl.gz`) under an S3 prefix partitioned Hive-style, e.g.:

s3://my-bucket/counters/<counter-name>/year=2026/month=05/day=08/hour=14/part-0000.jsonl.gz

See ./README.md for the full operator-facing description.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StableEventID

func StableEventID(e CountEvent, _ string, _ int) string

StableEventID hashes (entity_id, hour-bucket) into a deterministic dedup key. Use it as `s3.Config.EventID` so re-runs of the same Spark output land idempotently against `bootstrap.Run`'s deduper:

pipeline definition + Spark output + bootstrap.Run with Dedup →
    re-running the bootstrap is a no-op

Two rows for the same (entity, hour) collapse to one dedup entry, which is what we want when the Spark job re-emits a partition.

Types

type CountEvent

type CountEvent struct {
	EntityID   string    `json:"entity_id"`
	Count      int64     `json:"count"`
	OccurredAt time.Time `json:"occurred_at"`

	// Year/Month/Day/Hour are informational — the partition components
	// the upstream Spark job wrote the row under. The pipeline derives
	// its bucket from OccurredAt; these fields are kept on the struct
	// so they survive a round-trip through the decoder and are
	// available for logs or operator-facing tooling.
	Year  int `json:"year"`
	Month int `json:"month"`
	Day   int `json:"day"`
	Hour  int `json:"hour"`
}

CountEvent is the canonical event shape backfill rows decode into. It matches what a streaming source would produce when a Spark job pre-aggregates raw events into hourly bucket summaries: one event stands in for many, with Count carrying the pre-aggregated delta. Because the downstream Sum monoid is associative, "+1 forty-two times" and "+42 once" produce the same bucket value.

func DecodeJSONL

func DecodeJSONL(line []byte) (CountEvent, error)

DecodeJSONL decodes one JSON-Lines record. Wire it as the `jsonl.Config.Decode` (or `s3.Config.Decode`) when the rows on disk match the canonical schema verbatim.

Directories

Path Synopsis
cmd
backfill command
Command backfill drives a Murmur counter pipeline from Spark- aggregated S3 JSON-Lines into a DynamoDB Sum-monoid store.
Command backfill drives a Murmur counter pipeline from Spark- aggregated S3 JSON-Lines into a DynamoDB Sum-monoid store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL