redisqueue

package module
v0.0.0-...-ba30077 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: MIT Imports: 10 Imported by: 0

README

Smart Redis Queue

Очередь задач на Redis с поддержкой партиций, приоритетов, отложенного выполнения и строгих гарантий порядка.

Возможности

  • Гарантия порядка выполнения — ordered-партиции (префикс !) обрабатываются строго последовательно одним консьюмером; при reject порядок сохраняется
  • Отложенные сообщения — задачи с полем Scheduled становятся доступны только после указанного времени
  • Чистка мёртвых консьюмеров — heartbeat и автоматическое возвращение задач при падении воркера
  • Приоритет выполнения — в рамках партиции задачи с большим Priority обрабатываются первыми
  • Батч-добавление — атомарная публикация нескольких задач за один вызов Publish
  • Prefetch с сохранением порядка — для ordered-партиций при reject первой задачи в батче остальные тоже reject'ятся и возвращаются в правильном порядке
  • Rate limitingRejectWithDelay для ordered-партиций ставит TTL-блок, партиция не берётся до истечения задержки
  • Идемпотентность — добавление по ID (NX), дубликаты отклоняются

Требования

  • Go 1.22+
  • Redis 6+

Установка

go get github.com/Rinsvent/smart-redis-queue

Быстрый старт

package main

import (
    "context"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
    "github.com/Rinsvent/smart-redis-queue"
)

func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    defer rdb.Close()

    ctx := context.Background()
    producer := redisqueue.NewProducer(rdb, "my-queue")
    consumer := redisqueue.NewConsumer(rdb, "my-queue", "")
    defer consumer.Close()

    // Публикуем задачу
    err := producer.Publish(ctx, &redisqueue.Task{
        ID:        "task-1",
        Payload:   []byte(`{"action": "send_email"}`),
        Scheduled: time.Now(),
    })
    if err != nil {
        log.Fatal(err)
    }

    // Обрабатываем
    consumer.Consume(ctx, func(task *redisqueue.Task) error {
        log.Printf("Обработано: %s, payload: %s", task.ID, string(task.Payload))
        return nil // nil = Ack, ошибка = Reject
    })
}

Примеры

Отложенные сообщения
// Задача станет доступна через 5 минут
producer.Publish(ctx, &redisqueue.Task{
    ID:        "delayed-task",
    Payload:   []byte("data"),
    Scheduled: time.Now().Add(5 * time.Minute),
})
Приоритеты
// Задачи с большим Priority обрабатываются первыми
producer.Publish(ctx,
    &redisqueue.Task{ID: "low", Partition: "p1", Priority: 1, Payload: []byte("low"), Scheduled: time.Now()},
    &redisqueue.Task{ID: "high", Partition: "p1", Priority: 10, Payload: []byte("high"), Scheduled: time.Now()},
)
// Порядок: high, low
Ordered-партиции (гарантия порядка)
// Партиция с префиксом "!" — только один консьюмер, порядок строго сохраняется
producer.Publish(ctx,
    &redisqueue.Task{ID: "1", Partition: "!user-123", Payload: []byte("a"), Scheduled: time.Now()},
    &redisqueue.Task{ID: "2", Partition: "!user-123", Payload: []byte("b"), Scheduled: time.Now()},
)
// Всегда обработаются по порядку: 1, 2
Батч-добавление
tasks := make([]*redisqueue.Task, 100)
for i := range tasks {
    tasks[i] = &redisqueue.Task{
        ID:        fmt.Sprintf("task-%d", i),
        Payload:   []byte(fmt.Sprintf("payload-%d", i)),
        Scheduled: time.Now(),
    }
}
err := producer.Publish(ctx, tasks...)
// Атомарно: либо все добавлены, либо ошибка (в т.ч. при дубликатах)
Reject с задержкой (rate limit)
consumer.Consume(ctx, func(task *redisqueue.Task) error {
    if rateLimited {
        // Партиция ! не будет браться 60 секунд
        return redisqueue.NewRejectWithDelay(errors.New("rate limit"), 60)
    }
    return nil
})
Пул консьюмеров
pool := redisqueue.NewConsumerPool(rdb, "my-queue")
pool.SetCount(5)
pool.SetPrefetchCount(10)
pool.SetPollInterval(500 * time.Millisecond)

pool.Consume(ctx, func(task *redisqueue.Task) error {
    return process(task)
})
Ручное управление (Get / Ack / Reject)
ch := consumer.GetChan(ctx)
for task := range ch {
    if err := handle(task); err != nil {
        consumer.Reject(ctx, task.ID, 0) // вернуть в очередь
    } else {
        consumer.Ack(ctx, task.ID)
    }
}

Конфигурация консьюмера

consumer.SetPollInterval(2 * time.Second) // интервал при пустой очереди
consumer.SetPrefetchCount(10)             // задач за один Get (по умолчанию 5)

Docker

docker compose up -d
# Redis на localhost:6379

Запуск примера

# Redis должен быть запущен (docker compose up -d)
go run ./examples/basic

CLI Manager

Утилита для обслуживания очередей:

make manager
./bin/manager --help
Команды
Команда Описание
info (alias: list, ls) Информация об очередях: партиции, pending, in-progress, консьюмеры
purge Очистить очередь (или только указанную партицию)
retry Вернуть in-progress задачи в очередь (для зависших после падения консьюмера)
Примеры
# Все очереди
./bin/manager info

# Конкретная очередь
./bin/manager info -q my-queue

# С фильтром по партиции
./bin/manager info -q my-queue -p "!user-123"

# Очистить очередь
./bin/manager purge -q my-queue

# Очистить только партицию
./bin/manager purge -q my-queue -p base

# Вернуть зависшие задачи
./bin/manager retry -q my-queue

Глобальные флаги: -a (addr), -P (password), --db. Переменные: REDIS_ADDR, REDIS_PASSWORD.

Тесты

# Требуется Redis на localhost:6379
make test

# Короткие тесты (без long-running)
make test-short

# Покрытие
make test-coverage

API

Тип Описание
Producer Публикация задач
Consumer Один консьюмер (Get/Ack/Reject, Consume, GetChan)
ConsumerPool Пул консьюмеров
Admin Обслуживание: Inspect, Purge, Retry
Task Задача: ID, Partition, Priority, Payload, Scheduled
RejectWithDelay Ошибка для отложенного reject (rate limit)

Ключи Redis

Очередь использует префикс queue:{queueName}::

  • queue:{name}:partitions — множество партиций
  • queue:{name}:partition:{code}:{priority} — ZSET задач по партиции и приоритету
  • queue:{name}:consumers — множество консьюмеров
  • queue:{name}:consumer:{id} — heartbeat консьюмера (TTL 120 сек)

Contributing

Приветствуются pull request'ы! Подробнее: CONTRIBUTING.md

Перед отправкой:

  1. Запустите make test
  2. Добавьте тесты для новой функциональности
  3. Соблюдайте стиль кода (gofmt)
Как внести вклад

Лицензия

MIT — см. LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Admin

type Admin struct {
	// contains filtered or unexported fields
}

Admin предоставляет операции для обслуживания очередей

func NewAdmin

func NewAdmin(redisClient *redis.Client) *Admin

NewAdmin создаёт админ для управления очередями

func (*Admin) Inspect

func (a *Admin) Inspect(ctx context.Context, queueFilter, partitionFilter string) ([]*QueueStats, error)

Inspect возвращает статистику по очередям с фильтрами

func (*Admin) ListQueues

func (a *Admin) ListQueues(ctx context.Context) ([]string, error)

ListQueues возвращает имена всех очередей (обнаруживает по ключам queue:*:partitions)

func (*Admin) Purge

func (a *Admin) Purge(ctx context.Context, queueName, partition string) (int, error)

Purge удаляет все задачи из очереди. Если partition != "", удаляет только указанную партицию.

func (*Admin) Retry

func (a *Admin) Retry(ctx context.Context, queueName string) (int, error)

Retry возвращает все in-progress задачи обратно в очередь (для "зависших" после падения консьюмера)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer получает и обрабатывает задачи из очереди

func NewConsumer

func NewConsumer(redisClient *redis.Client, queueName string, consumerID string) *Consumer

NewConsumer создает нового консьюмера и запускает ping горутину. consumerID — идентификатор консьюмера; если пустая строка — генерируется автоматически.

func (*Consumer) Ack

func (c *Consumer) Ack(ctx context.Context, taskID string) error

Ack подтверждает обработку задачи

func (*Consumer) Close

func (c *Consumer) Close()

Close останавливает ping горутину

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler func(*Task) error) error

Consume запускает вечный цикл: Get → обработка handler → Ack при успехе, Reject при ошибке. Гарантирует последовательную обработку задач в рамках одного консьюмера. Для ordered-партиций (!): при reject первой задачи остальные из этой партиции в текущем prefetch-батче также reject'ятся, чтобы сохранить порядок при возврате в очередь. При ошибке Ack или Reject консьюмер завершает работу и возвращает ошибку — остальные сообщения вернутся в очередь при чистке мёртвых консьюмеров, что важно для ordered-партиций: не брать следующую пачку тем же консьюмером и не нарушать порядок.

func (*Consumer) ConsumerID

func (c *Consumer) ConsumerID() string

ConsumerID возвращает идентификатор консьюмера

func (*Consumer) Get

func (c *Consumer) Get(ctx context.Context) ([]*Task, error)

Get получает до prefetchCount задач из очереди и возвращает их разом

func (*Consumer) GetChan

func (c *Consumer) GetChan(ctx context.Context) <-chan *Task

GetChan запускает цикл чтения очереди и возвращает канал с задачами. Останавливается при отмене контекста или вызове Close.

func (*Consumer) Reject

func (c *Consumer) Reject(ctx context.Context, taskID string, waitTime int) error

Reject отклоняет задачу и возвращает её обратно в очередь. waitTime — в секундах; для ordered-партиций (!) при waitTime > 0 ставится TTL-блок, партиция не берётся до истечения (кейс ratelimit: нет смысла гонять одно сообщение туда-обратно).

func (*Consumer) SetPollInterval

func (c *Consumer) SetPollInterval(d time.Duration)

SetPollInterval задает интервал ожидания при отсутствии задач в очереди

func (*Consumer) SetPrefetchCount

func (c *Consumer) SetPrefetchCount(n int)

SetPrefetchCount задает количество задач для предзагрузки (по умолчанию 5)

type ConsumerPool

type ConsumerPool struct {
	// contains filtered or unexported fields
}

ConsumerPool пул консьюмеров, обрабатывающих очередь параллельно

func NewConsumerPool

func NewConsumerPool(redisClient *redis.Client, queueName string) *ConsumerPool

NewConsumerPool создает пул консьюмеров

func (*ConsumerPool) Consume

func (p *ConsumerPool) Consume(ctx context.Context, handler func(*Task) error)

Consume запускает count консьюмеров, блокируется до отмены контекста. При отмене контекста все консьюмеры останавливаются, метод возвращает управление. Если консьюмер завершается из-за ошибки Ack/Reject, запускается новый на его место — число активных консьюмеров остаётся постоянным.

func (*ConsumerPool) SetCount

func (p *ConsumerPool) SetCount(n int)

SetCount задает количество консьюмеров в пуле

func (*ConsumerPool) SetPollInterval

func (p *ConsumerPool) SetPollInterval(d time.Duration)

SetPollInterval задает интервал ожидания при отсутствии задач

func (*ConsumerPool) SetPrefetchCount

func (p *ConsumerPool) SetPrefetchCount(n int)

SetPrefetchCount задает количество задач для предзагрузки

type PartitionStats

type PartitionStats struct {
	Partition  string
	Pending    int64
	InProgress int64
	Priorities []string
	Locked     bool
	Blocked    bool
}

PartitionStats статистика по партиции

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer отправляет задачи в очередь

func NewProducer

func NewProducer(redisClient *redis.Client, queueName string) *Producer

NewProducer создает нового продюсера

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, tasks ...*Task) error

Publish добавляет одну или несколько задач в очередь атомарно, сохраняя порядок

type QueueStats

type QueueStats struct {
	QueueName       string
	Partitions      []PartitionStats
	TotalPending    int64
	TotalInProgress int64
	Consumers       []string
}

QueueStats статистика по очереди

type RejectWithDelay

type RejectWithDelay struct {
	Err   error
	Delay int // секунды
}

RejectWithDelay — ошибка, при которой задача возвращается в очередь с задержкой. Delay задаётся в секундах; для ordered-партиций (!) при waitTime > 0 ставится TTL-блок. Используется для ratelimit: партиция не берётся до истечения задержки.

func NewRejectWithDelay

func NewRejectWithDelay(err error, delaySeconds int) *RejectWithDelay

NewRejectWithDelay создаёт ошибку с задержкой для Reject.

func (*RejectWithDelay) Error

func (e *RejectWithDelay) Error() string

func (*RejectWithDelay) Unwrap

func (e *RejectWithDelay) Unwrap() error

type Task

type Task struct {
	ID          string    `json:"id"`
	Partition   string    `json:"partition,omitempty"`
	Priority    int       `json:"priority,omitempty"`
	Payload     []byte    `json:"-"`
	Scheduled   time.Time `json:"scheduled"`
	RejectCount int       `json:"rejectCount,omitempty"` // кол-во reject для расчёта задержки
}

Task задача в очереди

Directories

Path Synopsis
cmd
manager command
examples
basic command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL