Documentation
¶
Overview ¶
Package runner schedules and concurrently executes a set of jobs every minute.
Each minute the runner asks every registered job whether it should run via JobInterface.IsRun(now), and concurrently executes the ones that say yes — up to a configurable global concurrency limit.
The caller pushes the current set of jobs to a channel; batches are queued internally (FIFO with optional backpressure) and evaluated on each tick. A job's panic does not bring down the runner — it is recovered and reported as *PanicError on JobResult.Err.
See the README and the Example_* functions for usage patterns.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrShuttingDown = errors.New("runner: shutting down before job started")
ErrShuttingDown is reported in JobResult.Err for jobs that were not started because Stop was called while they were waiting for a concurrency slot.
var ErrSkippedDuplicate = errors.New("runner: skipped duplicate in-flight job")
ErrSkippedDuplicate is reported in JobResult.Err when WithDedupe is enabled and a job's GetID() is already in flight.
Functions ¶
This section is empty.
Types ¶
type JobInterface ¶
JobInterface is the contract a runnable job must satisfy.
Run returns the value and an optional error. The value lands in JobResult.Value, the error in JobResult.Err. A panic from Run is recovered and reported as *PanicError on Err, overriding any return value.
type JobResult ¶
JobResult is the outcome of a single job execution. When Err is non-nil, Value is unspecified.
type Option ¶
type Option func(*Runner)
Option configures a Runner at construction time.
func WithBatchBuffer ¶
WithBatchBuffer sets the size of the internal batch FIFO. n < 1 is normalized to 1. Default is 64.
When the buffer is full, sends to runnerCh (or Push) block, providing natural backpressure.
func WithDedupe ¶
func WithDedupe() Option
WithDedupe prevents the same GetID() from running concurrently. A duplicate is reported as JobResult{Err: ErrSkippedDuplicate} and the dedupe skip counter (Stats().DedupeSkipsTotal) is incremented.
func WithOnPanic ¶
WithOnPanic registers a callback invoked when Run() panics. The callback runs synchronously in the job goroutine, so it should be cheap.
func WithOnSkip ¶
WithOnSkip registers a callback invoked when a job is skipped by WithDedupe. The callback runs synchronously, so it should be cheap.
type PanicError ¶
PanicError wraps a recovered panic from Run(). It carries the original recovered value and the captured stack trace.
Example ¶
ExamplePanicError shows how to distinguish panics from other errors.
package main
import (
"errors"
"fmt"
runner "github.com/Sotaneum/go-runner"
)
func main() {
var err error = &runner.PanicError{
ID: "demo",
Recovered: "something went wrong",
}
var pe *runner.PanicError
if errors.As(err, &pe) {
fmt.Printf("job %s panicked: %v\n", pe.ID, pe.Recovered)
}
}
Output: job demo panicked: something went wrong
func (*PanicError) Error ¶
func (e *PanicError) Error() string
func (*PanicError) Unwrap ¶
func (e *PanicError) Unwrap() error
Unwrap exposes the underlying error if Run() panicked with one (e.g. panic(io.EOF)), enabling errors.Is / errors.As against the recovered value.
type Result ¶
Result is the aggregated outcome of one queue (one tick).
func (Result) PanicErrors ¶
func (r Result) PanicErrors() map[string]*PanicError
PanicErrors returns only the *PanicError values keyed by job ID.
type Runner ¶
type Runner struct {
ResultCh chan Result
// contains filtered or unexported fields
}
Runner schedules and concurrently executes JobInterface instances every minute. Construct via NewRunner / NewRunnerWithLimit / New.
func New ¶
New creates a Runner with an internally-managed batch channel. Use Push or PushAndAwait to submit batches.
func NewRunner ¶
func NewRunner(runnerCh chan []JobInterface) *Runner
NewRunner creates a Runner with the default concurrency limit (50). runnerCh must not be nil.
func NewRunnerWithLimit ¶
func NewRunnerWithLimit(runnerCh chan []JobInterface, limit int, opts ...Option) *Runner
NewRunnerWithLimit creates a Runner with a custom concurrency limit and optional configuration. runnerCh must not be nil; limit <= 0 is normalized to the default. Panics if runnerCh is nil.
Example ¶
ExampleNewRunnerWithLimit demonstrates the option-based constructor.
package main
import (
runner "github.com/Sotaneum/go-runner"
)
func main() {
runnerCh := make(chan []runner.JobInterface)
r := runner.NewRunnerWithLimit(runnerCh, 10,
runner.WithDedupe(),
runner.WithBatchBuffer(32),
)
defer r.StopAndWait()
_ = r
}
Output:
func (*Runner) InFlightIDs ¶
InFlightIDs returns the IDs of jobs currently executing. Only meaningful when WithDedupe is enabled; otherwise returns nil.
func (*Runner) Push ¶
func (r *Runner) Push(batch []JobInterface) bool
Push sends a batch to the runner without requiring the caller to manage the runnerCh channel directly. Returns true if the batch was accepted, false if Stop has been called (in which case the batch is discarded). Blocks if the internal batch FIFO is full (backpressure) until either a slot frees up or Stop is called.
func (*Runner) PushAndAwait ¶
func (r *Runner) PushAndAwait(batch []JobInterface) <-chan Result
PushAndAwait runs a batch immediately (without waiting for the next minute tick), evaluating IsRun(now) once for filtering, and returns a 1-buffered channel that receives the Result. The same Result is also delivered on ResultCh. The reply channel is closed after delivery.
Use this when the caller wants synchronous execution of a specific batch and needs to correlate its outcome — distinct from Push, which queues the batch for the next tick.
If Stop has been called, the returned channel is closed without a result.
func (*Runner) Stop ¶
func (r *Runner) Stop()
Stop terminates the runner's lifecycle goroutines (start, createQueue, ingest, timeChecker) and waits for them to exit. Already-started runQueue and Run() goroutines continue to completion. Idempotent.
func (*Runner) StopAndWait ¶
func (r *Runner) StopAndWait()
StopAndWait is the canonical graceful-shutdown call: Stop followed by Wait. After it returns, no goroutine spawned by this runner remains.
type Stats ¶
type Stats struct {
InFlightJobs int // 실행 중인 Job 수 (전역 세마포어 사용량)
BatchQueueDepth int // 내부 batch FIFO에 대기 중인 batch 수
BatchQueueCapacity int // 내부 batch FIFO 용량
DedupeSkipsTotal uint64 // dedupe로 스킵된 누적 횟수
DroppedResultsTotal uint64 // ResultCh 버퍼 초과로 드롭된 누적 결과 수
}
Stats is a point-in-time snapshot of runner state.
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
basic
command
Basic usage: register two jobs, push a batch, print one tick worth of results.
|
Basic usage: register two jobs, push a batch, print one tick worth of results. |
|
dedupe
command
Dedupe: same ID across overlapping queues runs at most once concurrently.
|
Dedupe: same ID across overlapping queues runs at most once concurrently. |
|
with-stats
command
Stats 모니터링 패턴: PushAndAwait으로 즉시 실행하면서 실시간 운영 지표를 출력한다.
|
Stats 모니터링 패턴: PushAndAwait으로 즉시 실행하면서 실시간 운영 지표를 출력한다. |