composite

package
v0.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

README

Composite Runner

The Composite Runner manages multiple runnables as a single logical unit. This "runnable" implements several go-supervisor core interfaces: Runnable, Reloadable, and Stateable.

Features

  • Group and manage multiple runnables as a single service (all the same type)
  • Provide individual configuration for each runnable or shared configuration (with hot-reload)
  • Support dynamic membership changes during reloads (sub-runnables can be added or removed)
  • Propagate errors from child runnables to the supervisor
  • Monitor state of individual child runnables
  • Manage configuration updates to the sub-runnables with a callback function

Quick Start Example

// Create runnable entries with their configs
entries := []composite.RunnableEntry[*myapp.SomeRunnable]{
    {Runnable: runnable1, Config: map[string]any{"timeout": 10 * time.Second}},
    {Runnable: runnable2, Config: map[string]any{"maxConnections": 100}},
}

// Define a config callback, used for dynamic membership changes and reloads
configCallback := func() (*composite.Config[*myapp.SomeRunnable], error) {
    return composite.NewConfig("MyRunnableGroup", entries)
}

// Create a composite runner
runner, err := composite.NewRunner(
    composite.WithConfigCallback(configCallback),
)
if err != nil {
    log.Fatalf("Failed to create runner: %v", err)
}

// Load the composite runner into a supervisor
super, err := supervisor.New(supervisor.WithRunnables(runner))
if err != nil {
    log.Fatalf("Failed to create supervisor: %v", err)
}

if err := super.Run(); err != nil {
    log.Fatalf("Supervisor failed: %v", err)
}
Shared Configuration Example

When all runnables share the same configuration:

runnables := []*myapp.SomeRunnable{runnable1, runnable2, runnable3}
configCallback := func() (*composite.Config[*myapp.SomeRunnable], error) {
    return composite.NewConfigFromRunnables(
        "MyRunnableGroup",
        runnables,
        map[string]any{"timeout": 30 * time.Second},
    )
}
runner, err := composite.NewRunner(
    composite.WithConfigCallback(configCallback),
)

Dynamic Configuration

The config callback function provides the configuration for the Composite Runner:

  • Returns the current configuration when requested
  • Called during initialization and reloads
  • Used to determine if runnable membership has changed
  • Should return quickly as it may be called frequently
// Example config callback that loads from file
configCallback := func() (*composite.Config[*myapp.SomeRunnable], error) {
    // Read config from file or other source
    config, err := loadConfigFromFile("config.json")
    if err != nil {
        return nil, err
    }
    
    // Create entries based on loaded config
    var entries []composite.RunnableEntry[*myapp.SomeRunnable]
    for name, cfg := range config.Services {
        runnable := getOrCreateRunnable(name)
        entries = append(entries, composite.RunnableEntry[*myapp.SomeRunnable]{
            Runnable: runnable,
            Config:   cfg,
        })
    }
    
    return composite.NewConfig("MyServices", entries)
}

ReloadableWithConfig Interface

Implement the ReloadableWithConfig interface to receive type-specific configuration updates:

type ConfigurableRunnable struct {
    timeout time.Duration
    // other fields
}

// Run implements the Runnable interface
func (r *ConfigurableRunnable) Run(ctx context.Context) error {
    // implementation
}

// Stop implements the Runnable interface
func (r *ConfigurableRunnable) Stop() {
    // implementation
}

// ReloadWithConfig receives configuration updates during reloads
func (r *ConfigurableRunnable) ReloadWithConfig(config any) {
    if cfg, ok := config.(map[string]any); ok {
        if timeout, ok := cfg["timeout"].(time.Duration); ok {
            r.timeout = timeout
        }
        // Handle other config parameters
    }
}

The Composite Runner will prioritize calling ReloadWithConfig over the standard Reload() method when a runnable implements both.

Monitoring Child States

Monitor the states of individual child runnables:

// Get a map of all child runnable states
states := compositeRunner.GetChildStates()

// Log the current state of each runnable
for name, state := range states {
    logger.Infof("Service %s is in state %s", name, state)
}

// Check if a specific service is ready
if states["database"] == "running" {
    // Database service is ready
}

Managing Lifecycle

The Composite Runner coordinates the lifecycle of all contained runnables:

  • Starts runnables in the order they are defined (async)
  • Stops runnables in reverse order
  • Propagates errors from any child runnable
  • Handles clean shutdown when context is canceled
  • Manages state transitions (New → Booting → Running → Stopping → Stopped)

Best Practices

  • Unique Identifiers: Ensure each runnable's String() method returns a consistent, unique identifier
  • Stateful Configuration: Store your latest configuration for reuse if the config source becomes temporarily unavailable
  • Error Handling: Check errors returned from Run() to detect failures in any child runnable
  • Context Management: Pass a cancellable context to Run() for controlled shutdown
  • Membership Changes: Be aware that changes in membership will cause all runnables to restart
  • Type Safety: Use the same concrete type for all runnables in a composite to leverage Go's type system

See the examples directory for complete working examples.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCompositeRunnable is returned when there's a general error in the composite runnable
	ErrCompositeRunnable = errors.New("composite runnable error")

	// ErrRunnableFailed is returned when a child runnable fails
	ErrRunnableFailed = errors.New("child runnable failed")

	// ErrConfigMissing is returned when the config is missing
	ErrConfigMissing = errors.New("config is missing")

	// ErrOldConfig is returned when the config hasn't changed during a reload
	ErrOldConfig = errors.New("configuration unchanged")
)

Functions

This section is empty.

Types

type Config

type Config[T runnable] struct {
	// Name is a human-readable identifier for this composite runner
	Name string

	// Entries is the list of runnables with their associated configurations
	Entries []RunnableEntry[T]
}

Config represents the configuration for a CompositeRunner

func NewConfig

func NewConfig[T runnable](
	name string,
	entries []RunnableEntry[T],
) (*Config[T], error)

NewConfig creates a new Config instance for a CompositeRunner

func NewConfigFromRunnables

func NewConfigFromRunnables[T runnable](
	name string,
	runnables []T,
	sharedConfig any,
) (*Config[T], error)

NewConfigFromRunnables creates a Config from a list of runnables, all using the same config

func (*Config[T]) Equal

func (c *Config[T]) Equal(other *Config[T]) bool

Equal compares two configs for equality

func (*Config[T]) String

func (c *Config[T]) String() string

String returns a string representation of the Config

type ConfigCallback

type ConfigCallback[T runnable] func() (*Config[T], error)

ConfigCallback is the function type signature for the callback used to load initial config, and new config during Reload()

type Option

type Option[T runnable] func(*Runner[T])

Option represents a functional option for configuring CompositeRunner

func WithLogHandler

func WithLogHandler[T runnable](handler slog.Handler) Option[T]

WithLogHandler sets a custom slog handler for the CompositeRunner instance.

type ReloadableWithConfig

type ReloadableWithConfig interface {
	ReloadWithConfig(config any)
}

ReloadableWithConfig is an interface for sub-runnables that can reload with specific config

type RunnableEntry

type RunnableEntry[T runnable] struct {
	// Runnable is the component to be managed
	Runnable T

	// Config holds the configuration data for this specific runnable
	Config any
}

RunnableEntry associates a runnable with its configuration

type Runner

type Runner[T runnable] struct {
	// contains filtered or unexported fields
}

Runner implements a component that manages multiple runnables of the same type as a single unit. It satisfies the Runnable, Reloadable, and Stateable interfaces.

func NewRunner

func NewRunner[T runnable](
	configCallback ConfigCallback[T],
	opts ...Option[T],
) (*Runner[T], error)

NewRunner creates a new CompositeRunner instance with the provided configuration callback and options. Parameters:

  • configCallback: Required. A function that returns the initial configuration and is called during any reload operations.
  • opts: Optional. A variadic list of Option functions to customize the Runner behavior.

The configCallback cannot be nil and will be invoked by Run() to load the initial configuration.

func (*Runner[T]) GetChildStates

func (r *Runner[T]) GetChildStates() map[string]string

GetChildStates returns a map of child runnable names to their states.

func (*Runner[T]) GetState

func (r *Runner[T]) GetState() string

GetState returns the current state of the CompositeRunner.

func (*Runner[T]) GetStateChan

func (r *Runner[T]) GetStateChan(ctx context.Context) <-chan string

GetStateChan returns a channel that will receive state updates.

func (*Runner[T]) GetStateChanWithTimeout added in v0.0.12

func (r *Runner[T]) GetStateChanWithTimeout(ctx context.Context) <-chan string

GetStateChanWithTimeout returns a channel that emits state changes. It's a pass-through to the underlying finite state machine.

func (*Runner[T]) IsRunning

func (r *Runner[T]) IsRunning() bool

IsRunning returns true if the runner is in the Running state.

func (*Runner[T]) Reload

func (r *Runner[T]) Reload()

Reload updates the configuration and handles runnables appropriately. If membership changes (different set of runnables), all existing runnables are stopped and the new set of runnables is started.

func (*Runner[T]) Run

func (r *Runner[T]) Run(ctx context.Context) error

Run starts all child runnables in order (first to last) and monitors for completion or errors. This method blocks until all child runnables are stopped or an error occurs.

func (*Runner[T]) Stop

func (r *Runner[T]) Stop()

Stop will cancel the context, causing all child runnables to stop.

func (*Runner[T]) String

func (r *Runner[T]) String() string

String returns a string representation of the CompositeRunner instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL