Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrFilteredOut = errors.New("filtered out")
ErrFilteredOut is returned by a processor when a token should be filtered out of the pipeline.
Functions ¶
This section is empty.
Types ¶
type DataProcessor ¶ added in v0.2.0
DataProcessor is an interface for processing data.
type DataWriter ¶
DataWriter is an interface for writing data to a destination.
type Mode ¶ added in v0.4.0
type Mode int
const ( // ModeSingle default pipeline mode, // when all workers from each stage linked to the other stage workers with one channel. ModeSingle Mode = iota // ModeParallel advanced pipeline mode, // when each worker in one stage linked to each worker f another stage with one channel. ModeParallel // ModeSingleParallel advanced pipeline mode, // When first stage with second stage is linked in single mode, and second and last stages linked in parallel. // SplitFunction must be set, for using this mode. ModeSingleParallel )
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline runs a series of workers in parallel. All workers run at the same time in separate goroutines. Each stage of workers are connected by a single channel that is the size of the number of workers in the stage. The pipeline stops when a worker returns an error or all workers are done. workers should not close the send channel, the pipeline stages handle that. Pipelines can be chained together by using them as workers.
func NewPipeline ¶
func NewPipeline[T any](mode Mode, sf splitFunc[T], workGroups ...[]Worker[T]) (*Pipeline[T], error)
NewPipeline creates a new DataPipeline.
func (*Pipeline[T]) GetMetrics ¶ added in v0.4.0
GetMetrics returns stats: reader, writer.
type ProcessorWorker ¶ added in v0.2.0
type ProcessorWorker[T any] struct { // contains filtered or unexported fields }
ProcessorWorker implements the pipeline.Worker interface. It wraps a DataProcessor and processes data with it.
func NewProcessorWorker ¶ added in v0.2.0
func NewProcessorWorker[T any](processor DataProcessor[T]) *ProcessorWorker[T]
NewProcessorWorker creates a new ProcessorWorker.
func (*ProcessorWorker[T]) GetMetrics ¶ added in v0.4.0
func (w *ProcessorWorker[T]) GetMetrics() (in, out int)
GetMetrics returns stats of received and sent messages.
func (*ProcessorWorker[T]) Run ¶ added in v0.2.0
func (w *ProcessorWorker[T]) Run(ctx context.Context) error
Run starts the ProcessorWorker.
func (*ProcessorWorker[T]) SetReceiveChan ¶ added in v0.2.0
func (w *ProcessorWorker[T]) SetReceiveChan(c <-chan T)
SetReceiveChan sets receive channel for the ProcessorWorker.
func (*ProcessorWorker[T]) SetSendChan ¶ added in v0.2.0
func (w *ProcessorWorker[T]) SetSendChan(c chan<- T)
SetSendChan sets the send channel for the ProcessorWorker.
type Route ¶ added in v0.4.0
type Route[T any] struct { // contains filtered or unexported fields }
Route describes how workers will communicate through stages. Each stage must have the corresponding route. The Route consists of input and output rule.
func NewParallelRoutes ¶ added in v0.4.0
NewParallelRoutes is a helper function, to initialize parallel (sync) mode routes.
func NewSingleParallelRoutes ¶ added in v0.4.0
NewSingleParallelRoutes is a helper function, to initialize routes that will have single mode between first and second stages and parallel mode between second and third.
func NewSingleRoutes ¶ added in v0.4.0
NewSingleRoutes is a helper function, to initialize simple single mode routes.
type RouteRule ¶ added in v0.4.0
type RouteRule[T any] struct { // contains filtered or unexported fields }
RouteRule describes how exactly stages will communicate with each other.
func NewRouteRuleParallel ¶ added in v0.4.0
NewRouteRuleParallel returns a new route rule for parallel mode communication.
func NewRouteRuleSingle ¶ added in v0.4.0
NewRouteRuleSingle returns a new route rule for single mode communication.
type Worker ¶
type Worker[T any] interface { SetSendChan(chan<- T) SetReceiveChan(<-chan T) Run(context.Context) error GetMetrics() (int, int) }
Worker is an interface for a pipeline item. Each worker has send and receive channels that connect it to the previous and next stage in the pipeline. The Run method starts the worker.
func NewReadWorker ¶
NewReadWorker creates a new ReadWorker.
func NewWriteWorker ¶
func NewWriteWorker[T any](writer DataWriter[T], limiter *rate.Limiter) Worker[T]
NewWriteWorker creates a new Worker.