threading

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GoSafely

func GoSafely(f func() error) <-chan error

GoSafely executes the given function in a goroutine that will never panic. If the given function panics, GoSafely will recover and return a special error (PanicError)

func Parallel

func Parallel[In any, Out any](items []In, worker func(arg In, index int) ([]Out, error)) ([]Out, error)

func ProcessInWorkQueue

func ProcessInWorkQueue[TIn, TOut any](in []TIn, workerFunc func(TIn) ([]TOut, error), numWorkers int, bufferSize int, maxRetries int) ([]TOut, error)

func RunTasks

func RunTasks[T any](tasks ...*Task[T]) ([]T, error)

func StartJobs

func StartJobs(jobs []BackgroundJob)

func WaitGroup

func WaitGroup(copies int, worker func() error) error

Types

type BackgroundJob

type BackgroundJob interface {
	Start() <-chan error
	Stop()
	Name() string
}

type ErrorCallback

type ErrorCallback[T any] func(T, error, int)

type Handler

type Handler[T any] func(T) error

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

func (*PanicError) Error

func (p *PanicError) Error() string

func (*PanicError) Stack

func (p *PanicError) Stack() string

type Partitionable

type Partitionable interface {
	PartitionKey() string
}

Partitionable is an optional interface that work items can implement to provide partition keys for ordered processing within partitions. If a type implements this interface, items with the same partition key will always be processed by the same worker in FIFO order.

type RetryPolicy

type RetryPolicy int32
const RetryPolicyAfterDuration RetryPolicy = 1
const RetryPolicyImmediately RetryPolicy = 2
const RetryPolicyNever RetryPolicy = 0

type Task

type Task[T any] struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask[T any](worker func() (T, error)) *Task[T]

func (*Task[T]) Await

func (t *Task[T]) Await() (T, error)

func (*Task[T]) Run

func (t *Task[T]) Run()

type TaskResult

type TaskResult[T any] struct {
	Value T
	Error error
}

type WorkItem

type WorkItem[T any] struct {
	Item    T
	Retries int
}

type WorkQueue

type WorkQueue[T any] struct {
	StopOnError        bool
	RetryPolicy        RetryPolicy
	RetryAfterDuration time.Duration
	MaxRetries         int
	ErrorCallback      ErrorCallback[T]

	// Partitioning support (optional)
	// PartitionFunc is an optional function to extract partition keys from items.
	// When set (or when items implement Partitionable), items with the same
	// partition key will be processed by the same worker in FIFO order.
	PartitionFunc func(T) string
	// contains filtered or unexported fields
}

func NewWorkQueue

func NewWorkQueue[T any](handler Handler[T], bufferSize int, workerCount int) *WorkQueue[T]

func (*WorkQueue[T]) AwaitCompletion

func (q *WorkQueue[T]) AwaitCompletion()

func (*WorkQueue[T]) Enqueue

func (q *WorkQueue[T]) Enqueue(items ...T)

func (*WorkQueue[T]) Error

func (q *WorkQueue[T]) Error() error

func (*WorkQueue[T]) IsRunning

func (q *WorkQueue[T]) IsRunning() bool

func (*WorkQueue[T]) Start

func (q *WorkQueue[T]) Start()

func (*WorkQueue[T]) Stop

func (q *WorkQueue[T]) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL