repeater

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2026 License: MIT Imports: 14 Imported by: 0

README

Compogo Repeater 🔁

Repeater — это планировщик периодических задач для Compogo, построенный поверх Runner. Поддерживает два режима работы (эксклюзивный и параллельный), имеет собственные middleware и полностью интегрируется с жизненным циклом Compogo.

🚀 Установка

go get github.com/Compogo/repeater
📦 Быстрый старт
package main

import (
    "context"
    "time"

    "github.com/Compogo/compogo"
    "github.com/Compogo/runner"
    "github.com/Compogo/repeater"
)

func main() {
    app := compogo.NewApp("myapp",
        compogo.WithOsSignalCloser(),
        runner.WithRunner(),
        repeater.WithRepeater(),
        compogo.WithComponents(
            myWorkerComponent,
        ),
    )

    if err := app.Serve(); err != nil {
        panic(err)
    }
}

var myWorkerComponent = &component.Component{
    Dependencies: component.Components{runner.Component, repeater.Component},
    Run: component.StepFunc(func(c container.Container) error {
        return c.Invoke(func(r repeater.Repeater) {
            // Задача, которая выполняется не чаще раза в 5 секунд
            task := repeater.NewTaskWithLock(
                "cleanup",
                runner.ProcessFunc(func(ctx context.Context) error {
                    return doCleanup()
                }),
                5 * time.Second,
            )
            
            r.AddTask(task)
        })
    }),
}
🎯 Две стратегии выполнения
Lock — эксклюзивный режим
// Только один экземпляр задачи может выполняться одновременно
task := repeater.NewTaskWithLock(
    "db-cleanup",
    cleanupProcess,
    1 * time.Hour,
)

Если задача ещё работает, следующий запуск пропускается.

Unlock — параллельный режим
// Можно запускать сколько угодно экземпляров
task := repeater.NewTaskWithUnlock(
    "queue-worker",
    workerProcess,
    10 * time.Second,
)

Каждый запуск получает уникальное имя: queue-worker_1, queue-worker_2 и т.д.

⚙️ Конфигурация

Repeater добавляет флаг --repeater.delay — как часто проверять, какие задачи пора запускать.

./myapp --repeater.delay=100ms

По умолчанию — 1/60 секунды (~16.6мс).

🧩 Опции задач
SkipFirstRun — пропустить первый запуск
task := repeater.NewTaskWithLock(
    "daily-report",
    reportProcess,
    24 * time.Hour,
    repeater.SkipFirstRun,  // первый запуск через 24 часа, а не сразу
)
🔌 Middleware

Repeater поддерживает свои middleware, которые работают только с периодическими задачами:

type LoggingMiddleware struct{}

func (m *LoggingMiddleware) Middleware(task *repeater.Task, next runner.Process) runner.Process {
    return runner.ProcessFunc(func(ctx context.Context) error {
        log.Printf("starting periodic task: %s", task.Name())
        err := next.Process(ctx)
        log.Printf("finished periodic task: %s, err=%v", task.Name(), err)
        return err
    })
}

// Использование
r.Use(&LoggingMiddleware{})
📊 Мониторинг

У каждой задачи есть счётчик выполненных запусков:

count := task.RunNumbers()  // сколько раз уже запускалась
🧹 Graceful shutdown

При остановке приложения Repeater:

  • Получает сигнал через Close()
  • Завершает основной цикл
  • Автоматически останавливает все запущенные экземпляры задач через Runner

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Component = &component.Component{
	Dependencies: component.Components{
		runner.Component,
		restore.ComponentNoRegistration,
		config.Component,
	},
	Init: component.StepFunc(func(container container.Container) error {
		return container.Provides(
			newRepeater,
			func(r *repeater) Repeater { return r },
		)
	}),
	Execute: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(r runner.Runner, repeater Repeater, logger logger.Logger) error {
			return r.RunProcess(runner.NewTask(repeater.Name(), repeater.Process, restore.NewRestore(&restore.Config{Delay: time.Microsecond}, r, logger)))
		})
	}),
	Stop: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(repeater Repeater) error {
			return repeater.Close()
		})
	}),
}

Functions

This section is empty.

Types

type Process added in v0.0.5

type Process interface {
	runner.Process
	Delay() time.Duration
	RunNumbers() uint64
}

type Repeater

type Repeater interface {
	runner.Process

	io.Closer
	AddProcess(Process) error
	AddProcesses(...Process) error
	StopProcess(Process) error
	StopProcessByName(string) error
	HasProcess(Process) bool
	HasProcessByName(string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(name string, delay time.Duration, processFunc runner.ProcessFunc, middlewares ...runner.Middleware) *Task

func (*Task) Close added in v0.0.5

func (task *Task) Close() error

func (*Task) Delay

func (task *Task) Delay() time.Duration

func (*Task) Name

func (task *Task) Name() string

func (*Task) Process added in v0.0.5

func (task *Task) Process(ctx context.Context) error

func (*Task) RunNumbers

func (task *Task) RunNumbers() uint64

type UnlockTask added in v0.0.6

type UnlockTask struct {
	*Task
}

func NewUnlockTask added in v0.0.6

func NewUnlockTask(name string, delay time.Duration, processFunc runner.ProcessFunc, middlewares ...runner.Middleware) *UnlockTask

func (*UnlockTask) Name added in v0.0.6

func (task *UnlockTask) Name() string

Directories

Path Synopsis
infrastructure

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL