operator

package
v1.1.0-beta.0...-a7bc5df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Compose

func Compose[T any](op1 WithSink[T], op2 WithSource[T])

Compose sets the sink of op1 and the source of op2.

Types

type AsyncOperator

type AsyncOperator[T workerpool.TaskMayPanic, R any] struct {
	// contains filtered or unexported fields
}

AsyncOperator process the data in async way.

Eg: The sink of AsyncOperator op1 and the source of op2 use the same channel, Then op2's worker will handle the result from op1.

func NewAsyncOperator

func NewAsyncOperator[T workerpool.TaskMayPanic, R any](ctx *workerpool.Context, pool *workerpool.WorkerPool[T, R]) *AsyncOperator[T, R]

NewAsyncOperator create an AsyncOperator. To catch the error and close the whole pipeline, you should pass the same context to each operator in the pipeline.

func NewAsyncOperatorWithTransform

func NewAsyncOperatorWithTransform[T workerpool.TaskMayPanic, R any](
	ctx *workerpool.Context,
	name string,
	workerNum int,
	transform func(T) R,
) *AsyncOperator[T, R]

NewAsyncOperatorWithTransform create an AsyncOperator with a transform function.

func (*AsyncOperator[T, R]) Close

func (c *AsyncOperator[T, R]) Close() error

Close implements the Operator's Close interface.

func (*AsyncOperator[T, R]) GetWorkerPoolSize

func (c *AsyncOperator[T, R]) GetWorkerPoolSize() int32

GetWorkerPoolSize returns the worker pool size.

func (*AsyncOperator[T, R]) Open

func (c *AsyncOperator[T, R]) Open() error

Open implements the Operator's Open interface.

func (*AsyncOperator[T, R]) SetSink

func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R])

SetSink set the sink channel.

func (*AsyncOperator[T, R]) SetSource

func (c *AsyncOperator[T, R]) SetSource(ch DataChannel[T])

SetSource set the source channel.

func (*AsyncOperator[T, R]) String

func (*AsyncOperator[T, R]) String() string

String show the name.

func (*AsyncOperator[T, R]) TuneWorkerPoolSize

func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32, wait bool)

TuneWorkerPoolSize tunes the worker pool size.

type AsyncPipeline

type AsyncPipeline struct {
	// contains filtered or unexported fields
}

AsyncPipeline wraps a list of Operators. The dataflow is from the first operator to the last operator.

func NewAsyncPipeline

func NewAsyncPipeline(ops ...Operator) *AsyncPipeline

NewAsyncPipeline creates a new AsyncPipeline.

func (*AsyncPipeline) Close

func (p *AsyncPipeline) Close() error

Close waits all tasks done.

func (*AsyncPipeline) Execute

func (p *AsyncPipeline) Execute() error

Execute opens all operators, it's run asynchronously.

func (*AsyncPipeline) GetReaderAndWriter

func (p *AsyncPipeline) GetReaderAndWriter() (operator1, operator2 TunableOperator)

GetReaderAndWriter returns the reader and writer in this pipeline. Currently this can only be used in readIndexStepExecutor.

func (*AsyncPipeline) IsStarted

func (p *AsyncPipeline) IsStarted() bool

IsStarted returns whether the pipeline is started.

func (*AsyncPipeline) String

func (p *AsyncPipeline) String() string

String shows the pipeline.

type DataChannel

type DataChannel[T any] interface {
	Channel() chan T
	Finish()
}

DataChannel is a channel that can be used to transfer data between operators.

type Operator

type Operator interface {
	Open() error
	// Close wait task done and close the operator.
	// TODO: the wait part should be separated from the close part.
	Close() error
	String() string
}

Operator is the basic operation unit in the task execution.

type SimpleDataChannel

type SimpleDataChannel[T any] struct {
	// contains filtered or unexported fields
}

SimpleDataChannel is a simple implementation of DataChannel.

func NewSimpleDataChannel

func NewSimpleDataChannel[T any](ch chan T) *SimpleDataChannel[T]

NewSimpleDataChannel creates a new SimpleDataChannel.

func (*SimpleDataChannel[T]) Channel

func (s *SimpleDataChannel[T]) Channel() chan T

Channel returns the underlying channel of the SimpleDataChannel.

func (*SimpleDataChannel[T]) Finish

func (s *SimpleDataChannel[T]) Finish()

Finish closes the underlying channel of the SimpleDataChannel.

type SimpleDataSource

type SimpleDataSource[T workerpool.TaskMayPanic] struct {
	// contains filtered or unexported fields
}

SimpleDataSource is a simple operator which use the given input slice as the data source.

func NewSimpleDataSource

func NewSimpleDataSource[T workerpool.TaskMayPanic](
	ctx *workerpool.Context,
	inputs []T,
) *SimpleDataSource[T]

NewSimpleDataSource creates a new SimpleOperator with the given inputs. The input workerpool.Context is used to quit this operator. By using the same context as the downstream operators, we can ensure that this operator will quit when other operators encounter an error or panic.

func (*SimpleDataSource[T]) Close

func (s *SimpleDataSource[T]) Close() error

Close implements the Operator interface.

func (*SimpleDataSource[T]) Open

func (s *SimpleDataSource[T]) Open() error

Open implements the Operator interface.

func (*SimpleDataSource[T]) SetSink

func (s *SimpleDataSource[T]) SetSink(ch DataChannel[T])

SetSink implements the WithSink interface.

func (*SimpleDataSource[T]) String

func (*SimpleDataSource[T]) String() string

String implements the Operator interface.

type TunableOperator

type TunableOperator interface {
	TuneWorkerPoolSize(workerNum int32, wait bool)
	GetWorkerPoolSize() int32
}

TunableOperator is the operator which supports modifying pool size.

type WithSink

type WithSink[T any] interface {
	// SetSink sets the sink of the operator.
	// Operator implementations should call the Finish method of the sink when they are done.
	SetSink(channel DataChannel[T])
}

WithSink is an interface that can be used to set the sink of an operator.

type WithSource

type WithSource[T any] interface {
	SetSource(channel DataChannel[T])
}

WithSource is an interface that can be used to set the source of an operator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL