runnable

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 12 Imported by: 16

README

Runnable

GoDoc Go Report Card

Tooling to manage the execution of a process based on a Runnable interface:

type Runnable interface {
	Run(context.Context) error
}

And a simpler RunnableFunc interface:

type RunnableFunc func(context.Context) error

Example of an implementation of the command "yes":

func main() {
	runnable.RunFunc(run)
}

func run(ctx context.Context) error {
	for {
		if ctx.Err() != nil {
			return ctx.Err()
		}
		fmt.Println("y")
	}
}

Tools:

Process start and shutdown

To trigger a clean shutdown, a process must react to the termination signals (SIGINT, SIGTERM).

The Run() method is intended to be the process entrypoint:

  • it immediately executes the runnable
  • it cancels the context.Context when a termination signal is received
  • it calls log.Fatal with the error if the runnable returned one

Example:

func main() {
	runnable.Run(
		app.Build(),
	)
}

The RunFunc() method is also provided for convenience.

Restart

The Restart runnable ensure that a component is running, even if it stops or crashes.

Example:

func main() {
	runnable.Run(
		runnable.Restart(
			task.New(),
		),
	)
}
HTTP Server

The HTTPServer runnable starts and gracefully shutdowns a *http.Server.

Example:

func main() {
	server := &http.Server{
		Addr:    "127.0.0.1:8000",
		Handler: http.RedirectHandler("https://go.dev", 307),
	}

	runnable.Run(
		runnable.HTTPServer(server),
	)
}
Dependency Manager

The Manager runnable starts and stops all runnables while respecting the dependency between them. Components with dependencies will be stopped before their dependencies.

Example with three components:

g := runnable.Manager(nil)
g.Add(jobQueue)
g.Add(httpServer, jobQueue) // jobs is a dependency
g.Add(monitor)

runnable.Run(g.Build())
Logs of a demo app
$ go run ./cmd/example
[RUNNABLE] 2020/10/22 22:42:26 INFO manager: main.JobQueue started
[RUNNABLE] 2020/10/22 22:42:26 INFO manager: runnable.httpServer started
[RUNNABLE] 2020/10/22 22:42:26 INFO manager: main.Monitor started
...
^C[RUNNABLE] 2020/10/22 22:42:34 INFO signal: received signal interrupt
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: starting shutdown (context cancelled)
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: runnable.httpServer cancelled
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: main.Monitor cancelled
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: main.Monitor stopped
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: runnable.httpServer stopped
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: main.JobQueue cancelled
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: main.JobQueue stopped
[RUNNABLE] 2020/10/22 22:42:34 INFO manager: shutdown complete

License

The MIT License (MIT)

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/pior/runnable"
)

func NewJobs() *Jobs {
	return &Jobs{queue: make(chan string)}
}

type Jobs struct {
	queue chan string
}

func (s *Jobs) Perform(id string) {
	s.queue <- id
}

// Run executes enqueued jobs, drains the queue and quits.
func (s *Jobs) Run(ctx context.Context) error {
	for {
		select {
		case id := <-s.queue:
			fmt.Printf("Starting job %s\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Completed job %s\n", id)

		default:
			if err := ctx.Err(); err != nil {
				close(s.queue)
				return err
			}
		}
	}
}

type CleanupTask struct{}

func (*CleanupTask) Run(ctx context.Context) error {
	<-ctx.Done()
	return nil
}

func main() {
	runnable.SetLogger(log.New(os.Stdout, "", 0))

	g := runnable.NewManager()

	jobs := NewJobs()
	g.Add(jobs)

	server := &http.Server{
		Addr: "127.0.0.1:8080",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			id := r.URL.Query().Get("id")
			jobs.Perform(id)
		}),
	}
	g.Add(runnable.HTTPServer(server), jobs)

	task := runnable.Func(func(ctx context.Context) error {
		_, _ = http.Post("http://127.0.0.1:8080/?id=1", "test/plain", nil)
		_, _ = http.Post("http://127.0.0.1:8080/?id=2", "test/plain", nil)
		_, _ = http.Post("http://127.0.0.1:8080/?id=3", "test/plain", nil)

		return nil // quit right away, will trigger a shutdown
	})
	g.Add(task)

	cleanup := runnable.Every(&CleanupTask{}, time.Hour)
	g.Add(cleanup, jobs)

	runnable.Run(g.Build())

	// INFO manager: runnable_test.Jobs started
	// INFO manager: runnable.httpServer started
	// INFO manager: func(runnable.RunnableFunc) started
	// INFO manager: periodic(runnable_test.CleanupTask) started
	// DBUG http_server: listening
	// Starting job 1
	// Completed job 1
	// Starting job 2
	// Completed job 2
	// Starting job 3
	// INFO manager: starting shutdown (func(runnable.RunnableFunc) died)
	// INFO manager: runnable.httpServer cancelled
	// INFO manager: func(runnable.RunnableFunc) cancelled
	// INFO manager: periodic(runnable_test.CleanupTask) cancelled
	// INFO manager: func(runnable.RunnableFunc) stopped
	// INFO manager: periodic(runnable_test.CleanupTask) stopped
	// DBUG http_server: shutdown (context cancelled)
	// INFO manager: runnable.httpServer stopped
	// INFO manager: runnable_test.Jobs cancelled
	// Completed job 3
	// INFO manager: runnable_test.Jobs stopped
	// INFO manager: shutdown complete
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextValues added in v0.10.0

func ContextValues(parent context.Context) context.Context

ContextValues returns a new context.Context with the values from the parent, without propagating the cancellation. Useful when you want to protect an operation that should not be cancelled. Often used with context.WithTimeout() or context.WithDeadline().

func Log added in v0.12.0

func Log(self any, format string, args ...any)

Log logs a formatted message, prefixed by the runnable chain.

func Run added in v0.4.0

func Run(runner Runnable)

Run runs a single runnable, and listen to SIGTERM/SIGINT

func RunFunc added in v0.5.0

func RunFunc(fn RunnableFunc)

RunFunc runs a runnable function, and listen to SIGTERM/SIGINT

func RunGroup added in v0.1.0

func RunGroup(runners ...Runnable)

RunGroup runs all runnables in a Group, and listen to SIGTERM/SIGINT

func SetLogger added in v0.9.0

func SetLogger(l Logger)

SetLogger replaces the default logger with a runnable.Logger.

Types

type AppManager added in v0.11.0

type AppManager interface {
	Add(runnable Runnable, dependencies ...Runnable)
	Build() Runnable
}

func NewManager added in v0.11.0

func NewManager(opts ...ManagerOption) AppManager

NewManager returns a runnable that execute runnables in go routines. Runnables can declare a dependency on another runnable. Dependencies are started first and stopped last.

type Logger

type Logger interface {
	Printf(format string, args ...any)
}

type ManagerOption added in v0.11.0

type ManagerOption func(*manager)

ManagerOption configures the behavior of a Manager.

func ManagerShutdownTimeout added in v0.11.0

func ManagerShutdownTimeout(dur time.Duration) ManagerOption

type PanicError

type PanicError struct {
	// contains filtered or unexported fields
}

func (*PanicError) Error

func (e *PanicError) Error() string

func (*PanicError) Unwrap

func (e *PanicError) Unwrap() error

type RecoverRunner

type RecoverRunner struct {
	// contains filtered or unexported fields
}

func (*RecoverRunner) Run

func (r *RecoverRunner) Run(ctx context.Context) (err error)

func (*RecoverRunner) RunnableName added in v0.12.0

func (w *RecoverRunner) RunnableName() string

func (*RecoverRunner) RunnableUnwrap added in v0.12.0

func (w *RecoverRunner) RunnableUnwrap() any

type RestartOption added in v0.6.0

type RestartOption func(*restartConfig)

func RestartCrashDelayFn added in v0.6.0

func RestartCrashDelayFn(fn func(int) time.Duration) RestartOption

RestartCrashDelayFn sets the function that determine the backoff delay after a crash.

func RestartCrashLimit added in v0.6.0

func RestartCrashLimit(times int) RestartOption

RestartCrashLimit sets a limit on the number restart after a crash.

func RestartDelay added in v0.6.0

func RestartDelay(delay time.Duration) RestartOption

RestartDelay sets the time waited before restarting the runnable after a successful execution.

func RestartLimit added in v0.6.0

func RestartLimit(times int) RestartOption

RestartLimit sets a limit on the number of restart after successful execution.

type Runnable

type Runnable interface {
	Run(context.Context) error
}

Runnable is the contract for anything that runs with a Go context, respects the concellation contract, and expects the caller to handle errors.

func Closer added in v0.4.0

func Closer(c interface{ Close() }) Runnable

Closer returns a runnable intended to call a Close method on shutdown.

func CloserCtx added in v0.12.0

func CloserCtx(c interface{ Close(context.Context) }) Runnable

Closer returns a runnable intended to call a Close method on shutdown.

func CloserCtxErr added in v0.12.0

func CloserCtxErr(c interface{ Close(context.Context) error }) Runnable

Closer returns a runnable intended to call a Close method on shutdown.

func CloserErr added in v0.12.0

func CloserErr(c interface{ Close() error }) Runnable

Closer returns a runnable intended to call a Close method on shutdown.

func Every added in v0.11.0

func Every(runnable Runnable, period time.Duration) Runnable

Every returns a runnable that will periodically run the runnable passed in argument.

func Func added in v0.5.0

func Func(fn RunnableFunc) Runnable

func HTTPServer added in v0.3.0

func HTTPServer(server *http.Server) Runnable

HTTPServer returns a runnable that runs a *http.Server.

Example
ctx, cancel := initializeForExample()
defer cancel()

server := &http.Server{
	Addr:    "127.0.0.1:8080",
	Handler: http.NotFoundHandler(),
}

r := HTTPServer(server)

_ = r.Run(ctx)
Output:
httpserver: listening on 127.0.0.1:8080
httpserver: shutdown
Example (Error)
ctx, cancel := initializeForExample()
defer cancel()

server := &http.Server{
	Addr:    "INVALID",
	Handler: http.NotFoundHandler(),
}

r := HTTPServer(server)

_ = r.Run(ctx)
Output:
httpserver: listening on INVALID
httpserver: shutdown (err: listen tcp: address INVALID: missing port in address)

func Noop added in v0.13.0

func Noop() Runnable

Noop returns a runnable that does nothing, and return when the context is cancelled.

func Recover

func Recover(runnable Runnable) Runnable

Recover returns a runnable that recovers when a runnable panics and return an error to represent this panic.

func Restart

func Restart(runnable Runnable, opts ...RestartOption) Runnable

Restart returns a runnable that runs a runnable and restarts it when it fails, with some conditions.

Example
ctx, cancel := initializeForExample()
defer cancel()

runnable := newDyingRunnable()

r := Restart(runnable, RestartCrashLimit(3))
_ = r.Run(ctx)
Output:
restart/dyingRunnable: starting (restart=0 crash=0)
restart/dyingRunnable: starting (restart=1 crash=1)
restart/dyingRunnable: starting (restart=2 crash=2)
restart/dyingRunnable: not restarting (hit the crash limit: 3)

func Signal

func Signal(runnable Runnable, signals ...os.Signal) Runnable

Signal returns a runnable that runs the runnable and cancels it when the process receives a POSIX signal.

type RunnableError added in v0.4.0

type RunnableError struct {
	// contains filtered or unexported fields
}

func (*RunnableError) Error added in v0.4.0

func (e *RunnableError) Error() string

func (*RunnableError) Unwrap added in v0.4.0

func (e *RunnableError) Unwrap() error

type RunnableFunc added in v0.5.0

type RunnableFunc func(context.Context) error

Directories

Path Synopsis
examples
crashing command
example command
http command
test command
yes command
loggers module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL