worker

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const TaskIDHeaderKey = "task_id"

TaskIDHeaderKey is the key for the task ID in the headers

Variables

This section is empty.

Functions

func SerializeTaskData

func SerializeTaskData[T any](
	encoder encoding.EncoderFunc,
	task T,
) ([]byte, error)

SerializeTaskData serializes task data using the provided encoder

Types

type BinaryMessageFormatOption added in v0.13.0

type BinaryMessageFormatOption func(*binaryMessageFormat)

BinaryMessageFormatOption configures binaryMessageFormat.

func WithEncoderDecoderPair added in v0.13.0

func WithEncoderDecoderPair(enc encoding.EncoderFunc, dec encoding.DecoderFunc) BinaryMessageFormatOption

WithEncoderDecoderPair sets custom encoder and decoder for headers serialization.

type Consumer

type Consumer interface {
	Consume(handler TaskHandlerFunc) error
}

Consumer defines an interface for task consumers that provide a channel of tasks

type Error

type Error struct {
	Type ErrorType
	Err  error
}

Error is an error that occurs during task processing and must must be handled in one of the predefined ways. We do not allow returnning generic error interface, as it leaves too much flexibility to the caller. Middleware authors may introduce their own types, that must however be handled in the middleware itself.

func NewError

func NewError(typ ErrorType, err error) *Error

NewError wraps a standard error with a type that can be used to determine how to handle it

func (*Error) Error

func (e *Error) Error() string

type ErrorType

type ErrorType string

ErrorType is the type of transformation error

const (
	// ErrTypeDroppable is the type of transformation error when a hit should be dropped
	ErrTypeDroppable ErrorType = "drop"
	// ErrTypeRetryable is the type of transformation error when a hit can be retried
	ErrTypeRetryable ErrorType = "retry"
)

type FilesystemDirectoryConsumer added in v0.13.0

type FilesystemDirectoryConsumer struct {
	// contains filtered or unexported fields
}

FilesystemDirectoryConsumer reads tasks from files in a directory, ordered by timestamp.

func NewFilesystemDirectoryConsumer added in v0.13.0

func NewFilesystemDirectoryConsumer(
	ctx context.Context,
	dir string,
	format MessageFormat,
) (*FilesystemDirectoryConsumer, error)

NewFilesystemDirectoryConsumer creates a consumer that reads tasks from timestamped files.

func (*FilesystemDirectoryConsumer) Consume added in v0.13.0

Consume implements Consumer.

type FilesystemDirectoryPublisher added in v0.13.0

type FilesystemDirectoryPublisher struct {
	// contains filtered or unexported fields
}

FilesystemDirectoryPublisher writes tasks to timestamped files in a directory.

func NewFilesystemDirectoryPublisher added in v0.13.0

func NewFilesystemDirectoryPublisher(dir string, format MessageFormat) (*FilesystemDirectoryPublisher, error)

NewFilesystemDirectoryPublisher creates a publisher that writes tasks to timestamped files.

func (*FilesystemDirectoryPublisher) Publish added in v0.13.0

func (p *FilesystemDirectoryPublisher) Publish(task *Task) error

Publish implements Publisher.

type MessageFormat

type MessageFormat interface {
	// Serialize converts a task into binary format
	Serialize(t *Task) ([]byte, error)
	// Deserialize converts binary data into a task
	Deserialize(data []byte) (*Task, error)
}

MessageFormat defines the interface for serializing and deserializing tasks Can be optionally used by Consumer and Publisher implementations, it's not a requirement though

func NewBinaryMessageFormat

func NewBinaryMessageFormat(opts ...BinaryMessageFormatOption) MessageFormat

NewBinaryMessageFormat creates a new binary message format implementation

type Middleware

type Middleware interface {
	Handle(task *Task, next func() *Error) *Error
}

Middleware defines an interface for task processing middleware

type Publisher

type Publisher interface {
	Publish(task *Task) error
}

Publisher defines an interface for task publishers that can publish tasks

func NewMonitoringPublisher added in v0.13.0

func NewMonitoringPublisher(publisher Publisher) Publisher

NewMonitoringPublisher creates a new publisher decorator that tracks metrics.

type Task

type Task struct {
	Type    string
	Headers map[string]string
	Body    []byte
}

Task represents a unit of work with type, headers and data

func NewTask

func NewTask(taskType string, headers map[string]string, data []byte) *Task

NewTask creates a new task with a random ID

func (*Task) ID

func (t *Task) ID() string

ID returns the ID of the task

type TaskHandler

type TaskHandler interface {
	TaskType() string
	Process(map[string]string, []byte) *Error
}

TaskHandler is an interface for processing tasks

func NewGenericTaskHandler

func NewGenericTaskHandler[T any](
	theType string,
	decoder encoding.DecoderFunc,
	processor func(headers map[string]string, data *T) *Error,
) TaskHandler

NewGenericTaskHandler creates a new generic task handler for a specific task type

type TaskHandlerFunc

type TaskHandlerFunc func(task *Task) error

TaskHandlerFunc is a function that handles a task

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a task processing worker that can handle multiple task types

func NewWorker

func NewWorker(handlers []TaskHandler, middleware []Middleware) *Worker

NewWorker creates a new worker instance with the specified handlers and middleware

func (*Worker) Process

func (w *Worker) Process(task *Task) error

Process handles a task by applying middleware and passing it to appropriate handlers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL