Documentation
¶
Index ¶
- Variables
- func Abort(err error) error
- func FanIn[T any](ctx context.Context, inputs ...<-chan T) <-chan T
- func FanOutBy[T any](src <-chan T, selector func(T) string, branches ...string) map[string]*Builder[T, T]
- type Backoff
- type Builder
- func (b *Builder[I, O]) Buffer(size int) *Builder[I, O]
- func (b *Builder[I, O]) OnError(p Policy) *Builder[I, O]
- func (b *Builder[I, O]) Parallel(n int) *Builder[I, O]
- func (b *Builder[I, O]) Run(ctx context.Context) <-chan O
- func (b *Builder[I, O]) WithAttr(k, v string) *Builder[I, O]
- func (b *Builder[I, O]) WithHooks(h Hooks) *Builder[I, O]
- func (b *Builder[I, O]) WithTags(tags ...string) *Builder[I, O]
- type Failure
- type Handler
- func Batch[T any](size int, dur time.Duration) Handler[T, []T]
- func Filter[T any](fn func(context.Context, T) (bool, error)) Handler[T, T]
- func ForEach[T any](fn func(context.Context, T) error) Handler[T, T]
- func Map[I any, O any](fn func(context.Context, I) (O, error)) Handler[I, O]
- func Reduce[I any, O any](seed func() O, fn func(context.Context, O, I) (O, error)) Handler[I, O]
- type Hooks
- type ItemInfo
- type Policy
- func Chain(policies ...Policy) Policy
- func CollectFailures(ch chan<- Failure) Policy
- func CollectFailuresFunc(fn func(Failure)) Policy
- func CollectFailuresFuncAs[T any](fn func(T)) Policy
- func DrainFailures(ch chan<- Failure) Policy
- func DrainFailuresFunc(fn func(Failure)) Policy
- func DrainFailuresFuncAs[T any](fn func(T)) Policy
- func Drop() Policy
- func Retry(max int, backoff Backoff) Policy
- type Result
- type StageInfo
- type StageOption
Constants ¶
This section is empty.
Variables ¶
var ErrDrained = errors.New("sazanami: item drained by policy")
Functions ¶
Types ¶
type Backoff ¶
Backoff computes the delay before the next retry attempt.
func ConstantBackoff ¶
ConstantBackoff waits for a fixed duration between retries.
func ExponentialBackoff ¶
ExponentialBackoff doubles the delay each attempt starting from base.
type Builder ¶
Builder wires stages together in a fluent fashion.
func AddStage ¶
func AddStage[I any, O any, N any](b *Builder[I, O], name string, h Handler[O, N], opts ...StageOption) *Builder[I, N]
AddStage appends a stage and applies StageOptions to its configuration. If name is empty, an automatic stage identifier is assigned.
func (*Builder[I, O]) Run ¶
Run executes the pipeline and returns a channel streaming the terminal stage outputs.
func (*Builder[I, O]) WithAttr ¶
WithAttr attaches a single attribute key/value pair to the most recent stage.
type Handler ¶
Handler represents a stage function that consumes values from in and emits to out. The handler should return when the input channel is closed or the context is done.
func Batch ¶
Batch groups items into slices of up to size or until dur elapses, whichever comes first.
type Hooks ¶
type Hooks struct {
StageStart func(StageInfo)
StageComplete func(StageInfo)
StageError func(StageInfo, error)
ItemStart func(StageInfo, ItemInfo)
ItemComplete func(StageInfo, ItemInfo)
ItemError func(StageInfo, ItemInfo, error)
}
Hooks captures lifecycle callbacks for observability.
type Policy ¶
Policy decides how to react when a handler reports a failure.
func CollectFailures ¶
CollectFailures routes failed items to the provided channel while continuing.
func CollectFailuresFunc ¶
CollectFailuresFunc invokes the provided handler for each failure while continuing.
func CollectFailuresFuncAs ¶
CollectFailuresFuncAs invokes the handler when the failure item matches the desired type.
func DrainFailures ¶
DrainFailures routes the failing item and all remaining input to the channel and stops the stage.
func DrainFailuresFunc ¶
DrainFailuresFunc invokes the handler with the failing item and drains the remaining input.
func DrainFailuresFuncAs ¶
DrainFailuresFuncAs invokes the handler when the failure item matches the desired type and drains the remaining input.
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
Result indicates how a policy wants to handle a failure.
type StageOption ¶
type StageOption func(*stageConfig)
StageOption mutates stage configuration during declarative assembly.
func WithAttr ¶
func WithAttr(k, v string) StageOption
WithAttr returns a StageOption that sets a single attribute key/value.
func WithBuffer ¶
func WithBuffer(size int) StageOption
WithBuffer returns a StageOption that configures the channel buffer size.
func WithErrorPolicy ¶
func WithErrorPolicy(p Policy) StageOption
WithErrorPolicy returns a StageOption that sets the stage error policy.
func WithParallel ¶
func WithParallel(n int) StageOption
WithParallel returns a StageOption that configures worker concurrency.
func WithStreaming ¶ added in v0.1.1
func WithStreaming() StageOption
WithStreaming marks the handler as a long-lived stream processor. Streaming stages ignore per-item retry semantics and run with parallelism set to 1.
func WithTags ¶
func WithTags(tags ...string) StageOption
WithTags returns a StageOption that appends tags to the stage configuration.
func WithTimeout ¶
func WithTimeout(d time.Duration) StageOption
WithTimeout configures a per-item processing deadline for the stage.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
batch_flush
command
|
|
|
etl_basic
command
|
|
|
failure_tap
command
|
|
|
fanout
command
|
|
|
hooks_metrics
command
|
|
|
reduce_summary
command
|
|
|
retry_chain
command
|
|
|
timeout_guard
command
|
|