eventbus

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2025 License: MIT Imports: 7 Imported by: 1

README

EventBus Module

The EventBus Module provides a publish-subscribe messaging system for Modular applications. It enables decoupled communication between components through a flexible event-driven architecture.

Features

  • In-memory event publishing and subscription
  • Support for both synchronous and asynchronous event handling
  • Topic-based routing
  • Event history tracking
  • Configurable worker pool for asynchronous event processing
  • Extensible design with support for external message brokers

Installation

import (
    "github.com/GoCodeAlone/modular"
    "github.com/GoCodeAlone/modular/modules/eventbus"
)

// Register the eventbus module with your Modular application
app.RegisterModule(eventbus.NewModule())

Configuration

The eventbus module can be configured using the following options:

eventbus:
  engine: memory              # Event bus engine (memory, redis, kafka)
  maxEventQueueSize: 1000     # Maximum events to queue per topic
  defaultEventBufferSize: 10  # Default buffer size for subscription channels
  workerCount: 5              # Worker goroutines for async event processing
  eventTTL: 3600              # TTL for events in seconds (1 hour)
  retentionDays: 7            # Days to retain event history
  externalBrokerURL: ""       # URL for external message broker (if used)
  externalBrokerUser: ""      # Username for external message broker (if used)
  externalBrokerPassword: ""  # Password for external message broker (if used)

Usage

Accessing the EventBus Service
// In your module's Init function
func (m *MyModule) Init(app modular.Application) error {
    var eventBusService *eventbus.EventBusModule
    err := app.GetService("eventbus.provider", &eventBusService)
    if err != nil {
        return fmt.Errorf("failed to get event bus service: %w", err)
    }
    
    // Now you can use the event bus service
    m.eventBus = eventBusService
    return nil
}
Using Interface-Based Service Matching
// Define the service dependency
func (m *MyModule) RequiresServices() []modular.ServiceDependency {
    return []modular.ServiceDependency{
        {
            Name:               "eventbus",
            Required:           true,
            MatchByInterface:   true,
            SatisfiesInterface: reflect.TypeOf((*eventbus.EventBus)(nil)).Elem(),
        },
    }
}

// Access the service in your constructor
func (m *MyModule) Constructor() modular.ModuleConstructor {
    return func(app modular.Application, services map[string]any) (modular.Module, error) {
        eventBusService := services["eventbus"].(eventbus.EventBus)
        return &MyModule{eventBus: eventBusService}, nil
    }
}
Publishing Events
// Publish a simple event
err := eventBusService.Publish(ctx, "user.created", user)
if err != nil {
    // Handle error
}

// Publish an event with metadata
metadata := map[string]interface{}{
    "source": "user-service",
    "version": "1.0",
}

event := eventbus.Event{
    Topic:    "user.created",
    Payload:  user,
    Metadata: metadata,
}

err = eventBusService.Publish(ctx, event)
if err != nil {
    // Handle error
}
Subscribing to Events
// Synchronous subscription
subscription, err := eventBusService.Subscribe(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    user := event.Payload.(User)
    fmt.Printf("User created: %s\n", user.Name)
    return nil
})

if err != nil {
    // Handle error
}

// Asynchronous subscription (handler runs in a worker goroutine)
asyncSub, err := eventBusService.SubscribeAsync(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    // This function is executed asynchronously
    user := event.Payload.(User)
    time.Sleep(1 * time.Second) // Simulating work
    fmt.Printf("Processed user asynchronously: %s\n", user.Name)
    return nil
})

// Unsubscribe when done
defer eventBusService.Unsubscribe(ctx, subscription)
defer eventBusService.Unsubscribe(ctx, asyncSub)
Working with Topics
// List all active topics
topics := eventBusService.Topics()
fmt.Println("Active topics:", topics)

// Get subscriber count for a topic
count := eventBusService.SubscriberCount("user.created")
fmt.Printf("Subscribers for 'user.created': %d\n", count)

Event Handling Best Practices

  1. Keep Handlers Lightweight: Event handlers should be quick and efficient, especially for synchronous subscriptions

  2. Error Handling: Always handle errors in your event handlers, especially for async handlers

  3. Topic Organization: Use hierarchical topics like "domain.event.action" for better organization

  4. Type Safety: Consider defining type-safe wrappers around the event bus for specific event types

  5. Context Usage: Use the provided context to implement cancellation and timeouts

Implementation Notes

  • The in-memory event bus uses channels to distribute events to subscribers
  • Asynchronous handlers are executed in a worker pool to limit concurrency
  • Event history is retained based on the configured retention period
  • The module is extensible to support external message brokers in the future

Testing

The eventbus module includes tests for module initialization, configuration, and lifecycle management.

Documentation

Index

Constants

View Source
const ModuleName = "eventbus"

ModuleName is the name of this module

View Source
const ServiceName = "eventbus.provider"

ServiceName is the name of the service provided by this module

Variables

This section is empty.

Functions

func NewModule

func NewModule() modular.Module

NewModule creates a new instance of the event bus module

Types

type Event

type Event struct {
	// Topic is the channel or subject of the event
	Topic string `json:"topic"`

	// Payload is the data associated with the event
	Payload interface{} `json:"payload"`

	// Metadata contains additional information about the event
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// CreatedAt is when the event was created
	CreatedAt time.Time `json:"createdAt"`

	// ProcessingStarted is when the event processing started
	ProcessingStarted *time.Time `json:"processingStarted,omitempty"`

	// ProcessingCompleted is when the event processing completed
	ProcessingCompleted *time.Time `json:"processingCompleted,omitempty"`
}

Event represents a message in the event bus

type EventBus

type EventBus interface {
	// Start initializes the event bus
	Start(ctx context.Context) error

	// Stop shuts down the event bus
	Stop(ctx context.Context) error

	// Publish sends an event to the specified topic
	Publish(ctx context.Context, event Event) error

	// Subscribe registers a handler for a topic with synchronous processing
	Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// SubscribeAsync registers a handler for a topic with asynchronous processing
	SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// Unsubscribe removes a subscription
	Unsubscribe(ctx context.Context, subscription Subscription) error

	// Topics returns a list of all active topics
	Topics() []string

	// SubscriberCount returns the number of subscribers for a topic
	SubscriberCount(topic string) int
}

EventBus defines the interface for an event bus implementation

type EventBusConfig

type EventBusConfig struct {
	// Engine specifies the event bus engine to use ("memory", "redis", "kafka", etc.)
	Engine string `json:"engine" yaml:"engine" validate:"oneof=memory redis kafka" env:"ENGINE"`

	// MaxEventQueueSize is the maximum number of events to queue per topic
	MaxEventQueueSize int `json:"maxEventQueueSize" yaml:"maxEventQueueSize" validate:"min=1" env:"MAX_EVENT_QUEUE_SIZE"`

	// DefaultEventBufferSize is the default buffer size for subscription channels
	DefaultEventBufferSize int `json:"defaultEventBufferSize" yaml:"defaultEventBufferSize" validate:"min=1" env:"DEFAULT_EVENT_BUFFER_SIZE"`

	// WorkerCount is the number of worker goroutines for async event processing
	WorkerCount int `json:"workerCount" yaml:"workerCount" validate:"min=1" env:"WORKER_COUNT"`

	// EventTTL is the time to live for events in seconds
	EventTTL int `json:"eventTTL" yaml:"eventTTL" validate:"min=1" env:"EVENT_TTL"`

	// RetentionDays is how many days to retain event history
	RetentionDays int `json:"retentionDays" yaml:"retentionDays" validate:"min=1" env:"RETENTION_DAYS"`

	// External broker configuration
	ExternalBrokerURL      string `json:"externalBrokerURL" yaml:"externalBrokerURL" env:"EXTERNAL_BROKER_URL"`
	ExternalBrokerUser     string `json:"externalBrokerUser" yaml:"externalBrokerUser" env:"EXTERNAL_BROKER_USER"`
	ExternalBrokerPassword string `json:"externalBrokerPassword" yaml:"externalBrokerPassword" env:"EXTERNAL_BROKER_PASSWORD"`
}

EventBusConfig defines the configuration for the event bus module

type EventBusModule

type EventBusModule struct {
	// contains filtered or unexported fields
}

EventBusModule represents the event bus module

func (*EventBusModule) Constructor

func (m *EventBusModule) Constructor() modular.ModuleConstructor

Constructor provides a dependency injection constructor for the module

func (*EventBusModule) Dependencies

func (m *EventBusModule) Dependencies() []string

Dependencies returns the names of modules this module depends on

func (*EventBusModule) Init

func (m *EventBusModule) Init(app modular.Application) error

Init initializes the module

func (*EventBusModule) Name

func (m *EventBusModule) Name() string

Name returns the name of the module

func (*EventBusModule) ProvidesServices

func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider

ProvidesServices declares services provided by this module

func (*EventBusModule) Publish

func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error

Publish publishes an event to the event bus

func (*EventBusModule) RegisterConfig

func (m *EventBusModule) RegisterConfig(app modular.Application) error

RegisterConfig registers the module's configuration structure

func (*EventBusModule) RequiresServices

func (m *EventBusModule) RequiresServices() []modular.ServiceDependency

RequiresServices declares services required by this module

func (*EventBusModule) Start

func (m *EventBusModule) Start(ctx context.Context) error

Start performs startup logic for the module

func (*EventBusModule) Stop

func (m *EventBusModule) Stop(ctx context.Context) error

Stop performs shutdown logic for the module

func (*EventBusModule) Subscribe

func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe subscribes to a topic on the event bus

func (*EventBusModule) SubscribeAsync

func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync subscribes to a topic with asynchronous event handling

func (*EventBusModule) SubscriberCount

func (m *EventBusModule) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*EventBusModule) Topics

func (m *EventBusModule) Topics() []string

Topics returns a list of all active topics

func (*EventBusModule) Unsubscribe

func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe cancels a subscription

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

EventHandler is a function that handles an event

type MemoryEventBus

type MemoryEventBus struct {
	// contains filtered or unexported fields
}

MemoryEventBus implements EventBus using in-memory channels

func NewMemoryEventBus

func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus

NewMemoryEventBus creates a new in-memory event bus

func (*MemoryEventBus) Publish

func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic

func (*MemoryEventBus) Start

func (m *MemoryEventBus) Start(ctx context.Context) error

Start initializes the event bus

func (*MemoryEventBus) Stop

func (m *MemoryEventBus) Stop(ctx context.Context) error

Stop shuts down the event bus

func (*MemoryEventBus) Subscribe

func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*MemoryEventBus) SubscribeAsync

func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*MemoryEventBus) SubscriberCount

func (m *MemoryEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*MemoryEventBus) Topics

func (m *MemoryEventBus) Topics() []string

Topics returns a list of all active topics

func (*MemoryEventBus) Unsubscribe

func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type Subscription

type Subscription interface {
	// Topic returns the topic being subscribed to
	Topic() string

	// ID returns the unique identifier for this subscription
	ID() string

	// IsAsync returns true if this is an asynchronous subscription
	IsAsync() bool

	// Cancel cancels the subscription
	Cancel() error
}

Subscription represents a subscription to a topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL