Documentation
¶
Index ¶
- Constants
- func SerializeTaskData[T any](encoder encoding.EncoderFunc, task T) ([]byte, error)
- type BinaryMessageFormatOption
- type Consumer
- type Error
- type ErrorType
- type FilesystemDirectoryConsumer
- type FilesystemDirectoryPublisher
- type MessageFormat
- type Middleware
- type Publisher
- type Task
- type TaskHandler
- type TaskHandlerFunc
- type Worker
Constants ¶
const TaskIDHeaderKey = "task_id"
TaskIDHeaderKey is the key for the task ID in the headers
Variables ¶
This section is empty.
Functions ¶
func SerializeTaskData ¶
func SerializeTaskData[T any]( encoder encoding.EncoderFunc, task T, ) ([]byte, error)
SerializeTaskData serializes task data using the provided encoder
Types ¶
type BinaryMessageFormatOption ¶ added in v0.13.0
type BinaryMessageFormatOption func(*binaryMessageFormat)
BinaryMessageFormatOption configures binaryMessageFormat.
func WithEncoderDecoderPair ¶ added in v0.13.0
func WithEncoderDecoderPair(enc encoding.EncoderFunc, dec encoding.DecoderFunc) BinaryMessageFormatOption
WithEncoderDecoderPair sets custom encoder and decoder for headers serialization.
type Consumer ¶
type Consumer interface {
Consume(handler TaskHandlerFunc) error
}
Consumer defines an interface for task consumers that provide a channel of tasks
type Error ¶
Error is an error that occurs during task processing and must must be handled in one of the predefined ways. We do not allow returnning generic error interface, as it leaves too much flexibility to the caller. Middleware authors may introduce their own types, that must however be handled in the middleware itself.
type FilesystemDirectoryConsumer ¶ added in v0.13.0
type FilesystemDirectoryConsumer struct {
// contains filtered or unexported fields
}
FilesystemDirectoryConsumer reads tasks from files in a directory, ordered by timestamp.
func NewFilesystemDirectoryConsumer ¶ added in v0.13.0
func NewFilesystemDirectoryConsumer( ctx context.Context, dir string, format MessageFormat, ) (*FilesystemDirectoryConsumer, error)
NewFilesystemDirectoryConsumer creates a consumer that reads tasks from timestamped files.
func (*FilesystemDirectoryConsumer) Consume ¶ added in v0.13.0
func (c *FilesystemDirectoryConsumer) Consume(handler TaskHandlerFunc) error
Consume implements Consumer.
type FilesystemDirectoryPublisher ¶ added in v0.13.0
type FilesystemDirectoryPublisher struct {
// contains filtered or unexported fields
}
FilesystemDirectoryPublisher writes tasks to timestamped files in a directory.
func NewFilesystemDirectoryPublisher ¶ added in v0.13.0
func NewFilesystemDirectoryPublisher(dir string, format MessageFormat) (*FilesystemDirectoryPublisher, error)
NewFilesystemDirectoryPublisher creates a publisher that writes tasks to timestamped files.
func (*FilesystemDirectoryPublisher) Publish ¶ added in v0.13.0
func (p *FilesystemDirectoryPublisher) Publish(task *Task) error
Publish implements Publisher.
type MessageFormat ¶
type MessageFormat interface {
// Serialize converts a task into binary format
Serialize(t *Task) ([]byte, error)
// Deserialize converts binary data into a task
Deserialize(data []byte) (*Task, error)
}
MessageFormat defines the interface for serializing and deserializing tasks Can be optionally used by Consumer and Publisher implementations, it's not a requirement though
func NewBinaryMessageFormat ¶
func NewBinaryMessageFormat(opts ...BinaryMessageFormatOption) MessageFormat
NewBinaryMessageFormat creates a new binary message format implementation
type Middleware ¶
Middleware defines an interface for task processing middleware
type Publisher ¶
Publisher defines an interface for task publishers that can publish tasks
func NewMonitoringPublisher ¶ added in v0.13.0
NewMonitoringPublisher creates a new publisher decorator that tracks metrics.
type Task ¶
Task represents a unit of work with type, headers and data
type TaskHandler ¶
TaskHandler is an interface for processing tasks
func NewGenericTaskHandler ¶
func NewGenericTaskHandler[T any]( theType string, decoder encoding.DecoderFunc, processor func(headers map[string]string, data *T) *Error, ) TaskHandler
NewGenericTaskHandler creates a new generic task handler for a specific task type
type TaskHandlerFunc ¶
TaskHandlerFunc is a function that handles a task
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a task processing worker that can handle multiple task types
func NewWorker ¶
func NewWorker(handlers []TaskHandler, middleware []Middleware) *Worker
NewWorker creates a new worker instance with the specified handlers and middleware