Documentation
¶
Overview ¶
Package messagequeue is a generated GoMock package.
Package messagequeue is a generated GoMock package.
Index ¶
- func AddWorker(wg *sync.WaitGroup, pool chan<- *Worker)
- type Chunker
- type ChunkerOptions
- type EnqueueMessageResult
- type Enqueuer
- type EnqueuerOptions
- type HandleFailedEnqueue
- type HandleMessage
- type Handler
- type Job
- type Message
- type MessageQueue
- type MockEnqueuer
- type MockEnqueuerMockRecorder
- type MockMessageQueue
- func (m *MockMessageQueue) Delete(arg0 context.Context, arg1 *Message) error
- func (m *MockMessageQueue) Dequeue(ctx context.Context, count int, wait, visibilityTimeout time.Duration) ([]*Message, error)
- func (m *MockMessageQueue) EXPECT() *MockMessageQueueMockRecorder
- func (m *MockMessageQueue) EnqueueBatch(arg0 context.Context, arg1 []*Message) ([]*EnqueueMessageResult, error)
- type MockMessageQueueMockRecorder
- type Poller
- type PollerOptions
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Chunker ¶ added in v2.12.0
type Chunker[T any] interface { // Start the chunker Start(buffer <-chan T, handler Handler[T]) error // Stop the chunker and clean up any resources Stop() error }
Chunker interface for 'chunking' entries on a buffer into slices of a desired size.
func NewChunker ¶ added in v2.12.0
func NewChunker[T any](options *ChunkerOptions) Chunker[T]
NewChunker with the passed buffer, handler and options. Chunker will create slices of a specified size from the passed buffer and pass them to the handler. Entries retrieved from the buffer are guaranteed to be delivered to the handler within the configured ChunkElementExpiry duration.
type ChunkerOptions ¶ added in v2.12.0
type ChunkerOptions struct {
// ChunkSize determines the desired chunk size for entries to be returned
// from the buffer.
ChunkSize uint
// MaxElementWait determines the maximum time an entry will wait to be chunked.
// Smaller durations may result in chunks that are less than the desired size.
MaxElementWait time.Duration
}
ChunkerOptions for configuring a Chunker instance
func NewChunkerOptions ¶ added in v2.12.0
func NewChunkerOptions() *ChunkerOptions
NewChunkerOptions creates a ChunkerOptions with default values that can be used to create a NewChunker
func (*ChunkerOptions) Validate ¶ added in v2.12.0
func (o *ChunkerOptions) Validate() error
Validate that the values contained in this ChunkerOptions are complete and within the bounds necessary for operation.
type EnqueueMessageResult ¶
type EnqueueMessageResult struct {
// Message this result is for
*Message
// Success indicates whether the message was successfully enqueued
Success bool
// SenderFault when success is false, indicates that the enqueue failed due
// to a malformed message
SenderFault bool
// Error that occurred when enqueueing the message
Error string
}
EnqueueMessageResult is returned on for each message that is enqueued
type Enqueuer ¶ added in v2.12.0
type Enqueuer interface {
// Start validates configurations and creates the resources necessary to
// handle enqueuing messages.
Start(messageQueue MessageQueue) error
// Enqueue the passed message. This operation may be
// buffered and errors enqueueing will not be available for immediate handling.
Enqueue(messages *Message) error
// Stop all active workers, drain queues, and free resources.
Stop() error
}
Enqueuer for inserting tasks into a MessageQueue
func New ¶
func New(options *EnqueuerOptions) Enqueuer
New enqueuer for inserting messages into a MessageQueue
type EnqueuerOptions ¶ added in v2.12.0
type EnqueuerOptions struct {
*ChunkerOptions
// Logger for the enqueuer to use
Logger log.Logger
// BufferSize to use for forming batches, must be greater than BatchSize
BufferSize uint
// FailedBufferSize for messages that fail to enqueue. This should be at
// least equal to BatchSize to avoid blocking.
FailedBufferSize uint
// FailureHandler receives messages that failed to enqueue, optional.
FailureHandler HandleFailedEnqueue
}
func NewEnqueuerOptions ¶ added in v2.12.0
func NewEnqueuerOptions() *EnqueuerOptions
NewEnqueuerOptions with default values
func (*EnqueuerOptions) Validate ¶ added in v2.12.0
func (eo *EnqueuerOptions) Validate() error
Validate that the values contained in this Options are complete and within the bounds necessary for operation.
type HandleFailedEnqueue ¶ added in v2.12.0
type HandleFailedEnqueue func(Enqueuer, *EnqueueMessageResult)
HandleFailedEnqueue is called when a message fails to enqueue
type HandleMessage ¶ added in v2.12.0
HandleMessage returning a boolean indicating if the message was successfully processed. If this function returns true the message will be deleted from the queue, otherwise it will become available for other handlers after it's visibility timeout expires.
type Handler ¶ added in v2.12.0
type Handler[T any] func(chunk []T)
Handler receives chunks that are created by the chunker
type Job ¶ added in v2.12.0
type Job interface {
Work() bool
}
Job done by a worker, returns true if the worker should continue, or false if the worker should exit
type Message ¶
type Message struct {
// ID uniquely identifies this message
ID string
// External field used by the sdk
External string
// Trace field for telemetry
Trace string
// Delay before this message becomes visible after being enqueued
Delay time.Duration
// Service this message is for
Service string
// Method that should be invoked to process this message
Method string
// Body can contain any structured (JSON, XML) or unstructured text
// limitations are determined by the implementation
Body string
// Deadline for processing this message
Deadline time.Time
}
Message that can be enqueued in a MessageQueue
type MessageQueue ¶
type MessageQueue interface {
// Enqueue all the passed messages as a batch
EnqueueBatch(context.Context, []*Message) ([]*EnqueueMessageResult, error)
// Dequeue up to the passed count of messages waiting up to the passed
// duration
Dequeue(ctx context.Context, count int, wait, visibilityTimeout time.Duration) ([]*Message, error)
// Delete the passed message from the queue so that it is not processed by
// other workers
// TODO: [COR-553] Batch delete messages
Delete(context.Context, *Message) error
}
MessageQueue for enqueueing and dequeueing messages
type MockEnqueuer ¶ added in v2.27.0
type MockEnqueuer struct {
// contains filtered or unexported fields
}
MockEnqueuer is a mock of Enqueuer interface.
func NewMockEnqueuer ¶ added in v2.27.0
func NewMockEnqueuer(ctrl *gomock.Controller) *MockEnqueuer
NewMockEnqueuer creates a new mock instance.
func (*MockEnqueuer) EXPECT ¶ added in v2.27.0
func (m *MockEnqueuer) EXPECT() *MockEnqueuerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEnqueuer) Enqueue ¶ added in v2.27.0
func (m *MockEnqueuer) Enqueue(messages *Message) error
Enqueue mocks base method.
func (*MockEnqueuer) Start ¶ added in v2.27.0
func (m *MockEnqueuer) Start(messageQueue MessageQueue) error
Start mocks base method.
func (*MockEnqueuer) Stop ¶ added in v2.27.0
func (m *MockEnqueuer) Stop() error
Stop mocks base method.
type MockEnqueuerMockRecorder ¶ added in v2.27.0
type MockEnqueuerMockRecorder struct {
// contains filtered or unexported fields
}
MockEnqueuerMockRecorder is the mock recorder for MockEnqueuer.
func (*MockEnqueuerMockRecorder) Enqueue ¶ added in v2.27.0
func (mr *MockEnqueuerMockRecorder) Enqueue(messages any) *gomock.Call
Enqueue indicates an expected call of Enqueue.
func (*MockEnqueuerMockRecorder) Start ¶ added in v2.27.0
func (mr *MockEnqueuerMockRecorder) Start(messageQueue any) *gomock.Call
Start indicates an expected call of Start.
func (*MockEnqueuerMockRecorder) Stop ¶ added in v2.27.0
func (mr *MockEnqueuerMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type MockMessageQueue ¶ added in v2.12.0
type MockMessageQueue struct {
// contains filtered or unexported fields
}
MockMessageQueue is a mock of MessageQueue interface.
func NewMockMessageQueue ¶ added in v2.12.0
func NewMockMessageQueue(ctrl *gomock.Controller) *MockMessageQueue
NewMockMessageQueue creates a new mock instance.
func (*MockMessageQueue) Delete ¶ added in v2.12.0
func (m *MockMessageQueue) Delete(arg0 context.Context, arg1 *Message) error
Delete mocks base method.
func (*MockMessageQueue) Dequeue ¶ added in v2.12.0
func (m *MockMessageQueue) Dequeue(ctx context.Context, count int, wait, visibilityTimeout time.Duration) ([]*Message, error)
Dequeue mocks base method.
func (*MockMessageQueue) EXPECT ¶ added in v2.12.0
func (m *MockMessageQueue) EXPECT() *MockMessageQueueMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockMessageQueue) EnqueueBatch ¶ added in v2.12.0
func (m *MockMessageQueue) EnqueueBatch(arg0 context.Context, arg1 []*Message) ([]*EnqueueMessageResult, error)
EnqueueBatch mocks base method.
type MockMessageQueueMockRecorder ¶ added in v2.12.0
type MockMessageQueueMockRecorder struct {
// contains filtered or unexported fields
}
MockMessageQueueMockRecorder is the mock recorder for MockMessageQueue.
func (*MockMessageQueueMockRecorder) Delete ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) Delete(arg0, arg1 any) *gomock.Call
Delete indicates an expected call of Delete.
func (*MockMessageQueueMockRecorder) Dequeue ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) Dequeue(ctx, count, wait, visibilityTimeout any) *gomock.Call
Dequeue indicates an expected call of Dequeue.
func (*MockMessageQueueMockRecorder) EnqueueBatch ¶ added in v2.12.0
func (mr *MockMessageQueueMockRecorder) EnqueueBatch(arg0, arg1 any) *gomock.Call
EnqueueBatch indicates an expected call of EnqueueBatch.
type Poller ¶ added in v2.12.0
type Poller interface {
// Poll for messages on the passed queue
Poll(HandleMessage, MessageQueue) error
// Stop polling for messages
Stop() error
}
Poller retrieves batches of messages from a message queue and handles them using provided functions.
func NewPoller ¶ added in v2.12.0
func NewPoller(options *PollerOptions) Poller
NewPoller with the passed options, if options are nil, default options will be used.
type PollerOptions ¶ added in v2.12.0
type PollerOptions struct {
// Logger to use for reporting errors
Logger log.Logger
// ConcurrentMessageHandlers that should be running at any given time
ConcurrentMessageHandlers int
// WaitForBatch the specified duration before prematurely returning with less
// than the desired number of messages.
WaitForBatch time.Duration
// DequeueCount is the number of messages to attempt to dequeue per request.
// maximum will vary by implementation
DequeueCount int
// QueueOperationTimeout
QueueOperationTimeout time.Duration
// VisibilityTimeout is the amount of time a message is hidden from other
// consumers after it has been received by a message queue client.
VisibilityTimeout time.Duration
}
func NewPollerOptions ¶ added in v2.12.0
func NewPollerOptions() *PollerOptions
NewPollerOptions with valid values that can be used to initialize a new Poller
func (*PollerOptions) Validate ¶ added in v2.12.0
func (po *PollerOptions) Validate() error
Validate that the values contained in this Options are complete and within the bounds necessary for operation.
type Worker ¶ added in v2.12.0
type Worker struct {
// contains filtered or unexported fields
}
Worker can be used to asynchronously call work added via AddWork until work returns false.