Documentation
¶
Overview ¶
Package extractor is a parallel tile-based ETL framework: split a large input into units, dispatch across a worker pool, track progress, and skip already-completed units on resume.
Three composable layers:
- Worker[T,R] — bounded-concurrency worker pool with cancellation and per-error recovery policy.
- Progress — atomic counters + JSON heartbeat snapshots.
- Checkpoint — completed-unit ledger (file or DB) for resumption.
A Pipeline[T,R] composes all three; each layer is independently usable.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrAborted = errors.New("extractor: worker aborted by OnError policy") ErrProcessRequired = errors.New("extractor: WorkerConfig.Process is required") )
Errors surfaced by Worker.
var ErrUnitIDRequired = errors.New("extractor: PipelineConfig.UnitID is required when Checkpoint is set")
ErrUnitIDRequired is returned by NewPipeline when a Checkpoint is configured but UnitID is nil.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint interface {
Done(unit string) (bool, error)
Mark(unit string) error
Reset(unit string) error
All() ([]string, error)
}
Checkpoint is the persistent ledger of completed units. Backends: MemoryCheckpoint (tests), FileCheckpoint (JSON sidecar), DBCheckpoint (planned). The interface stays minimal so any KV-shaped backend fits.
func FileCheckpoint ¶
func FileCheckpoint(path string) (Checkpoint, error)
FileCheckpoint loads (or creates) a JSON ledger at path.
type MemoryCheckpoint ¶
type MemoryCheckpoint struct {
// contains filtered or unexported fields
}
MemoryCheckpoint is an in-process Checkpoint suited for tests.
func NewMemoryCheckpoint ¶
func NewMemoryCheckpoint() *MemoryCheckpoint
NewMemoryCheckpoint constructs an empty MemoryCheckpoint.
func (*MemoryCheckpoint) All ¶
func (m *MemoryCheckpoint) All() ([]string, error)
All implements Checkpoint.
func (*MemoryCheckpoint) Done ¶
func (m *MemoryCheckpoint) Done(unit string) (bool, error)
Done implements Checkpoint.
func (*MemoryCheckpoint) Mark ¶
func (m *MemoryCheckpoint) Mark(unit string) error
Mark implements Checkpoint.
func (*MemoryCheckpoint) Reset ¶
func (m *MemoryCheckpoint) Reset(unit string) error
Reset implements Checkpoint.
type Pipeline ¶
type Pipeline[T, R any] struct { // contains filtered or unexported fields }
Pipeline composes worker + progress + checkpoint into one entry.
func NewPipeline ¶
func NewPipeline[T, R any](cfg PipelineConfig[T, R]) (*Pipeline[T, R], error)
NewPipeline constructs a Pipeline.
type PipelineConfig ¶
type PipelineConfig[T, R any] struct { Worker WorkerConfig[T, R] Progress *Progress Checkpoint Checkpoint // UnitID derives the checkpoint key for a unit. Required when // Checkpoint is non-nil. UnitID func(T) string }
PipelineConfig wires a worker, optional progress meter, optional checkpoint, and the function that derives a stable unit id used for checkpointing.
type Progress ¶
type Progress struct {
// contains filtered or unexported fields
}
Progress is a goroutine-safe counter meter for long-running batches.
func NewProgress ¶
NewProgress constructs a Progress meter named `name` with `total` expected units.
func (*Progress) IncBytes ¶
IncBytes adds `by` to the byte counter (used by content / extraction pipelines that care about throughput).
type Snapshot ¶
type Snapshot struct {
Name string `json:"name"`
Total int64 `json:"total"`
Done int64 `json:"done"`
Errors int64 `json:"errors"`
Bytes int64 `json:"bytes"`
Started time.Time `json:"started"`
Elapsed time.Duration `json:"elapsed_ms"`
ETA time.Duration `json:"eta_ms"`
Pct float32 `json:"pct"`
}
Snapshot is a point-in-time view of a Progress meter.
type Worker ¶
type Worker[T, R any] struct { // contains filtered or unexported fields }
Worker is a bounded-concurrency processor for a slice of units.
type WorkerConfig ¶
type WorkerConfig[T, R any] struct { // Concurrency is the max number of in-flight units. Defaults to 1. Concurrency int // Process is the per-unit work. Required. Process func(ctx context.Context, unit T) (R, error) // OnResult fires after a successful Process; receives the unit // and its result. Optional. OnResult func(unit T, result R) // OnError fires when Process returns an error and decides how to // recover. Defaults to Continue (log + carry on) when nil. OnError func(unit T, err error) Recovery }
WorkerConfig configures a Worker.