runner

package module
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2026 License: MIT Imports: 11 Imported by: 2

README

Compogo Runner 🏃

Compogo Runner — это компонент для управления конкурентными задачами (воркерами, серверами, фоновыми процессами) с автоматическим graceful shutdown. Интегрируется с Compogo одной строкой и берёт на себя всю работу по запуску, мониторингу и остановке задач.

🚀 Установка

go get github.com/Compogo/runner

📦 Быстрый старт

// Пример компонента, использующего раннер
var myWorkerComponent = &component.Component{
    Dependencies: component.Components{runner.Component},
    Run: component.StepFunc(func(c container.Container) error {
        return c.Invoke(func(r runner.Runner) {
            // Создаём задачу из обычной функции
            task := runner.NewTask("worker", runner.ProcessFunc(func(ctx context.Context) error {
                ticker := time.NewTicker(1 * time.Second)
                defer ticker.Stop()
                
                for {
                    select {
                    case <-ctx.Done():
                        return nil  // graceful shutdown
                    case <-ticker.C:
                        doWork()
                    }
                }
            }))
            
            r.RunTask(task)  // ← раннер сам управляет жизнью задачи
        })
    }),
}

✨ Возможности

🎯 Простой интерфейс задачи

Любая задача должна реализовывать Process — всего один метод:

type Process interface {
    Process(ctx context.Context) error
}

А для функций есть адаптер ProcessFunc:

task := runner.NewTask("worker", runner.ProcessFunc(func(ctx context.Context) error {
    // ваша логика
    return nil
}))
🛡️ Безопасность из коробки
  • Panic-защита — если задача паникует, раннер ловит панику, логирует её и продолжает работу
  • Детект дубликатов — нельзя запустить одну и ту же задачу дважды, без предварительной остановки.
  • Потокобезопасность — все операции защищены мьютексами
🔌 Интеграция с жизненным циклом Compogo
  • Контекст задачи привязан к closer — при получении сигнала все задачи получают ctx.Done()
  • В Stop-фазе раннер автоматически останавливает все задачи и ждёт их завершения
📋 Управление задачами
r.RunTask(task)              // запустить одну задачу
r.RunTasks(task1, task2)     // запустить несколько
r.StopTask(task)             // остановить по ссылке
r.StopTaskByName("worker")    // остановить по имени
r.Close()                     // остановить всё и подождать

🎨 Примеры использования

HTTP-сервер как задача
task := runner.NewTask("http-server", runner.ProcessFunc(func(ctx context.Context) error {
    server := &http.Server{Addr: ":8080", Handler: router}
    
    go func() {
        <-ctx.Done()
        server.Shutdown(context.Background())
    }()
    
    return server.ListenAndServe()
}))
Пул воркеров
for i := 0; i < 10; i++ {
    workerID := i
    task := runner.NewTask(fmt.Sprintf("worker-%d", i), runner.ProcessFunc(func(ctx context.Context) error {
        for {
            select {
            case <-ctx.Done():
                return nil
            case job := <-jobQueue:
                processJob(job)
            }
        }
    }))
    r.RunTask(task)
}
Периодическая задача
task := runner.NewTask("ticker", runner.ProcessFunc(func(ctx context.Context) error {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            cleanupOldData()
        }
    }
}))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TaskAlreadyExistsError is returned when attempting to run a task that is
	// already registered and running.
	TaskAlreadyExistsError = errors.New("task already exists")

	// TaskUndefinedError is returned when attempting to stop a task that is not
	// registered or has already completed.
	TaskUndefinedError = errors.New("task is undefined")
)
View Source
var Component = &component.Component{
	Name: "runner",
	Init: component.StepFunc(func(container container.Container) error {
		return container.Provides(
			newRunner,
			func(r *runner) Runner { return r },
		)
	}),
	Stop: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(d *runner) error {
			return d.Close()
		})
	}),
}

Functions

This section is empty.

Types

type Middleware

type Middleware interface {
	Middleware(Process, ProcessFunc) ProcessFunc
}

type MiddlewareFunc

type MiddlewareFunc func(Process, ProcessFunc) ProcessFunc

func (MiddlewareFunc) Middleware

func (m MiddlewareFunc) Middleware(p Process, next ProcessFunc) ProcessFunc

type Process

type Process interface {
	io.Closer
	Process(ctx context.Context) error
	Name() string
}

type ProcessFunc

type ProcessFunc func(ctx context.Context) error

type Runner

type Runner interface {
	io.Closer
	RunProcess(Process) error
	RunProcesses(...Process) error
	StopProcess(Process) error
	StopProcessByName(string) error
	HasProcess(Process) bool
	HasProcessByName(string) bool
	Use(middlewares ...Middleware)
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(name string, processFunc ProcessFunc, middlewares ...Middleware) *Task

func (*Task) Close added in v0.0.10

func (task *Task) Close() error

func (*Task) Name

func (task *Task) Name() string

func (*Task) Process added in v0.0.10

func (task *Task) Process(ctx context.Context) error

Directories

Path Synopsis
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL