repeater

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 14 Imported by: 0

README

Compogo Repeater 🔁

Repeater — это планировщик периодических задач для Compogo, построенный поверх Runner. Поддерживает два режима работы (эксклюзивный и параллельный), имеет собственные middleware и полностью интегрируется с жизненным циклом Compogo.

🚀 Установка

go get github.com/Compogo/repeater
📦 Быстрый старт
package main

import (
    "context"
    "time"

    "github.com/Compogo/compogo"
    "github.com/Compogo/runner"
    "github.com/Compogo/repeater"
)

func main() {
    app := compogo.NewApp("myapp",
        compogo.WithOsSignalCloser(),
        runner.WithRunner(),
        repeater.WithRepeater(),
        compogo.WithComponents(
            myWorkerComponent,
        ),
    )

    if err := app.Serve(); err != nil {
        panic(err)
    }
}

var myWorkerComponent = &component.Component{
    Dependencies: component.Components{runner.Component, repeater.Component},
    Run: component.StepFunc(func(c container.Container) error {
        return c.Invoke(func(r repeater.Repeater) {
            // Задача, которая выполняется не чаще раза в 5 секунд
            task := repeater.NewTaskWithLock(
                "cleanup",
                runner.ProcessFunc(func(ctx context.Context) error {
                    return doCleanup()
                }),
                5 * time.Second,
            )
            
            r.AddTask(task)
        })
    }),
}
🎯 Две стратегии выполнения
Lock — эксклюзивный режим
// Только один экземпляр задачи может выполняться одновременно
task := repeater.NewTaskWithLock(
    "db-cleanup",
    cleanupProcess,
    1 * time.Hour,
)

Если задача ещё работает, следующий запуск пропускается.

Unlock — параллельный режим
// Можно запускать сколько угодно экземпляров
task := repeater.NewTaskWithUnlock(
    "queue-worker",
    workerProcess,
    10 * time.Second,
)

Каждый запуск получает уникальное имя: queue-worker_1, queue-worker_2 и т.д.

⚙️ Конфигурация

Repeater добавляет флаг --repeater.delay — как часто проверять, какие задачи пора запускать.

./myapp --repeater.delay=100ms

По умолчанию — 1/60 секунды (~16.6мс).

🧩 Опции задач
SkipFirstRun — пропустить первый запуск
task := repeater.NewTaskWithLock(
    "daily-report",
    reportProcess,
    24 * time.Hour,
    repeater.SkipFirstRun,  // первый запуск через 24 часа, а не сразу
)
🔌 Middleware

Repeater поддерживает свои middleware, которые работают только с периодическими задачами:

type LoggingMiddleware struct{}

func (m *LoggingMiddleware) Middleware(task *repeater.Task, next runner.Process) runner.Process {
    return runner.ProcessFunc(func(ctx context.Context) error {
        log.Printf("starting periodic task: %s", task.Name())
        err := next.Process(ctx)
        log.Printf("finished periodic task: %s, err=%v", task.Name(), err)
        return err
    })
}

// Использование
r.Use(&LoggingMiddleware{})
📊 Мониторинг

У каждой задачи есть счётчик выполненных запусков:

count := task.RunNumbers()  // сколько раз уже запускалась
🧹 Graceful shutdown

При остановке приложения Repeater:

  • Получает сигнал через Close()
  • Завершает основной цикл
  • Автоматически останавливает все запущенные экземпляры задач через Runner

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Component = &component.Component{
		Dependencies: component.Components{
			runner.Component,
			restore.ComponentNoRegistration,
			config.Component,
		},
		Init: component.StepFunc(func(container container.Container) error {
			return container.Provides(
				newRepeater,
				func(r *repeater) Repeater { return r },
			)
		}),
		Execute: component.StepFunc(func(container container.Container) error {
			return container.Invoke(func(r runner.Runner, restore *restore.Restore, repeater Repeater) error {
				return r.RunProcess(runner.NewTask(repeater.Name(), repeater.Process, restore))
			})
		}),
		Stop: component.StepFunc(func(container container.Container) error {
			return container.Invoke(func(repeater Repeater) error {
				return repeater.Close()
			})
		}),
	}

	ComponentNoRestore = &component.Component{
		Dependencies: component.Components{
			runner.Component,
			config.Component,
		},
		Init: component.StepFunc(func(container container.Container) error {
			return container.Provides(
				newRepeater,
				func(r *repeater) Repeater { return r },
			)
		}),
		Execute: component.StepFunc(func(container container.Container) error {
			return container.Invoke(func(r runner.Runner, repeater Repeater) error {
				return r.RunProcess(repeater)
			})
		}),
		Stop: component.StepFunc(func(container container.Container) error {
			return container.Invoke(func(repeater Repeater) error {
				return repeater.Close()
			})
		}),
	}
)

Functions

This section is empty.

Types

type Process added in v0.0.5

type Process interface {
	runner.Process
	Delay() time.Duration
	RunNumbers() uint64
}

type Repeater

type Repeater interface {
	runner.Process

	io.Closer
	AddProcess(Process) error
	AddProcesses(...Process) error
	StopProcess(Process) error
	StopProcessByName(string) error
	HasProcess(Process) bool
	HasProcessByName(string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(name string, delay time.Duration, processFunc runner.ProcessFunc, middlewares ...runner.Middleware) *Task

func (*Task) Close added in v0.0.5

func (task *Task) Close() error

func (*Task) Delay

func (task *Task) Delay() time.Duration

func (*Task) Name

func (task *Task) Name() string

func (*Task) Process added in v0.0.5

func (task *Task) Process(ctx context.Context) error

func (*Task) RunNumbers

func (task *Task) RunNumbers() uint64

Directories

Path Synopsis
infrastructure

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL