q

package
v1.3.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is returned when attempting to operate on a closed queue
	ErrClosed = errors.New("pipe.q.closed")

	// ErrQueueFull is returned when attempting to push to a full queue
	ErrQueueFull = errors.New("pipe.q.full")
)

Functions

This section is empty.

Types

type ITaskItem added in v1.3.2

type ITaskItem interface {
	// Do executes the task. This method should be idempotent and thread-safe.
	// Any errors that occur during execution should be handled internally.
	Do()
}

ITaskItem represents a task that can be executed by the SimpleTaskProcessor. Implementations should ensure that the Do method is safe to call concurrently and handles any errors internally, as no error is returned.

type Q

type Q[T any] struct {
	// contains filtered or unexported fields
}

Q represents a thread-safe queue with dynamic capacity using linked list It provides the same interface as SliceQueue but uses container/list for storage

func NewQ

func NewQ[T any](capacity int) *Q[T]

NewQ creates a new list-based queue with specified capacity If capacity is 0, the queue has unlimited capacity

func (*Q[T]) Cap added in v1.2.11

func (q *Q[T]) Cap() int

Cap returns the maximum capacity of the queue (0 means unlimited)

func (*Q[T]) Close

func (q *Q[T]) Close()

Close closes the queue, all subsequent operations will return ErrClosed

func (*Q[T]) IsClosed added in v1.2.11

func (q *Q[T]) IsClosed() bool

IsClosed returns true if the queue is closed

func (*Q[T]) IsEmpty added in v1.2.11

func (q *Q[T]) IsEmpty() bool

IsEmpty returns true if the queue has no items

func (*Q[T]) IsFull added in v1.2.11

func (q *Q[T]) IsFull() bool

IsFull returns true if the queue is at capacity (always false for unlimited capacity)

func (*Q[T]) IsUnlimited added in v1.3.2

func (q *Q[T]) IsUnlimited() bool

IsUnlimited returns true if the queue has unlimited capacity

func (*Q[T]) Len added in v1.2.11

func (q *Q[T]) Len() int

Len returns the current number of items in the queue

func (*Q[T]) Peek added in v1.3.2

func (q *Q[T]) Peek() T

Peek returns the item at the front of the queue without removing it Returns zero value if the queue is empty

func (*Q[T]) Pop

func (q *Q[T]) Pop() (T, error)

Pop removes and returns an item from the front of the queue Blocks if queue is empty until an item is available or queue is closed Important: If the queue is closed, it immediately returns ErrClosed regardless of whether there are items left. This ensures that once a queue is closed, no further data can be consumed, which is useful for graceful shutdown scenarios.

func (*Q[T]) Push added in v1.2.11

func (q *Q[T]) Push(item T) error

Push adds an item to the end of the queue Returns ErrQueueFull if queue is at capacity (when capacity > 0)

func (*Q[T]) PushBlocking added in v1.3.2

func (q *Q[T]) PushBlocking(item T) error

PushBlocking adds an item to the end of the queue Blocks if queue is at capacity until space is available or queue is closed

func (*Q[T]) Reset added in v1.2.11

func (q *Q[T]) Reset()

Reset clears all items from the queue (useful for reusing the queue)

type Queue added in v1.2.11

type Queue[T any] interface {
	// Push adds an item to the queue
	Push(item T) error

	// PushBlocking adds an item to the end of the queue
	// Blocks if queue is at capacity until space is available or queue is closed
	PushBlocking(item T) error

	// Pop removes and returns an item from the queue
	// Blocks if queue is empty until an item is available or queue is closed
	Pop() (T, error)

	// Peek returns the item at the front of the queue without removing it
	// Returns zero value if the queue is empty
	Peek() T

	// Close closes the queue
	Close()

	// Len returns the current number of items in the queue
	Len() int

	// Cap returns the maximum capacity of the queue
	Cap() int

	// IsUnlimited returns true if the queue has unlimited capacity
	IsUnlimited() bool

	// IsClosed returns true if the queue is closed
	IsClosed() bool

	// IsFull returns true if the queue is at capacity
	IsFull() bool

	// IsEmpty returns true if the queue has no items
	IsEmpty() bool

	// Reset clears all items from the queue
	Reset()
}

Queue defines the common interface for all queue implementations

type RingQ added in v1.2.11

type RingQ[T any] struct {
	// contains filtered or unexported fields
}

RingQ represents a thread-safe ring buffer queue with fixed capacity using pre-allocated slice The slice is allocated at creation time with the specified capacity

func NewRingQ added in v1.2.11

func NewRingQ[T any](capacity int) *RingQ[T]

NewRingQ creates a new slice-based ring queue with fixed capacity The slice is pre-allocated with the specified capacity

func (*RingQ[T]) Cap added in v1.2.11

func (q *RingQ[T]) Cap() int

Cap returns the maximum capacity of the queue

func (*RingQ[T]) Close added in v1.2.11

func (q *RingQ[T]) Close()

Close closes the queue, all subsequent operations will return ErrClosed

func (*RingQ[T]) IsClosed added in v1.2.11

func (q *RingQ[T]) IsClosed() bool

IsClosed returns true if the queue is closed

func (*RingQ[T]) IsEmpty added in v1.2.11

func (q *RingQ[T]) IsEmpty() bool

IsEmpty returns true if the queue has no items

func (*RingQ[T]) IsFull added in v1.2.11

func (q *RingQ[T]) IsFull() bool

IsFull returns true if the queue is at capacity

func (*RingQ[T]) IsUnlimited added in v1.3.2

func (q *RingQ[T]) IsUnlimited() bool

IsUnlimited returns true if the queue has unlimited capacity

func (*RingQ[T]) Len added in v1.2.11

func (q *RingQ[T]) Len() int

Len returns the current number of items in the queue

func (*RingQ[T]) Peek added in v1.3.2

func (q *RingQ[T]) Peek() T

Peek returns the item at the front of the queue without removing it Returns zero value if the queue is empty

func (*RingQ[T]) Pop added in v1.2.11

func (q *RingQ[T]) Pop() (T, error)

Pop removes and returns an item from the front of the queue Blocks if queue is empty until an item is available or queue is closed Important: If the queue is closed, it immediately returns ErrClosed regardless of whether there are items left. This ensures that once a queue is closed, no further data can be consumed, which is useful for graceful shutdown scenarios.

func (*RingQ[T]) Push added in v1.2.11

func (q *RingQ[T]) Push(item T) error

Push adds an item to the end of the queue Returns ErrQueueFull if queue is at capacity

func (*RingQ[T]) PushBlocking added in v1.3.2

func (q *RingQ[T]) PushBlocking(item T) error

PushBlocking adds an item to the end of the queue Blocks if queue is at capacity until space is available or queue is closed

func (*RingQ[T]) Reset added in v1.2.11

func (q *RingQ[T]) Reset()

Reset clears all items from the queue (useful for reusing the queue)

type SimpleTaskProcessor added in v1.3.2

type SimpleTaskProcessor struct {
	// contains filtered or unexported fields
}

SimpleTaskProcessor is a high-performance, concurrent task processor that uses an unbounded queue to manage task execution. It provides a simple interface for submitting tasks and automatically distributes them across multiple worker goroutines for parallel execution.

Key features:

  • Unbounded task queue (no capacity limit)
  • Configurable number of worker goroutines
  • Automatic task distribution and load balancing
  • Graceful shutdown with proper cleanup
  • Thread-safe operations

Example usage:

type MyTask struct {
	data string
}

func (t *MyTask) Do() {
	fmt.Println("Processing:", t.data)
}

processor := NewSimpleTaskProcessor(4) // 4 workers
processor.Submit(&MyTask{data: "hello"})
processor.Shutdown() // Clean shutdown

The processor is safe for concurrent use by multiple goroutines.

func NewSimpleTaskProcessor added in v1.3.2

func NewSimpleTaskProcessor(workerCount int) *SimpleTaskProcessor

NewSimpleTaskProcessor creates and starts a new task processor with the specified number of worker goroutines.

Parameters:

  • workerCount: Number of worker goroutines to spawn. Must be >= 0. If 0, tasks will be queued but not processed until shutdown.

Returns:

  • *SimpleTaskProcessor: A running task processor ready to accept tasks.

The processor starts immediately and begins listening for tasks. Worker goroutines will block waiting for tasks until the processor is shut down.

func (*SimpleTaskProcessor) Shutdown added in v1.3.2

func (x *SimpleTaskProcessor) Shutdown()

Shutdown gracefully stops the task processor. It closes the task queue, which signals all worker goroutines to finish processing their current tasks and then exit.

After calling Shutdown:

  • No new tasks can be submitted (Submit will return ErrClosed)
  • Workers will finish processing any tasks already in progress
  • Workers will exit after processing remaining queued tasks
  • The processor cannot be restarted

This method is thread-safe and can be called multiple times safely. Subsequent calls to Shutdown have no effect.

func (*SimpleTaskProcessor) Submit added in v1.3.2

func (x *SimpleTaskProcessor) Submit(task ITaskItem) error

Submit adds a task to the processing queue. The task will be executed asynchronously by one of the available worker goroutines.

Parameters:

  • task: The task to be executed. Must implement ITaskItem interface.

Returns:

  • error: ErrClosed if the processor has been shut down, nil otherwise.

This method is thread-safe and can be called concurrently by multiple goroutines. Tasks are processed in FIFO order by the available workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL