Documentation
¶
Index ¶
- type Admin
- func (a *Admin) Inspect(ctx context.Context, queueFilter, partitionFilter string) ([]*QueueStats, error)
- func (a *Admin) ListQueues(ctx context.Context) ([]string, error)
- func (a *Admin) Purge(ctx context.Context, queueName, partition string) (int, error)
- func (a *Admin) Retry(ctx context.Context, queueName string) (int, error)
- type Consumer
- func (c *Consumer) Ack(ctx context.Context, taskID string) error
- func (c *Consumer) Close()
- func (c *Consumer) Consume(ctx context.Context, handler func(*Task) error) error
- func (c *Consumer) ConsumerID() string
- func (c *Consumer) Get(ctx context.Context) ([]*Task, error)
- func (c *Consumer) GetChan(ctx context.Context) <-chan *Task
- func (c *Consumer) Reject(ctx context.Context, taskID string, waitTime int) error
- func (c *Consumer) SetPollInterval(d time.Duration)
- func (c *Consumer) SetPrefetchCount(n int)
- type ConsumerPool
- type PartitionStats
- type Producer
- type QueueStats
- type RejectWithDelay
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Admin ¶
type Admin struct {
// contains filtered or unexported fields
}
Admin предоставляет операции для обслуживания очередей
func (*Admin) Inspect ¶
func (a *Admin) Inspect(ctx context.Context, queueFilter, partitionFilter string) ([]*QueueStats, error)
Inspect возвращает статистику по очередям с фильтрами
func (*Admin) ListQueues ¶
ListQueues возвращает имена всех очередей (обнаруживает по ключам queue:*:partitions)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer получает и обрабатывает задачи из очереди
func NewConsumer ¶
NewConsumer создает нового консьюмера и запускает ping горутину. consumerID — идентификатор консьюмера; если пустая строка — генерируется автоматически.
func (*Consumer) Consume ¶
Consume запускает вечный цикл: Get → обработка handler → Ack при успехе, Reject при ошибке. Гарантирует последовательную обработку задач в рамках одного консьюмера. Для ordered-партиций (!): при reject первой задачи остальные из этой партиции в текущем prefetch-батче также reject'ятся, чтобы сохранить порядок при возврате в очередь. При ошибке Ack или Reject консьюмер завершает работу и возвращает ошибку — остальные сообщения вернутся в очередь при чистке мёртвых консьюмеров, что важно для ordered-партиций: не брать следующую пачку тем же консьюмером и не нарушать порядок.
func (*Consumer) ConsumerID ¶
ConsumerID возвращает идентификатор консьюмера
func (*Consumer) GetChan ¶
GetChan запускает цикл чтения очереди и возвращает канал с задачами. Останавливается при отмене контекста или вызове Close.
func (*Consumer) Reject ¶
Reject отклоняет задачу и возвращает её обратно в очередь. waitTime — в секундах; для ordered-партиций (!) при waitTime > 0 ставится TTL-блок, партиция не берётся до истечения (кейс ratelimit: нет смысла гонять одно сообщение туда-обратно).
func (*Consumer) SetPollInterval ¶
SetPollInterval задает интервал ожидания при отсутствии задач в очереди
func (*Consumer) SetPrefetchCount ¶
SetPrefetchCount задает количество задач для предзагрузки (по умолчанию 5)
type ConsumerPool ¶
type ConsumerPool struct {
// contains filtered or unexported fields
}
ConsumerPool пул консьюмеров, обрабатывающих очередь параллельно
func NewConsumerPool ¶
func NewConsumerPool(redisClient *redis.Client, queueName string) *ConsumerPool
NewConsumerPool создает пул консьюмеров
func (*ConsumerPool) Consume ¶
func (p *ConsumerPool) Consume(ctx context.Context, handler func(*Task) error)
Consume запускает count консьюмеров, блокируется до отмены контекста. При отмене контекста все консьюмеры останавливаются, метод возвращает управление. Если консьюмер завершается из-за ошибки Ack/Reject, запускается новый на его место — число активных консьюмеров остаётся постоянным.
func (*ConsumerPool) SetCount ¶
func (p *ConsumerPool) SetCount(n int)
SetCount задает количество консьюмеров в пуле
func (*ConsumerPool) SetPollInterval ¶
func (p *ConsumerPool) SetPollInterval(d time.Duration)
SetPollInterval задает интервал ожидания при отсутствии задач
func (*ConsumerPool) SetPrefetchCount ¶
func (p *ConsumerPool) SetPrefetchCount(n int)
SetPrefetchCount задает количество задач для предзагрузки
type PartitionStats ¶
type PartitionStats struct {
Partition string
Pending int64
InProgress int64
Priorities []string
Locked bool
Blocked bool
}
PartitionStats статистика по партиции
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer отправляет задачи в очередь
func NewProducer ¶
NewProducer создает нового продюсера
type QueueStats ¶
type QueueStats struct {
QueueName string
Partitions []PartitionStats
TotalPending int64
TotalInProgress int64
Consumers []string
}
QueueStats статистика по очереди
type RejectWithDelay ¶
RejectWithDelay — ошибка, при которой задача возвращается в очередь с задержкой. Delay задаётся в секундах; для ordered-партиций (!) при waitTime > 0 ставится TTL-блок. Используется для ratelimit: партиция не берётся до истечения задержки.
func NewRejectWithDelay ¶
func NewRejectWithDelay(err error, delaySeconds int) *RejectWithDelay
NewRejectWithDelay создаёт ошибку с задержкой для Reject.
func (*RejectWithDelay) Error ¶
func (e *RejectWithDelay) Error() string
func (*RejectWithDelay) Unwrap ¶
func (e *RejectWithDelay) Unwrap() error
type Task ¶
type Task struct {
ID string `json:"id"`
Partition string `json:"partition,omitempty"`
Priority int `json:"priority,omitempty"`
Payload []byte `json:"-"`
Scheduled time.Time `json:"scheduled"`
RejectCount int `json:"rejectCount,omitempty"` // кол-во reject для расчёта задержки
}
Task задача в очереди