Documentation
¶
Overview ¶
Package eventbus provides a flexible event-driven messaging system for the modular framework.
This module enables decoupled communication between application components through an event bus pattern. It supports both synchronous and asynchronous event processing, multiple event bus engines, and configurable event handling strategies.
Features ¶
The eventbus module offers the following capabilities:
- Topic-based event publishing and subscription
- Synchronous and asynchronous event processing
- Multiple engine support (memory, Redis, Kafka)
- Configurable worker pools for async processing
- Event metadata and lifecycle tracking
- Subscription management with unique identifiers
- Event TTL and retention policies
Configuration ¶
The module can be configured through the EventBusConfig structure:
config := &EventBusConfig{
Engine: "memory", // or "redis", "kafka"
MaxEventQueueSize: 1000, // events per topic queue
DefaultEventBufferSize: 10, // subscription channel buffer
WorkerCount: 5, // async processing workers
EventTTL: 3600, // event time-to-live in seconds
RetentionDays: 7, // event history retention
ExternalBrokerURL: "", // for external brokers
ExternalBrokerUser: "", // broker authentication
ExternalBrokerPassword: "", // broker password
}
Service Registration ¶
The module registers itself as a service for dependency injection:
// Get the event bus service
eventBus := app.GetService("eventbus.provider").(*EventBusModule)
// Publish an event
err := eventBus.Publish(ctx, "user.created", userData)
// Subscribe to events
subscription, err := eventBus.Subscribe(ctx, "user.*", userEventHandler)
Usage Examples ¶
Basic event publishing:
// Publish a simple event
err := eventBus.Publish(ctx, "order.placed", orderData)
// Publish with custom metadata
event := Event{
Topic: "payment.processed",
Payload: paymentData,
Metadata: map[string]interface{}{
"source": "payment-service",
"version": "1.2.0",
},
}
err := eventBus.Publish(ctx, event.Topic, event.Payload)
Event subscription patterns:
// Synchronous subscription
subscription, err := eventBus.Subscribe(ctx, "user.updated", func(ctx context.Context, event Event) error {
user := event.Payload.(UserData)
return updateUserCache(user)
})
// Asynchronous subscription for heavy processing
asyncSub, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
imageData := event.Payload.(ImageData)
return processImageThumbnails(imageData)
})
// Wildcard subscriptions
allOrdersSub, err := eventBus.Subscribe(ctx, "order.*", orderEventHandler)
Subscription management:
// Check subscription details
fmt.Printf("Subscribed to: %s (ID: %s, Async: %v)",
subscription.Topic(), subscription.ID(), subscription.IsAsync())
// Cancel specific subscriptions
err := eventBus.Unsubscribe(ctx, subscription)
// Or cancel through the subscription itself
err := subscription.Cancel()
Event Processing Patterns ¶
The module supports different event processing patterns:
**Synchronous Processing**: Events are processed immediately in the same goroutine that published them. Best for lightweight operations and when ordering is important.
**Asynchronous Processing**: Events are queued and processed by worker goroutines. Best for heavy operations, external API calls, or when you don't want to block the publisher.
Engine Support ¶
Currently supported engines:
- **memory**: In-process event bus using Go channels
- **redis**: Distributed event bus using Redis pub/sub (planned)
- **kafka**: Enterprise event bus using Apache Kafka (planned)
Index ¶
- Constants
- func NewModule() modular.Module
- type Event
- type EventBus
- type EventBusConfig
- type EventBusModule
- func (m *EventBusModule) Constructor() modular.ModuleConstructor
- func (m *EventBusModule) Dependencies() []string
- func (m *EventBusModule) Init(app modular.Application) error
- func (m *EventBusModule) Name() string
- func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider
- func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error
- func (m *EventBusModule) RegisterConfig(app modular.Application) error
- func (m *EventBusModule) RequiresServices() []modular.ServiceDependency
- func (m *EventBusModule) Start(ctx context.Context) error
- func (m *EventBusModule) Stop(ctx context.Context) error
- func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *EventBusModule) SubscriberCount(topic string) int
- func (m *EventBusModule) Topics() []string
- func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error
- type EventHandler
- type MemoryEventBus
- func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error
- func (m *MemoryEventBus) Start(ctx context.Context) error
- func (m *MemoryEventBus) Stop(ctx context.Context) error
- func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
- func (m *MemoryEventBus) SubscriberCount(topic string) int
- func (m *MemoryEventBus) Topics() []string
- func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
- type Subscription
Constants ¶
const ModuleName = "eventbus"
ModuleName is the unique identifier for the eventbus module.
const ServiceName = "eventbus.provider"
ServiceName is the name of the service provided by this module. Other modules can use this name to request the event bus service through dependency injection.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Event ¶
type Event struct {
// Topic is the channel or subject of the event.
// Topics are used for routing events to the appropriate subscribers.
// Topic names can use hierarchical patterns like "user.created" or "order.payment.failed".
Topic string `json:"topic"`
// Payload is the data associated with the event.
// This can be any serializable data structure that represents
// the event's information. The payload type should be consistent
// for events within the same topic.
Payload interface{} `json:"payload"`
// Metadata contains additional information about the event.
// This can include source information, correlation IDs, version numbers,
// or any other contextual data that doesn't belong in the main payload.
// Optional field that can be nil if no metadata is needed.
Metadata map[string]interface{} `json:"metadata,omitempty"`
// CreatedAt is when the event was created.
// This timestamp is set automatically when the event is published
// and can be used for event ordering, TTL calculations, and debugging.
CreatedAt time.Time `json:"createdAt"`
// ProcessingStarted is when the event processing started.
// This field is set when an event handler begins processing the event.
// Used for performance monitoring and timeout detection.
ProcessingStarted *time.Time `json:"processingStarted,omitempty"`
// ProcessingCompleted is when the event processing completed.
// This field is set when an event handler finishes processing the event,
// whether successfully or with an error. Used for performance monitoring
// and event lifecycle tracking.
ProcessingCompleted *time.Time `json:"processingCompleted,omitempty"`
}
Event represents a message in the event bus. Events are the core data structure used for communication between publishers and subscribers. They contain the message data along with metadata for tracking and processing.
type EventBus ¶
type EventBus interface {
// Start initializes the event bus.
// This method is called during module startup and should prepare
// the event bus for publishing and subscribing operations.
// For memory buses, this might initialize internal data structures.
// For network-based buses, this establishes connections.
Start(ctx context.Context) error
// Stop shuts down the event bus.
// This method is called during module shutdown and should cleanup
// all resources, close connections, and stop background processes.
// It should ensure all in-flight events are processed before returning.
Stop(ctx context.Context) error
// Publish sends an event to the specified topic.
// The event will be delivered to all active subscribers of the topic.
// The method should handle event queuing, topic routing, and delivery
// according to the engine's semantics.
Publish(ctx context.Context, event Event) error
// Subscribe registers a handler for a topic with synchronous processing.
// Events matching the topic will be delivered immediately to the handler
// in the same goroutine that published them. The publisher will wait
// for the handler to complete before continuing.
Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
// SubscribeAsync registers a handler for a topic with asynchronous processing.
// Events matching the topic will be queued for processing by worker goroutines.
// The publisher can continue immediately without waiting for processing.
// This is preferred for heavy operations or non-critical event handling.
SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
// Unsubscribe removes a subscription.
// After unsubscribing, the subscription will no longer receive events.
// This method should be idempotent and not return errors for
// subscriptions that are already cancelled.
Unsubscribe(ctx context.Context, subscription Subscription) error
// Topics returns a list of all active topics.
// This includes only topics that currently have at least one subscriber.
// Useful for monitoring, debugging, and administrative interfaces.
Topics() []string
// SubscriberCount returns the number of subscribers for a topic.
// This includes both synchronous and asynchronous subscriptions.
// Returns 0 if the topic has no subscribers or doesn't exist.
SubscriberCount(topic string) int
}
EventBus defines the interface for an event bus implementation. This interface abstracts the underlying messaging mechanism, allowing the eventbus module to support multiple backends (memory, Redis, Kafka) through a common API.
All operations are context-aware to support cancellation and timeouts. Implementations should be thread-safe and handle concurrent access properly.
type EventBusConfig ¶
type EventBusConfig struct {
// Engine specifies the event bus engine to use.
// Supported values: "memory", "redis", "kafka"
// Default: "memory"
Engine string `json:"engine" yaml:"engine" validate:"oneof=memory redis kafka" env:"ENGINE"`
// MaxEventQueueSize is the maximum number of events to queue per topic.
// When this limit is reached, new events may be dropped or publishers
// may be blocked, depending on the engine implementation.
// Must be at least 1.
MaxEventQueueSize int `json:"maxEventQueueSize" yaml:"maxEventQueueSize" validate:"min=1" env:"MAX_EVENT_QUEUE_SIZE"`
// DefaultEventBufferSize is the default buffer size for subscription channels.
// This affects how many events can be buffered for each subscription before
// blocking. Larger buffers can improve performance but use more memory.
// Must be at least 1.
DefaultEventBufferSize int `json:"defaultEventBufferSize" yaml:"defaultEventBufferSize" validate:"min=1" env:"DEFAULT_EVENT_BUFFER_SIZE"`
// WorkerCount is the number of worker goroutines for async event processing.
// These workers process events from asynchronous subscriptions. More workers
// can increase throughput but also increase resource usage.
// Must be at least 1.
WorkerCount int `json:"workerCount" yaml:"workerCount" validate:"min=1" env:"WORKER_COUNT"`
// EventTTL is the time to live for events in seconds.
// Events older than this value may be automatically removed from queues
// or marked as expired. Used for event cleanup and storage management.
// Must be at least 1.
EventTTL int `json:"eventTTL" yaml:"eventTTL" validate:"min=1" env:"EVENT_TTL"`
// RetentionDays is how many days to retain event history.
// This affects event storage and cleanup policies. Longer retention
// allows for event replay and debugging but requires more storage.
// Must be at least 1.
RetentionDays int `json:"retentionDays" yaml:"retentionDays" validate:"min=1" env:"RETENTION_DAYS"`
// ExternalBrokerURL is the connection URL for external message brokers.
// Used when the engine is set to "redis" or "kafka". The format depends
// on the specific broker type.
// Examples:
// Redis: "redis://localhost:6379" or "redis://user:pass@host:port/db"
// Kafka: "kafka://localhost:9092" or "kafka://broker1:9092,broker2:9092"
ExternalBrokerURL string `json:"externalBrokerURL" yaml:"externalBrokerURL" env:"EXTERNAL_BROKER_URL"`
// ExternalBrokerUser is the username for external broker authentication.
// Used when the external broker requires authentication.
// Leave empty if the broker doesn't require authentication.
ExternalBrokerUser string `json:"externalBrokerUser" yaml:"externalBrokerUser" env:"EXTERNAL_BROKER_USER"`
// ExternalBrokerPassword is the password for external broker authentication.
// Used when the external broker requires authentication.
// Leave empty if the broker doesn't require authentication.
// This should be kept secure and may be provided via environment variables.
ExternalBrokerPassword string `json:"externalBrokerPassword" yaml:"externalBrokerPassword" env:"EXTERNAL_BROKER_PASSWORD"`
}
EventBusConfig defines the configuration for the event bus module. This structure contains all the settings needed to configure event processing, worker pools, event retention, and external broker connections.
Configuration can be provided through JSON, YAML, or environment variables. The struct tags define the mapping for each configuration source and validation rules.
Example YAML configuration:
engine: "memory" maxEventQueueSize: 2000 defaultEventBufferSize: 20 workerCount: 10 eventTTL: 7200 retentionDays: 14 externalBrokerURL: "redis://localhost:6379" externalBrokerUser: "eventbus_user" externalBrokerPassword: "secure_password"
Example environment variables:
EVENTBUS_ENGINE=memory EVENTBUS_MAX_EVENT_QUEUE_SIZE=1000 EVENTBUS_WORKER_COUNT=5
type EventBusModule ¶
type EventBusModule struct {
// contains filtered or unexported fields
}
EventBusModule provides event-driven messaging capabilities for the modular framework. It implements a publish-subscribe pattern with support for multiple event bus engines, asynchronous processing, and flexible subscription management.
The module implements the following interfaces:
- modular.Module: Basic module lifecycle
- modular.Configurable: Configuration management
- modular.ServiceAware: Service dependency management
- modular.Startable: Startup logic
- modular.Stoppable: Shutdown logic
- EventBus: Event publishing and subscription interface
Event processing is thread-safe and supports concurrent publishers and subscribers.
func (*EventBusModule) Constructor ¶
func (m *EventBusModule) Constructor() modular.ModuleConstructor
Constructor provides a dependency injection constructor for the module. This method is used by the dependency injection system to create the module instance with any required services.
func (*EventBusModule) Dependencies ¶
func (m *EventBusModule) Dependencies() []string
Dependencies returns the names of modules this module depends on. The eventbus module operates independently and has no dependencies.
func (*EventBusModule) Init ¶
func (m *EventBusModule) Init(app modular.Application) error
Init initializes the eventbus module with the application context. This method is called after all modules have been registered and their configurations loaded. It sets up the event bus engine based on configuration.
The initialization process:
- Retrieves the module's configuration
- Sets up logging
- Initializes the appropriate event bus engine
- Prepares the event bus for startup
Supported engines:
- "memory": In-process event bus using Go channels
- fallback: defaults to memory engine for unknown engines
func (*EventBusModule) Name ¶
func (m *EventBusModule) Name() string
Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.
func (*EventBusModule) ProvidesServices ¶
func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider
ProvidesServices declares services provided by this module. The eventbus module provides an event bus service that can be injected into other modules for event-driven communication.
Provided services:
- "eventbus.provider": The main event bus service interface
func (*EventBusModule) Publish ¶
func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error
Publish publishes an event to the event bus. Creates an Event struct with the provided topic and payload, then sends it through the event bus for processing by subscribers.
The event will be delivered to all active subscribers of the topic. Topic patterns and wildcards may be supported depending on the engine.
Example:
err := eventBus.Publish(ctx, "user.created", userData) err := eventBus.Publish(ctx, "order.payment.failed", paymentData)
func (*EventBusModule) RegisterConfig ¶
func (m *EventBusModule) RegisterConfig(app modular.Application) error
RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the eventbus module.
Default configuration:
- Engine: "memory"
- MaxEventQueueSize: 1000 events per topic
- DefaultEventBufferSize: 10 events per subscription channel
- WorkerCount: 5 async processing workers
- EventTTL: 3600 seconds (1 hour)
- RetentionDays: 7 days for event history
- ExternalBroker settings: empty (not used for memory engine)
func (*EventBusModule) RequiresServices ¶
func (m *EventBusModule) RequiresServices() []modular.ServiceDependency
RequiresServices declares services required by this module. The eventbus module operates independently and requires no external services.
func (*EventBusModule) Start ¶
func (m *EventBusModule) Start(ctx context.Context) error
Start performs startup logic for the module. This method starts the event bus engine and begins processing events. It's called after all modules have been initialized and are ready to start.
The startup process:
- Checks if already started (idempotent)
- Starts the underlying event bus engine
- Initializes worker pools for async processing
- Prepares topic management and subscription tracking
This method is thread-safe and can be called multiple times safely.
func (*EventBusModule) Stop ¶
func (m *EventBusModule) Stop(ctx context.Context) error
Stop performs shutdown logic for the module. This method gracefully shuts down the event bus, ensuring all in-flight events are processed and all subscriptions are properly cleaned up.
The shutdown process:
- Checks if already stopped (idempotent)
- Stops accepting new events
- Waits for in-flight events to complete
- Cancels all active subscriptions
- Shuts down worker pools
- Closes the underlying event bus engine
This method is thread-safe and can be called multiple times safely.
func (*EventBusModule) Subscribe ¶
func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe subscribes to a topic on the event bus with synchronous processing. The provided handler will be called immediately when an event is published to the specified topic. The handler blocks the event delivery until it completes.
Use synchronous subscriptions for:
- Lightweight event processing
- When event ordering is important
- Critical event handlers that must complete before continuing
Example:
subscription, err := eventBus.Subscribe(ctx, "user.login", func(ctx context.Context, event Event) error {
user := event.Payload.(UserData)
return updateLastLoginTime(user.ID)
})
func (*EventBusModule) SubscribeAsync ¶
func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync subscribes to a topic with asynchronous event processing. The provided handler will be queued for processing by worker goroutines, allowing the event publisher to continue without waiting for processing.
Use asynchronous subscriptions for:
- Heavy processing operations
- External API calls
- Non-critical event handlers
- When you want to avoid blocking publishers
Example:
subscription, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
imageData := event.Payload.(ImageData)
return generateThumbnails(imageData)
})
func (*EventBusModule) SubscriberCount ¶
func (m *EventBusModule) SubscriberCount(topic string) int
SubscriberCount returns the number of active subscribers for a topic. This includes both synchronous and asynchronous subscriptions. Returns 0 if the topic has no subscribers.
Example:
count := eventBus.SubscriberCount("user.created")
if count == 0 {
log.Warn("No subscribers for user creation events")
}
func (*EventBusModule) Topics ¶
func (m *EventBusModule) Topics() []string
Topics returns a list of all active topics that have subscribers. This can be useful for debugging, monitoring, or building administrative interfaces that show current event bus activity.
Example:
activeTopics := eventBus.Topics()
for _, topic := range activeTopics {
count := eventBus.SubscriberCount(topic)
fmt.Printf("Topic: %s, Subscribers: %d\n", topic, count)
}
func (*EventBusModule) Unsubscribe ¶
func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe cancels a subscription and stops receiving events. The subscription will be removed from the event bus and no longer receive events for its topic.
This method is idempotent - calling it multiple times on the same subscription is safe and will not cause errors.
Example:
err := eventBus.Unsubscribe(ctx, subscription)
type EventHandler ¶
EventHandler is a function that handles an event. Event handlers are called when an event matching their subscription topic is published. Handlers should be idempotent when possible and handle errors gracefully.
The context can be used for cancellation, timeouts, and passing request-scoped values. Handlers should respect context cancellation and return promptly when the context is cancelled.
Example handler:
func userCreatedHandler(ctx context.Context, event Event) error {
user := event.Payload.(UserData)
return sendWelcomeEmail(ctx, user.Email)
}
type MemoryEventBus ¶
type MemoryEventBus struct {
// contains filtered or unexported fields
}
MemoryEventBus implements EventBus using in-memory channels
func NewMemoryEventBus ¶
func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus
NewMemoryEventBus creates a new in-memory event bus
func (*MemoryEventBus) Publish ¶
func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error
Publish sends an event to the specified topic
func (*MemoryEventBus) Start ¶
func (m *MemoryEventBus) Start(ctx context.Context) error
Start initializes the event bus
func (*MemoryEventBus) Stop ¶
func (m *MemoryEventBus) Stop(ctx context.Context) error
Stop shuts down the event bus
func (*MemoryEventBus) Subscribe ¶
func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
Subscribe registers a handler for a topic
func (*MemoryEventBus) SubscribeAsync ¶
func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)
SubscribeAsync registers a handler for a topic with asynchronous processing
func (*MemoryEventBus) SubscriberCount ¶
func (m *MemoryEventBus) SubscriberCount(topic string) int
SubscriberCount returns the number of subscribers for a topic
func (*MemoryEventBus) Topics ¶
func (m *MemoryEventBus) Topics() []string
Topics returns a list of all active topics
func (*MemoryEventBus) Unsubscribe ¶
func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error
Unsubscribe removes a subscription
type Subscription ¶
type Subscription interface {
// Topic returns the topic being subscribed to.
// This may include wildcard patterns depending on the engine implementation.
Topic() string
// ID returns the unique identifier for this subscription.
// Each subscription gets a unique ID that can be used for tracking,
// logging, and debugging purposes.
ID() string
// IsAsync returns true if this is an asynchronous subscription.
// Asynchronous subscriptions process events in background workers,
// while synchronous subscriptions process events immediately.
IsAsync() bool
// Cancel cancels the subscription.
// After calling Cancel, the subscription will no longer receive events.
// This is equivalent to calling Unsubscribe on the event bus.
// The method is idempotent and safe to call multiple times.
Cancel() error
}
Subscription represents a subscription to a topic. Subscriptions are created when a handler is registered for a topic and provide methods for managing the subscription lifecycle.