GoBroke

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: GPL-2.0 Imports: 14 Imported by: 0

README

GoBroke

GoBroke is a lightweight internal message routing system designed for modular logic processing in Go applications. It provides a clean architecture for handling messages between different components (clients and logic modules) within your project, with optional Redis integration for high availability across multiple instances.

It must be noted that this project may not be for you... I would highly recommend you look into using Kafka or Redis as an alternative. This is primarily for my own interest and another project I am working on.

Overview

GoBroke acts as a message router that:

  • Routes messages between clients and logic modules
  • Supports different types of logic processing (Dispatched, Worker, Passive)
  • Allows custom endpoint implementations (HTTP, UDP, TCP, gRPC, etc.)
  • Provides clean separation between message handling and business logic
  • Enables high availability through Redis integration

Architecture

Core Components
  1. Broke: The main router that manages message flow between clients and logic modules
  2. Endpoint: Interface for implementing custom network protocols
  3. Client: Represents connected clients and manages their state
  4. Logic: Interface for implementing business logic modules
  5. Message: Structure for passing data between components
  6. LogicBase: Base implementation providing common logic functionality
  7. Redis Integration: Optional component for enabling high availability across multiple instances
Message Flow
[Client] <-> [Endpoint] <-> [Broke] <-> [Logic Modules]
                             |
                        [Redis Layer]
                             |
                     [Other GoBroke Instances]

Messages can flow:

  • From clients to logic modules
  • From logic modules to specific clients
  • From logic modules to all clients (broadcast)
  • Between GoBroke instances via Redis (when enabled)

Logic Implementation

All logic modules in GoBroke extend the LogicBase struct, which provides common functionality:

type LogicBase struct {
    name      string
    logicType types.LogicType
    Ctx       context.Context
    *Broke
}

To create a new logic module, embed LogicBase and initialize it using NewLogicBase:

type customLogic struct {
    GoBroke.LogicBase
    // Additional fields specific to your logic
}

func CreateCustomLogic(broke *GoBroke.Broke) types.Logic {
    logic := customLogic{
        LogicBase: GoBroke.NewLogicBase("customlogic", types.DISPATCHED, broke),
        // Initialize additional fields
    }
    return &logic
}

Logic Types

GoBroke supports three types of logic modules:

1. DISPATCHED Logic
  • Processes messages immediately in a new goroutine
  • Best for quick, non-blocking operations
  • Suitable for broadcasting or simple transformations
  • Example: Message broadcaster
type broadcasterDispatched struct {
    GoBroke.LogicBase
}

func CreateDispatched(broke *GoBroke.Broke) types.Logic {
    worker := broadcasterDispatched{
        LogicBase: GoBroke.NewLogicBase("broadcaster", types.DISPATCHED, broke),
    }
    return &worker
}

func (w *broadcasterDispatched) RunLogic(msg types.Message) error {
    clients := w.GetAllClients()
    sMsg := types.Message{
        ToClient:   clients,
        FromLogic:  w,
        MessageRaw: msg.MessageRaw,
    }
    w.SendMessage(sMsg)
    return nil
}
2. WORKER Logic
  • Processes messages in a dedicated worker goroutine
  • Maintains its own message queue
  • Best for sequential processing or rate-limited operations
  • Example: Sequential message processor
type broadcasterWorker struct {
    GoBroke.LogicBase
    receive chan types.Message
}

func CreateWorker(broke *GoBroke.Broke, ctx context.Context) types.Logic {
    worker := broadcasterWorker{
        LogicBase: GoBroke.NewLogicBase("broadcaster", types.WORKER, broke),
        receive:   make(chan types.Message),
    }
    worker.startWorker()
    return &worker
}

func (w *broadcasterWorker) startWorker() {
    for {
        select {
        case <-w.Ctx.Done():
            return
        case msg := <-w.receive:
            w.work(msg)
        }
    }
}

func (w *broadcasterWorker) RunLogic(message types.Message) error {
    w.receive <- message
    return nil
}
3. PASSIVE Logic
  • Runs independently of message flow
  • Never receives messages directly
  • Best for background tasks or monitoring
  • Example: Inactivity monitor
type inactivityMonitor struct {
    GoBroke.LogicBase
    inactivityMinutes int
}

func CreateWorker(broke *GoBroke.Broke, inactivityMinutes int) types.Logic {
    worker := inactivityMonitor{
        LogicBase:         GoBroke.NewLogicBase("inactivitymonitor", types.PASSIVE, broke),
        inactivityMinutes: inactivityMinutes,
    }
    worker.startWorker()
    return &worker
}

func (w *inactivityMonitor) startWorker() {
    for {
        select {
        case <-w.Ctx.Done():
            return
        default:
            time.Sleep(10 * time.Second)
            clients := w.GetAllClients()
            for _, client := range clients {
                delta := time.Now().Sub(client.GetLastMessage())
                if delta.Minutes() > float64(w.inactivityMinutes) {
                    _ = w.RemoveClient(client)
                }
            }
        }
    }
}

func (w *inactivityMonitor) RunLogic(message types.Message) error {
    return fmt.Errorf("this logic does not support invocation")
}

Getting Started

  1. Create a new GoBroke instance:
ctx := context.Background()

// Optional: Configure Redis for high availability
redisClient := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
})

// Create GoBroke instance with optional Redis integration
gb, err := GoBroke.New(
    yourendpoint,
    GoBroke.WithContext(ctx),
    // Optional: Enable Redis integration
    GoBroke.WithRedis(GoBroke.RedisConfig{
        Client:      redisClient,
        ChannelName: "gobroke:messages",
        InstanceID:  "instance-1",
    }),
)
if err != nil {
    panic(err)
}
  1. Implement your logic modules:
// Create and add logic modules
broadcasterLogic := broadcaster.CreateDispatched(gb)
_ = gb.AddLogic(broadcasterLogic)

inactivityMonitor := inactivitymonitor.CreateWorker(gb, 15)
_ = gb.AddLogic(inactivityMonitor)
  1. Implement an endpoint:
// Implement the endpoint.Endpoint interface
type Endpoint interface {
    Sender(chan types.Message) error
    Receiver(chan types.Message) error
    Disconnect(*clients.Client) error
}
  1. Start the router:
gb.Start()

Custom Endpoints

To implement a custom endpoint:

  1. Create a struct that implements the endpoint.Endpoint interface
  2. Implement the required methods:
    • Sender: Handle outgoing messages
    • Receiver: Handle incoming messages
    • Disconnect: Handle client disconnection

Example WebSocket endpoint structure:

type WSEndpoint struct {
    upgrader websocket.Upgrader
    clients  map[string]*websocket.Conn
}

func (e *WSEndpoint) Sender(ch chan types.Message) error {
    // Implement message sending logic
}

func (e *WSEndpoint) Receiver(ch chan types.Message) error {
    // Implement message receiving logic
}

func (e *WSEndpoint) Disconnect(client *clients.Client) error {
    // Implement client disconnection logic
}

Message Structure

Messages in GoBroke contain:

  • Target clients (ToClient)
  • Target logic modules (ToLogic)
  • Source client (FromClient)
  • Source logic module (FromLogic)
  • Raw message data (MessageRaw)
  • Metadata for additional context (Metadata)
  • Unique identifier (UUID)
  • Message state (State)
  • Tags for middleware processing (Tags)
Message State and Control

Messages can be in one of two states:

  • ACCEPTED (default): Message continues through the processing pipeline
  • REJECTED: Message is dropped from the processing pipeline

Control methods:

// Accept the message for further processing
message.Accept()

// Reject the message to prevent further processing
message.Reject()
Message Tags

Tags provide a way to attach and retrieve arbitrary data during message processing:

// Add a tag to the message
message.AddTag("priority", "high")

// Retrieve a tag value
value, err := message.GetTag("priority", nil)

Middleware

GoBroke supports middleware functions for both receiving and sending messages. Middleware can modify messages, add tags, or control message flow through accept/reject states.

Adding Middleware
// Middleware function type
type middlewareFunc func(types.Message) types.Message

// Add receive middleware (executed when messages are received)
gb.AttachReceiveMiddleware(func(msg types.Message) types.Message {
    // Process incoming message
    return msg
})

// Add send middleware (executed before messages are sent)
gb.AttachSendMiddleware(func(msg types.Message) types.Message {
    // Process outgoing message
    return msg
})

Example middleware for message filtering:

gb.AttachReceiveMiddleware(func(msg types.Message) types.Message {
    // Reject messages larger than 1MB
    if len(msg.MessageRaw) > 1024*1024 {
        msg.Reject()
    }
    return msg
})

Redis Integration

GoBroke supports Redis integration for enabling high availability across multiple instances. When enabled, this feature allows:

  1. Client discovery across instances
  2. Message routing between instances
  3. High availability for client connections
  4. Last message time synchronization
Configuration

Redis integration is configured through the WithRedis option:

redisClient := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
})

broker, err := GoBroke.New(
    endpoint.NewStubEndpoint(),
    GoBroke.WithContext(ctx),
    GoBroke.WithRedis(GoBroke.RedisConfig{
        Client:      redisClient,
        ChannelName: "gobroke:messages",
        InstanceID:  "instance-1",
    }),
)
How It Works
  1. Client Registration:

    • Clients are registered both locally and in Redis
    • Registration includes instance ID and last message time
    • Registrations expire after 24 hours to prevent stale entries
  2. Message Routing:

    • Messages are automatically routed to the correct instance
    • Uses Redis pub/sub for inter-instance communication
    • Includes loop prevention mechanisms
  3. Client Discovery:

    • GetAllClients returns clients from all instances
    • Creates virtual client references for remote clients
    • Includes last message times from all instances
  4. High Availability:

    • Clients can connect to any instance
    • Messages are automatically routed to the correct instance
    • Last message times are synchronized across instances
Best Practices
  1. Use meaningful instance IDs for debugging
  2. Implement monitoring for Redis connectivity
  3. Consider Redis Cluster or Sentinel for production
  4. Handle Redis errors gracefully
  5. Monitor client inactivity across instances
Limitations
  1. Client metadata is not synchronized between instances
  2. Logic handlers run only on the instance that receives the message
  3. Redis becomes a single point of failure unless using Cluster/Sentinel

Best Practices

  1. Logic Type Selection:

    • Use DISPATCHED for simple, non-blocking operations
    • Use WORKER for sequential or rate-limited processing
    • Use PASSIVE for background tasks and monitoring
  2. Context Usage:

    • Use the context provided by LogicBase for cancellation
    • Add timeouts where appropriate
    • Handle context cancellation in worker loops
  3. Message Processing:

    • Keep message processing logic concise
    • Use appropriate goroutines for concurrent processing
    • Consider message ordering requirements when choosing logic types
  4. LogicBase Usage:

    • Extend LogicBase for all logic implementations
    • Use the provided context for cancellation handling
    • Access common functionality through LogicBase methods
  5. High Availability:

    • Use Redis integration for production deployments
    • Configure appropriate Redis timeouts
    • Monitor Redis connectivity
    • Implement proper error handling

Releasing (module tags)

This module is consumed as github.com/A13xB0/GoBroke. After merging changes to main:

git tag -a v0.X.Y -m "v0.X.Y: summary"
git push origin main
git push origin v0.X.Y

Downstream repos should bump require github.com/A13xB0/GoBroke v0.X.Y and remove any replace ... => ../GoBroke once the tag is visible on GitHub.

License

This project is licensed under the terms specified in the LICENSE file.

Note

This is primarily a personal project focused on clean architecture and modular design in Go. While it's functional and can be used in other projects, it's primarily meant as a learning tool and reference implementation.

Documentation

Overview

Package GoBroke provides a flexible message broker implementation for handling client-to-client and client-to-logic communication patterns. It supports different types of message routing, client management, and custom logic handlers.

Package GoBroke provides configuration options for the GoBroke message broker system.

Package GoBroke provides the base implementation for logic handlers in the GoBroke message broker system.

Package GoBroke provides Redis integration for high availability message routing.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithChannelSize

func WithChannelSize(size int) brokeOptsFunc

WithChannelSize returns a brokeOptsFunc that sets the message channel buffer size. This affects how many messages can be queued before blocking occurs.

func WithContext

func WithContext(ctx context.Context) brokeOptsFunc

WithContext returns a brokeOptsFunc that sets a custom context for the broker. The context can be used to control the broker's lifecycle and pass values.

func WithOnLogicPanic added in v0.2.0

func WithOnLogicPanic(fn func(logicName types.LogicName, msg types.Message, recovered any, stack string)) brokeOptsFunc

WithOnLogicPanic sets a callback invoked when a logic handler panics. The broker always recovers so the process and message loop keep running; use this for structured logging.

func WithRedis added in v0.1.0

func WithRedis(config RedisConfig) brokeOptsFunc

WithRedis returns a brokeOptsFunc that enables Redis integration for high availability. This allows messages to be routed between multiple GoBroke instances.

Types

type Broke

type Broke struct {
	// contains filtered or unexported fields
}

Broke represents a message broker instance that manages client connections, message routing, and custom logic handlers.

func New

func New(endpoint endpoint.Endpoint, opts ...brokeOptsFunc) (*Broke, error)

New creates a new GoBroke instance with the specified endpoint and optional configuration. It returns an error if the endpoint is nil or if there are issues setting up message queues.

func (*Broke) AddLogic

func (broke *Broke) AddLogic(logic types.Logic) error

AddLogic adds a new logic handler to the GoBroke instance. It returns an error if a logic handler with the same name already exists.

func (*Broke) AttachReceiveMiddleware added in v0.0.2

func (broke *Broke) AttachReceiveMiddleware(mFunc middlewareFunc)

AttachReceiveMiddleware adds a middleware function to the receive message pipeline. Middleware functions are executed in the order they are attached and can modify or filter messages before they are processed by the broker.

The middleware function receives a Message and returns a modified Message or nil if the message should be dropped from the pipeline.

func (*Broke) AttachSendMiddleware added in v0.0.2

func (broke *Broke) AttachSendMiddleware(mFunc middlewareFunc)

AttachSendMiddleware adds a middleware function to the send message pipeline. Middleware functions are executed in the order they are attached and can modify or filter messages before they are sent to clients.

The middleware function receives a Message and returns a modified Message or nil if the message should be dropped from the pipeline.

func (*Broke) GetAllClients

func (broke *Broke) GetAllClients(localOnly ...bool) []*clients.Client

GetAllClients returns a slice containing all currently connected clients. If Redis is enabled, it also includes clients connected to other instances.

func (*Broke) GetClient

func (broke *Broke) GetClient(uuid string, localOnly ...bool) (*clients.Client, error)

GetClient retrieves a client by their UUID. It returns the client instance and nil if found, or nil and an error if not found. If Redis is enabled and the client is not found locally, it checks if the client exists on another instance.

func (*Broke) GetEndpoint added in v0.1.0

func (broke *Broke) GetEndpoint() endpoint.Endpoint

GetEndpoint returns the endpoint used by this broker. This can be useful for endpoint-specific operations.

func (*Broke) RegisterClient

func (broke *Broke) RegisterClient(client *clients.Client) error

RegisterClient registers a new client in the GoBroke instance. This method should be called from the endpoint implementation. It returns an error if the client is already registered.

func (*Broke) RemoveClient

func (broke *Broke) RemoveClient(client *clients.Client) error

RemoveClient removes a client from the GoBroke instance and disconnects them from the endpoint. It can remove clients from both the local instance and from Redis. If the client is not found locally but exists in Redis, it will be removed from Redis.

func (*Broke) RemoveLogic

func (broke *Broke) RemoveLogic(name types.LogicName) error

RemoveLogic removes a logic handler from the GoBroke instance by its name. It returns nil even if the logic handler doesn't exist.

func (*Broke) SendMessage

func (broke *Broke) SendMessage(message types.Message)

SendMessage queues a message for processing by GoBroke. This method can be used to send messages to both logic handlers and clients. If the message is from a client, their last message timestamp is updated. If Redis is enabled and the message is for clients not on this instance, it will be published to Redis for routing to other instances.

func (*Broke) SendMessageQuickly

func (broke *Broke) SendMessageQuickly(message types.Message)

SendMessageQuickly sends a message directly to the endpoint for processing. This method should only be used for client-to-client communication as it bypasses logic handlers.

func (*Broke) Start

func (broke *Broke) Start()

Start begins processing messages in the GoBroke instance. It runs until the context is cancelled, at which point it closes all message queues and stops processing.

type LogicBase

type LogicBase struct {
	Ctx    context.Context // Context for cancellation and value propagation
	*Broke                 // Embedded broker instance for accessing broker functionality
	// contains filtered or unexported fields
}

LogicBase provides a base implementation of the types.Logic interface. It implements common functionality that can be embedded in specific logic handlers.

func NewLogicBase

func NewLogicBase(name types.LogicName, logicType types.LogicType, broke *Broke) LogicBase

NewLogicBase creates a new LogicBase instance with the specified configuration. Parameters:

  • name: Unique identifier for the logic handler
  • logicType: Determines how messages are processed (WORKER, DISPATCHED, or PASSIVE)
  • broke: Reference to the broker instance

Returns a LogicBase configured with the provided parameters and a derived context.

func (LogicBase) Name

func (w LogicBase) Name() types.LogicName

Name returns the unique identifier of this logic handler. This method satisfies part of the types.Logic interface.

func (LogicBase) Type

func (w LogicBase) Type() types.LogicType

Type returns the LogicType of this handler (WORKER, DISPATCHED, or PASSIVE). This method satisfies part of the types.Logic interface.

type RedisConfig added in v0.1.0

type RedisConfig struct {
	Enabled     bool
	Client      *redis.Client // Optional existing Redis client
	ChannelName string
	InstanceID  string // Unique identifier for this GoBroke instance
}

RedisConfig holds configuration for Redis integration.

Directories

Path Synopsis
Package clients provides client management functionality for the GoBroke system.
Package clients provides client management functionality for the GoBroke system.
Package endpoint defines the interface for communication endpoints in the GoBroke system.
Package endpoint defines the interface for communication endpoints in the GoBroke system.
Package brokeerrors provides error definitions for the GoBroke message broker system.
Package brokeerrors provides error definitions for the GoBroke message broker system.
examples
cmd command
logic/broadcaster
Package broadcaster provides example implementations of logic handlers for broadcasting messages.
Package broadcaster provides example implementations of logic handlers for broadcasting messages.
logic/inactivitymonitor
Package inactivitymonitor provides a passive logic handler that monitors client activity and automatically removes clients that have been inactive for a specified duration.
Package inactivitymonitor provides a passive logic handler that monitors client activity and automatically removes clients that have been inactive for a specified duration.
Package message provides message creation and management functionality for the GoBroke system.
Package message provides message creation and management functionality for the GoBroke system.
Package types provides core type definitions for the GoBroke message broker system.
Package types provides core type definitions for the GoBroke message broker system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL