Documentation
¶
Overview ¶
Example ¶
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/pior/runnable"
)
// exampleLogger returns a text logger writing to stdout without timestamps,
// suitable for deterministic testable examples.
func exampleLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey {
return slog.Attr{}
}
return a
},
}))
}
// JobQueue is a long-running service that processes background jobs.
type JobQueue struct{}
func (q *JobQueue) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}
func (q *JobQueue) Enqueue(job string) {
fmt.Println("JobQueue: " + job)
}
// CleanupTask enqueues a cleanup job on each execution.
type CleanupTask struct {
jobs *JobQueue
runs int
done chan struct{}
}
func (t *CleanupTask) Run(_ context.Context) error {
t.runs++
t.jobs.Enqueue(fmt.Sprintf("cleanup-%d", t.runs))
if t.runs >= 3 {
close(t.done)
}
return nil
}
func main() {
runnable.SetLogger(exampleLogger())
jobs := &JobQueue{}
done := make(chan struct{})
cleanup := &CleanupTask{jobs: jobs, done: done}
m := runnable.Manager()
m.RegisterService(jobs)
m.Register(runnable.Schedule(cleanup, runnable.Every(500*time.Millisecond)))
m.Register(runnable.Func(func(_ context.Context) error {
<-done
return nil
}).Name("app"))
runnable.Run(m)
}
Output: level=INFO msg="manager/JobQueue: started" level=INFO msg="manager/schedule/CleanupTask: started" level=INFO msg="manager/app: started" JobQueue: cleanup-1 JobQueue: cleanup-2 JobQueue: cleanup-3 level=INFO msg="manager/app: stopped" level=INFO msg="manager: starting shutdown" reason="app died" level=INFO msg="manager/schedule/CleanupTask: stopped" level=INFO msg="manager/JobQueue: stopped" level=INFO msg="manager: shutdown complete"
Index ¶
- func Func(fn RunnableFunc) *funcRunnable
- func HTTPServer(server *http.Server) *httpServer
- func Manager() *manager
- func Restart(runnable Runnable) *restart
- func Run(runner Runnable)
- func RunFunc(fn RunnableFunc)
- func RunGroup(runners ...Runnable)
- func Schedule(runnable Runnable, specs ...ScheduleSpec) *schedule
- func SetLogger(l *slog.Logger)
- type ManagerRegistry
- type PanicError
- type Runnable
- type RunnableError
- type RunnableFunc
- type ScheduleSpec
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Func ¶ added in v0.5.0
func Func(fn RunnableFunc) *funcRunnable
Func returns a Runnable from a function. The name is derived from the function using reflection.
func HTTPServer ¶ added in v0.3.0
HTTPServer returns a runnable that runs a *http.Server.
On context cancellation, it calls http.Server.Shutdown to gracefully drain in-flight requests before returning. The shutdown timeout defaults to 30 seconds and can be configured with [httpServer.ShutdownTimeout].
Example ¶
ctx, cancel := initializeForExample()
defer cancel()
server := &http.Server{
Addr: "127.0.0.1:8080",
Handler: http.NotFoundHandler(),
}
r := HTTPServer(server)
_ = r.Run(ctx)
Output: level=INFO msg="httpserver: listening" addr=127.0.0.1:8080 level=INFO msg="httpserver: shutting down" level=INFO msg="httpserver: stopped"
Example (Error) ¶
ctx, cancel := initializeForExample()
defer cancel()
server := &http.Server{
Addr: "INVALID",
Handler: http.NotFoundHandler(),
}
r := HTTPServer(server)
_ = r.Run(ctx)
Output: level=INFO msg="httpserver: listening" addr=INVALID level=INFO msg="httpserver: stopped with error" error="listen tcp: address INVALID: missing port in address"
func Manager ¶ added in v0.4.0
func Manager() *manager
Manager returns a new manager that coordinates the lifecycle of multiple runnables.
Runnables are organized in two tiers: processes (foreground work) and services (infrastructure like databases or queues). Shutdown is triggered when the context is cancelled or any runnable completes. During shutdown, processes are cancelled first, then services, ensuring services remain available while processes drain.
Each runnable is wrapped with Recover to catch panics. Errors from runnables are collected, except context.Canceled which is ignored. A manager is itself a Runnable, so managers can be nested for independent shutdown ordering.
Registering the same runnable twice, or as both a process and a service, panics.
func Restart ¶
func Restart(runnable Runnable) *restart
Restart returns a runnable that keeps running the given runnable, restarting it after both successful exits and errors. Panics are recovered and treated as errors.
On successful exit, the runnable is restarted after [restart.Delay] (default: immediate). On error, the runnable is restarted after a backoff period determined by [restart.ErrorBackoff] (default: immediate for ≤3 errors, 10s for ≤10, then 1m).
The error count tracks consecutive errors and resets to zero after any successful run. Use [restart.ErrorResetAfter] to also reset after a run that lasted long enough before failing.
Restart loops indefinitely unless limited by [restart.Limit] or [restart.ErrorLimit]. When the restart limit is reached, Restart returns nil. When the error limit is reached, Restart returns the last error. Context cancellation stops the loop and returns context.Canceled.
Example ¶
ctx, cancel := initializeForExample() defer cancel() worker := newDyingRunnable() r := Restart(worker).ErrorLimit(3) _ = r.Run(ctx)
Output: level=INFO msg="restart/dyingRunnable: starting" restart=0 errors=0 level=INFO msg="restart/dyingRunnable: starting" restart=1 errors=1 level=INFO msg="restart/dyingRunnable: starting" restart=2 errors=2 level=INFO msg="restart/dyingRunnable: not restarting" reason="error limit" limit=3
Example (Worker) ¶
ctx, cancel := initializeForExample() defer cancel() worker := newCounterRunnable() r := Restart(worker).Limit(2).Delay(time.Millisecond) _ = r.Run(ctx)
Output: level=INFO msg="restart/counter: starting" restart=0 errors=0 level=INFO msg="restart/counter: starting" restart=1 errors=0 level=INFO msg="restart/counter: starting" restart=2 errors=0 level=INFO msg="restart/counter: not restarting" reason="restart limit" limit=2
func Run ¶ added in v0.4.0
func Run(runner Runnable)
Run runs a single runnable, and listens to SIGTERM/SIGINT.
func RunFunc ¶ added in v0.5.0
func RunFunc(fn RunnableFunc)
RunFunc runs a runnable function, and listens to SIGTERM/SIGINT.
func RunGroup ¶ added in v0.1.0
func RunGroup(runners ...Runnable)
RunGroup runs all runnables in a Manager, and listens to SIGTERM/SIGINT.
func Schedule ¶ added in v1.0.0
func Schedule(runnable Runnable, specs ...ScheduleSpec) *schedule
Schedule returns a runnable that runs the given runnable according to the provided schedule specs. When multiple specs are provided, the runnable runs at whichever fires next.
If an execution outlasts the interval, missed ticks are skipped (not queued). On error from the inner runnable, Schedule stops and returns the error. On context cancellation, returns context.Canceled.
For custom scheduling logic, pass a ScheduleSpec function directly. For example, to use github.com/robfig/cron/v3:
sched, _ := cron.ParseStandard("15 */6 * * *") // every 6h at :15
Schedule(worker, func(_, now time.Time) time.Time {
return sched.Next(now)
})
func SetLogger ¶ added in v0.9.0
SetLogger replaces the default logger with a *slog.Logger. Passing nil resets to slog.Default.
Types ¶
type ManagerRegistry ¶ added in v1.0.0
type ManagerRegistry interface {
// Register registers processes. Processes are the primary runnables of the
// application. They are cancelled first during shutdown.
Register(runners ...Runnable) ManagerRegistry
// RegisterService registers services. Services are infrastructure runnables
// (databases, queues, etc.) that processes depend on. They are cancelled after
// all processes have stopped.
RegisterService(services ...Runnable) ManagerRegistry
}
ManagerRegistry is the interface for registering runnables with a Manager.
type PanicError ¶
type PanicError struct {
// contains filtered or unexported fields
}
func (*PanicError) Error ¶
func (e *PanicError) Error() string
func (*PanicError) Unwrap ¶
func (e *PanicError) Unwrap() error
type Runnable ¶
Runnable is the contract for anything that runs with a Go context, respects the cancellation contract, and expects the caller to handle errors.
func Closer ¶ added in v0.4.0
func Closer(c interface{ Close() }) Runnable
Closer returns a runnable that calls Close on context cancellation. There is no timeout on the call to Close.
func CloserCtx ¶ added in v0.12.0
CloserCtx returns a runnable that calls Close on context cancellation. The context passed to Close is not cancelled, so Close can perform graceful cleanup. There is no timeout on the call to Close.
func CloserCtxErr ¶ added in v0.12.0
CloserCtxErr returns a runnable that calls Close on context cancellation. The context passed to Close is not cancelled, so Close can perform graceful cleanup. There is no timeout on the call to Close.
func CloserErr ¶ added in v0.12.0
CloserErr returns a runnable that calls Close on context cancellation. There is no timeout on the call to Close.
func Noop ¶ added in v0.13.0
func Noop() Runnable
Noop returns a runnable that does nothing, and return when the context is cancelled.
type RunnableError ¶ added in v0.4.0
type RunnableError struct {
// contains filtered or unexported fields
}
func (*RunnableError) Error ¶ added in v0.4.0
func (e *RunnableError) Error() string
func (*RunnableError) Unwrap ¶ added in v0.4.0
func (e *RunnableError) Unwrap() error
type RunnableFunc ¶ added in v0.5.0
RunnableFunc is a function that implements the Runnable contract.
type ScheduleSpec ¶ added in v1.0.0
ScheduleSpec computes the next execution time given the last execution start and the current time. Interval specs like Every use lastStart to account for execution time. Clock-aligned specs like DailyAt use now.
func DailyAt ¶ added in v1.0.0
func DailyAt(hour, minute int) ScheduleSpec
DailyAt returns a schedule spec that triggers at the given hour and minute each day.
func Every ¶ added in v0.11.0
func Every(d time.Duration) ScheduleSpec
Every returns a schedule spec that triggers at regular intervals, accounting for execution time. If the runnable takes longer than the interval, the next execution starts immediately (missed ticks are skipped, not queued).
func Hourly ¶ added in v1.0.0
func Hourly() ScheduleSpec
Hourly returns a schedule spec that triggers at the top of every hour (:00).
func HourlyAt ¶ added in v1.0.0
func HourlyAt(minute int) ScheduleSpec
HourlyAt returns a schedule spec that triggers at the given minute past each hour.