eventbroker

package
v0.0.119 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 15 Imported by: 0

README

Event Broker System

A comprehensive event handler/broker system for ResolveSpec that provides real-time event publishing, subscription, and cross-instance communication.

Features

  • Multiple Sources: Events from database, websockets, frontend, system, and internal sources
  • Event Status Tracking: Pending, processing, completed, failed states with timestamps
  • Rich Metadata: User IDs, session IDs, instance IDs, JSON payloads, and custom metadata
  • Sync & Async Modes: Choose between synchronous or asynchronous event processing
  • Pattern Matching: Subscribe to events using glob-style patterns
  • Multiple Providers: In-memory, Redis Streams, NATS JetStream, PostgreSQL with NOTIFY
  • Hook Integration: Automatic CRUD event capture via restheadspec hooks
  • Retry Logic: Configurable retry policy with exponential backoff
  • Metrics: Prometheus-compatible metrics for monitoring
  • Graceful Shutdown: Proper cleanup and event flushing on shutdown

Quick Start

1. Configuration

Add to your config.yaml:

event_broker:
  enabled: true
  provider: memory  # memory, redis, nats, database
  mode: async       # sync, async
  worker_count: 10
  buffer_size: 1000
  instance_id: "${HOSTNAME}"
2. Initialize
import (
	"github.com/bitechdev/ResolveSpec/pkg/config"
	"github.com/bitechdev/ResolveSpec/pkg/eventbroker"
)

func main() {
	// Load configuration
	cfgMgr := config.NewManager()
	cfg, _ := cfgMgr.GetConfig()

	// Initialize event broker
	if err := eventbroker.Initialize(cfg.EventBroker); err != nil {
		log.Fatal(err)
	}
}
3. Subscribe to Events
// Subscribe to specific events
eventbroker.Subscribe("public.users.create", eventbroker.EventHandlerFunc(
	func(ctx context.Context, event *eventbroker.Event) error {
		log.Printf("New user created: %s", event.Payload)
		// Send welcome email, update cache, etc.
		return nil
	},
))

// Subscribe with patterns
eventbroker.Subscribe("*.*.delete", eventbroker.EventHandlerFunc(
	func(ctx context.Context, event *eventbroker.Event) error {
		log.Printf("Deleted: %s.%s", event.Schema, event.Entity)
		return nil
	},
))
4. Publish Events
// Create and publish an event
event := eventbroker.NewEvent(eventbroker.EventSourceDatabase, "public.users.update")
event.InstanceID = eventbroker.GetDefaultBroker().InstanceID()
event.UserID = 123
event.SessionID = "session-456"
event.Schema = "public"
event.Entity = "users"
event.Operation = "update"

event.SetPayload(map[string]interface{}{
	"id": 123,
	"name": "John Doe",
})

// Async (non-blocking)
eventbroker.PublishAsync(ctx, event)

// Sync (blocking)
eventbroker.PublishSync(ctx, event)

Automatic CRUD Event Capture

Automatically capture database CRUD operations:

import (
	"github.com/bitechdev/ResolveSpec/pkg/eventbroker"
	"github.com/bitechdev/ResolveSpec/pkg/restheadspec"
)

func setupHooks(handler *restheadspec.Handler) {
	broker := eventbroker.GetDefaultBroker()

	// Configure which operations to capture
	config := eventbroker.DefaultCRUDHookConfig()
	config.EnableRead = false // Disable read events for performance

	// Register hooks
	eventbroker.RegisterCRUDHooks(broker, handler.Hooks(), config)

	// Now all create/update/delete operations automatically publish events!
}

Event Structure

Every event contains:

type Event struct {
	ID          string                 // UUID
	Source      EventSource            // database, websocket, system, frontend, internal
	Type        string                 // Pattern: schema.entity.operation
	Status      EventStatus            // pending, processing, completed, failed
	Payload     json.RawMessage        // JSON payload
	UserID      int                    // User who triggered the event
	SessionID   string                 // Session identifier
	InstanceID  string                 // Server instance identifier
	Schema      string                 // Database schema
	Entity      string                 // Database entity/table
	Operation   string                 // create, update, delete, read
	CreatedAt   time.Time              // When event was created
	ProcessedAt *time.Time             // When processing started
	CompletedAt *time.Time             // When processing completed
	Error       string                 // Error message if failed
	Metadata    map[string]interface{} // Additional context
	RetryCount  int                    // Number of retry attempts
}

Pattern Matching

Subscribe to events using glob-style patterns:

Pattern Matches Example
* All events Any event
public.users.* All user operations public.users.create, public.users.update
*.*.create All create operations public.users.create, auth.sessions.create
public.*.* All events in public schema public.users.create, public.posts.delete
public.users.create Exact match Only public.users.create

Providers

Memory Provider (Default)

Best for: Development, single-instance deployments

  • Pros: Fast, no dependencies, simple
  • Cons: Events lost on restart, single-instance only
event_broker:
  provider: memory
Redis Provider (Future)

Best for: Production, multi-instance deployments

  • Pros: Persistent, cross-instance pub/sub, reliable
  • Cons: Requires Redis
event_broker:
  provider: redis
  redis:
    stream_name: "resolvespec:events"
    consumer_group: "resolvespec-workers"
    host: "localhost"
    port: 6379
NATS Provider (Future)

Best for: High-performance, low-latency requirements

  • Pros: Very fast, built-in clustering, durable
  • Cons: Requires NATS server
event_broker:
  provider: nats
  nats:
    url: "nats://localhost:4222"
    stream_name: "RESOLVESPEC_EVENTS"
Database Provider (Future)

Best for: Audit trails, event replay, SQL queries

  • Pros: No additional infrastructure, full SQL query support, PostgreSQL NOTIFY for real-time
  • Cons: Slower than Redis/NATS
event_broker:
  provider: database
  database:
    table_name: "events"
    channel: "resolvespec_events"

Processing Modes

Events are queued and processed by worker pool:

  • Non-blocking event publishing
  • Configurable worker count
  • Better throughput
  • Events may be processed out of order
event_broker:
  mode: async
  worker_count: 10
  buffer_size: 1000
Sync Mode

Events are processed immediately:

  • Blocking event publishing
  • Guaranteed ordering
  • Immediate error feedback
  • Lower throughput
event_broker:
  mode: sync

Retry Policy

Configure automatic retries for failed handlers:

event_broker:
  retry_policy:
    max_retries: 3
    initial_delay: 1s
    max_delay: 30s
    backoff_factor: 2.0  # Exponential backoff

Metrics

The event broker exposes Prometheus metrics:

  • eventbroker_events_published_total{source, type} - Total events published
  • eventbroker_events_processed_total{source, type, status} - Total events processed
  • eventbroker_event_processing_duration_seconds{source, type} - Event processing duration
  • eventbroker_queue_size - Current queue size (async mode)

Best Practices

  1. Use Async Mode: For better performance, use async mode in production
  2. Disable Read Events: Read events can be high volume; disable if not needed
  3. Pattern Matching: Use specific patterns to avoid processing unnecessary events
  4. Error Handling: Always handle errors in event handlers; they won't fail the original operation
  5. Idempotency: Make handlers idempotent as events may be retried
  6. Payload Size: Keep payloads reasonable; avoid large objects
  7. Monitoring: Monitor metrics to detect issues early

Examples

See example_usage.go for comprehensive examples including:

  • Basic event publishing and subscription
  • Hook integration
  • Error handling
  • Configuration
  • Pattern matching

Architecture

┌─────────────────┐
│   Application   │
└────────┬────────┘
         │
         ├─ Publish Events
         │
┌────────▼────────┐      ┌──────────────┐
│  Event Broker   │◄────►│  Subscribers │
└────────┬────────┘      └──────────────┘
         │
         ├─ Store Events
         │
┌────────▼────────┐
│    Provider     │
│  (Memory/Redis  │
│   /NATS/DB)     │
└─────────────────┘

Future Enhancements

  • Database Provider with PostgreSQL NOTIFY
  • Redis Streams Provider
  • NATS JetStream Provider
  • Event replay functionality
  • Dead letter queue
  • Event filtering at provider level
  • Batch publishing
  • Event compression
  • Schema versioning

Documentation

Overview

nolint

Index

Constants

View Source
const ExampleYAMLConfiguration = `` /* 773-byte string literal not displayed */

ExampleYAMLConfiguration shows example YAML configuration

Variables

View Source
var (
	ErrWorkerPoolStopped = &BrokerError{Code: "worker_pool_stopped", Message: "worker pool is stopped"}
	ErrQueueFull         = &BrokerError{Code: "queue_full", Message: "event queue is full"}
)

Error definitions

Functions

func EventType

func EventType(schema, entity, operation string) string

EventType generates a type string from schema, entity, and operation Pattern: schema.entity.operation (e.g., "public.users.create")

func Example

func Example()

Example demonstrates basic usage of the event broker

func ExampleConfiguration

func ExampleConfiguration()

ExampleConfiguration demonstrates initializing from configuration

func ExampleErrorHandling

func ExampleErrorHandling()

ExampleErrorHandling demonstrates error handling in event handlers

func ExampleSubscriptionPatterns

func ExampleSubscriptionPatterns()

ExampleSubscriptionPatterns demonstrates different subscription patterns

func ExampleWithHooks

func ExampleWithHooks()

ExampleWithHooks demonstrates integration with the hook system

func Initialize

func Initialize(cfg config.EventBrokerConfig) error

Initialize initializes the global event broker from configuration

func IsInitialized

func IsInitialized() bool

IsInitialized returns true if the default broker is initialized

func Publish

func Publish(ctx context.Context, event *Event) error

Publish publishes an event using the default broker

func PublishAsync

func PublishAsync(ctx context.Context, event *Event) error

PublishAsync publishes an event asynchronously using the default broker

func PublishSync

func PublishSync(ctx context.Context, event *Event) error

PublishSync publishes an event synchronously using the default broker

func RegisterCRUDHooks

func RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry, config *CRUDHookConfig) error

RegisterCRUDHooks registers event hooks for CRUD operations This integrates with the restheadspec.HookRegistry to automatically capture database events

func RegisterShutdown

func RegisterShutdown(broker Broker)

RegisterShutdown registers the broker's shutdown with the server shutdown callbacks

func SetDefaultBroker

func SetDefaultBroker(broker Broker)

SetDefaultBroker sets the default global broker

func Unsubscribe

func Unsubscribe(id SubscriptionID) error

Unsubscribe unsubscribes from events using the default broker

func WithBufferSize

func WithBufferSize(size int) func(*Options)

func WithInstanceID

func WithInstanceID(id string) func(*Options)

func WithMode

func WithMode(m ProcessingMode) func(*Options)

func WithProvider

func WithProvider(p Provider) func(*Options)

Functional option pattern helpers

func WithRetryPolicy

func WithRetryPolicy(policy *RetryPolicy) func(*Options)

func WithWorkerCount

func WithWorkerCount(count int) func(*Options)

Types

type Broker

type Broker interface {
	// Publish publishes an event (mode-dependent: sync or async)
	Publish(ctx context.Context, event *Event) error

	// PublishSync publishes an event synchronously (blocks until all handlers complete)
	PublishSync(ctx context.Context, event *Event) error

	// PublishAsync publishes an event asynchronously (returns immediately)
	PublishAsync(ctx context.Context, event *Event) error

	// Subscribe registers a handler for events matching the pattern
	Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)

	// Unsubscribe removes a subscription
	Unsubscribe(id SubscriptionID) error

	// Start starts the broker (begins processing events)
	Start(ctx context.Context) error

	// Stop stops the broker gracefully (flushes pending events)
	Stop(ctx context.Context) error

	// Stats returns broker statistics
	Stats(ctx context.Context) (*BrokerStats, error)

	// InstanceID returns the instance ID of this broker
	InstanceID() string
}

Broker is the main interface for event publishing and subscription

func GetDefaultBroker

func GetDefaultBroker() Broker

GetDefaultBroker returns the default global broker

type BrokerError

type BrokerError struct {
	Code    string
	Message string
}

BrokerError represents an error from the event broker

func (*BrokerError) Error

func (e *BrokerError) Error() string

type BrokerStats

type BrokerStats struct {
	InstanceID        string                 `json:"instance_id"`
	Mode              ProcessingMode         `json:"mode"`
	IsRunning         bool                   `json:"is_running"`
	TotalPublished    int64                  `json:"total_published"`
	TotalProcessed    int64                  `json:"total_processed"`
	TotalFailed       int64                  `json:"total_failed"`
	ActiveSubscribers int                    `json:"active_subscribers"`
	QueueSize         int                    `json:"queue_size,omitempty"`     // For async mode
	ActiveWorkers     int                    `json:"active_workers,omitempty"` // For async mode
	ProviderStats     *ProviderStats         `json:"provider_stats,omitempty"`
	AdditionalStats   map[string]interface{} `json:"additional_stats,omitempty"`
}

BrokerStats contains broker statistics

func Stats

func Stats(ctx context.Context) (*BrokerStats, error)

Stats returns statistics from the default broker

type CRUDHookConfig

type CRUDHookConfig struct {
	EnableCreate bool
	EnableRead   bool
	EnableUpdate bool
	EnableDelete bool
}

CRUDHookConfig configures which CRUD operations should trigger events

func DefaultCRUDHookConfig

func DefaultCRUDHookConfig() *CRUDHookConfig

DefaultCRUDHookConfig returns default configuration (all enabled)

type Event

type Event struct {
	// Identification
	ID string `json:"id" db:"id"`

	// Source & Classification
	Source EventSource `json:"source" db:"source"`
	Type   string      `json:"type" db:"type"` // Pattern: schema.entity.operation

	// Status Tracking
	Status     EventStatus `json:"status" db:"status"`
	RetryCount int         `json:"retry_count" db:"retry_count"`
	Error      string      `json:"error,omitempty" db:"error"`

	// Payload
	Payload json.RawMessage `json:"payload" db:"payload"`

	// Context Information
	UserID     int    `json:"user_id" db:"user_id"`
	SessionID  string `json:"session_id" db:"session_id"`
	InstanceID string `json:"instance_id" db:"instance_id"`

	// Database Context
	Schema    string `json:"schema" db:"schema"`
	Entity    string `json:"entity" db:"entity"`
	Operation string `json:"operation" db:"operation"` // create, update, delete, read

	// Timestamps
	CreatedAt   time.Time  `json:"created_at" db:"created_at"`
	ProcessedAt *time.Time `json:"processed_at,omitempty" db:"processed_at"`
	CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`

	// Extensibility
	Metadata map[string]interface{} `json:"metadata" db:"metadata"`
}

Event represents a single event in the system with complete metadata

func NewEvent

func NewEvent(source EventSource, eventType string) *Event

NewEvent creates a new event with defaults

func (*Event) Clone

func (e *Event) Clone() *Event

Clone creates a deep copy of the event

func (*Event) GetPayload

func (e *Event) GetPayload(v interface{}) error

GetPayload unmarshals the payload into the provided value

func (*Event) IncrementRetry

func (e *Event) IncrementRetry()

IncrementRetry increments the retry counter

func (*Event) MarkCompleted

func (e *Event) MarkCompleted()

MarkCompleted marks the event as successfully completed

func (*Event) MarkFailed

func (e *Event) MarkFailed(err error)

MarkFailed marks the event as failed with an error message

func (*Event) MarkProcessing

func (e *Event) MarkProcessing()

MarkProcessing marks the event as being processed

func (*Event) SetPayload

func (e *Event) SetPayload(v interface{}) error

SetPayload sets the event payload from any value by marshaling to JSON

func (*Event) Validate

func (e *Event) Validate() error

Validate performs basic validation on the event

type EventBroker

type EventBroker struct {
	// contains filtered or unexported fields
}

EventBroker implements the Broker interface

func NewBroker

func NewBroker(opts Options) (*EventBroker, error)

NewBroker creates a new event broker with the given options

func (*EventBroker) InstanceID

func (b *EventBroker) InstanceID() string

InstanceID returns the instance ID

func (*EventBroker) Publish

func (b *EventBroker) Publish(ctx context.Context, event *Event) error

Publish publishes an event based on the broker's mode

func (*EventBroker) PublishAsync

func (b *EventBroker) PublishAsync(ctx context.Context, event *Event) error

PublishAsync publishes an event asynchronously

func (*EventBroker) PublishSync

func (b *EventBroker) PublishSync(ctx context.Context, event *Event) error

PublishSync publishes an event synchronously

func (*EventBroker) Start

func (b *EventBroker) Start(ctx context.Context) error

Start starts the broker

func (*EventBroker) Stats

func (b *EventBroker) Stats(ctx context.Context) (*BrokerStats, error)

Stats returns broker statistics

func (*EventBroker) Stop

func (b *EventBroker) Stop(ctx context.Context) error

Stop stops the broker gracefully

func (*EventBroker) Subscribe

func (b *EventBroker) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)

Subscribe adds a subscription for events matching the pattern

func (*EventBroker) Unsubscribe

func (b *EventBroker) Unsubscribe(id SubscriptionID) error

Unsubscribe removes a subscription

type EventFilter

type EventFilter struct {
	Source     *EventSource
	Status     *EventStatus
	UserID     *int
	Schema     string
	Entity     string
	Operation  string
	InstanceID string
	StartTime  *time.Time
	EndTime    *time.Time
	Limit      int
	Offset     int
}

EventFilter defines filter criteria for listing events

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, event *Event) error
}

EventHandler processes an event

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, event *Event) error

EventHandlerFunc is a function adapter for EventHandler This allows using regular functions as event handlers

func (EventHandlerFunc) Handle

func (f EventHandlerFunc) Handle(ctx context.Context, event *Event) error

Handle implements EventHandler

type EventSource

type EventSource string

EventSource represents where an event originated from

const (
	EventSourceDatabase  EventSource = "database"
	EventSourceWebSocket EventSource = "websocket"
	EventSourceFrontend  EventSource = "frontend"
	EventSourceSystem    EventSource = "system"
	EventSourceInternal  EventSource = "internal"
)

type EventStatus

type EventStatus string

EventStatus represents the current state of an event

const (
	EventStatusPending    EventStatus = "pending"
	EventStatusProcessing EventStatus = "processing"
	EventStatusCompleted  EventStatus = "completed"
	EventStatusFailed     EventStatus = "failed"
)

type MemoryProvider

type MemoryProvider struct {
	// contains filtered or unexported fields
}

MemoryProvider implements Provider interface using in-memory storage Features: - Thread-safe event storage with RW mutex - LRU eviction when max events reached - In-process pub/sub (not cross-instance) - Automatic cleanup of old completed events

func NewMemoryProvider

func NewMemoryProvider(opts MemoryProviderOptions) *MemoryProvider

NewMemoryProvider creates a new in-memory event provider

func (*MemoryProvider) Close

func (mp *MemoryProvider) Close() error

Close closes the provider and releases resources

func (*MemoryProvider) Delete

func (mp *MemoryProvider) Delete(ctx context.Context, id string) error

Delete deletes an event by ID

func (*MemoryProvider) Get

func (mp *MemoryProvider) Get(ctx context.Context, id string) (*Event, error)

Get retrieves an event by ID

func (*MemoryProvider) List

func (mp *MemoryProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error)

List lists events with optional filters

func (*MemoryProvider) Publish

func (mp *MemoryProvider) Publish(ctx context.Context, event *Event) error

Publish publishes an event to all subscribers

func (*MemoryProvider) Stats

func (mp *MemoryProvider) Stats(ctx context.Context) (*ProviderStats, error)

Stats returns provider statistics

func (*MemoryProvider) Store

func (mp *MemoryProvider) Store(ctx context.Context, event *Event) error

Store stores an event

func (*MemoryProvider) Stream

func (mp *MemoryProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error)

Stream returns a channel of events for real-time consumption Note: This is in-process only, not cross-instance

func (*MemoryProvider) UpdateStatus

func (mp *MemoryProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error

UpdateStatus updates the status of an event

type MemoryProviderOptions

type MemoryProviderOptions struct {
	InstanceID      string
	MaxEvents       int
	CleanupInterval time.Duration
	MaxAge          time.Duration
}

MemoryProviderOptions configures the memory provider

type MemoryProviderStats

type MemoryProviderStats struct {
	TotalEvents       atomic.Int64
	PendingEvents     atomic.Int64
	ProcessingEvents  atomic.Int64
	CompletedEvents   atomic.Int64
	FailedEvents      atomic.Int64
	EventsPublished   atomic.Int64
	EventsConsumed    atomic.Int64
	ActiveSubscribers atomic.Int32
	Evictions         atomic.Int64
}

MemoryProviderStats contains statistics for the memory provider

type Options

type Options struct {
	Provider    Provider
	Mode        ProcessingMode
	WorkerCount int // For async mode
	BufferSize  int // For async mode
	RetryPolicy *RetryPolicy
	InstanceID  string
}

Options for creating a new broker

type ProcessingMode

type ProcessingMode string

ProcessingMode determines how events are processed

const (
	ProcessingModeSync  ProcessingMode = "sync"
	ProcessingModeAsync ProcessingMode = "async"
)

type Provider

type Provider interface {
	// Store stores an event
	Store(ctx context.Context, event *Event) error

	// Get retrieves an event by ID
	Get(ctx context.Context, id string) (*Event, error)

	// List lists events with optional filters
	List(ctx context.Context, filter *EventFilter) ([]*Event, error)

	// UpdateStatus updates the status of an event
	UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error

	// Delete deletes an event by ID
	Delete(ctx context.Context, id string) error

	// Stream returns a channel of events for real-time consumption
	// Used for cross-instance pub/sub
	// The channel is closed when the context is canceled or an error occurs
	Stream(ctx context.Context, pattern string) (<-chan *Event, error)

	// Publish publishes an event to all subscribers (for distributed providers)
	// For in-memory provider, this is the same as Store
	// For Redis/NATS/Database, this triggers cross-instance delivery
	Publish(ctx context.Context, event *Event) error

	// Close closes the provider and releases resources
	Close() error

	// Stats returns provider statistics
	Stats(ctx context.Context) (*ProviderStats, error)
}

Provider defines the storage backend interface for events Implementations: MemoryProvider, RedisProvider, NATSProvider, DatabaseProvider

func NewProviderFromConfig

func NewProviderFromConfig(cfg config.EventBrokerConfig) (Provider, error)

NewProviderFromConfig creates a provider based on configuration

type ProviderStats

type ProviderStats struct {
	ProviderType      string                 `json:"provider_type"`
	TotalEvents       int64                  `json:"total_events"`
	PendingEvents     int64                  `json:"pending_events"`
	ProcessingEvents  int64                  `json:"processing_events"`
	CompletedEvents   int64                  `json:"completed_events"`
	FailedEvents      int64                  `json:"failed_events"`
	EventsPublished   int64                  `json:"events_published"`
	EventsConsumed    int64                  `json:"events_consumed"`
	ActiveSubscribers int                    `json:"active_subscribers"`
	ProviderSpecific  map[string]interface{} `json:"provider_specific,omitempty"`
}

ProviderStats contains statistics about the provider

type RetryPolicy

type RetryPolicy struct {
	MaxRetries    int
	InitialDelay  time.Duration
	MaxDelay      time.Duration
	BackoffFactor float64
}

RetryPolicy defines how failed events should be retried

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy

type SubscriptionID

type SubscriptionID string

SubscriptionID uniquely identifies a subscription

func Subscribe

func Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)

Subscribe subscribes to events using the default broker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL