pipeline

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFilteredOut = errors.New("filtered out")

ErrFilteredOut is returned by a processor when a token should be filtered out of the pipeline.

Functions

This section is empty.

Types

type DataProcessor added in v0.2.0

type DataProcessor[T any] interface {
	Process(T) (T, error)
}

DataProcessor is an interface for processing data.

type DataWriter

type DataWriter[T any] interface {
	Write(T) (n int, err error)
	Close() (err error)
}

DataWriter is an interface for writing data to a destination.

type Mode added in v0.4.0

type Mode int
const (
	// ModeSingle default pipeline mode,
	// when all workers from each stage linked to the other stage workers with one channel.
	ModeSingle Mode = iota
	// ModeParallel advanced pipeline mode,
	// when each worker in one stage linked to each worker f another stage with one channel.
	ModeParallel
	// ModeSingleParallel advanced pipeline mode,
	// When first stage with second stage is linked in single mode, and second and last stages linked in parallel.
	// SplitFunction must be set, for using this mode.
	ModeSingleParallel
)

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline runs a series of workers in parallel. All workers run at the same time in separate goroutines. Each stage of workers are connected by a single channel that is the size of the number of workers in the stage. The pipeline stops when a worker returns an error or all workers are done. workers should not close the send channel, the pipeline stages handle that. Pipelines can be chained together by using them as workers.

func NewPipeline

func NewPipeline[T any](mode Mode, sf splitFunc[T], workGroups ...[]Worker[T]) (*Pipeline[T], error)

NewPipeline creates a new DataPipeline.

func (*Pipeline[T]) GetMetrics added in v0.4.0

func (dp *Pipeline[T]) GetMetrics() (in, out int)

GetMetrics returns stats: reader, writer.

func (*Pipeline[T]) Run

func (dp *Pipeline[T]) Run(ctx context.Context) error

Run starts the pipeline. The pipeline stops when a worker returns an error, all workers are done, or the context is canceled.

type ProcessorWorker added in v0.2.0

type ProcessorWorker[T any] struct {
	// contains filtered or unexported fields
}

ProcessorWorker implements the pipeline.Worker interface. It wraps a DataProcessor and processes data with it.

func NewProcessorWorker added in v0.2.0

func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T]

NewProcessorWorker creates a new ProcessorWorker.

func (*ProcessorWorker[T]) GetMetrics added in v0.4.0

func (w *ProcessorWorker[T]) GetMetrics() (in, out int)

GetMetrics returns stats of received and sent messages.

func (*ProcessorWorker[T]) Run added in v0.2.0

func (w *ProcessorWorker[T]) Run(ctx context.Context) error

Run starts the ProcessorWorker.

func (*ProcessorWorker[T]) SetReceiveChan added in v0.2.0

func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T)

SetReceiveChan sets receive channel for the ProcessorWorker.

func (*ProcessorWorker[T]) SetSendChan added in v0.2.0

func (w *ProcessorWorker[T]) SetSendChan(c chan<- T)

SetSendChan sets the send channel for the ProcessorWorker.

type Route added in v0.4.0

type Route[T any] struct {
	// contains filtered or unexported fields
}

Route describes how workers will communicate through stages. Each stage must have the corresponding route. The Route consists of input and output rule.

func NewParallelRoutes added in v0.4.0

func NewParallelRoutes[T any](stagesNum int) []Route[T]

NewParallelRoutes is a helper function, to initialize parallel (sync) mode routes.

func NewSingleParallelRoutes added in v0.4.0

func NewSingleParallelRoutes[T any](sf splitFunc[T]) []Route[T]

NewSingleParallelRoutes is a helper function, to initialize routes that will have single mode between first and second stages and parallel mode between second and third.

func NewSingleRoutes added in v0.4.0

func NewSingleRoutes[T any](stagesNum int) []Route[T]

NewSingleRoutes is a helper function, to initialize simple single mode routes.

type RouteRule added in v0.4.0

type RouteRule[T any] struct {
	// contains filtered or unexported fields
}

RouteRule describes how exactly stages will communicate with each other.

func NewRouteRuleParallel added in v0.4.0

func NewRouteRuleParallel[T any](bufferSize int, sf splitFunc[T]) *RouteRule[T]

NewRouteRuleParallel returns a new route rule for parallel mode communication.

func NewRouteRuleSingle added in v0.4.0

func NewRouteRuleSingle[T any](bufferSize int, sf splitFunc[T]) *RouteRule[T]

NewRouteRuleSingle returns a new route rule for single mode communication.

type Worker

type Worker[T any] interface {
	SetSendChan(chan<- T)
	SetReceiveChan(<-chan T)
	Run(context.Context) error
	GetMetrics() (int, int)
}

Worker is an interface for a pipeline item. Each worker has send and receive channels that connect it to the previous and next stage in the pipeline. The Run method starts the worker.

func NewReadWorker

func NewReadWorker[T any](reader dataReader[T]) Worker[T]

NewReadWorker creates a new ReadWorker.

func NewWriteWorker

func NewWriteWorker[T any](writer DataWriter[T], limiter *rate.Limiter) Worker[T]

NewWriteWorker creates a new Worker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL