runner

package module
v0.1.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: MIT Imports: 7 Imported by: 2

README

go-runner

test Go Reference Go Report Card

A small Go library that runs a set of jobs every minute. Each minute the runner asks every registered job whether it should run (IsRun(now)), and concurrently executes the ones that say yes — up to a configurable concurrency limit.

  • Push the current set of jobs to a channel; the runner queues batches internally and evaluates them on each minute tick.
  • Jobs implement JobInterface (IsRun, GetID, Run).
  • Concurrent execution within a tick, with a global concurrency cap.
  • Backpressure-aware batch queue, optional dedupe, panic recovery, graceful shutdown, runtime stats.

Install

go get github.com/Sotaneum/go-runner

Quick Start

package main

import (
    "errors"
    "log"
    "time"

    runner "github.com/Sotaneum/go-runner"
)

type Job struct{ id string }

func (j *Job) GetID() string          { return j.id }
func (j *Job) IsRun(t time.Time) bool { return true } // every minute
func (j *Job) Run() (any, error)      { return "done", nil }

func main() {
    runnerCh := make(chan []runner.JobInterface)
    r := runner.NewRunnerWithLimit(runnerCh, 10)
    defer r.StopAndWait()

    runnerCh <- []runner.JobInterface{&Job{id: "a"}, &Job{id: "b"}}

    for res := range r.ResultCh {
        for _, jr := range res.Items {
            switch {
            case errors.Is(jr.Err, runner.ErrSkippedDuplicate):
                log.Printf("%s skipped: duplicate in flight", jr.ID)
            case jr.Err != nil:
                log.Printf("%s failed: %v", jr.ID, jr.Err) // *runner.PanicError
            default:
                log.Printf("%s -> %v (%v)", jr.ID, jr.Value, jr.EndedAt.Sub(jr.StartedAt))
            }
        }
    }
}

How It Works

caller ──[runnerCh]──▶ ingest ──▶ batch FIFO ──▶ createQueue ──▶ start ──▶ runQueue
                                       ▲              │           │
                              minute tick (timeChecker)            └─▶ goroutine per job (capped by limit) ──▶ ResultCh
  • runnerCh accepts a batch (full job list) at a time. Each push is queued into the internal FIFO; the caller does not need to wait for the previous batch to finish.
  • Every minute, createQueue pops one batch, evaluates IsRun(now) on each job, and forwards the surviving jobs to start.
  • start hands the queue off to a background goroutine and immediately listens for the next queue, so a slow batch never blocks the next tick.
  • Each job runs in its own goroutine, gated by a global concurrency semaphore (limit).
  • Results for a queue are delivered as a single Result on ResultCh once all jobs in that queue finish.

Concurrency

  • Jobs in a single queue execute concurrently.
  • The concurrency limit is global across all in-flight queues (not per-queue).
  • When the limit is reached, additional jobs block until a slot frees up.
  • Default limit: 50. Set explicitly via NewRunnerWithLimit.

Because queues can overlap, the same GetID() appearing in two queues will, by default, run twice concurrently. Either make Run() safe for that, ensure uniqueness across queues, or enable WithDedupe().

Options

NewRunnerWithLimit accepts variadic Option values; existing callers without options remain source-compatible.

r := runner.NewRunnerWithLimit(runnerCh, 10,
    runner.WithDedupe(),
    runner.WithBatchBuffer(64),
)
WithDedupe()

Prevents the same GetID() from running concurrently. If a job with the same ID is already in flight (within the same queue or across overlapping queues), the duplicate is recorded as JobResult{ID, Err: ErrSkippedDuplicate} and Run() is not invoked. The skip is also reflected in Stats().DedupeSkipsTotal.

WithBatchBuffer(n)

Sets the size of the internal FIFO queue that holds batches received from runnerCh. Default 64, minimum 1.

  • Each minute tick consumes one batch.
  • If the caller pushes batches faster than they are consumed, the queue fills and subsequent runnerCh sends block, providing natural backpressure. The caller does not need to manually pace itself.
  • If IsRun(now) returns false at the time the batch is finally dequeued (e.g. it lagged behind), that job is dropped from the queue.

Submitting Batches

Two equivalent ways:

// 1) caller-owned channel
runnerCh := make(chan []runner.JobInterface)
r := runner.NewRunnerWithLimit(runnerCh, 10)
runnerCh <- batch

// 2) library-managed channel
r := runner.New(10)
ok := r.Push(batch) // false if Stop has been called

For synchronous, immediate execution of a single batch (no tick wait) and per-batch correlation:

reply := r.PushAndAwait(batch)
res := <-reply // closed after delivery

Result and Errors

type Result struct {
    StartedAt time.Time
    EndedAt   time.Time
    Items     []JobResult
}

type JobResult struct {
    ID        string
    Value     any       // Run() return value, valid only when Err == nil
    Err       error     // *PanicError, ErrSkippedDuplicate, or nil
    StartedAt time.Time
    EndedAt   time.Time
}

type PanicError struct {
    ID        string
    Recovered any
    Stack     []byte
}

Result provides convenience helpers:

res.ByID()          // map[string]JobResult — O(1) lookup by ID
res.Errors()        // map[string]error — only entries with Err != nil
res.PanicErrors()   // map[string]*PanicError — panics only

Distinguish error sources with errors.As / errors.Is:

var pe *runner.PanicError
switch {
case errors.As(jr.Err, &pe):
    log.Printf("%s panicked: %v\n%s", pe.ID, pe.Recovered, pe.Stack)
case errors.Is(jr.Err, runner.ErrSkippedDuplicate):
    // skipped by WithDedupe
case jr.Err != nil:
    // (none currently produced, reserved for future)
}

Lifecycle

r := runner.NewRunner(runnerCh)
defer r.StopAndWait() // graceful: stop accepting new work and wait for in-flight queues
Method Behavior
Stop() Closes lifecycle goroutines (start, createQueue, ingest, timeChecker). In-flight Run() calls continue. Idempotent.
Wait() Blocks until all in-flight runQueue goroutines complete.
StopAndWait() Stop() followed by Wait().

In-flight Run() invocations are not cancelled — JobInterface does not currently expose a context.Context.

Channel ownership
  • The caller owns runnerCh. Closing it stops the ingest goroutine but does not stop the runner — call Stop() for that.
  • The library owns ResultCh. It is buffered (size 8) and is closed by Wait() (or StopAndWait) once all in-flight queues have finished. After close, range r.ResultCh exits naturally.
  • If the caller falls behind, the oldest result is dropped and Stats().DroppedResultsTotal is incremented.

Stats

s := r.Stats()
// s.InFlightJobs        — currently executing jobs (semaphore depth)
// s.BatchQueueDepth     — batches waiting to be consumed by createQueue
// s.BatchQueueCapacity  — configured via WithBatchBuffer
// s.DedupeSkipsTotal    — cumulative duplicate skips
// s.DroppedResultsTotal — cumulative ResultCh overflow drops

API

Function / Method Description
NewRunner(runnerCh) Create a runner with the default concurrency limit (50).
NewRunnerWithLimit(runnerCh, limit, opts...) Create with custom limit and options. Panics if runnerCh is nil. limit <= 0 normalizes to default.
New(limit, opts...) Create a runner with an internally-managed channel. Use Push / PushAndAwait to submit batches.
(*Runner).Push(batch) Send a batch. Returns false if the runner has been stopped.
(*Runner).PushAndAwait(batch) Run a batch immediately (no tick wait), evaluating IsRun(now) once. Returns a per-batch result channel.
(*Runner).Stop() Stop lifecycle goroutines, wait for them to exit. Idempotent.
(*Runner).Wait() Block until in-flight queues complete, then close ResultCh.
(*Runner).StopAndWait() Stop + Wait. After this returns no goroutine from this runner remains.
(*Runner).Stats() Snapshot of operational counters.
(*Runner).InFlightIDs() Currently executing job IDs (only meaningful with WithDedupe).
(*Runner).ResultCh Buffered receive channel of Result.
WithDedupe() Skip duplicate in-flight IDs.
WithBatchBuffer(n) Set internal batch FIFO size.
WithOnPanic(fn) Callback invoked on Run() panics (synchronous).
WithOnSkip(fn) Callback invoked when a job is skipped by dedupe (synchronous).
ErrSkippedDuplicate Sentinel error for dedupe skips.
PanicError Wraps a recovered Run() panic with Recovered and Stack.
JobInterface
type JobInterface interface {
    IsRun(t time.Time) bool // called once per tick at evaluation time
    GetID() string          // stable identifier; result map key (and dedupe key if enabled)
    Run() (any, error)      // executed when IsRun returned true.
                            // value goes to JobResult.Value, error to JobResult.Err.
                            // a panic also lands in Err as *PanicError.
}

Examples

Runnable programs in examples/:

Testing

go test -race ./...               # unit + leak (via go.uber.org/goleak)
go test -bench=. -benchmem ./...  # benchmarks
staticcheck ./...                 # style/lint

License

MIT — Copyright (c) 2021 Sotaneum

Documentation

Overview

Package runner schedules and concurrently executes a set of jobs every minute.

Each minute the runner asks every registered job whether it should run via JobInterface.IsRun(now), and concurrently executes the ones that say yes — up to a configurable global concurrency limit.

The caller pushes the current set of jobs to a channel; batches are queued internally (FIFO with optional backpressure) and evaluated on each tick. A job's panic does not bring down the runner — it is recovered and reported as *PanicError on JobResult.Err.

See the README and the Example_* functions for usage patterns.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrShuttingDown = errors.New("runner: shutting down before job started")

ErrShuttingDown is reported in JobResult.Err for jobs that were not started because Stop was called while they were waiting for a concurrency slot.

View Source
var ErrSkippedDuplicate = errors.New("runner: skipped duplicate in-flight job")

ErrSkippedDuplicate is reported in JobResult.Err when WithDedupe is enabled and a job's GetID() is already in flight.

Functions

This section is empty.

Types

type JobInterface

type JobInterface interface {
	IsRun(t time.Time) bool
	GetID() string
	Run() (any, error)
}

JobInterface is the contract a runnable job must satisfy.

Run returns the value and an optional error. The value lands in JobResult.Value, the error in JobResult.Err. A panic from Run is recovered and reported as *PanicError on Err, overriding any return value.

type JobResult

type JobResult struct {
	ID        string
	Value     any
	Err       error
	StartedAt time.Time
	EndedAt   time.Time
}

JobResult is the outcome of a single job execution. When Err is non-nil, Value is unspecified.

type Option

type Option func(*Runner)

Option configures a Runner at construction time.

func WithBatchBuffer

func WithBatchBuffer(n int) Option

WithBatchBuffer sets the size of the internal batch FIFO. n < 1 is normalized to 1. Default is 64.

When the buffer is full, sends to runnerCh (or Push) block, providing natural backpressure.

func WithDedupe

func WithDedupe() Option

WithDedupe prevents the same GetID() from running concurrently. A duplicate is reported as JobResult{Err: ErrSkippedDuplicate} and the dedupe skip counter (Stats().DedupeSkipsTotal) is incremented.

func WithOnPanic

func WithOnPanic(fn func(id string, recovered any, stack []byte)) Option

WithOnPanic registers a callback invoked when Run() panics. The callback runs synchronously in the job goroutine, so it should be cheap.

func WithOnSkip

func WithOnSkip(fn func(id string)) Option

WithOnSkip registers a callback invoked when a job is skipped by WithDedupe. The callback runs synchronously, so it should be cheap.

type PanicError

type PanicError struct {
	ID        string
	Recovered any
	Stack     []byte
}

PanicError wraps a recovered panic from Run(). It carries the original recovered value and the captured stack trace.

Example

ExamplePanicError shows how to distinguish panics from other errors.

package main

import (
	"errors"
	"fmt"

	runner "github.com/Sotaneum/go-runner"
)

func main() {
	var err error = &runner.PanicError{
		ID:        "demo",
		Recovered: "something went wrong",
	}
	var pe *runner.PanicError
	if errors.As(err, &pe) {
		fmt.Printf("job %s panicked: %v\n", pe.ID, pe.Recovered)
	}
}
Output:
job demo panicked: something went wrong

func (*PanicError) Error

func (e *PanicError) Error() string

func (*PanicError) Unwrap

func (e *PanicError) Unwrap() error

Unwrap exposes the underlying error if Run() panicked with one (e.g. panic(io.EOF)), enabling errors.Is / errors.As against the recovered value.

type Result

type Result struct {
	StartedAt time.Time
	EndedAt   time.Time
	Items     []JobResult
}

Result is the aggregated outcome of one queue (one tick).

func (Result) ByID

func (r Result) ByID() map[string]JobResult

ByID indexes Items by JobResult.ID for O(1) lookup.

func (Result) Errors

func (r Result) Errors() map[string]error

Errors returns all non-nil errors keyed by job ID.

func (Result) PanicErrors

func (r Result) PanicErrors() map[string]*PanicError

PanicErrors returns only the *PanicError values keyed by job ID.

type Runner

type Runner struct {
	ResultCh chan Result
	// contains filtered or unexported fields
}

Runner schedules and concurrently executes JobInterface instances every minute. Construct via NewRunner / NewRunnerWithLimit / New.

func New

func New(limit int, opts ...Option) *Runner

New creates a Runner with an internally-managed batch channel. Use Push or PushAndAwait to submit batches.

func NewRunner

func NewRunner(runnerCh chan []JobInterface) *Runner

NewRunner creates a Runner with the default concurrency limit (50). runnerCh must not be nil.

func NewRunnerWithLimit

func NewRunnerWithLimit(runnerCh chan []JobInterface, limit int, opts ...Option) *Runner

NewRunnerWithLimit creates a Runner with a custom concurrency limit and optional configuration. runnerCh must not be nil; limit <= 0 is normalized to the default. Panics if runnerCh is nil.

Example

ExampleNewRunnerWithLimit demonstrates the option-based constructor.

package main

import (
	runner "github.com/Sotaneum/go-runner"
)

func main() {
	runnerCh := make(chan []runner.JobInterface)
	r := runner.NewRunnerWithLimit(runnerCh, 10,
		runner.WithDedupe(),
		runner.WithBatchBuffer(32),
	)
	defer r.StopAndWait()
	_ = r
}

func (*Runner) InFlightIDs

func (r *Runner) InFlightIDs() []string

InFlightIDs returns the IDs of jobs currently executing. Only meaningful when WithDedupe is enabled; otherwise returns nil.

func (*Runner) Push

func (r *Runner) Push(batch []JobInterface) bool

Push sends a batch to the runner without requiring the caller to manage the runnerCh channel directly. Returns true if the batch was accepted, false if Stop has been called (in which case the batch is discarded). Blocks if the internal batch FIFO is full (backpressure) until either a slot frees up or Stop is called.

func (*Runner) PushAndAwait

func (r *Runner) PushAndAwait(batch []JobInterface) <-chan Result

PushAndAwait runs a batch immediately (without waiting for the next minute tick), evaluating IsRun(now) once for filtering, and returns a 1-buffered channel that receives the Result. The same Result is also delivered on ResultCh. The reply channel is closed after delivery.

Use this when the caller wants synchronous execution of a specific batch and needs to correlate its outcome — distinct from Push, which queues the batch for the next tick.

If Stop has been called, the returned channel is closed without a result.

func (*Runner) Stats

func (r *Runner) Stats() Stats

Stats returns a snapshot of operational counters.

func (*Runner) Stop

func (r *Runner) Stop()

Stop terminates the runner's lifecycle goroutines (start, createQueue, ingest, timeChecker) and waits for them to exit. Already-started runQueue and Run() goroutines continue to completion. Idempotent.

func (*Runner) StopAndWait

func (r *Runner) StopAndWait()

StopAndWait is the canonical graceful-shutdown call: Stop followed by Wait. After it returns, no goroutine spawned by this runner remains.

func (*Runner) Wait

func (r *Runner) Wait()

Wait blocks until in-flight runQueues complete and then closes ResultCh, allowing `range r.ResultCh` to terminate naturally.

type Stats

type Stats struct {
	InFlightJobs        int    // 실행 중인 Job 수 (전역 세마포어 사용량)
	BatchQueueDepth     int    // 내부 batch FIFO에 대기 중인 batch 수
	BatchQueueCapacity  int    // 내부 batch FIFO 용량
	DedupeSkipsTotal    uint64 // dedupe로 스킵된 누적 횟수
	DroppedResultsTotal uint64 // ResultCh 버퍼 초과로 드롭된 누적 결과 수
}

Stats is a point-in-time snapshot of runner state.

Directories

Path Synopsis
examples
basic command
Basic usage: register two jobs, push a batch, print one tick worth of results.
Basic usage: register two jobs, push a batch, print one tick worth of results.
dedupe command
Dedupe: same ID across overlapping queues runs at most once concurrently.
Dedupe: same ID across overlapping queues runs at most once concurrently.
with-stats command
Stats 모니터링 패턴: PushAndAwait으로 즉시 실행하면서 실시간 운영 지표를 출력한다.
Stats 모니터링 패턴: PushAndAwait으로 즉시 실행하면서 실시간 운영 지표를 출력한다.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL