Documentation
¶
Index ¶
- Variables
- type ITaskItem
- type Q
- func (q *Q[T]) Cap() int
- func (q *Q[T]) Close()
- func (q *Q[T]) IsClosed() bool
- func (q *Q[T]) IsEmpty() bool
- func (q *Q[T]) IsFull() bool
- func (q *Q[T]) IsUnlimited() bool
- func (q *Q[T]) Len() int
- func (q *Q[T]) Peek() T
- func (q *Q[T]) Pop() (T, error)
- func (q *Q[T]) Push(item T) error
- func (q *Q[T]) PushBlocking(item T) error
- func (q *Q[T]) Reset()
- type Queue
- type RingQ
- func (q *RingQ[T]) Cap() int
- func (q *RingQ[T]) Close()
- func (q *RingQ[T]) IsClosed() bool
- func (q *RingQ[T]) IsEmpty() bool
- func (q *RingQ[T]) IsFull() bool
- func (q *RingQ[T]) IsUnlimited() bool
- func (q *RingQ[T]) Len() int
- func (q *RingQ[T]) Peek() T
- func (q *RingQ[T]) Pop() (T, error)
- func (q *RingQ[T]) Push(item T) error
- func (q *RingQ[T]) PushBlocking(item T) error
- func (q *RingQ[T]) Reset()
- type SimpleTaskProcessor
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed is returned when attempting to operate on a closed queue ErrClosed = errors.New("pipe.q.closed") // ErrQueueFull is returned when attempting to push to a full queue ErrQueueFull = errors.New("pipe.q.full") )
Functions ¶
This section is empty.
Types ¶
type ITaskItem ¶ added in v1.3.2
type ITaskItem interface {
// Do executes the task. This method should be idempotent and thread-safe.
// Any errors that occur during execution should be handled internally.
Do()
}
ITaskItem represents a task that can be executed by the SimpleTaskProcessor. Implementations should ensure that the Do method is safe to call concurrently and handles any errors internally, as no error is returned.
type Q ¶
type Q[T any] struct { // contains filtered or unexported fields }
Q represents a thread-safe queue with dynamic capacity using linked list It provides the same interface as SliceQueue but uses container/list for storage
func NewQ ¶
NewQ creates a new list-based queue with specified capacity If capacity is 0, the queue has unlimited capacity
func (*Q[T]) Cap ¶ added in v1.2.11
Cap returns the maximum capacity of the queue (0 means unlimited)
func (*Q[T]) Close ¶
func (q *Q[T]) Close()
Close closes the queue, all subsequent operations will return ErrClosed
func (*Q[T]) IsFull ¶ added in v1.2.11
IsFull returns true if the queue is at capacity (always false for unlimited capacity)
func (*Q[T]) IsUnlimited ¶ added in v1.3.2
IsUnlimited returns true if the queue has unlimited capacity
func (*Q[T]) Peek ¶ added in v1.3.2
func (q *Q[T]) Peek() T
Peek returns the item at the front of the queue without removing it Returns zero value if the queue is empty
func (*Q[T]) Pop ¶
Pop removes and returns an item from the front of the queue Blocks if queue is empty until an item is available or queue is closed Important: If the queue is closed, it immediately returns ErrClosed regardless of whether there are items left. This ensures that once a queue is closed, no further data can be consumed, which is useful for graceful shutdown scenarios.
func (*Q[T]) Push ¶ added in v1.2.11
Push adds an item to the end of the queue Returns ErrQueueFull if queue is at capacity (when capacity > 0)
func (*Q[T]) PushBlocking ¶ added in v1.3.2
PushBlocking adds an item to the end of the queue Blocks if queue is at capacity until space is available or queue is closed
type Queue ¶ added in v1.2.11
type Queue[T any] interface { // Push adds an item to the queue Push(item T) error // PushBlocking adds an item to the end of the queue // Blocks if queue is at capacity until space is available or queue is closed PushBlocking(item T) error // Pop removes and returns an item from the queue // Blocks if queue is empty until an item is available or queue is closed Pop() (T, error) // Peek returns the item at the front of the queue without removing it // Returns zero value if the queue is empty Peek() T // Close closes the queue Close() // Len returns the current number of items in the queue Len() int // Cap returns the maximum capacity of the queue Cap() int // IsUnlimited returns true if the queue has unlimited capacity IsUnlimited() bool // IsClosed returns true if the queue is closed IsClosed() bool // IsFull returns true if the queue is at capacity IsFull() bool // IsEmpty returns true if the queue has no items IsEmpty() bool // Reset clears all items from the queue Reset() }
Queue defines the common interface for all queue implementations
type RingQ ¶ added in v1.2.11
type RingQ[T any] struct { // contains filtered or unexported fields }
RingQ represents a thread-safe ring buffer queue with fixed capacity using pre-allocated slice The slice is allocated at creation time with the specified capacity
func NewRingQ ¶ added in v1.2.11
NewRingQ creates a new slice-based ring queue with fixed capacity The slice is pre-allocated with the specified capacity
func (*RingQ[T]) Close ¶ added in v1.2.11
func (q *RingQ[T]) Close()
Close closes the queue, all subsequent operations will return ErrClosed
func (*RingQ[T]) IsUnlimited ¶ added in v1.3.2
IsUnlimited returns true if the queue has unlimited capacity
func (*RingQ[T]) Peek ¶ added in v1.3.2
func (q *RingQ[T]) Peek() T
Peek returns the item at the front of the queue without removing it Returns zero value if the queue is empty
func (*RingQ[T]) Pop ¶ added in v1.2.11
Pop removes and returns an item from the front of the queue Blocks if queue is empty until an item is available or queue is closed Important: If the queue is closed, it immediately returns ErrClosed regardless of whether there are items left. This ensures that once a queue is closed, no further data can be consumed, which is useful for graceful shutdown scenarios.
func (*RingQ[T]) Push ¶ added in v1.2.11
Push adds an item to the end of the queue Returns ErrQueueFull if queue is at capacity
func (*RingQ[T]) PushBlocking ¶ added in v1.3.2
PushBlocking adds an item to the end of the queue Blocks if queue is at capacity until space is available or queue is closed
type SimpleTaskProcessor ¶ added in v1.3.2
type SimpleTaskProcessor struct {
// contains filtered or unexported fields
}
SimpleTaskProcessor is a high-performance, concurrent task processor that uses an unbounded queue to manage task execution. It provides a simple interface for submitting tasks and automatically distributes them across multiple worker goroutines for parallel execution.
Key features:
- Unbounded task queue (no capacity limit)
- Configurable number of worker goroutines
- Automatic task distribution and load balancing
- Graceful shutdown with proper cleanup
- Thread-safe operations
Example usage:
type MyTask struct {
data string
}
func (t *MyTask) Do() {
fmt.Println("Processing:", t.data)
}
processor := NewSimpleTaskProcessor(4) // 4 workers
processor.Submit(&MyTask{data: "hello"})
processor.Shutdown() // Clean shutdown
The processor is safe for concurrent use by multiple goroutines.
func NewSimpleTaskProcessor ¶ added in v1.3.2
func NewSimpleTaskProcessor(workerCount int) *SimpleTaskProcessor
NewSimpleTaskProcessor creates and starts a new task processor with the specified number of worker goroutines.
Parameters:
- workerCount: Number of worker goroutines to spawn. Must be >= 0. If 0, tasks will be queued but not processed until shutdown.
Returns:
- *SimpleTaskProcessor: A running task processor ready to accept tasks.
The processor starts immediately and begins listening for tasks. Worker goroutines will block waiting for tasks until the processor is shut down.
func (*SimpleTaskProcessor) Shutdown ¶ added in v1.3.2
func (x *SimpleTaskProcessor) Shutdown()
Shutdown gracefully stops the task processor. It closes the task queue, which signals all worker goroutines to finish processing their current tasks and then exit.
After calling Shutdown:
- No new tasks can be submitted (Submit will return ErrClosed)
- Workers will finish processing any tasks already in progress
- Workers will exit after processing remaining queued tasks
- The processor cannot be restarted
This method is thread-safe and can be called multiple times safely. Subsequent calls to Shutdown have no effect.
func (*SimpleTaskProcessor) Submit ¶ added in v1.3.2
func (x *SimpleTaskProcessor) Submit(task ITaskItem) error
Submit adds a task to the processing queue. The task will be executed asynchronously by one of the available worker goroutines.
Parameters:
- task: The task to be executed. Must implement ITaskItem interface.
Returns:
- error: ErrClosed if the processor has been shut down, nil otherwise.
This method is thread-safe and can be called concurrently by multiple goroutines. Tasks are processed in FIFO order by the available workers.