Documentation
¶
Index ¶
- func Compose[T any](op1 WithSink[T], op2 WithSource[T])
- type AsyncOperator
- func NewAsyncOperator[T workerpool.TaskMayPanic, R any](ctx *workerpool.Context, pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R]
- func NewAsyncOperatorWithTransform[T workerpool.TaskMayPanic, R any](ctx *workerpool.Context, name string, workerNum int, transform func(T) R) *AsyncOperator[T, R]
- func (c *AsyncOperator[T, R]) Close() error
- func (c *AsyncOperator[T, R]) GetWorkerPoolSize() int32
- func (c *AsyncOperator[T, R]) Open() error
- func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R])
- func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T])
- func (*AsyncOperator[T, R]) String() string
- func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32, wait bool)
- type AsyncPipeline
- type DataChannel
- type Operator
- type SimpleDataChannel
- type SimpleDataSource
- type TunableOperator
- type WithSink
- type WithSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Compose ¶
func Compose[T any](op1 WithSink[T], op2 WithSource[T])
Compose sets the sink of op1 and the source of op2.
Types ¶
type AsyncOperator ¶
type AsyncOperator[T workerpool.TaskMayPanic, R any] struct { // contains filtered or unexported fields }
AsyncOperator process the data in async way.
Eg: The sink of AsyncOperator op1 and the source of op2 use the same channel, Then op2's worker will handle the result from op1.
func NewAsyncOperator ¶
func NewAsyncOperator[T workerpool.TaskMayPanic, R any](ctx *workerpool.Context, pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R]
NewAsyncOperator create an AsyncOperator. To catch the error and close the whole pipeline, you should pass the same context to each operator in the pipeline.
func NewAsyncOperatorWithTransform ¶
func NewAsyncOperatorWithTransform[T workerpool.TaskMayPanic, R any]( ctx *workerpool.Context, name string, workerNum int, transform func(T) R, ) *AsyncOperator[T, R]
NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.
func (*AsyncOperator[T, R]) Close ¶
func (c *AsyncOperator[T, R]) Close() error
Close implements the Operator's Close interface.
func (*AsyncOperator[T, R]) GetWorkerPoolSize ¶
func (c *AsyncOperator[T, R]) GetWorkerPoolSize() int32
GetWorkerPoolSize returns the worker pool size.
func (*AsyncOperator[T, R]) Open ¶
func (c *AsyncOperator[T, R]) Open() error
Open implements the Operator's Open interface.
func (*AsyncOperator[T, R]) SetSink ¶
func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R])
SetSink set the sink channel.
func (*AsyncOperator[T, R]) SetSource ¶
func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T])
SetSource set the source channel.
func (*AsyncOperator[T, R]) String ¶
func (*AsyncOperator[T, R]) String() string
String show the name.
func (*AsyncOperator[T, R]) TuneWorkerPoolSize ¶
func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32, wait bool)
TuneWorkerPoolSize tunes the worker pool size.
type AsyncPipeline ¶
type AsyncPipeline struct {
// contains filtered or unexported fields
}
AsyncPipeline wraps a list of Operators. The dataflow is from the first operator to the last operator.
func NewAsyncPipeline ¶
func NewAsyncPipeline(ops ...Operator) *AsyncPipeline
NewAsyncPipeline creates a new AsyncPipeline.
func (*AsyncPipeline) Execute ¶
func (p *AsyncPipeline) Execute() error
Execute opens all operators, it's run asynchronously.
func (*AsyncPipeline) GetReaderAndWriter ¶
func (p *AsyncPipeline) GetReaderAndWriter() (operator1, operator2 TunableOperator)
GetReaderAndWriter returns the reader and writer in this pipeline. Currently this can only be used in readIndexStepExecutor.
func (*AsyncPipeline) IsStarted ¶
func (p *AsyncPipeline) IsStarted() bool
IsStarted returns whether the pipeline is started.
type DataChannel ¶
type DataChannel[T any] interface { Channel() chan T Finish() }
DataChannel is a channel that can be used to transfer data between operators.
type Operator ¶
type Operator interface {
Open() error
// Close wait task done and close the operator.
// TODO: the wait part should be separated from the close part.
Close() error
String() string
}
Operator is the basic operation unit in the task execution.
type SimpleDataChannel ¶
type SimpleDataChannel[T any] struct { // contains filtered or unexported fields }
SimpleDataChannel is a simple implementation of DataChannel.
func NewSimpleDataChannel ¶
func NewSimpleDataChannel[T any](ch chan T) *SimpleDataChannel[T]
NewSimpleDataChannel creates a new SimpleDataChannel.
func (*SimpleDataChannel[T]) Channel ¶
func (s *SimpleDataChannel[T]) Channel() chan T
Channel returns the underlying channel of the SimpleDataChannel.
func (*SimpleDataChannel[T]) Finish ¶
func (s *SimpleDataChannel[T]) Finish()
Finish closes the underlying channel of the SimpleDataChannel.
type SimpleDataSource ¶
type SimpleDataSource[T workerpool.TaskMayPanic] struct { // contains filtered or unexported fields }
SimpleDataSource is a simple operator which use the given input slice as the data source.
func NewSimpleDataSource ¶
func NewSimpleDataSource[T workerpool.TaskMayPanic]( ctx *workerpool.Context, inputs []T, ) *SimpleDataSource[T]
NewSimpleDataSource creates a new SimpleOperator with the given inputs. The input workerpool.Context is used to quit this operator. By using the same context as the downstream operators, we can ensure that this operator will quit when other operators encounter an error or panic.
func (*SimpleDataSource[T]) Close ¶
func (s *SimpleDataSource[T]) Close() error
Close implements the Operator interface.
func (*SimpleDataSource[T]) Open ¶
func (s *SimpleDataSource[T]) Open() error
Open implements the Operator interface.
func (*SimpleDataSource[T]) SetSink ¶
func (s *SimpleDataSource[T]) SetSink(ch DataChannel[T])
SetSink implements the WithSink interface.
func (*SimpleDataSource[T]) String ¶
func (*SimpleDataSource[T]) String() string
String implements the Operator interface.
type TunableOperator ¶
type TunableOperator interface {
TuneWorkerPoolSize(workerNum int32, wait bool)
GetWorkerPoolSize() int32
}
TunableOperator is the operator which supports modifying pool size.
type WithSink ¶
type WithSink[T any] interface { // SetSink sets the sink of the operator. // Operator implementations should call the Finish method of the sink when they are done. SetSink(channel DataChannel[T]) }
WithSink is an interface that can be used to set the sink of an operator.
type WithSource ¶
type WithSource[T any] interface { SetSource(channel DataChannel[T]) }
WithSource is an interface that can be used to set the source of an operator.