Documentation
¶
Overview ¶
nolint
Index ¶
- Constants
- Variables
- func EventType(schema, entity, operation string) string
- func Example()
- func ExampleConfiguration()
- func ExampleErrorHandling()
- func ExampleSubscriptionPatterns()
- func ExampleWithHooks()
- func Initialize(cfg config.EventBrokerConfig) error
- func IsInitialized() bool
- func Publish(ctx context.Context, event *Event) error
- func PublishAsync(ctx context.Context, event *Event) error
- func PublishSync(ctx context.Context, event *Event) error
- func RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry, config *CRUDHookConfig) error
- func RegisterShutdown(broker Broker)
- func SetDefaultBroker(broker Broker)
- func Unsubscribe(id SubscriptionID) error
- func WithBufferSize(size int) func(*Options)
- func WithInstanceID(id string) func(*Options)
- func WithMode(m ProcessingMode) func(*Options)
- func WithProvider(p Provider) func(*Options)
- func WithRetryPolicy(policy *RetryPolicy) func(*Options)
- func WithWorkerCount(count int) func(*Options)
- type Broker
- type BrokerError
- type BrokerStats
- type CRUDHookConfig
- type Event
- type EventBroker
- func (b *EventBroker) InstanceID() string
- func (b *EventBroker) Publish(ctx context.Context, event *Event) error
- func (b *EventBroker) PublishAsync(ctx context.Context, event *Event) error
- func (b *EventBroker) PublishSync(ctx context.Context, event *Event) error
- func (b *EventBroker) Start(ctx context.Context) error
- func (b *EventBroker) Stats(ctx context.Context) (*BrokerStats, error)
- func (b *EventBroker) Stop(ctx context.Context) error
- func (b *EventBroker) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
- func (b *EventBroker) Unsubscribe(id SubscriptionID) error
- type EventFilter
- type EventHandler
- type EventHandlerFunc
- type EventSource
- type EventStatus
- type MemoryProvider
- func (mp *MemoryProvider) Close() error
- func (mp *MemoryProvider) Delete(ctx context.Context, id string) error
- func (mp *MemoryProvider) Get(ctx context.Context, id string) (*Event, error)
- func (mp *MemoryProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error)
- func (mp *MemoryProvider) Publish(ctx context.Context, event *Event) error
- func (mp *MemoryProvider) Stats(ctx context.Context) (*ProviderStats, error)
- func (mp *MemoryProvider) Store(ctx context.Context, event *Event) error
- func (mp *MemoryProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error)
- func (mp *MemoryProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error
- type MemoryProviderOptions
- type MemoryProviderStats
- type Options
- type ProcessingMode
- type Provider
- type ProviderStats
- type RetryPolicy
- type SubscriptionID
Constants ¶
const ExampleYAMLConfiguration = `` /* 773-byte string literal not displayed */
ExampleYAMLConfiguration shows example YAML configuration
Variables ¶
var ( ErrWorkerPoolStopped = &BrokerError{Code: "worker_pool_stopped", Message: "worker pool is stopped"} ErrQueueFull = &BrokerError{Code: "queue_full", Message: "event queue is full"} )
Error definitions
Functions ¶
func EventType ¶
EventType generates a type string from schema, entity, and operation Pattern: schema.entity.operation (e.g., "public.users.create")
func ExampleConfiguration ¶
func ExampleConfiguration()
ExampleConfiguration demonstrates initializing from configuration
func ExampleErrorHandling ¶
func ExampleErrorHandling()
ExampleErrorHandling demonstrates error handling in event handlers
func ExampleSubscriptionPatterns ¶
func ExampleSubscriptionPatterns()
ExampleSubscriptionPatterns demonstrates different subscription patterns
func ExampleWithHooks ¶
func ExampleWithHooks()
ExampleWithHooks demonstrates integration with the hook system
func Initialize ¶
func Initialize(cfg config.EventBrokerConfig) error
Initialize initializes the global event broker from configuration
func IsInitialized ¶
func IsInitialized() bool
IsInitialized returns true if the default broker is initialized
func PublishAsync ¶
PublishAsync publishes an event asynchronously using the default broker
func PublishSync ¶
PublishSync publishes an event synchronously using the default broker
func RegisterCRUDHooks ¶
func RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry, config *CRUDHookConfig) error
RegisterCRUDHooks registers event hooks for CRUD operations This integrates with the restheadspec.HookRegistry to automatically capture database events
func RegisterShutdown ¶
func RegisterShutdown(broker Broker)
RegisterShutdown registers the broker's shutdown with the server shutdown callbacks
func SetDefaultBroker ¶
func SetDefaultBroker(broker Broker)
SetDefaultBroker sets the default global broker
func Unsubscribe ¶
func Unsubscribe(id SubscriptionID) error
Unsubscribe unsubscribes from events using the default broker
func WithBufferSize ¶
func WithInstanceID ¶
func WithMode ¶
func WithMode(m ProcessingMode) func(*Options)
func WithRetryPolicy ¶
func WithRetryPolicy(policy *RetryPolicy) func(*Options)
func WithWorkerCount ¶
Types ¶
type Broker ¶
type Broker interface {
// Publish publishes an event (mode-dependent: sync or async)
Publish(ctx context.Context, event *Event) error
// PublishSync publishes an event synchronously (blocks until all handlers complete)
PublishSync(ctx context.Context, event *Event) error
// PublishAsync publishes an event asynchronously (returns immediately)
PublishAsync(ctx context.Context, event *Event) error
// Subscribe registers a handler for events matching the pattern
Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
// Unsubscribe removes a subscription
Unsubscribe(id SubscriptionID) error
// Start starts the broker (begins processing events)
Start(ctx context.Context) error
// Stop stops the broker gracefully (flushes pending events)
Stop(ctx context.Context) error
// Stats returns broker statistics
Stats(ctx context.Context) (*BrokerStats, error)
// InstanceID returns the instance ID of this broker
InstanceID() string
}
Broker is the main interface for event publishing and subscription
func GetDefaultBroker ¶
func GetDefaultBroker() Broker
GetDefaultBroker returns the default global broker
type BrokerError ¶
BrokerError represents an error from the event broker
func (*BrokerError) Error ¶
func (e *BrokerError) Error() string
type BrokerStats ¶
type BrokerStats struct {
InstanceID string `json:"instance_id"`
Mode ProcessingMode `json:"mode"`
IsRunning bool `json:"is_running"`
TotalPublished int64 `json:"total_published"`
TotalProcessed int64 `json:"total_processed"`
TotalFailed int64 `json:"total_failed"`
ActiveSubscribers int `json:"active_subscribers"`
QueueSize int `json:"queue_size,omitempty"` // For async mode
ActiveWorkers int `json:"active_workers,omitempty"` // For async mode
ProviderStats *ProviderStats `json:"provider_stats,omitempty"`
AdditionalStats map[string]interface{} `json:"additional_stats,omitempty"`
}
BrokerStats contains broker statistics
type CRUDHookConfig ¶
type CRUDHookConfig struct {
EnableCreate bool
EnableRead bool
EnableUpdate bool
EnableDelete bool
}
CRUDHookConfig configures which CRUD operations should trigger events
func DefaultCRUDHookConfig ¶
func DefaultCRUDHookConfig() *CRUDHookConfig
DefaultCRUDHookConfig returns default configuration (all enabled)
type Event ¶
type Event struct {
// Identification
ID string `json:"id" db:"id"`
// Source & Classification
Source EventSource `json:"source" db:"source"`
Type string `json:"type" db:"type"` // Pattern: schema.entity.operation
// Status Tracking
Status EventStatus `json:"status" db:"status"`
RetryCount int `json:"retry_count" db:"retry_count"`
Error string `json:"error,omitempty" db:"error"`
// Payload
Payload json.RawMessage `json:"payload" db:"payload"`
// Context Information
UserID int `json:"user_id" db:"user_id"`
SessionID string `json:"session_id" db:"session_id"`
InstanceID string `json:"instance_id" db:"instance_id"`
// Database Context
Schema string `json:"schema" db:"schema"`
Entity string `json:"entity" db:"entity"`
Operation string `json:"operation" db:"operation"` // create, update, delete, read
// Timestamps
CreatedAt time.Time `json:"created_at" db:"created_at"`
ProcessedAt *time.Time `json:"processed_at,omitempty" db:"processed_at"`
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
// Extensibility
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
}
Event represents a single event in the system with complete metadata
func NewEvent ¶
func NewEvent(source EventSource, eventType string) *Event
NewEvent creates a new event with defaults
func (*Event) GetPayload ¶
GetPayload unmarshals the payload into the provided value
func (*Event) IncrementRetry ¶
func (e *Event) IncrementRetry()
IncrementRetry increments the retry counter
func (*Event) MarkCompleted ¶
func (e *Event) MarkCompleted()
MarkCompleted marks the event as successfully completed
func (*Event) MarkFailed ¶
MarkFailed marks the event as failed with an error message
func (*Event) MarkProcessing ¶
func (e *Event) MarkProcessing()
MarkProcessing marks the event as being processed
func (*Event) SetPayload ¶
SetPayload sets the event payload from any value by marshaling to JSON
type EventBroker ¶
type EventBroker struct {
// contains filtered or unexported fields
}
EventBroker implements the Broker interface
func NewBroker ¶
func NewBroker(opts Options) (*EventBroker, error)
NewBroker creates a new event broker with the given options
func (*EventBroker) InstanceID ¶
func (b *EventBroker) InstanceID() string
InstanceID returns the instance ID
func (*EventBroker) Publish ¶
func (b *EventBroker) Publish(ctx context.Context, event *Event) error
Publish publishes an event based on the broker's mode
func (*EventBroker) PublishAsync ¶
func (b *EventBroker) PublishAsync(ctx context.Context, event *Event) error
PublishAsync publishes an event asynchronously
func (*EventBroker) PublishSync ¶
func (b *EventBroker) PublishSync(ctx context.Context, event *Event) error
PublishSync publishes an event synchronously
func (*EventBroker) Start ¶
func (b *EventBroker) Start(ctx context.Context) error
Start starts the broker
func (*EventBroker) Stats ¶
func (b *EventBroker) Stats(ctx context.Context) (*BrokerStats, error)
Stats returns broker statistics
func (*EventBroker) Stop ¶
func (b *EventBroker) Stop(ctx context.Context) error
Stop stops the broker gracefully
func (*EventBroker) Subscribe ¶
func (b *EventBroker) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
Subscribe adds a subscription for events matching the pattern
func (*EventBroker) Unsubscribe ¶
func (b *EventBroker) Unsubscribe(id SubscriptionID) error
Unsubscribe removes a subscription
type EventFilter ¶
type EventFilter struct {
Source *EventSource
Status *EventStatus
UserID *int
Schema string
Entity string
Operation string
InstanceID string
StartTime *time.Time
EndTime *time.Time
Limit int
Offset int
}
EventFilter defines filter criteria for listing events
type EventHandler ¶
EventHandler processes an event
type EventHandlerFunc ¶
EventHandlerFunc is a function adapter for EventHandler This allows using regular functions as event handlers
type EventSource ¶
type EventSource string
EventSource represents where an event originated from
const ( EventSourceDatabase EventSource = "database" EventSourceWebSocket EventSource = "websocket" EventSourceFrontend EventSource = "frontend" EventSourceSystem EventSource = "system" EventSourceInternal EventSource = "internal" )
type EventStatus ¶
type EventStatus string
EventStatus represents the current state of an event
const ( EventStatusPending EventStatus = "pending" EventStatusProcessing EventStatus = "processing" EventStatusCompleted EventStatus = "completed" EventStatusFailed EventStatus = "failed" )
type MemoryProvider ¶
type MemoryProvider struct {
// contains filtered or unexported fields
}
MemoryProvider implements Provider interface using in-memory storage Features: - Thread-safe event storage with RW mutex - LRU eviction when max events reached - In-process pub/sub (not cross-instance) - Automatic cleanup of old completed events
func NewMemoryProvider ¶
func NewMemoryProvider(opts MemoryProviderOptions) *MemoryProvider
NewMemoryProvider creates a new in-memory event provider
func (*MemoryProvider) Close ¶
func (mp *MemoryProvider) Close() error
Close closes the provider and releases resources
func (*MemoryProvider) Delete ¶
func (mp *MemoryProvider) Delete(ctx context.Context, id string) error
Delete deletes an event by ID
func (*MemoryProvider) List ¶
func (mp *MemoryProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error)
List lists events with optional filters
func (*MemoryProvider) Publish ¶
func (mp *MemoryProvider) Publish(ctx context.Context, event *Event) error
Publish publishes an event to all subscribers
func (*MemoryProvider) Stats ¶
func (mp *MemoryProvider) Stats(ctx context.Context) (*ProviderStats, error)
Stats returns provider statistics
func (*MemoryProvider) Store ¶
func (mp *MemoryProvider) Store(ctx context.Context, event *Event) error
Store stores an event
func (*MemoryProvider) Stream ¶
Stream returns a channel of events for real-time consumption Note: This is in-process only, not cross-instance
func (*MemoryProvider) UpdateStatus ¶
func (mp *MemoryProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error
UpdateStatus updates the status of an event
type MemoryProviderOptions ¶
type MemoryProviderOptions struct {
InstanceID string
MaxEvents int
CleanupInterval time.Duration
MaxAge time.Duration
}
MemoryProviderOptions configures the memory provider
type MemoryProviderStats ¶
type MemoryProviderStats struct {
TotalEvents atomic.Int64
PendingEvents atomic.Int64
ProcessingEvents atomic.Int64
CompletedEvents atomic.Int64
FailedEvents atomic.Int64
EventsPublished atomic.Int64
EventsConsumed atomic.Int64
ActiveSubscribers atomic.Int32
Evictions atomic.Int64
}
MemoryProviderStats contains statistics for the memory provider
type Options ¶
type Options struct {
Provider Provider
Mode ProcessingMode
WorkerCount int // For async mode
BufferSize int // For async mode
RetryPolicy *RetryPolicy
InstanceID string
}
Options for creating a new broker
type ProcessingMode ¶
type ProcessingMode string
ProcessingMode determines how events are processed
const ( ProcessingModeSync ProcessingMode = "sync" ProcessingModeAsync ProcessingMode = "async" )
type Provider ¶
type Provider interface {
// Store stores an event
Store(ctx context.Context, event *Event) error
// Get retrieves an event by ID
Get(ctx context.Context, id string) (*Event, error)
// List lists events with optional filters
List(ctx context.Context, filter *EventFilter) ([]*Event, error)
// UpdateStatus updates the status of an event
UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error
// Delete deletes an event by ID
Delete(ctx context.Context, id string) error
// Stream returns a channel of events for real-time consumption
// Used for cross-instance pub/sub
// The channel is closed when the context is canceled or an error occurs
Stream(ctx context.Context, pattern string) (<-chan *Event, error)
// Publish publishes an event to all subscribers (for distributed providers)
// For in-memory provider, this is the same as Store
// For Redis/NATS/Database, this triggers cross-instance delivery
Publish(ctx context.Context, event *Event) error
// Close closes the provider and releases resources
Close() error
// Stats returns provider statistics
Stats(ctx context.Context) (*ProviderStats, error)
}
Provider defines the storage backend interface for events Implementations: MemoryProvider, RedisProvider, NATSProvider, DatabaseProvider
func NewProviderFromConfig ¶
func NewProviderFromConfig(cfg config.EventBrokerConfig) (Provider, error)
NewProviderFromConfig creates a provider based on configuration
type ProviderStats ¶
type ProviderStats struct {
ProviderType string `json:"provider_type"`
TotalEvents int64 `json:"total_events"`
PendingEvents int64 `json:"pending_events"`
ProcessingEvents int64 `json:"processing_events"`
CompletedEvents int64 `json:"completed_events"`
FailedEvents int64 `json:"failed_events"`
EventsPublished int64 `json:"events_published"`
EventsConsumed int64 `json:"events_consumed"`
ActiveSubscribers int `json:"active_subscribers"`
ProviderSpecific map[string]interface{} `json:"provider_specific,omitempty"`
}
ProviderStats contains statistics about the provider
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
}
RetryPolicy defines how failed events should be retried
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy
type SubscriptionID ¶
type SubscriptionID string
SubscriptionID uniquely identifies a subscription
func Subscribe ¶
func Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
Subscribe subscribes to events using the default broker