pipeline

package
v0.0.0-...-ffc4fba Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: Apache-2.0, Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package pipeline provides configuration option types for analysis pipeline items and composable building blocks for concurrent pipeline construction: RunPC (producer-consumer goroutine skeleton), Phase/RunPhases (chain-of-responsibility), Batcher (batching strategies), DispatchFunc (dispatch strategy), and Fetcher (cache decorator pattern).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunPhases

func RunPhases[S any](ctx context.Context, s S, phases ...Phase[S]) (S, error)

RunPhases executes phases sequentially, threading state through each one. Returns immediately on the first error, preserving the partial state. Returns the input state unchanged when no phases are provided.

func SignalOnDrain

func SignalOnDrain[T any](src <-chan T) (forwarded <-chan T, drained <-chan struct{})

SignalOnDrain forwards items from src to the returned forwarded channel and closes the returned drained channel once src is exhausted. This enables ending pipeline stage spans independently.

Types

type Batcher

type Batcher[In, Batch any] interface {
	// Add adds an item. Returns true if the batch is ready to flush.
	Add(In) bool

	// Flush returns the current batch and resets. Returns false if empty.
	Flush() (Batch, bool)
}

Batcher accumulates input items and produces batches.

type ConfigurationOption

type ConfigurationOption struct {
	// Default is the initial value of the configuration option.
	Default any
	// Name identifies the configuration option in facts.
	Name string
	// Description represents the help text about the configuration option.
	Description string
	// Flag corresponds to the CLI token with "--" prepended.
	Flag string
	// Type specifies the kind of the configuration option's value.
	Type ConfigurationOptionType
}

ConfigurationOption allows for the unified, retrospective way to setup PipelineItem-s.

func (ConfigurationOption) FormatDefault

func (opt ConfigurationOption) FormatDefault() string

FormatDefault converts the default value of ConfigurationOption to string. Used in the command line interface to show the argument's default value.

type ConfigurationOptionType

type ConfigurationOptionType int

ConfigurationOptionType represents the possible types of a ConfigurationOption's value.

const (
	// BoolConfigurationOption reflects the boolean value type.
	BoolConfigurationOption ConfigurationOptionType = iota
	// IntConfigurationOption reflects the integer value type.
	IntConfigurationOption
	// StringConfigurationOption reflects the string value type.
	StringConfigurationOption
	// FloatConfigurationOption reflects a floating point value type.
	FloatConfigurationOption
	// StringsConfigurationOption reflects the array of strings value type.
	StringsConfigurationOption
	// PathConfigurationOption reflects the file system path value type.
	PathConfigurationOption
)

func (ConfigurationOptionType) String

func (opt ConfigurationOptionType) String() string

String returns an empty string for the boolean type, "int" for integers and "string" for strings. It is used in the command line interface to show the argument's type.

type DispatchFunc

type DispatchFunc[Req any] func(ctx context.Context, req Req) error

DispatchFunc sends a request to a worker pool. The worker channel is captured in the closure, keeping the dispatch strategy decoupled from request semantics.

type Fetcher

type Fetcher[Req, Resp any] interface {
	Fetch(ctx context.Context, req Req) (Resp, error)
}

Fetcher retrieves a response for a given request. It serves as the base interface for the cache decorator pattern: wrap a Fetcher with "check cache → fetch misses → update cache" logic.

type FetcherFunc

type FetcherFunc[Req, Resp any] func(ctx context.Context, req Req) (Resp, error)

FetcherFunc adapts a plain function to the Fetcher interface.

func (FetcherFunc[Req, Resp]) Fetch

func (f FetcherFunc[Req, Resp]) Fetch(ctx context.Context, req Req) (Resp, error)

Fetch calls the underlying function.

type PassthroughBatcher

type PassthroughBatcher[T any] struct {
	// contains filtered or unexported fields
}

PassthroughBatcher wraps each input item as a single-element batch. Add always returns true, meaning every item is immediately ready.

func (*PassthroughBatcher[T]) Add

func (b *PassthroughBatcher[T]) Add(item T) bool

Add stores the item and returns true (always ready).

func (*PassthroughBatcher[T]) Flush

func (b *PassthroughBatcher[T]) Flush() ([]T, bool)

Flush returns the stored item as a single-element slice and resets. Returns false if Add was not called since the last flush.

type Phase

type Phase[S any] interface {
	Run(ctx context.Context, s S) (S, error)
}

Phase represents a single processing stage that transforms state S.

type PhaseFunc

type PhaseFunc[S any] func(ctx context.Context, s S) (S, error)

PhaseFunc adapts a plain function to the Phase interface.

func (PhaseFunc[S]) Run

func (f PhaseFunc[S]) Run(ctx context.Context, s S) (S, error)

Run executes the phase function.

type RunPC

type RunPC[In, Out, Job any] struct {
	// Buffer sets the capacity of the internal jobs channel.
	// Values below 1 are clamped to 1.
	Buffer int

	// Produce reads the input and sends work items on the jobs channel.
	Produce func(ctx context.Context, in In, jobs chan<- Job)

	// Consume reads work items from jobs and sends results on out.
	Consume func(ctx context.Context, jobs <-chan Job, out chan<- Out)
}

RunPC is a producer-consumer micro-skeleton that owns the goroutine topology: channel creation, goroutine spawning, and orderly shutdown.

Type parameters:

  • In: the input consumed by the producer
  • Out: the output emitted by the consumer
  • Job: the internal work item flowing from producer to consumer

The Produce function reads from in and writes jobs to the jobs channel. The Consume function reads jobs and writes results to the out channel. Neither function should close its output channel; RunPC handles that.

func (RunPC[In, Out, Job]) Run

func (r RunPC[In, Out, Job]) Run(ctx context.Context, in In) <-chan Out

Run starts the producer and consumer goroutines and returns the output channel. The jobs channel is closed after Produce returns. The output channel is closed after Consume returns.

type SharedResponse

type SharedResponse[T any] struct {
	// contains filtered or unexported fields
}

SharedResponse evaluates a computation exactly once and caches the result for concurrent access by multiple goroutines. The computation receives a context.Context for cancellation support.

func NewSharedResponse

func NewSharedResponse[T any](compute func(context.Context) (T, error)) *SharedResponse[T]

NewSharedResponse creates a SharedResponse that will evaluate compute on the first call to SharedResponse.Get. The compute function must not be nil.

func (*SharedResponse[T]) Get

func (s *SharedResponse[T]) Get(ctx context.Context) (T, error)

Get evaluates the compute function exactly once (via sync.Once) and returns the cached (result, error) pair. Subsequent calls return the same values without re-evaluation, regardless of the context passed.

type ThresholdBatcher

type ThresholdBatcher[T any] struct {
	// contains filtered or unexported fields
}

ThresholdBatcher accumulates items into a slice until the count reaches the configured threshold, at which point Add returns true.

func NewThresholdBatcher

func NewThresholdBatcher[T any](threshold int) *ThresholdBatcher[T]

NewThresholdBatcher creates a batcher that signals readiness after threshold items. Threshold values below 1 are clamped to 1.

func (*ThresholdBatcher[T]) Add

func (b *ThresholdBatcher[T]) Add(item T) bool

Add appends an item. Returns true when the batch reaches the threshold.

func (*ThresholdBatcher[T]) Flush

func (b *ThresholdBatcher[T]) Flush() ([]T, bool)

Flush returns the accumulated items and resets the internal buffer. Returns false if no items have been added since the last flush.

type WorkerPool

type WorkerPool[T any] struct {
	// MaxParallel is the maximum number of concurrent goroutines.
	// Zero defaults to runtime.NumCPU().
	MaxParallel int
	// Work processes a single item. Must not be nil.
	Work func(ctx context.Context, item T) error
}

WorkerPool runs Work on each item with at most MaxParallel goroutines. Returns the first non-nil error encountered, or nil. Remaining goroutines observe context cancellation on first error.

func (WorkerPool[T]) Run

func (p WorkerPool[T]) Run(ctx context.Context, items []T) error

Run processes all items with bounded concurrency. If any Work call returns a non-nil error, the derived context is canceled and Run returns that error after all goroutines finish.

func (WorkerPool[T]) RunChan

func (p WorkerPool[T]) RunChan(ctx context.Context, ch <-chan T) error

RunChan processes items from a channel with bounded concurrency. Semantics match Run but items arrive via channel instead of slice. A nil or already-closed channel returns nil immediately.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL