runnable

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 5 Imported by: 2

README

runnable

Overview

runnable is a Go package that provides a Runnable interface for functions or objects that can be started and stopped. It provides a simple way to run a function or object in a goroutine and stop it when needed. It also provides a way to run a function with retry and statistics number of restarts, when started and stopped, if returned error, etc.

Examples

Runnable Function
fmt.Println("Simple function...")
err := runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            return nil
        default:
        }
        time.Sleep(1 * time.Second)
        fmt.Println("Running...")
    }
    return nil
}).Run(context.Background())
if err != nil {
    fmt.Println(err)
}
Runnable Function with Stop
fmt.Println("Simple function with stop...")
r := runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-time.After(time.Second):
        }
        fmt.Println("Running...")
    }
})

go func() {
    time.Sleep(5 * time.Second)

    fmt.Println("Calling Stop...")
    err := r.Stop(context.Background())
    if err != nil {
        fmt.Println(err)
    }
}()

err = r.Run(context.Background())
if err != nil {
    fmt.Println(err)
}
Runnable Function with timeout
fmt.Println("Simple function with timeout...")
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-time.After(time.Second):
        }
        fmt.Println("Running...")
    }
}).Run(ctxWithTimeout)
if err != nil {
    fmt.Println(err)
}
Adapters

Cross-cutting behaviors that aren't part of the core lifecycle live in the runnable/adapters subpackage as chi-style middleware: each runnable.Adapter has the shape func(next RunFunc) RunFunc. Apply them with runnable.WithAdapters (left-to-right = outermost-to-innermost):

r := runnable.New(reconcile, runnable.WithAdapters(
    adapters.Draining(10*time.Second),
    adapters.Recovering(),
    adapters.Retry(3, time.Minute),
    adapters.Ticker(30*time.Second),
))

Draining — graceful shutdown with a grace window. When the outer ctx is cancelled, the wrapped work has timeout to return via adapters.Stopping(ctx) before its ctx is force-cancelled and adapters.ErrDrainTimedOut is returned.

Ticker — calls the wrapped work once per interval until ctx is cancelled or the work returns an error. Composes with Draining: an in-flight tick is allowed to finish before the loop exits.

Recovering — turns panics in the wrapped work into errors and emits a runnable.PanicRecoveredEvent to the Publisher on ctx. Place inside Draining when both are in use.

Retry — re-invokes the wrapped work up to maxRetries times on non-context errors. If resetAfter is non-zero and at least that long has passed since the previous attempt, the retry budget resets. Emits a runnable.RetryEvent after each failed attempt.

Inside long-running work, always select on both ctx.Done() and adapters.Stopping(ctx)Stopping signals drain start, ctx.Done() fires only when the drain timer expires.

A full SIGTERM-safe service shape lives in examples/ticker-with-drain.

Observability via Publisher

Adapters emit typed events to a runnable.Publisher installed on the runnable's ctx. Use runnable.WithPublisher to register one (or many — multiple WithPublisher calls fan out):

type log struct{}

func (log) Publish(event any) {
    switch ev := event.(type) {
    case runnable.RetryEvent:
        fmt.Printf("retry attempt %d: %v\n", ev.Attempt, ev.Err)
    case runnable.DrainStartedEvent:
        fmt.Printf("drain started, %s window\n", ev.Timeout)
    case runnable.PanicRecoveredEvent:
        fmt.Fprintf(os.Stderr, "panic: %v\n%s", ev.Recovered, ev.Stack)
    }
}

r := runnable.New(work,
    runnable.WithPublisher(log{}),
    runnable.WithAdapters(adapters.Retry(3, time.Minute), adapters.Recovering()),
)

StatusStore is a Publisher too — WithStatus(id, store) wires it automatically and counts RetryEvents into Status.Restarts.

Publisher.Publish runs on the caller's goroutine, so subscribers must not block. Buffer internally if you need async dispatch.

Migrating from v0.0.x to v0.1.0

v0.1.0 moves retry and panic recovery out of the core package, and introduces drain-on-shutdown and periodic execution as new adapters. The Option-based WithRetry and WithRecoverer are removed; their replacements live at runnable/adapters as chi-style middleware applied via runnable.WithAdapters.

Before (v0.0.x):

r := runnable.New(doWork,
    runnable.WithRecoverer(reporter, nil),
    runnable.WithRetry(3, time.Minute),
)

After (v0.1.0):

r := runnable.New(doWork, runnable.WithAdapters(
    adapters.Recovering(),
    adapters.Retry(3, time.Minute),
))

Symbol mapping:

  • runnable.WithRetry / runnable.ResetNeveradapters.Retry / adapters.ResetNever.
  • runnable.WithRecovereradapters.Recovering() plus a runnable.WithPublisher subscriber listening for runnable.PanicRecoveredEvent (the two-interface RecoveryReporter / StackPrinter callback split is gone).

Status.Restarts is event-driven. The Restarts field on Status is unchanged from a caller's perspective, but it now counts runnable.RetryEvents published by adapters.Retry (or any other Publisher source) rather than being incremented by an onStart side-channel from WithRetry. No call-site change required when using WithStatus + adapters.Retry.

New in v0.1.0: adapters.Draining for graceful shutdown, adapters.Ticker for periodic execution, adapters.Stopping(ctx) to observe drain start, adapters.ErrDrainTimedOut. See the Adapters section above.

Runnable Object
package main

import (
	"time"

	"github.com/0xsequence/runnable"
)

type Monitor struct {
	runnable.Runnable
}

func NewMonitor() *Monitor {
	m := &Monitor{}
	m.Runnable = runnable.New(m.run)
	return m
}

func (m *Monitor) run(ctx context.Context) error {
	fmt.Println("Starting...")
	defer fmt.Println("Stopping...")
	
	// Start monitoring
	for {
		select {
		case <-ctx.Done():
			return nil
		default:
		}

		time.Sleep(1 * time.Second)
		fmt.Println("Monitoring...")
	}
	return nil
}

func main() {
	fmt.Println("Runnable object(Monitor)...")
	m := NewMonitor()

	go func() {
		time.Sleep(5 * time.Second)

		fmt.Println("Calling Stop...")
		err := m.Stop(context.Background())
		if err != nil {
			fmt.Println(err)
		}
	}()

	err = m.Run(context.Background())
	if err != nil {
		fmt.Println(err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyRunning = fmt.Errorf("already running")
	ErrNotRunning     = fmt.Errorf("not running")
)

Functions

func Publish added in v0.1.0

func Publish(ctx context.Context, event any)

Publish forwards event to the Publisher in ctx, or no-ops if none.

Types

type Adapter added in v0.1.0

type Adapter func(next RunFunc) RunFunc

Adapter wraps a RunFunc with cross-cutting behavior, mirroring the chi middleware shape. Concrete adapters live in runnable/adapters.

type DrainStartedEvent added in v0.1.0

type DrainStartedEvent struct {
	Timeout time.Duration
}

DrainStartedEvent is published by adapters.Draining when the outer ctx is cancelled and the drain window begins.

type DrainTimedOutEvent added in v0.1.0

type DrainTimedOutEvent struct{}

DrainTimedOutEvent is published by adapters.Draining when the drain window expires and work is force-cancelled.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAdapters added in v0.1.0

func WithAdapters(adapters ...Adapter) Option

WithAdapters wraps the runnable's runFunc left-to-right (first listed = outermost). Apply order across Options matters.

func WithPublisher added in v0.1.0

func WithPublisher(p Publisher) Option

WithPublisher installs p as the runnable's event Publisher. Stacks additively across calls (and with WithStatus).

func WithStatus added in v0.0.2

func WithStatus(id string, store *StatusStore) Option

type PanicRecoveredEvent added in v0.1.0

type PanicRecoveredEvent struct {
	Recovered any
	Stack     []byte
}

PanicRecoveredEvent is published by adapters.Recovering when it catches a panic from the wrapped work.

type Publisher added in v0.1.0

type Publisher interface {
	Publish(event any)
}

Publisher receives events from adapters. Implementations must not block — buffer internally if async dispatch is needed.

func PublisherFrom added in v0.1.0

func PublisherFrom(ctx context.Context) Publisher

PublisherFrom returns the Publisher installed in ctx, or nil. Adapters should prefer Publish, which no-ops when none is set.

type Publishers added in v0.1.0

type Publishers []Publisher

Publishers fans out each event to every member in order; nil members are skipped.

func (Publishers) Publish added in v0.1.0

func (ps Publishers) Publish(event any)

type RetryEvent added in v0.1.0

type RetryEvent struct {
	Attempt int
	Err     error
}

RetryEvent is published by adapters.Retry after a failed attempt (before sleeping or retrying). Attempt is 1-indexed.

type RunFunc added in v0.1.0

type RunFunc func(ctx context.Context) error

RunFunc is the lifecycle function wrapped by runnable.New.

type Runnable

type Runnable interface {
	Run(ctx context.Context) error
	Stop(ctx context.Context) error
	IsRunning() bool
}

func New

func New(runFunc func(ctx context.Context) error, options ...Option) Runnable

New creates a new Runnable with the given runFunc.

Example:

type Monitor struct {
	runnable.Runnable
}

func (m *Monitor) run(ctx context.Context) error {
	// do something
	return nil
}

func NewMonitor() Monitor {
	m := Monitor{}
	m.Runnable = runnable.NewRunnable(m.run)
	return m
}

func NewGroup

func NewGroup(runners ...Runnable) Runnable

NewGroup creates a new Runnable that runs multiple runnables concurrently.

Example:

group := NewGroup(
	New(func(ctx context.Context) error {
		// do something
		return nil
	}),
	New(func(ctx context.Context) error {
		// do something
		return nil
	}),
)

err := group.Run(context.Background())
if err != nil {
	// handle error
}

type Status added in v0.0.2

type Status struct {
	Running   bool       `json:"running"`
	Restarts  int        `json:"restarts"`
	StartTime time.Time  `json:"start_time"`
	EndTime   *time.Time `json:"end_time,omitempty"`
	LastError error      `json:"last_error"`
}

type StatusMap added in v0.0.2

type StatusMap map[string]Status

type StatusStore added in v0.0.2

type StatusStore struct {
	// contains filtered or unexported fields
}

func NewStatusStore added in v0.0.2

func NewStatusStore() *StatusStore

func (*StatusStore) Get added in v0.0.2

func (s *StatusStore) Get() StatusMap

Directories

Path Synopsis
Package adapters provides chi-style middleware around the runnable RunFunc signature.
Package adapters provides chi-style middleware around the runnable RunFunc signature.
ticker-with-drain command
Example: a periodic reconciler that drains gracefully on SIGTERM.
Example: a periodic reconciler that drains gracefully on SIGTERM.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL