command

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: MIT Imports: 14 Imported by: 7

README

Go Command

A Go package for implementing command and query patterns with support for multiple execution strategies including CLI, cron scheduling, message dispatching, and batch/parallel processing.

Overview

go-command provides a framework for building applications inspired by the Command Query Responsibility Segregation (CQRS) pattern. It offers:

  • Type-safe message handling through Go generics
  • Multiple execution strategies (CLI, cron, dispatcher, batch, parallel)
  • Flexible error handling with retry support
  • Context-aware operations with cancellation and timeouts
  • Registry system for automatic command/query discovery
  • Integration with popular frameworks (Kong for CLI, cron for scheduling)

Installation

go get github.com/goliatone/go-command

Core Concepts

Messages

Messages are data carriers that implement the Message interface:

type Message interface {
    Type() string
}

Example:

type CreateUserCommand struct {
    Email string
    Name  string
}

func (c CreateUserCommand) Type() string {
    return "user.create"
}
Commands

Commands handle operations with side effects:

type Commander[T any] interface {
    Execute(ctx context.Context, msg T) error
}

// Function adapter
type CommandFunc[T any] func(ctx context.Context, msg T) error
Queries

Queries retrieve data without side effects:

type Querier[T any, R any] interface {
    Query(ctx context.Context, msg T) (R, error)
}

// Function adapter
type QueryFunc[T any, R any] func(ctx context.Context, msg T) (R, error)

Execution Strategies

1. Dispatcher Pattern

The dispatcher provides a centralized message routing system:

import "github.com/goliatone/go-command/dispatcher"

// Subscribe a command handler
dispatcher.SubscribeCommand(&CreateUserHandler{db: db})

// Or use a function
dispatcher.SubscribeCommandFunc(func(ctx context.Context, cmd CreateUserCommand) error {
    // Handle command
    return nil
})

// Dispatch a command
err := dispatcher.Dispatch(context.Background(), CreateUserCommand{
    Email: "user@example.com",
    Name:  "John Doe",
})

// Subscribe a query handler
dispatcher.SubscribeQuery(&GetUserHandler{db: db})

// Execute a query
user, err := dispatcher.Query[GetUserMessage, *User](context.Background(), GetUserMessage{
    ID: "user-123",
})
2. Registry System with CLI and Cron

The registry allows commands to be registered once and executed through multiple interfaces:

import (
    "github.com/goliatone/go-command/registry"
    "github.com/goliatone/go-command/cron"
)

// Command that supports multiple execution modes
type SyncDataCommand struct {
    service SyncService
    logger  Logger
}

// Core business logic
func (c *SyncDataCommand) Execute(ctx context.Context, evt *SyncDataEvent) error {
    return c.service.Sync(ctx, evt.Source, evt.Target, evt.BatchSize)
}

// Enable CLI execution
func (c *SyncDataCommand) CLIHandler() any {
    return &SyncDataCLICommand{cmd: c}
}

func (c *SyncDataCommand) CLIOptions() command.CLIConfig {
    return command.CLIConfig{
        Path:        []string{"sync"},
        Description: "Synchronize data between directories",
        Group:       "data",
    }
}

// Namespaced commands (e.g., ctx prompt create)
func (c *CreatePromptCommand) CLIOptions() command.CLIConfig {
    return command.CLIConfig{
        Path:        []string{"prompt", "create"},
        Description: "Create a prompt",
        Aliases:     []string{"add"},
        Groups: []command.CLIGroup{
            {Name: "prompt", Description: "Prompt management"},
        },
    }
}

// Enable cron scheduling
func (c *SyncDataCommand) CronHandler() func() error {
    return func() error {
        return c.Execute(context.Background(), &SyncDataEvent{
            Source:    os.Getenv("SYNC_SOURCE"),
            Target:    os.Getenv("SYNC_TARGET"),
            BatchSize: 500,
        })
    }
}

func (c *SyncDataCommand) CronOptions() command.HandlerConfig {
    return command.HandlerConfig{
        Expression: "0 2 * * *", // Daily at 2 AM
        MaxRetries: 3,
        Timeout:    time.Hour,
    }
}

// Setup
scheduler := cron.NewScheduler()
scheduler.Start(context.Background())

// Call before Start/Initialize.
registry.SetCronRegister(func(opts command.HandlerConfig, handler any) error {
    _, err := scheduler.AddHandler(opts, handler)
    return err
})

registry.RegisterCommand(syncCmd)
registry.Start(context.Background())

// Get CLI options for Kong integration
cliOptions, _ := registry.GetCLIOptions()
Registry Resolvers

Resolvers run during registry initialization for each registered command. CLI and cron are built-in resolvers (keys "cli" and "cron"), and you can add more with AddResolver:

cmdRegistry := command.NewRegistry()
queueRegistry := queuecmd.NewRegistry()

if err := cmdRegistry.AddResolver("queue", queuecmd.QueueResolver(queueRegistry)); err != nil {
    return err
}

If your command uses an interface message parameter, implement command.MessageFactory to provide a concrete value for metadata:

type Event interface{ Type() string }

type EventCommand struct{}

func (c *EventCommand) Execute(ctx context.Context, msg Event) error { return nil }
func (c *EventCommand) MessageValue() any { return &UserCreated{} }

MessageValue() must return a value assignable to the interface parameter type; otherwise metadata is treated as empty and resolvers that rely on MessageType will skip it.

For the global registry helpers, use registry.AddResolver and registry.HasResolver.

Migration notes:

  • If you switch to resolver-based queue registration, you must attach the queue resolver or queue registration will not happen.
  • When both resolver-based and direct registration are used, the queue layer should treat duplicate registrations as no-ops to avoid conflicts.

See REGISTRY_RESOLVERS.md for a deeper guide.

3. Batch Executor

Process commands in batches with concurrency control:

import "github.com/goliatone/go-command/flow"

// Create batch executor
executor := flow.NewBatchExecutor(
    &ItemProcessor{},
    flow.WithBatchSize[ProcessItemCommand](100),
    flow.WithConcurrency[ProcessItemCommand](5),
)

// Process messages in batches
messages := []ProcessItemCommand{
    {ItemID: "1"}, {ItemID: "2"}, // ... more items
}
err := executor.Execute(context.Background(), messages)

// Or use the functional approach
err = flow.ExecuteBatch(
    context.Background(),
    messages,
    processFunc,
    100, // batch size
    5,   // concurrency
)

The batch executor:

  • Splits messages into batches of specified size
  • Processes batches concurrently with configurable parallelism
  • Supports error handling with optional stop-on-error behavior
  • Provides detailed error metadata for debugging
4. Parallel Executor

Execute multiple handlers concurrently for the same message:

import "github.com/goliatone/go-command/flow"

// Create parallel executor with multiple handlers
handlers := []command.Commander[NotificationEvent]{
    &EmailNotifier{},
    &SMSNotifier{},
    &PushNotifier{},
}

executor := flow.NewParallelExecutor(handlers)

// Execute all handlers in parallel
err := executor.Execute(context.Background(), NotificationEvent{
    UserID:  "user-123",
    Message: "Your order has been shipped",
})

// Or use the functional approach
err = flow.ParallelExecute(
    context.Background(),
    event,
    []command.CommandFunc[NotificationEvent]{
        sendEmail,
        sendSMS,
        sendPush,
    },
)

The parallel executor:

  • Runs all handlers concurrently
  • Supports context cancellation
  • Can stop all handlers on first error (configurable)
  • Collects and returns all errors
5. Runner with Retry Logic

The runner provides execution control with retries and timeouts:

import "github.com/goliatone/go-command/runner"

handler := runner.NewHandler(
    runner.WithMaxRetries(3),
    runner.WithTimeout(30 * time.Second),
    runner.WithRetryDelay(time.Second),
    runner.WithStopOnError(true),
)

err := runner.RunCommand(ctx, handler, cmd, msg)

The runner supports custom retry logic through error interfaces:

  • IsRetryable() bool - Control whether an error should trigger a retry
  • RetryDelay(attempt int) time.Duration - Custom retry delay calculation
6. Cron Scheduler

Schedule commands to run periodically:

import "github.com/goliatone/go-command/cron"

scheduler := cron.NewScheduler(
    cron.WithLocation(time.UTC),
    cron.WithLogLevel(cron.LogLevelInfo),
)

// Add a command to run every 5 minutes
id, err := scheduler.AddHandler(
    command.HandlerConfig{
        Expression: "*/5 * * * *",
        MaxRetries: 3,
        Timeout:    time.Minute,
    },
    func() error {
        return processBatch(context.Background())
    },
)

scheduler.Start(context.Background())

Complete Example

Here's a comprehensive example showing multiple features:

package main

import (
    "context"
    "log"
    "time"

    "github.com/goliatone/go-command"
    "github.com/goliatone/go-command/dispatcher"
    "github.com/goliatone/go-command/flow"
    "github.com/goliatone/go-command/registry"
)

// Define messages
type ProcessOrderCommand struct {
    OrderID string
    UserID  string
}

func (c ProcessOrderCommand) Type() string { return "order.process" }

type NotifyUserCommand struct {
    UserID  string
    Message string
}

func (c NotifyUserCommand) Type() string { return "user.notify" }

// Command handlers
type OrderProcessor struct {
    orderService OrderService
    logger       Logger
}

func (p *OrderProcessor) Execute(ctx context.Context, cmd ProcessOrderCommand) error {
    log.Printf("Processing order %s for user %s", cmd.OrderID, cmd.UserID)

    // Process the order
    if err := p.orderService.Process(cmd.OrderID); err != nil {
        return err
    }

    // Dispatch notification
    return dispatcher.Dispatch(ctx, NotifyUserCommand{
        UserID:  cmd.UserID,
        Message: "Your order has been processed",
    })
}

// Batch processing example
func processDailyOrders(ctx context.Context) error {
    orders := []ProcessOrderCommand{
        {OrderID: "1", UserID: "user1"},
        {OrderID: "2", UserID: "user2"},
        // ... more orders
    }

    return flow.ExecuteBatch(
        ctx,
        orders,
        func(ctx context.Context, cmd ProcessOrderCommand) error {
            return dispatcher.Dispatch(ctx, cmd)
        },
        50,  // batch size
        10,  // concurrency
    )
}

// Parallel notification example
func notifyAllChannels(ctx context.Context, userID, message string) error {
    cmd := NotifyUserCommand{UserID: userID, Message: message}

    return flow.ParallelExecute(
        ctx,
        cmd,
        []command.CommandFunc[NotifyUserCommand]{
            sendEmail,
            sendSMS,
            sendPushNotification,
        },
    )
}

func main() {
    // Register handlers
    dispatcher.SubscribeCommand(&OrderProcessor{
        orderService: &orderService{},
        logger:       &logger{},
    })

    dispatcher.SubscribeCommandFunc(func(ctx context.Context, cmd NotifyUserCommand) error {
        return notifyAllChannels(ctx, cmd.UserID, cmd.Message)
    })

    // Process an order
    err := dispatcher.Dispatch(context.Background(), ProcessOrderCommand{
        OrderID: "12345",
        UserID:  "user-789",
    })

    if err != nil {
        log.Fatal("Failed to process order:", err)
    }
}

Error Handling

The package provides consistent error handling across all execution strategies:

// Configure error handler
handler := runner.NewHandler(
    runner.WithErrorHandler(func(err error) {
        log.Printf("Command execution failed: %v", err)
    }),
)

// Custom retryable errors
type RetryableError struct {
    err   error
    delay time.Duration
}

func (e RetryableError) Error() string { return e.err.Error() }
func (e RetryableError) IsRetryable() bool { return true }
func (e RetryableError) RetryDelay(attempt int) time.Duration {
    return e.delay * time.Duration(attempt)
}

Testing

The package provides utilities for testing:

import "github.com/goliatone/go-command/registry"

func TestCommand(t *testing.T) {
    registry.WithTestRegistry(func() {
        // Register test command
        registry.RegisterCommand(myCommand)

        // Start registry
        err := registry.Start(context.Background())
        require.NoError(t, err)

        // Test execution
        err = dispatcher.Dispatch(context.Background(), MyCommand{})
        assert.NoError(t, err)
    })
}

Advanced Features

Message Type Resolution

Messages can implement custom type resolution:

func (m MyMessage) Type() string {
    return "custom.message.type"
}

Or rely on automatic type detection based on struct name.

Context Propagation

All operations support context for:

  • Cancellation
  • Deadlines
  • Value propagation
  • Tracing integration
Thread Safety

All components are designed to be thread-safe and can be used concurrently.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrValidation = errors.New("validation error", errors.CategoryValidation).
	WithTextCode("VALIDATION_FAILED")

ErrValidation is a sentinel error used to mark validation failures. Wrappers can compare errors with errors.Is(err, ErrValidation) to propagate validation intent through additional layers.

Functions

func ContextWithResult added in v0.3.0

func ContextWithResult[T any](ctx context.Context, result *Result[T]) context.Context

Context helpers

func DefaultPanicLogger added in v0.2.0

func DefaultPanicLogger(funcName string, err any, stack []byte, fields ...map[string]any)

func GetGoroutineID added in v0.2.0

func GetGoroutineID() uint64

func GetMessageType added in v0.3.0

func GetMessageType(msg any) string

func IsNilMessage

func IsNilMessage(msg any) bool

func MakePanicHandler added in v0.2.0

func MakePanicHandler(logger PanicLogger) func(funcName string, fields ...map[string]any)

func NilCronRegister added in v0.3.0

func NilCronRegister(opts HandlerConfig, handler any) error

func ValidateMessage added in v0.3.0

func ValidateMessage(msg any) error

Types

type CLICommand added in v0.3.0

type CLICommand interface {
	CLIHandler() any
	CLIOptions() CLIConfig
}

type CLIConfig added in v0.3.0

type CLIConfig struct {
	Path        []string
	Description string
	// Group is kept for backward compatibility; prefer Groups for nested paths.
	Group   string
	Groups  []CLIGroup
	Aliases []string
	Hidden  bool
}

func (CLIConfig) BuildTags added in v0.3.0

func (opts CLIConfig) BuildTags() []string

type CLIGroup added in v0.7.0

type CLIGroup struct {
	Name        string
	Description string
}

type CommandExposure added in v0.13.0

type CommandExposure struct {
	// ExposeInAdmin signals this command can be listed in go-admin UI/REPL.
	ExposeInAdmin bool
	// Tags for grouping/filtering in UI (e.g. "debug", "ops").
	Tags []string
	// Permissions required to execute (admin will enforce).
	// If empty, consumers should derive defaults from Mutates.
	Permissions []string
	// Roles allowed to execute (optional).
	Roles []string
	// Mutates signals side effects; false implies read-only.
	Mutates bool
}

CommandExposure declares optional UI/REPL exposure metadata. The zero value is safe: ExposeInAdmin is false and Mutates is read-only.

func ExposureOf added in v0.13.0

func ExposureOf(cmd any) (CommandExposure, bool)

ExposureOf returns exposure metadata if cmd implements ExposableCommand.

type CommandFunc

type CommandFunc[T any] func(ctx context.Context, msg T) error

CommandFunc is an adapter that lets you use a function as a CommandHandler[T]

func (CommandFunc[T]) Execute

func (f CommandFunc[T]) Execute(ctx context.Context, msg T) error

Execute calls the underlying function

type CommandMeta added in v0.11.0

type CommandMeta struct {
	MessageType      string
	MessageValue     any
	MessageTypeValue reflect.Type
}

func MessageTypeForCommand added in v0.11.0

func MessageTypeForCommand(cmd any) CommandMeta

type Commander

type Commander[T any] interface {
	Execute(ctx context.Context, msg T) error
}

Commander is responsible for executing side effects

type CronCommand added in v0.3.0

type CronCommand interface {
	CronHandler() func() error
	CronOptions() HandlerConfig
}

type ExposableCommand added in v0.13.0

type ExposableCommand interface {
	Exposure() CommandExposure
}

ExposableCommand can be implemented by CLICommand (or any Command/Query). Opt-in is explicit: consumers should ignore commands that do not implement it or return ExposeInAdmin=false.

type HTTPCommand added in v0.3.0

type HTTPCommand interface {
	HTTPHandler()
}

type HandlerConfig added in v0.3.0

type HandlerConfig struct {
	Timeout    time.Duration `json:"timeout"`
	Deadline   time.Time     `json:"deadline"`
	MaxRetries int           `json:"max_retries"`
	MaxRuns    int           `json:"max_runs"`
	RunOnce    bool          `json:"run_once"`
	Expression string        `json:"expression"`
	NoTimeout  bool          `json:"no_timeout"`
}

type Message

type Message interface {
	Type() string
}

Message is the interface command and queries messages must implement

type MessageFactory added in v0.11.0

type MessageFactory interface {
	MessageValue() any
}

MessageFactory provides a concrete message value for interface-based commands.

type PanicLogger added in v0.2.0

type PanicLogger func(funcName string, err any, stack []byte, fields ...map[string]any)

type Querier

type Querier[T any, R any] interface {
	Query(ctx context.Context, msg T) (R, error)
}

Querier is responsible for returning data, with no side effects

type QueryFunc

type QueryFunc[T any, R any] func(ctx context.Context, msg T) (R, error)

QueryFunc is an adapter that lets you use a function as a QueryHandler[T, R]

func (QueryFunc[T, R]) Query

func (f QueryFunc[T, R]) Query(ctx context.Context, msg T) (R, error)

Query calls the underlying function

type Registry added in v0.3.0

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry added in v0.3.0

func NewRegistry() *Registry

func (*Registry) AddResolver added in v0.11.0

func (r *Registry) AddResolver(key string, res Resolver) error

func (*Registry) GetCLIOptions added in v0.3.0

func (r *Registry) GetCLIOptions() ([]kong.Option, error)

func (*Registry) HasResolver added in v0.11.0

func (r *Registry) HasResolver(key string) bool

func (*Registry) Initialize added in v0.3.0

func (r *Registry) Initialize() error

func (*Registry) RegisterCommand added in v0.3.0

func (r *Registry) RegisterCommand(cmd any) error

func (*Registry) SetCronRegister added in v0.3.0

func (r *Registry) SetCronRegister(fn func(opts HandlerConfig, handler any) error) *Registry

type Resolver added in v0.11.0

type Resolver func(cmd any, meta CommandMeta, r *Registry) error

type Result added in v0.3.0

type Result[T any] struct {
	// contains filtered or unexported fields
}

Result collector implementation from before

func NewResult added in v0.3.0

func NewResult[T any]() *Result[T]

func ResultFromContext added in v0.3.0

func ResultFromContext[T any](ctx context.Context) *Result[T]

func (*Result[T]) Error added in v0.3.0

func (r *Result[T]) Error() error

func (*Result[T]) GetMetadata added in v0.3.0

func (r *Result[T]) GetMetadata(key string) (any, bool)

func (*Result[T]) Load added in v0.3.0

func (r *Result[T]) Load() (T, bool)

func (*Result[T]) Store added in v0.3.0

func (r *Result[T]) Store(value T)

func (*Result[T]) StoreError added in v0.3.0

func (r *Result[T]) StoreError(err error)

func (*Result[T]) StoreWithMeta added in v0.3.0

func (r *Result[T]) StoreWithMeta(value T, meta map[string]any)

type ResultKey added in v0.3.0

type ResultKey[T any] struct{}

Directories

Path Synopsis
examples
flow/resilience command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL