extractor

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package extractor is a parallel tile-based ETL framework: split a large input into units, dispatch across a worker pool, track progress, and skip already-completed units on resume.

Three composable layers:

  • Worker[T,R] — bounded-concurrency worker pool with cancellation and per-error recovery policy.
  • Progress — atomic counters + JSON heartbeat snapshots.
  • Checkpoint — completed-unit ledger (file or DB) for resumption.

A Pipeline[T,R] composes all three; each layer is independently usable.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAborted         = errors.New("extractor: worker aborted by OnError policy")
	ErrProcessRequired = errors.New("extractor: WorkerConfig.Process is required")
)

Errors surfaced by Worker.

View Source
var ErrUnitIDRequired = errors.New("extractor: PipelineConfig.UnitID is required when Checkpoint is set")

ErrUnitIDRequired is returned by NewPipeline when a Checkpoint is configured but UnitID is nil.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint interface {
	Done(unit string) (bool, error)
	Mark(unit string) error
	Reset(unit string) error
	All() ([]string, error)
}

Checkpoint is the persistent ledger of completed units. Backends: MemoryCheckpoint (tests), FileCheckpoint (JSON sidecar), DBCheckpoint (planned). The interface stays minimal so any KV-shaped backend fits.

func FileCheckpoint

func FileCheckpoint(path string) (Checkpoint, error)

FileCheckpoint loads (or creates) a JSON ledger at path.

type MemoryCheckpoint

type MemoryCheckpoint struct {
	// contains filtered or unexported fields
}

MemoryCheckpoint is an in-process Checkpoint suited for tests.

func NewMemoryCheckpoint

func NewMemoryCheckpoint() *MemoryCheckpoint

NewMemoryCheckpoint constructs an empty MemoryCheckpoint.

func (*MemoryCheckpoint) All

func (m *MemoryCheckpoint) All() ([]string, error)

All implements Checkpoint.

func (*MemoryCheckpoint) Done

func (m *MemoryCheckpoint) Done(unit string) (bool, error)

Done implements Checkpoint.

func (*MemoryCheckpoint) Mark

func (m *MemoryCheckpoint) Mark(unit string) error

Mark implements Checkpoint.

func (*MemoryCheckpoint) Reset

func (m *MemoryCheckpoint) Reset(unit string) error

Reset implements Checkpoint.

type Pipeline

type Pipeline[T, R any] struct {
	// contains filtered or unexported fields
}

Pipeline composes worker + progress + checkpoint into one entry.

func NewPipeline

func NewPipeline[T, R any](cfg PipelineConfig[T, R]) (*Pipeline[T, R], error)

NewPipeline constructs a Pipeline.

func (*Pipeline[T, R]) Run

func (p *Pipeline[T, R]) Run(ctx context.Context, units []T) error

Run processes every unit, skipping ones the Checkpoint already records as done. Progress is updated per successful unit.

type PipelineConfig

type PipelineConfig[T, R any] struct {
	Worker     WorkerConfig[T, R]
	Progress   *Progress
	Checkpoint Checkpoint
	// UnitID derives the checkpoint key for a unit. Required when
	// Checkpoint is non-nil.
	UnitID func(T) string
}

PipelineConfig wires a worker, optional progress meter, optional checkpoint, and the function that derives a stable unit id used for checkpointing.

type Progress

type Progress struct {
	// contains filtered or unexported fields
}

Progress is a goroutine-safe counter meter for long-running batches.

func NewProgress

func NewProgress(name string, total int64) *Progress

NewProgress constructs a Progress meter named `name` with `total` expected units.

func (*Progress) Error

func (p *Progress) Error()

Error increments the error counter.

func (*Progress) Inc

func (p *Progress) Inc(by int64)

Inc adds `by` to the done count.

func (*Progress) IncBytes

func (p *Progress) IncBytes(by int64)

IncBytes adds `by` to the byte counter (used by content / extraction pipelines that care about throughput).

func (*Progress) Snapshot

func (p *Progress) Snapshot() Snapshot

Snapshot captures the current state.

func (*Progress) Stream

func (p *Progress) Stream(ctx context.Context, every time.Duration, out io.Writer) error

Stream writes a JSON snapshot every `every` to `out` until ctx is cancelled. Suited for CI heartbeats.

type Recovery

type Recovery uint8

Recovery tells the worker how to handle a per-unit error.

const (
	// Continue keeps processing the unit (the error is logged via
	// OnError but not retried — the caller decides).
	Continue Recovery = iota
	// Skip drops the unit and moves on.
	Skip
	// Abort cancels the worker pool with the unit's error.
	Abort
)

type Snapshot

type Snapshot struct {
	Name    string        `json:"name"`
	Total   int64         `json:"total"`
	Done    int64         `json:"done"`
	Errors  int64         `json:"errors"`
	Bytes   int64         `json:"bytes"`
	Started time.Time     `json:"started"`
	Elapsed time.Duration `json:"elapsed_ms"`
	ETA     time.Duration `json:"eta_ms"`
	Pct     float32       `json:"pct"`
}

Snapshot is a point-in-time view of a Progress meter.

type Worker

type Worker[T, R any] struct {
	// contains filtered or unexported fields
}

Worker is a bounded-concurrency processor for a slice of units.

func NewWorker

func NewWorker[T, R any](cfg WorkerConfig[T, R]) (*Worker[T, R], error)

NewWorker constructs a Worker.

func (*Worker[T, R]) Run

func (w *Worker[T, R]) Run(ctx context.Context, units []T) error

Run processes every unit in `units` concurrently up to Concurrency. Returns ctx.Err() on cancellation, or the first error wrapped in ErrAborted if OnError requested Abort.

type WorkerConfig

type WorkerConfig[T, R any] struct {
	// Concurrency is the max number of in-flight units. Defaults to 1.
	Concurrency int
	// Process is the per-unit work. Required.
	Process func(ctx context.Context, unit T) (R, error)
	// OnResult fires after a successful Process; receives the unit
	// and its result. Optional.
	OnResult func(unit T, result R)
	// OnError fires when Process returns an error and decides how to
	// recover. Defaults to Continue (log + carry on) when nil.
	OnError func(unit T, err error) Recovery
}

WorkerConfig configures a Worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL