sazanami

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2025 License: MIT Imports: 7 Imported by: 0

README

Sazanami 🌊

Go Reference Go Report Card CI

Overview

Sazanami (“ripples” in Japanese) is a minimal pipeline library for Go. It lets you compose concurrent stages with type-safe handlers, buffering, and error policies—ideal for ETL flows, streaming jobs, or any workload that benefits from backpressure-aware pipelines. The design favors explicit, readable code: generics for safety, zero external dependencies, and a fluent builder that feels at home in Go.

⚠️ Status: Early-stage (v0.1.x) The core pipeline features are stable, but APIs may change before v1.0.0. Feedback and contributions are welcome!

Installation

go get github.com/UUGTech/sazanami

Quickstart

src := make(chan int, 4)
go func() {
	defer close(src)
	for _, v := range []int{1, 2, 3, 4} {
		src <- v
	}
}()

ctx := context.Background()

p := sazanami.From(src)
p = sazanami.AddStage(p, "double",
	sazanami.Map(func(_ context.Context, v int) (int, error) {
		return v * 2, nil
	}),
)
p = sazanami.AddStage(p, "filter-multiples",
	sazanami.Filter(func(_ context.Context, v int) (bool, error) {
		return v%4 == 0, nil
	}),
)

for v := range p.Run(ctx) {
	fmt.Println(v) // 4, 8
}

Features

  • Fluent builder with typed handlers (From, AddStage, Run)
  • Concurrency controls: per-stage Parallel and Buffer
  • Error handling via Drop, Retry, Collect/Drain (channel or handler variants)
  • Hooks for stage/item lifecycle metrics; testkit.StageRecorder for assertions
  • Built-in stage helpers: Map, Filter, Reduce, ForEach, Batch, plus lightweight testkit
  • Fan-out / Fan-in helpers (FanOutBy, FanIn)
  • Zero external dependencies; standard library only

Built-in Handlers

Sazanami ships with adapters for the most common stage shapes so you rarely touch raw channels:

  • Map(func(ctx, in) (out, error)) – transform each item (returning an error drops into the policy path)
  • Filter(func(ctx, in) (keep bool, error)) – keep a subset without manual continue
  • Reduce(seed, func(ctx, acc, in) (acc, error)) – accumulate until upstream closes
  • ForEach(func(ctx, in) error) – side effects while forwarding the original item
  • Batch(size, dur) – emit slices when capacity or deadline hits first

Each helper returns a standard Handler, so you can mix them with custom stages freely.

Need a long-lived stage that ranges over the channel? Mark it with sazanami.WithStreaming() when you add the stage—the handler will receive the real input/output streams (parallelism is forced to 1, and retries operate at the stage level).

Example: ETL Pipeline

ctx := context.Background()
var storeFailures int32 = 1

p := sazanami.From(lines)

p = sazanami.AddStage(p, "parse",  parseLogs,
    sazanami.WithTags("ingest","json"),
    sazanami.WithParallel(2),
)

p = sazanami.AddStage(p, "filter",
    sazanami.Filter(func(_ context.Context, e entry) (bool, error) {
        return e.Level == "warn" || e.Level == "error", nil
    }),
    sazanami.WithTags("filter"),
    sazanami.WithBuffer(4),
)

p = sazanami.AddStage(p, "batch",  sazanami.Batch[entry](2, time.Second),
    sazanami.WithTags("batch"),
    sazanami.WithAttr("size","2"),
    sazanami.WithBuffer(1),
)

p = sazanami.AddStage(p, "store",  storeBatches(&storeFailures),
    sazanami.WithTags("sink"),
    sazanami.WithErrorPolicy(sazanami.Retry(2, func(i int) time.Duration {
        if i < 0 {
            i = 0
        }
        return time.Duration(1<<i) * 200 * time.Millisecond
    })),
    sazanami.WithTimeout(500*time.Millisecond),
    sazanami.WithBuffer(1),
)

out := p.Run(ctx)

Philosophy

Keep the surface area small, the behavior predictable, and the code idiomatic. Sazanami embraces Go’s bias toward explicit composition, relies only on the standard library, and aims to stay approachable for both quick scripts and production services.

License

MIT

PRs welcome. See CONTRIBUTING.md for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDrained = errors.New("sazanami: item drained by policy")

Functions

func Abort

func Abort(err error) error

Abort marks an error as fatal for the stage, bypassing item-level policies.

func FanIn

func FanIn[T any](ctx context.Context, inputs ...<-chan T) <-chan T

FanIn merges multiple channels into a single output channel.

func FanOutBy

func FanOutBy[T any](src <-chan T, selector func(T) string, branches ...string) map[string]*Builder[T, T]

FanOutBy routes each item from src to a named branch based on selector. The selector returns the branch name for the given item.

Types

type Backoff

type Backoff func(attempt int) time.Duration

Backoff computes the delay before the next retry attempt.

func ConstantBackoff

func ConstantBackoff(d time.Duration) Backoff

ConstantBackoff waits for a fixed duration between retries.

func ExponentialBackoff

func ExponentialBackoff(base time.Duration) Backoff

ExponentialBackoff doubles the delay each attempt starting from base.

type Builder

type Builder[I any, O any] struct {
	// contains filtered or unexported fields
}

Builder wires stages together in a fluent fashion.

func AddStage

func AddStage[I any, O any, N any](b *Builder[I, O], name string, h Handler[O, N], opts ...StageOption) *Builder[I, N]

AddStage appends a stage and applies StageOptions to its configuration. If name is empty, an automatic stage identifier is assigned.

func From

func From[I any](src <-chan I) *Builder[I, I]

From bootstraps a builder from a source channel.

func (*Builder[I, O]) Buffer

func (b *Builder[I, O]) Buffer(size int) *Builder[I, O]

Buffer configures the buffer size between this stage and the next.

func (*Builder[I, O]) OnError

func (b *Builder[I, O]) OnError(p Policy) *Builder[I, O]

OnError sets the error policy for the most recent stage.

func (*Builder[I, O]) Parallel

func (b *Builder[I, O]) Parallel(n int) *Builder[I, O]

Parallel configures worker concurrency for the most recent stage.

func (*Builder[I, O]) Run

func (b *Builder[I, O]) Run(ctx context.Context) <-chan O

Run executes the pipeline and returns a channel streaming the terminal stage outputs.

func (*Builder[I, O]) WithAttr

func (b *Builder[I, O]) WithAttr(k, v string) *Builder[I, O]

WithAttr attaches a single attribute key/value pair to the most recent stage.

func (*Builder[I, O]) WithHooks

func (b *Builder[I, O]) WithHooks(h Hooks) *Builder[I, O]

WithHooks installs hooks that observe pipeline execution.

func (*Builder[I, O]) WithTags

func (b *Builder[I, O]) WithTags(tags ...string) *Builder[I, O]

WithTags attaches tags to the most recent stage.

type Failure

type Failure struct {
	Stage    StageInfo
	Item     any
	Err      error
	Attempt  int
	ItemMeta ItemInfo
}

Failure describes a handler failure for a specific item.

type Handler

type Handler[I any, O any] func(ctx context.Context, in <-chan I, out chan<- O) error

Handler represents a stage function that consumes values from in and emits to out. The handler should return when the input channel is closed or the context is done.

func Batch

func Batch[T any](size int, dur time.Duration) Handler[T, []T]

Batch groups items into slices of up to size or until dur elapses, whichever comes first.

func Filter

func Filter[T any](fn func(context.Context, T) (bool, error)) Handler[T, T]

Filter keeps only the items for which fn returns true.

func ForEach

func ForEach[T any](fn func(context.Context, T) error) Handler[T, T]

ForEach runs fn for every item and forwards the original item downstream.

func Map

func Map[I any, O any](fn func(context.Context, I) (O, error)) Handler[I, O]

Map wraps a simple transform function into a streaming handler. The provided fn is invoked for every item and its result is forwarded downstream.

func Reduce

func Reduce[I any, O any](seed func() O, fn func(context.Context, O, I) (O, error)) Handler[I, O]

Reduce accumulates items using fn and emits the final accumulator once the input closes.

type Hooks

type Hooks struct {
	StageStart    func(StageInfo)
	StageComplete func(StageInfo)
	StageError    func(StageInfo, error)

	ItemStart    func(StageInfo, ItemInfo)
	ItemComplete func(StageInfo, ItemInfo)
	ItemError    func(StageInfo, ItemInfo, error)
}

Hooks captures lifecycle callbacks for observability.

type ItemInfo

type ItemInfo struct {
	Sequence uint64
	Worker   int
	Attempt  int
}

ItemInfo describes an item processed within a stage.

type Policy

type Policy interface {
	Decide(ctx context.Context, failure Failure) Result
}

Policy decides how to react when a handler reports a failure.

func Chain

func Chain(policies ...Policy) Policy

Chain evaluates policies in order until one returns a definitive action.

func CollectFailures

func CollectFailures(ch chan<- Failure) Policy

CollectFailures routes failed items to the provided channel while continuing.

func CollectFailuresFunc

func CollectFailuresFunc(fn func(Failure)) Policy

CollectFailuresFunc invokes the provided handler for each failure while continuing.

func CollectFailuresFuncAs

func CollectFailuresFuncAs[T any](fn func(T)) Policy

CollectFailuresFuncAs invokes the handler when the failure item matches the desired type.

func DrainFailures

func DrainFailures(ch chan<- Failure) Policy

DrainFailures routes the failing item and all remaining input to the channel and stops the stage.

func DrainFailuresFunc

func DrainFailuresFunc(fn func(Failure)) Policy

DrainFailuresFunc invokes the handler with the failing item and drains the remaining input.

func DrainFailuresFuncAs

func DrainFailuresFuncAs[T any](fn func(T)) Policy

DrainFailuresFuncAs invokes the handler when the failure item matches the desired type and drains the remaining input.

func Drop

func Drop() Policy

Drop ignores the failing item and continues processing.

func Retry

func Retry(max int, backoff Backoff) Policy

Retry retries the failing item up to max attempts using backoff schedule.

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result indicates how a policy wants to handle a failure.

type StageInfo

type StageInfo struct {
	Index      int
	Name       string
	Tags       []string
	Attributes map[string]string
}

StageInfo describes a stage during execution.

type StageOption

type StageOption func(*stageConfig)

StageOption mutates stage configuration during declarative assembly.

func WithAttr

func WithAttr(k, v string) StageOption

WithAttr returns a StageOption that sets a single attribute key/value.

func WithBuffer

func WithBuffer(size int) StageOption

WithBuffer returns a StageOption that configures the channel buffer size.

func WithErrorPolicy

func WithErrorPolicy(p Policy) StageOption

WithErrorPolicy returns a StageOption that sets the stage error policy.

func WithParallel

func WithParallel(n int) StageOption

WithParallel returns a StageOption that configures worker concurrency.

func WithStreaming added in v0.1.1

func WithStreaming() StageOption

WithStreaming marks the handler as a long-lived stream processor. Streaming stages ignore per-item retry semantics and run with parallelism set to 1.

func WithTags

func WithTags(tags ...string) StageOption

WithTags returns a StageOption that appends tags to the stage configuration.

func WithTimeout

func WithTimeout(d time.Duration) StageOption

WithTimeout configures a per-item processing deadline for the stage.

Directories

Path Synopsis
examples
batch_flush command
etl_basic command
failure_tap command
fanout command
hooks_metrics command
reduce_summary command
retry_chain command
timeout_guard command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL