concurrency

package
v1.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2025 License: MIT Imports: 6 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelWithResult

type ChannelWithResult[ResultType any] interface {
	GetResult() ResultType
}

ChannelWithResult is an interface that should be implemented by the result channel

type ConcurrentExecutor

type ConcurrentExecutor[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any] struct {
	// contains filtered or unexported fields
}

ConcurrentExecutor is a utility to execute tasks concurrently

func NewConcurrentExecutor

func NewConcurrentExecutor[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any](logger zerolog.Logger, opts ...ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]) *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]

NewConcurrentExecutor creates a new ConcurrentExecutor

func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) Execute

func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) Execute(concurrency int, payload []TaskType, processorFn TaskProcessorFn[ResultChannelType, TaskType]) ([]ResultType, error)

Execute executes a task that requires a payload. It is executed with given concurrency. The processorFn is the function that processes the task. Executor will attempt to distribute the tasks evenly among the executors.

func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) ExecuteSimple

func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) ExecuteSimple(concurrency int, repeatTimes int, simpleProcessorFn SimpleTaskProcessorFn[ResultChannelType]) ([]ResultType, error)

ExecuteSimple executes a task that doesn't require a payload. It is executed repeatTimes times with given concurrency. The simpleProcessorFn is the function that processes the task. Executor will attempt to distribute the tasks evenly among the executors.

func (*ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) GetErrors

func (e *ConcurrentExecutor[ResultType, ResultChannelType, TaskType]) GetErrors() []error

GetErrors returns all errors that occurred during processing

type ConcurrentExecutorOpt

type ConcurrentExecutorOpt[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any] func(c *ConcurrentExecutor[ResultType, ResultChannelType, TaskType])

func WithContext

func WithContext[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any](context context.Context) ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]

/ WithContext sets the context for the executor, if not set it defaults to context.Background()

func WithoutFailFast

func WithoutFailFast[ResultType any, ResultChannelType ChannelWithResult[ResultType], TaskType any]() ConcurrentExecutorOpt[ResultType, ResultChannelType, TaskType]

WithoutFailFast disables fail fast. Executor will wait for all tasks to finish even if some of them fail.

type NoTaskType

type NoTaskType struct{}

NoTaskType is a dummy type to be used when no task type is needed

type SimpleTaskProcessorFn

type SimpleTaskProcessorFn[ResultChannelType any] func(resultCh chan ResultChannelType, errorCh chan error, executorNum int)

SimpleTaskProcessorFn is a function that processes a task that doesn't require a payload. It should send the result to the resultCh and any error to the errorCh. It should never send to both channels. The executorNum is the index of the executor that is processing the task.

type TaskProcessorFn

type TaskProcessorFn[ResultChannelType, TaskType any] func(resultCh chan ResultChannelType, errorCh chan error, executorNum int, payload TaskType)

TaskProcessorFn is a function that processes a task that requires a payload. It should send the result to the resultCh and any error to the errorCh. It should never send to both channels. The executorNum is the index of the executor that is processing the task. The payload is the task's payload. If task doesn't require one use SimpleTaskProcessorFn instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL