eventbus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2025 License: MIT Imports: 7 Imported by: 0

README

EventBus Module

Go Reference

The EventBus Module provides a publish-subscribe messaging system for Modular applications. It enables decoupled communication between components through a flexible event-driven architecture.

Features

  • In-memory event publishing and subscription
  • Support for both synchronous and asynchronous event handling
  • Topic-based routing
  • Event history tracking
  • Configurable worker pool for asynchronous event processing
  • Extensible design with support for external message brokers

Installation

import (
    "github.com/CrisisTextLine/modular"
    "github.com/CrisisTextLine/modular/modules/eventbus"
)

// Register the eventbus module with your Modular application
app.RegisterModule(eventbus.NewModule())

Configuration

The eventbus module can be configured using the following options:

eventbus:
  engine: memory              # Event bus engine (memory, redis, kafka)
  maxEventQueueSize: 1000     # Maximum events to queue per topic
  defaultEventBufferSize: 10  # Default buffer size for subscription channels
  workerCount: 5              # Worker goroutines for async event processing
  eventTTL: 3600              # TTL for events in seconds (1 hour)
  retentionDays: 7            # Days to retain event history
  externalBrokerURL: ""       # URL for external message broker (if used)
  externalBrokerUser: ""      # Username for external message broker (if used)
  externalBrokerPassword: ""  # Password for external message broker (if used)

Usage

Accessing the EventBus Service
// In your module's Init function
func (m *MyModule) Init(app modular.Application) error {
    var eventBusService *eventbus.EventBusModule
    err := app.GetService("eventbus.provider", &eventBusService)
    if err != nil {
        return fmt.Errorf("failed to get event bus service: %w", err)
    }
    
    // Now you can use the event bus service
    m.eventBus = eventBusService
    return nil
}
Using Interface-Based Service Matching
// Define the service dependency
func (m *MyModule) RequiresServices() []modular.ServiceDependency {
    return []modular.ServiceDependency{
        {
            Name:               "eventbus",
            Required:           true,
            MatchByInterface:   true,
            SatisfiesInterface: reflect.TypeOf((*eventbus.EventBus)(nil)).Elem(),
        },
    }
}

// Access the service in your constructor
func (m *MyModule) Constructor() modular.ModuleConstructor {
    return func(app modular.Application, services map[string]any) (modular.Module, error) {
        eventBusService := services["eventbus"].(eventbus.EventBus)
        return &MyModule{eventBus: eventBusService}, nil
    }
}
Publishing Events
// Publish a simple event
err := eventBusService.Publish(ctx, "user.created", user)
if err != nil {
    // Handle error
}

// Publish an event with metadata
metadata := map[string]interface{}{
    "source": "user-service",
    "version": "1.0",
}

event := eventbus.Event{
    Topic:    "user.created",
    Payload:  user,
    Metadata: metadata,
}

err = eventBusService.Publish(ctx, event)
if err != nil {
    // Handle error
}
Subscribing to Events
// Synchronous subscription
subscription, err := eventBusService.Subscribe(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    user := event.Payload.(User)
    fmt.Printf("User created: %s\n", user.Name)
    return nil
})

if err != nil {
    // Handle error
}

// Asynchronous subscription (handler runs in a worker goroutine)
asyncSub, err := eventBusService.SubscribeAsync(ctx, "user.created", func(ctx context.Context, event eventbus.Event) error {
    // This function is executed asynchronously
    user := event.Payload.(User)
    time.Sleep(1 * time.Second) // Simulating work
    fmt.Printf("Processed user asynchronously: %s\n", user.Name)
    return nil
})

// Unsubscribe when done
defer eventBusService.Unsubscribe(ctx, subscription)
defer eventBusService.Unsubscribe(ctx, asyncSub)
Working with Topics
// List all active topics
topics := eventBusService.Topics()
fmt.Println("Active topics:", topics)

// Get subscriber count for a topic
count := eventBusService.SubscriberCount("user.created")
fmt.Printf("Subscribers for 'user.created': %d\n", count)

Event Handling Best Practices

  1. Keep Handlers Lightweight: Event handlers should be quick and efficient, especially for synchronous subscriptions

  2. Error Handling: Always handle errors in your event handlers, especially for async handlers

  3. Topic Organization: Use hierarchical topics like "domain.event.action" for better organization

  4. Type Safety: Consider defining type-safe wrappers around the event bus for specific event types

  5. Context Usage: Use the provided context to implement cancellation and timeouts

Implementation Notes

  • The in-memory event bus uses channels to distribute events to subscribers
  • Asynchronous handlers are executed in a worker pool to limit concurrency
  • Event history is retained based on the configured retention period
  • The module is extensible to support external message brokers in the future

Testing

The eventbus module includes tests for module initialization, configuration, and lifecycle management.

Documentation

Overview

Package eventbus provides a flexible event-driven messaging system for the modular framework.

This module enables decoupled communication between application components through an event bus pattern. It supports both synchronous and asynchronous event processing, multiple event bus engines, and configurable event handling strategies.

Features

The eventbus module offers the following capabilities:

  • Topic-based event publishing and subscription
  • Synchronous and asynchronous event processing
  • Multiple engine support (memory, Redis, Kafka)
  • Configurable worker pools for async processing
  • Event metadata and lifecycle tracking
  • Subscription management with unique identifiers
  • Event TTL and retention policies

Configuration

The module can be configured through the EventBusConfig structure:

config := &EventBusConfig{
    Engine:                 "memory",    // or "redis", "kafka"
    MaxEventQueueSize:      1000,        // events per topic queue
    DefaultEventBufferSize: 10,          // subscription channel buffer
    WorkerCount:            5,           // async processing workers
    EventTTL:               3600,        // event time-to-live in seconds
    RetentionDays:          7,           // event history retention
    ExternalBrokerURL:      "",          // for external brokers
    ExternalBrokerUser:     "",          // broker authentication
    ExternalBrokerPassword: "",          // broker password
}

Service Registration

The module registers itself as a service for dependency injection:

// Get the event bus service
eventBus := app.GetService("eventbus.provider").(*EventBusModule)

// Publish an event
err := eventBus.Publish(ctx, "user.created", userData)

// Subscribe to events
subscription, err := eventBus.Subscribe(ctx, "user.*", userEventHandler)

Usage Examples

Basic event publishing:

// Publish a simple event
err := eventBus.Publish(ctx, "order.placed", orderData)

// Publish with custom metadata
event := Event{
    Topic:   "payment.processed",
    Payload: paymentData,
    Metadata: map[string]interface{}{
        "source": "payment-service",
        "version": "1.2.0",
    },
}
err := eventBus.Publish(ctx, event.Topic, event.Payload)

Event subscription patterns:

// Synchronous subscription
subscription, err := eventBus.Subscribe(ctx, "user.updated", func(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return updateUserCache(user)
})

// Asynchronous subscription for heavy processing
asyncSub, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
    imageData := event.Payload.(ImageData)
    return processImageThumbnails(imageData)
})

// Wildcard subscriptions
allOrdersSub, err := eventBus.Subscribe(ctx, "order.*", orderEventHandler)

Subscription management:

// Check subscription details
fmt.Printf("Subscribed to: %s (ID: %s, Async: %v)",
    subscription.Topic(), subscription.ID(), subscription.IsAsync())

// Cancel specific subscriptions
err := eventBus.Unsubscribe(ctx, subscription)

// Or cancel through the subscription itself
err := subscription.Cancel()

Event Processing Patterns

The module supports different event processing patterns:

**Synchronous Processing**: Events are processed immediately in the same goroutine that published them. Best for lightweight operations and when ordering is important.

**Asynchronous Processing**: Events are queued and processed by worker goroutines. Best for heavy operations, external API calls, or when you don't want to block the publisher.

Engine Support

Currently supported engines:

  • **memory**: In-process event bus using Go channels
  • **redis**: Distributed event bus using Redis pub/sub (planned)
  • **kafka**: Enterprise event bus using Apache Kafka (planned)

Index

Constants

View Source
const ModuleName = "eventbus"

ModuleName is the unique identifier for the eventbus module.

View Source
const ServiceName = "eventbus.provider"

ServiceName is the name of the service provided by this module. Other modules can use this name to request the event bus service through dependency injection.

Variables

This section is empty.

Functions

func NewModule

func NewModule() modular.Module

NewModule creates a new instance of the event bus module. This is the primary constructor for the eventbus module and should be used when registering the module with the application.

Example:

app.RegisterModule(eventbus.NewModule())

Types

type Event

type Event struct {
	// Topic is the channel or subject of the event.
	// Topics are used for routing events to the appropriate subscribers.
	// Topic names can use hierarchical patterns like "user.created" or "order.payment.failed".
	Topic string `json:"topic"`

	// Payload is the data associated with the event.
	// This can be any serializable data structure that represents
	// the event's information. The payload type should be consistent
	// for events within the same topic.
	Payload interface{} `json:"payload"`

	// Metadata contains additional information about the event.
	// This can include source information, correlation IDs, version numbers,
	// or any other contextual data that doesn't belong in the main payload.
	// Optional field that can be nil if no metadata is needed.
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// CreatedAt is when the event was created.
	// This timestamp is set automatically when the event is published
	// and can be used for event ordering, TTL calculations, and debugging.
	CreatedAt time.Time `json:"createdAt"`

	// ProcessingStarted is when the event processing started.
	// This field is set when an event handler begins processing the event.
	// Used for performance monitoring and timeout detection.
	ProcessingStarted *time.Time `json:"processingStarted,omitempty"`

	// ProcessingCompleted is when the event processing completed.
	// This field is set when an event handler finishes processing the event,
	// whether successfully or with an error. Used for performance monitoring
	// and event lifecycle tracking.
	ProcessingCompleted *time.Time `json:"processingCompleted,omitempty"`
}

Event represents a message in the event bus. Events are the core data structure used for communication between publishers and subscribers. They contain the message data along with metadata for tracking and processing.

type EventBus

type EventBus interface {
	// Start initializes the event bus.
	// This method is called during module startup and should prepare
	// the event bus for publishing and subscribing operations.
	// For memory buses, this might initialize internal data structures.
	// For network-based buses, this establishes connections.
	Start(ctx context.Context) error

	// Stop shuts down the event bus.
	// This method is called during module shutdown and should cleanup
	// all resources, close connections, and stop background processes.
	// It should ensure all in-flight events are processed before returning.
	Stop(ctx context.Context) error

	// Publish sends an event to the specified topic.
	// The event will be delivered to all active subscribers of the topic.
	// The method should handle event queuing, topic routing, and delivery
	// according to the engine's semantics.
	Publish(ctx context.Context, event Event) error

	// Subscribe registers a handler for a topic with synchronous processing.
	// Events matching the topic will be delivered immediately to the handler
	// in the same goroutine that published them. The publisher will wait
	// for the handler to complete before continuing.
	Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// SubscribeAsync registers a handler for a topic with asynchronous processing.
	// Events matching the topic will be queued for processing by worker goroutines.
	// The publisher can continue immediately without waiting for processing.
	// This is preferred for heavy operations or non-critical event handling.
	SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

	// Unsubscribe removes a subscription.
	// After unsubscribing, the subscription will no longer receive events.
	// This method should be idempotent and not return errors for
	// subscriptions that are already cancelled.
	Unsubscribe(ctx context.Context, subscription Subscription) error

	// Topics returns a list of all active topics.
	// This includes only topics that currently have at least one subscriber.
	// Useful for monitoring, debugging, and administrative interfaces.
	Topics() []string

	// SubscriberCount returns the number of subscribers for a topic.
	// This includes both synchronous and asynchronous subscriptions.
	// Returns 0 if the topic has no subscribers or doesn't exist.
	SubscriberCount(topic string) int
}

EventBus defines the interface for an event bus implementation. This interface abstracts the underlying messaging mechanism, allowing the eventbus module to support multiple backends (memory, Redis, Kafka) through a common API.

All operations are context-aware to support cancellation and timeouts. Implementations should be thread-safe and handle concurrent access properly.

type EventBusConfig

type EventBusConfig struct {
	// Engine specifies the event bus engine to use.
	// Supported values: "memory", "redis", "kafka"
	// Default: "memory"
	Engine string `json:"engine" yaml:"engine" validate:"oneof=memory redis kafka" env:"ENGINE"`

	// MaxEventQueueSize is the maximum number of events to queue per topic.
	// When this limit is reached, new events may be dropped or publishers
	// may be blocked, depending on the engine implementation.
	// Must be at least 1.
	MaxEventQueueSize int `json:"maxEventQueueSize" yaml:"maxEventQueueSize" validate:"min=1" env:"MAX_EVENT_QUEUE_SIZE"`

	// DefaultEventBufferSize is the default buffer size for subscription channels.
	// This affects how many events can be buffered for each subscription before
	// blocking. Larger buffers can improve performance but use more memory.
	// Must be at least 1.
	DefaultEventBufferSize int `json:"defaultEventBufferSize" yaml:"defaultEventBufferSize" validate:"min=1" env:"DEFAULT_EVENT_BUFFER_SIZE"`

	// WorkerCount is the number of worker goroutines for async event processing.
	// These workers process events from asynchronous subscriptions. More workers
	// can increase throughput but also increase resource usage.
	// Must be at least 1.
	WorkerCount int `json:"workerCount" yaml:"workerCount" validate:"min=1" env:"WORKER_COUNT"`

	// EventTTL is the time to live for events in seconds.
	// Events older than this value may be automatically removed from queues
	// or marked as expired. Used for event cleanup and storage management.
	// Must be at least 1.
	EventTTL int `json:"eventTTL" yaml:"eventTTL" validate:"min=1" env:"EVENT_TTL"`

	// RetentionDays is how many days to retain event history.
	// This affects event storage and cleanup policies. Longer retention
	// allows for event replay and debugging but requires more storage.
	// Must be at least 1.
	RetentionDays int `json:"retentionDays" yaml:"retentionDays" validate:"min=1" env:"RETENTION_DAYS"`

	// ExternalBrokerURL is the connection URL for external message brokers.
	// Used when the engine is set to "redis" or "kafka". The format depends
	// on the specific broker type.
	// Examples:
	//   Redis: "redis://localhost:6379" or "redis://user:pass@host:port/db"
	//   Kafka: "kafka://localhost:9092" or "kafka://broker1:9092,broker2:9092"
	ExternalBrokerURL string `json:"externalBrokerURL" yaml:"externalBrokerURL" env:"EXTERNAL_BROKER_URL"`

	// ExternalBrokerUser is the username for external broker authentication.
	// Used when the external broker requires authentication.
	// Leave empty if the broker doesn't require authentication.
	ExternalBrokerUser string `json:"externalBrokerUser" yaml:"externalBrokerUser" env:"EXTERNAL_BROKER_USER"`

	// ExternalBrokerPassword is the password for external broker authentication.
	// Used when the external broker requires authentication.
	// Leave empty if the broker doesn't require authentication.
	// This should be kept secure and may be provided via environment variables.
	ExternalBrokerPassword string `json:"externalBrokerPassword" yaml:"externalBrokerPassword" env:"EXTERNAL_BROKER_PASSWORD"`
}

EventBusConfig defines the configuration for the event bus module. This structure contains all the settings needed to configure event processing, worker pools, event retention, and external broker connections.

Configuration can be provided through JSON, YAML, or environment variables. The struct tags define the mapping for each configuration source and validation rules.

Example YAML configuration:

engine: "memory"
maxEventQueueSize: 2000
defaultEventBufferSize: 20
workerCount: 10
eventTTL: 7200
retentionDays: 14
externalBrokerURL: "redis://localhost:6379"
externalBrokerUser: "eventbus_user"
externalBrokerPassword: "secure_password"

Example environment variables:

EVENTBUS_ENGINE=memory
EVENTBUS_MAX_EVENT_QUEUE_SIZE=1000
EVENTBUS_WORKER_COUNT=5

type EventBusModule

type EventBusModule struct {
	// contains filtered or unexported fields
}

EventBusModule provides event-driven messaging capabilities for the modular framework. It implements a publish-subscribe pattern with support for multiple event bus engines, asynchronous processing, and flexible subscription management.

The module implements the following interfaces:

  • modular.Module: Basic module lifecycle
  • modular.Configurable: Configuration management
  • modular.ServiceAware: Service dependency management
  • modular.Startable: Startup logic
  • modular.Stoppable: Shutdown logic
  • EventBus: Event publishing and subscription interface

Event processing is thread-safe and supports concurrent publishers and subscribers.

func (*EventBusModule) Constructor

func (m *EventBusModule) Constructor() modular.ModuleConstructor

Constructor provides a dependency injection constructor for the module. This method is used by the dependency injection system to create the module instance with any required services.

func (*EventBusModule) Dependencies

func (m *EventBusModule) Dependencies() []string

Dependencies returns the names of modules this module depends on. The eventbus module operates independently and has no dependencies.

func (*EventBusModule) Init

func (m *EventBusModule) Init(app modular.Application) error

Init initializes the eventbus module with the application context. This method is called after all modules have been registered and their configurations loaded. It sets up the event bus engine based on configuration.

The initialization process:

  1. Retrieves the module's configuration
  2. Sets up logging
  3. Initializes the appropriate event bus engine
  4. Prepares the event bus for startup

Supported engines:

  • "memory": In-process event bus using Go channels
  • fallback: defaults to memory engine for unknown engines

func (*EventBusModule) Name

func (m *EventBusModule) Name() string

Name returns the unique identifier for this module. This name is used for service registration, dependency resolution, and configuration section identification.

func (*EventBusModule) ProvidesServices

func (m *EventBusModule) ProvidesServices() []modular.ServiceProvider

ProvidesServices declares services provided by this module. The eventbus module provides an event bus service that can be injected into other modules for event-driven communication.

Provided services:

  • "eventbus.provider": The main event bus service interface

func (*EventBusModule) Publish

func (m *EventBusModule) Publish(ctx context.Context, topic string, payload interface{}) error

Publish publishes an event to the event bus. Creates an Event struct with the provided topic and payload, then sends it through the event bus for processing by subscribers.

The event will be delivered to all active subscribers of the topic. Topic patterns and wildcards may be supported depending on the engine.

Example:

err := eventBus.Publish(ctx, "user.created", userData)
err := eventBus.Publish(ctx, "order.payment.failed", paymentData)

func (*EventBusModule) RegisterConfig

func (m *EventBusModule) RegisterConfig(app modular.Application) error

RegisterConfig registers the module's configuration structure. This method is called during application initialization to register the default configuration values for the eventbus module.

Default configuration:

  • Engine: "memory"
  • MaxEventQueueSize: 1000 events per topic
  • DefaultEventBufferSize: 10 events per subscription channel
  • WorkerCount: 5 async processing workers
  • EventTTL: 3600 seconds (1 hour)
  • RetentionDays: 7 days for event history
  • ExternalBroker settings: empty (not used for memory engine)

func (*EventBusModule) RequiresServices

func (m *EventBusModule) RequiresServices() []modular.ServiceDependency

RequiresServices declares services required by this module. The eventbus module operates independently and requires no external services.

func (*EventBusModule) Start

func (m *EventBusModule) Start(ctx context.Context) error

Start performs startup logic for the module. This method starts the event bus engine and begins processing events. It's called after all modules have been initialized and are ready to start.

The startup process:

  1. Checks if already started (idempotent)
  2. Starts the underlying event bus engine
  3. Initializes worker pools for async processing
  4. Prepares topic management and subscription tracking

This method is thread-safe and can be called multiple times safely.

func (*EventBusModule) Stop

func (m *EventBusModule) Stop(ctx context.Context) error

Stop performs shutdown logic for the module. This method gracefully shuts down the event bus, ensuring all in-flight events are processed and all subscriptions are properly cleaned up.

The shutdown process:

  1. Checks if already stopped (idempotent)
  2. Stops accepting new events
  3. Waits for in-flight events to complete
  4. Cancels all active subscriptions
  5. Shuts down worker pools
  6. Closes the underlying event bus engine

This method is thread-safe and can be called multiple times safely.

func (*EventBusModule) Subscribe

func (m *EventBusModule) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe subscribes to a topic on the event bus with synchronous processing. The provided handler will be called immediately when an event is published to the specified topic. The handler blocks the event delivery until it completes.

Use synchronous subscriptions for:

  • Lightweight event processing
  • When event ordering is important
  • Critical event handlers that must complete before continuing

Example:

subscription, err := eventBus.Subscribe(ctx, "user.login", func(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return updateLastLoginTime(user.ID)
})

func (*EventBusModule) SubscribeAsync

func (m *EventBusModule) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync subscribes to a topic with asynchronous event processing. The provided handler will be queued for processing by worker goroutines, allowing the event publisher to continue without waiting for processing.

Use asynchronous subscriptions for:

  • Heavy processing operations
  • External API calls
  • Non-critical event handlers
  • When you want to avoid blocking publishers

Example:

subscription, err := eventBus.SubscribeAsync(ctx, "image.uploaded", func(ctx context.Context, event Event) error {
    imageData := event.Payload.(ImageData)
    return generateThumbnails(imageData)
})

func (*EventBusModule) SubscriberCount

func (m *EventBusModule) SubscriberCount(topic string) int

SubscriberCount returns the number of active subscribers for a topic. This includes both synchronous and asynchronous subscriptions. Returns 0 if the topic has no subscribers.

Example:

count := eventBus.SubscriberCount("user.created")
if count == 0 {
    log.Warn("No subscribers for user creation events")
}

func (*EventBusModule) Topics

func (m *EventBusModule) Topics() []string

Topics returns a list of all active topics that have subscribers. This can be useful for debugging, monitoring, or building administrative interfaces that show current event bus activity.

Example:

activeTopics := eventBus.Topics()
for _, topic := range activeTopics {
    count := eventBus.SubscriberCount(topic)
    fmt.Printf("Topic: %s, Subscribers: %d\n", topic, count)
}

func (*EventBusModule) Unsubscribe

func (m *EventBusModule) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe cancels a subscription and stops receiving events. The subscription will be removed from the event bus and no longer receive events for its topic.

This method is idempotent - calling it multiple times on the same subscription is safe and will not cause errors.

Example:

err := eventBus.Unsubscribe(ctx, subscription)

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

EventHandler is a function that handles an event. Event handlers are called when an event matching their subscription topic is published. Handlers should be idempotent when possible and handle errors gracefully.

The context can be used for cancellation, timeouts, and passing request-scoped values. Handlers should respect context cancellation and return promptly when the context is cancelled.

Example handler:

func userCreatedHandler(ctx context.Context, event Event) error {
    user := event.Payload.(UserData)
    return sendWelcomeEmail(ctx, user.Email)
}

type MemoryEventBus

type MemoryEventBus struct {
	// contains filtered or unexported fields
}

MemoryEventBus implements EventBus using in-memory channels

func NewMemoryEventBus

func NewMemoryEventBus(config *EventBusConfig) *MemoryEventBus

NewMemoryEventBus creates a new in-memory event bus

func (*MemoryEventBus) Publish

func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error

Publish sends an event to the specified topic

func (*MemoryEventBus) Start

func (m *MemoryEventBus) Start(ctx context.Context) error

Start initializes the event bus

func (*MemoryEventBus) Stop

func (m *MemoryEventBus) Stop(ctx context.Context) error

Stop shuts down the event bus

func (*MemoryEventBus) Subscribe

func (m *MemoryEventBus) Subscribe(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

Subscribe registers a handler for a topic

func (*MemoryEventBus) SubscribeAsync

func (m *MemoryEventBus) SubscribeAsync(ctx context.Context, topic string, handler EventHandler) (Subscription, error)

SubscribeAsync registers a handler for a topic with asynchronous processing

func (*MemoryEventBus) SubscriberCount

func (m *MemoryEventBus) SubscriberCount(topic string) int

SubscriberCount returns the number of subscribers for a topic

func (*MemoryEventBus) Topics

func (m *MemoryEventBus) Topics() []string

Topics returns a list of all active topics

func (*MemoryEventBus) Unsubscribe

func (m *MemoryEventBus) Unsubscribe(ctx context.Context, subscription Subscription) error

Unsubscribe removes a subscription

type Subscription

type Subscription interface {
	// Topic returns the topic being subscribed to.
	// This may include wildcard patterns depending on the engine implementation.
	Topic() string

	// ID returns the unique identifier for this subscription.
	// Each subscription gets a unique ID that can be used for tracking,
	// logging, and debugging purposes.
	ID() string

	// IsAsync returns true if this is an asynchronous subscription.
	// Asynchronous subscriptions process events in background workers,
	// while synchronous subscriptions process events immediately.
	IsAsync() bool

	// Cancel cancels the subscription.
	// After calling Cancel, the subscription will no longer receive events.
	// This is equivalent to calling Unsubscribe on the event bus.
	// The method is idempotent and safe to call multiple times.
	Cancel() error
}

Subscription represents a subscription to a topic. Subscriptions are created when a handler is registered for a topic and provide methods for managing the subscription lifecycle.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL