orchestra

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: MIT Imports: 11 Imported by: 3

README

Orchestra

Orchestra is a library to manage long running go processes.

At the heart of the library is an interface called Player

// Player is a long running background worker
type Player interface {
    Play(context.Context) error
}

All a type needs to do to satisfy the interface is to have a Play method that will gracefully shutdown when the context is done.

It can also return an error if it encounters a problem when playing.

Next, there's the Conductor type (which itself is a Player)

// Conductor is a group of workers. It is also a Player itself **evil laugh**
type Conductor struct {
    Timeout time.Duration
    Players map[string]Player
}

With the conductor, you add Players to it, and when you call the Play method on the conductor, it will start the Players under it and gracefully shut them all down when the main context is done.

The timeout is there incase there is a Player that refused to stop

Helper functions

PlayUntilSignal(p Player, sig ...os.Signal)

This will start a player with a context, and close the context once it receives any of the signals provided.

Example:

package main

import (
    "os"
    "syscall"

    "github.com/stephenafamo/orchestra"
)

func main() {
    player := ... // something that satisfies the player interface
    err := orchestra.PlayUntilSignal(player, os.Interrupt, syscall.SIGTERM)
    if err != nil {
        panic(err)
    }
}
PlayerFunc(func(context.Context) error)

PlayerFunc is a quick way to convert a standalone function into a type that satisfies the Player interface.

package main

import (
    "context"
    "os"
    "time"
    "syscall"

    "github.com/stephenafamo/orchestra"
)

func main() {
    player := orchestra.PlayerFunc(myFunction)
    err := orchestra.PlayUntilSignal(player, os.Interrupt, syscall.SIGTERM)
    if err != nil {
        panic(err)
    }
}

func myFunction(ctx context.Context) error {
    // A continuously running process
    // Exits when ctx is done
    <-ctx.Done()
    return nil
}
NewServerPlayer(*http.Server)

ServerPlayer is a type that embeds the *http.Server and extends it to satisfy the Player interface.

Since a very common long running process is the *http.Server, this makes it easy to create a player from one without having to re-write the boilerplate each time.

With the help of multiple helper functions, we can create a gracefully shutting down server that closes on SIGINT and SIGTERM by:

package main

import (
    "net/http"
    "os"
    "syscall"

	"github.com/cenkalti/backoff/v4"
    "github.com/stephenafamo/orchestra"
)

func main() {
    s := orchestra.NewServerPlayer(
        // Setting the *http.Server
        &http.Server{Addr: ":8080"},
        // Sets the timeout waiting for the server to stop.
        orchestra.WithShutdownTimeout(time.Second * 5),
        // With TLS makes the server use ListenAndServeTLS
        orchestra.WithTLS(),
        // With Backoff adds a backoff strategy to the server
        orchestra.WithBackoff(backoff.NewExponentialBackOff()),
    )
    err := orchestra.PlayUntilSignal(s, os.Interrupt, syscall.SIGTERM)
    if err != nil {
        panic(err)
    }
}

Using the Conductor

The Conductor type makes it easy to coordinate multiple long running processes. Because each one is blocking, it is often clumsy to start and stop all of them nicely.

Well, the Conductor is here to make the pain go away.

package main

import (
    "context"
    "net/http"
    "os"
    "syscall"
    "time"

    "github.com/stephenafamo/orchestra"
)

func main() {
    // A player from a function
    a := orchestra.PlayerFunc(myFunction)
    // A player from a server
    b := orchestra.NewServerPlayer(myServer)

    // A conductor to control them all
    conductor := &orchestra.Conductor{
        Timeout: 5 * time.Second,
        Players: map[string]orchestra.Player{
            // the names are used to identify the players
            // both in logs and the returned errors
            "function": a,
            "server":   b,
        },
    }

    // Use the conductor as a Player
    err := orchestra.PlayUntilSignal(conductor, os.Interrupt, syscall.SIGTERM)
    if err != nil {
        panic(err)
    }
}

func myFunction(ctx context.Context) error {
    // A continuously running process
    // Exits when ctx is done
    <-ctx.Done()
    return nil
}

Note: The Conductor makes sure that if by some mistake you add the conductor as a player to itself (or another conductor under it), it will not start the players multiple times.

If the conductor has to exit because of the timeout and not because all the Players exited successfully, it will return an error of type TimeoutErr.

You can ignore this type of error by checking for it like this:

// Use the conductor as a Player
err := orchestra.PlayUntilSignal(conductor, os.Interrupt, syscall.SIGTERM)
if err != nil && !errors.As(err, &orchestra.TimeoutErr{}) {
    panic(err)
}

Or you can specially handle it like this:

// Use the conductor as a Player
err := orchestra.PlayUntilSignal(conductor, os.Interrupt, syscall.SIGTERM)
if err != nil {
    timeoutErr := orchestra.TimeoutErr{}
    if errors.As(err, &timeoutErr) {
        fmt.Println(timeoutErr) // Handle the timeout error
    } else {
        panic(err) // handle other errors
    }
}
Restarting Players

A player can be configured to restart, possibly with exponential backoff by implementing the PlayerWithBackoff interface.

For example:

package main

import "github.com/cenkalti/backoff/v4"

type playerThatRestartsImmediately struct{}

func (playerThatRestartsImmediately) Backoff() backoff.BackOff {
	return &backoff.ZeroBackOff{}
}


type playerWithExponentialBakoff struct{}

func (playerWithExponentialBakoff) Backoff() backoff.BackOff {
	return backoff.NewExponentialBackOff()
}

Customization

The logger can be modified by assigning a logger to orchestra.Logger

type Logger interface {
	Info(msg string, attrs ...slog.Attr)
	Error(msg string, attrs ...slog.Attr)
	WithGroup(name string) Logger
}

If you have an existing *slog.Logger, you can create an orchestra.Logger by using the orchestra.LoggerFromSlog function.

orchestraLogger := orchestra.LoggerFromSlog(slog.LevelInfo, slog.LevelError, slog.Default())

Contributing

Looking forward to pull requests.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PlayUntilSignal

func PlayUntilSignal(ctx context.Context, p Player, sig ...os.Signal) error

PlayUntilSignal starts the player and stops when it receives os.Signals

Types

type Conductor

type Conductor struct {
	Timeout time.Duration
	Players map[string]Player
	Logger  Logger
	// contains filtered or unexported fields
}

Conductor is a group of players. It is also a Player itself **evil laugh**

func (*Conductor) Play

func (c *Conductor) Play(ctx context.Context) error

Play starts all the players and gracefully shuts them down

type InstrumentError

type InstrumentError struct {
	Name string
	Err  error
}

InstrumentError is an error that happens in an instrument started by a conductor It carries the name of the instrument

func (InstrumentError) Error

func (e InstrumentError) Error() string

func (InstrumentError) Unwrap

func (e InstrumentError) Unwrap() error

type Logger

type Logger interface {
	Info(msg string, attrs ...slog.Attr)
	Error(msg string, attrs ...slog.Attr)
	WithGroup(name string) Logger
}

Logger is accepted by some Players (Conductor, ServerPlayer)

DefaultLogger is used when a conductor's logger is nil

func LoggerFromSlog

func LoggerFromSlog(infoLevel, errorLevel slog.Level, l slogInterface) Logger

type Player

type Player interface {
	Play(context.Context) error
}

Player is a long running background worker

type PlayerFunc

type PlayerFunc func(ctx context.Context) error

PlayerFunc is a function type that satisfies the Player interface

func (PlayerFunc) Play

func (f PlayerFunc) Play(ctx context.Context) error

Play satisfies the Player interface

type PlayerWithBackoff added in v0.3.0

type PlayerWithBackoff interface {
	Player
	// A backoff strategy to use when the player fails but returns ErrRestart
	// NOTE: This is only called once before the player is started, so it should be
	// idempotent
	Backoff() backoff.BackOff
}

PlayerWithBackoff is a player that can be restarted with a backoff strategy

type ServerPlayer

type ServerPlayer struct {
	// contains filtered or unexported fields
}

ServerPlayer is a type that extends the *http.Server

func NewServerPlayer

func NewServerPlayer(srv *http.Server, opts ...ServerPlayerOption) *ServerPlayer

NewServerPlayer creates a new ServerPlayer

func (ServerPlayer) Backoff added in v0.4.0

func (s ServerPlayer) Backoff() backoff.BackOff

Backoff satisfies the PlayerWithBackoff interface

func (ServerPlayer) Play

func (s ServerPlayer) Play(ctxMain context.Context) error

Play starts the server until the context is done

type ServerPlayerOption

type ServerPlayerOption func(s *ServerPlayer)

ServerPlayerOption is a function interface to configure the ServerPlayer

func WithBackoff added in v0.4.0

func WithBackoff(b backoff.BackOff) ServerPlayerOption

WithBackoff sets the backoff strategy for the ServerPlayer

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) ServerPlayerOption

WithShutdownTimeout sets the shutdown timeout of ServerPlayer (10s by default)

func WithTLS

func WithTLS() ServerPlayerOption

WithTLS indicates that the ServerPlayer uses TLS so it will use ListenAndServeTLS instead of ListenAndServe

type TimeoutErr

type TimeoutErr struct {
	Left []string
}

TimeoutErr is an error that happens when a conductor terminates because of the timeout and does not wait for all players to exit sucessfully

func (TimeoutErr) Error

func (e TimeoutErr) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL