runner

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 10 Imported by: 2

README

Compogo Runner 🏃

Compogo Runner — это компонент для управления конкурентными задачами (воркерами, серверами, фоновыми процессами) с автоматическим graceful shutdown. Интегрируется с Compogo одной строкой и берёт на себя всю работу по запуску, мониторингу и остановке задач.

🚀 Установка

go get github.com/Compogo/runner

📦 Быстрый старт

// Пример компонента, использующего раннер
var myWorkerComponent = &component.Component{
    Dependencies: component.Components{runner.Component},
    Run: component.StepFunc(func(c container.Container) error {
        return c.Invoke(func(r runner.Runner) {
            // Создаём задачу из обычной функции
            task := runner.NewTask("worker", runner.ProcessFunc(func(ctx context.Context) error {
                ticker := time.NewTicker(1 * time.Second)
                defer ticker.Stop()
                
                for {
                    select {
                    case <-ctx.Done():
                        return nil  // graceful shutdown
                    case <-ticker.C:
                        doWork()
                    }
                }
            }))
            
            r.RunTask(task)  // ← раннер сам управляет жизнью задачи
        })
    }),
}

✨ Возможности

🎯 Простой интерфейс задачи

Любая задача должна реализовывать Process — всего один метод:

type Process interface {
    Process(ctx context.Context) error
}

А для функций есть адаптер ProcessFunc:

task := runner.NewTask("worker", runner.ProcessFunc(func(ctx context.Context) error {
    // ваша логика
    return nil
}))
🛡️ Безопасность из коробки
  • Panic-защита — если задача паникует, раннер ловит панику, логирует её и продолжает работу
  • Детект дубликатов — нельзя запустить одну и ту же задачу дважды, без предварительной остановки.
  • Потокобезопасность — все операции защищены мьютексами
🔌 Интеграция с жизненным циклом Compogo
  • Контекст задачи привязан к closer — при получении сигнала все задачи получают ctx.Done()
  • В Stop-фазе раннер автоматически останавливает все задачи и ждёт их завершения
📋 Управление задачами
r.RunTask(task)              // запустить одну задачу
r.RunTasks(task1, task2)     // запустить несколько
r.StopTask(task)             // остановить по ссылке
r.StopTaskByName("worker")    // остановить по имени
r.Close()                     // остановить всё и подождать

🎨 Примеры использования

HTTP-сервер как задача
task := runner.NewTask("http-server", runner.ProcessFunc(func(ctx context.Context) error {
    server := &http.Server{Addr: ":8080", Handler: router}
    
    go func() {
        <-ctx.Done()
        server.Shutdown(context.Background())
    }()
    
    return server.ListenAndServe()
}))
Пул воркеров
for i := 0; i < 10; i++ {
    workerID := i
    task := runner.NewTask(fmt.Sprintf("worker-%d", i), runner.ProcessFunc(func(ctx context.Context) error {
        for {
            select {
            case <-ctx.Done():
                return nil
            case job := <-jobQueue:
                processJob(job)
            }
        }
    }))
    r.RunTask(task)
}
Периодическая задача
task := runner.NewTask("ticker", runner.ProcessFunc(func(ctx context.Context) error {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return nil
        case <-ticker.C:
            cleanupOldData()
        }
    }
}))

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TaskAlreadyExistsError is returned when attempting to run a task that is
	// already registered and running.
	TaskAlreadyExistsError = errors.New("task already exists")

	// TaskUndefinedError is returned when attempting to stop a task that is not
	// registered or has already completed.
	TaskUndefinedError = errors.New("task is undefined")
)
View Source
var Component = &component.Component{
	Name: "runner",
	Init: component.StepFunc(func(container container.Container) error {
		return container.Provide(NewRunner)
	}),
	Stop: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(r Runner) error {
			return r.Close()
		})
	}),
}

Component is a ready-to-use Compogo component that provides the Runner. It automatically:

  • Registers NewRunner in the DI container
  • Calls Close() on the runner during the Stop phase

Usage:

compogo.WithComponents(runner.Component)

Functions

This section is empty.

Types

type Middleware

type Middleware interface {
	// Middleware wraps a task's process function.
	// It receives the task being executed and the next Process in the chain,
	// and returns a new Process that will be executed instead.
	//
	// The returned Process can:
	//   - Perform setup before calling next.Process(ctx)
	//   - Call next.Process(ctx) to continue the chain
	//   - Perform cleanup after next.Process(ctx) returns
	//   - Short-circuit the chain by returning early without calling next
	Middleware(task *Task, next Process) Process
}

Middleware defines the interface for task middleware. Middlewares can wrap a task's Process function to add cross-cutting concerns such as logging, metrics, tracing, panic recovery, or authentication.

Middlewares are applied in a chain, where each middleware receives the task and the next Process in the chain, and returns a new Process that may perform operations before and/or after calling the next one.

type MiddlewareFunc

type MiddlewareFunc func(task *Task, next Process) Process

MiddlewareFunc is a function adapter that allows ordinary functions to be used as Middleware implementations.

Example:

var loggingMiddleware = MiddlewareFunc(func(task *Task, next Process) Process {
    return ProcessFunc(func(ctx context.Context) error {
        log.Info("starting task", "name", task.Name())
        err := next.Process(ctx)
        log.Info("finished task", "name", task.Name(), "error", err)
        return err
    })
})

func (MiddlewareFunc) Middleware

func (m MiddlewareFunc) Middleware(task *Task, next Process) Process

Middleware implements the Middleware interface by calling the underlying function.

type Process

type Process interface {
	// Process executes the task logic. The provided context is canceled
	// when the task needs to stop (graceful shutdown). The method should
	// block until the task completes or the context is done.
	Process(ctx context.Context) error
}

Process defines the interface for any executable unit that can be run as a task. The Process method receives a context that is canceled when the task should stop (e.g., during application shutdown).

Example:

type Server struct {}

func (s *Server) Process(ctx context.Context) error {
    return s.httpServer.ListenAndServe()
}

type ProcessFunc

type ProcessFunc func(ctx context.Context) error

ProcessFunc is a function adapter that allows ordinary functions to be used as Process implementations.

Example:

task := runner.NewTask("worker", runner.ProcessFunc(func(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
            doWork()
        }
    }
}))

func (ProcessFunc) Process

func (p ProcessFunc) Process(ctx context.Context) error

Process implements the Process interface by calling the underlying function.

type Runner

type Runner interface {
	// Closer stops all running tasks and waits for their completion.
	// It implements io.Closer for integration with Compogo's lifecycle.
	io.Closer

	// RunTask starts a single task. Returns an error if the task is already running.
	RunTask(task *Task) error

	// RunTasks starts multiple tasks sequentially. If any task fails to start,
	// the error is returned immediately and subsequent tasks are not started.
	RunTasks(tasks ...*Task) error

	// StopTask stops a specific task by canceling its context.
	// The task is removed from the runner after it completes.
	StopTask(task *Task) error

	// StopTaskByName stops a task identified by its name.
	// Returns TaskUndefinedError if no task with the given name exists.
	StopTaskByName(name string) error

	// HasTaskByName checks if a task with the given name is currently running.
	// Returns true if the task exists, false otherwise.
	HasTaskByName(name string) bool

	// HasTask checks if the specific task instance is currently running.
	// This is useful for checking against a known task pointer.
	HasTask(task *Task) bool

	// Use registers one or more middlewares that will wrap all tasks
	// executed by this runner. Middlewares are applied in the order they are added,
	// with the last added middleware being the outermost wrapper.
	Use(middlewares ...Middleware)
}

Runner defines the interface for managing concurrent tasks. It provides methods to start, stop, and manage the lifecycle of multiple tasks.

func NewRunner

func NewRunner(closer closer.Closer, logger logger.Logger) Runner

NewRunner creates a new Runner instance. The provided closer is used to derive contexts for tasks, enabling graceful shutdown when the application stops. The logger is used for task lifecycle events and error reporting

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a named executable unit managed by the Runner. Each Task has its own lifecycle and can be started and stopped independently.

func NewTask

func NewTask(name string, process Process) *Task

NewTask creates a new Task with the given name and process. The name is used for logging and stopping the task by name.

func (*Task) Name

func (task *Task) Name() string

Name returns the task's identifier.

func (*Task) String

func (task *Task) String() string

String returns the task's name, implementing the fmt.Stringer interface.

Directories

Path Synopsis
middleware

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL