events

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRateLimitExceeded = fmt.Errorf("event rate limit exceeded")

ErrRateLimitExceeded is returned when rate limit is exceeded

Functions

func Dispatch

func Dispatch(event interface{}) error

Dispatch fires an event using the global dispatcher

func DispatchAfter

func DispatchAfter(event interface{}, delay time.Duration) error

DispatchAfter fires an event after a delay using the global dispatcher

func DispatchAsync

func DispatchAsync(event interface{}) error

DispatchAsync fires an event asynchronously using the global dispatcher

func DispatchNow

func DispatchNow(event interface{}) error

DispatchNow fires an event synchronously using the global dispatcher

func EventJobFactory

func EventJobFactory(data []byte) (queue.Job, error)

EventJobFactory creates EventListenerJob instances for queue deserialization

func FireModelEvent

func FireModelEvent(event string, model interface{}) error

FireModelEvent fires a model event globally

func Flush

func Flush(event string)

Flush removes all listeners for an event using the global dispatcher

func Forget

func Forget(event string)

Forget removes specific listeners using the global dispatcher

func HasListeners

func HasListeners(event interface{}) bool

HasListeners checks if an event has listeners using the global dispatcher

func Initialize

func Initialize(dispatcher Dispatcher)

Initialize sets up the global event dispatcher

func InitializeQueueIntegration

func InitializeQueueIntegration()

Initialize queue integration by registering the event job type

func Listen

func Listen(events interface{}, listener Listener)

Listen registers a listener for one or more events using the global dispatcher

func MatchesPattern

func MatchesPattern(eventName, pattern string) bool

MatchesPattern checks if an event name matches a wildcard pattern

func ObserveGlobal

func ObserveGlobal(modelType string, observer ModelObserver)

ObserveGlobal registers an observer globally

func Reset

func Reset()

Reset resets the global dispatcher (useful for testing)

func Subscribe

func Subscribe(subscriber Subscriber)

Subscribe registers an event subscriber using the global dispatcher

func Until

func Until(event interface{}) (interface{}, error)

Until dispatches events until the first non-nil return using the global dispatcher

func ValidateSubscriber

func ValidateSubscriber(subscriber interface{}) []error

ValidateSubscriber validates that a subscriber has valid method signatures

Types

type AsyncDispatcher

type AsyncDispatcher struct {
}

AsyncDispatcher handles asynchronous event dispatching

func NewAsyncDispatcher

func NewAsyncDispatcher() *AsyncDispatcher

NewAsyncDispatcher creates a new async dispatcher

func (*AsyncDispatcher) Push

func (a *AsyncDispatcher) Push(event interface{}, listener Listener, delay time.Duration) error

Push processes an event asynchronously

type AsyncEventBus

type AsyncEventBus struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

AsyncEventBus combines sync and async dispatching

func NewAsyncEventBus

func NewAsyncEventBus() *AsyncEventBus

NewAsyncEventBus creates a new async event bus

func (*AsyncEventBus) ProcessQueuedEvent

func (b *AsyncEventBus) ProcessQueuedEvent(jobData string) error

ProcessQueuedEvent processes a queued event (called by queue workers)

func (*AsyncEventBus) RegisterQueuedListener

func (b *AsyncEventBus) RegisterQueuedListener(event string, listener QueuedListener, factory func() Listener)

RegisterQueuedListener registers a queued listener with its factory

type AutoObserver

type AutoObserver struct {
	BaseObserver
	// contains filtered or unexported fields
}

AutoObserver automatically calls model observers based on method names

func NewAutoObserver

func NewAutoObserver(instance interface{}) *AutoObserver

NewAutoObserver creates an observer that auto-maps methods to model events

func (*AutoObserver) Created

func (o *AutoObserver) Created(model interface{}) error

Created calls the Created method if it exists

func (*AutoObserver) Creating

func (o *AutoObserver) Creating(model interface{}) error

Creating calls the Creating method if it exists

func (*AutoObserver) Deleted

func (o *AutoObserver) Deleted(model interface{}) error

Deleted calls the Deleted method if it exists

func (*AutoObserver) Deleting

func (o *AutoObserver) Deleting(model interface{}) error

Deleting calls the Deleting method if it exists

func (*AutoObserver) Restored

func (o *AutoObserver) Restored(model interface{}) error

Restored calls the Restored method if it exists

func (*AutoObserver) Restoring

func (o *AutoObserver) Restoring(model interface{}) error

Restoring calls the Restoring method if it exists

func (*AutoObserver) Saved

func (o *AutoObserver) Saved(model interface{}) error

Saved calls the Saved method if it exists

func (*AutoObserver) Saving

func (o *AutoObserver) Saving(model interface{}) error

Saving calls the Saving method if it exists

func (*AutoObserver) Updated

func (o *AutoObserver) Updated(model interface{}) error

Updated calls the Updated method if it exists

func (*AutoObserver) Updating

func (o *AutoObserver) Updating(model interface{}) error

Updating calls the Updating method if it exists

type AutoSubscriber

type AutoSubscriber struct {
	// contains filtered or unexported fields
}

AutoSubscriber provides automatic event registration based on method names

func NewAutoSubscriber

func NewAutoSubscriber(instance interface{}, prefix string) *AutoSubscriber

NewAutoSubscriber creates a subscriber that auto-registers based on method names Methods should follow the pattern: HandleEventName(event interface{}) error

func (*AutoSubscriber) Subscribe

func (s *AutoSubscriber) Subscribe(dispatcher Dispatcher)

Subscribe auto-registers methods as event listeners

type BaseEvent

type BaseEvent struct {
	EventName string
}

BaseEvent provides a base implementation of Event

func (*BaseEvent) Name

func (e *BaseEvent) Name() string

Name returns the event name

type BaseListener

type BaseListener struct{}

BaseListener provides a base implementation of Listener

func (*BaseListener) Handle

func (l *BaseListener) Handle(event interface{}) error

Handle processes the event (override in implementations)

func (*BaseListener) ShouldQueue

func (l *BaseListener) ShouldQueue() bool

ShouldQueue returns whether the listener should be queued

type BaseObserver

type BaseObserver struct{}

BaseObserver provides a default implementation of ModelObserver Embed this in your observer to only override the methods you need

func (*BaseObserver) Created

func (o *BaseObserver) Created(model interface{}) error

func (*BaseObserver) Creating

func (o *BaseObserver) Creating(model interface{}) error

func (*BaseObserver) Deleted

func (o *BaseObserver) Deleted(model interface{}) error

func (*BaseObserver) Deleting

func (o *BaseObserver) Deleting(model interface{}) error

func (*BaseObserver) Restored

func (o *BaseObserver) Restored(model interface{}) error

func (*BaseObserver) Restoring

func (o *BaseObserver) Restoring(model interface{}) error

func (*BaseObserver) Saved

func (o *BaseObserver) Saved(model interface{}) error

func (*BaseObserver) Saving

func (o *BaseObserver) Saving(model interface{}) error

func (*BaseObserver) Updated

func (o *BaseObserver) Updated(model interface{}) error

func (*BaseObserver) Updating

func (o *BaseObserver) Updating(model interface{}) error

type BaseStoppableEvent

type BaseStoppableEvent struct {
	BaseEvent
	// contains filtered or unexported fields
}

BaseStoppableEvent provides a base implementation of StoppableEvent

func (*BaseStoppableEvent) ShouldStopPropagation

func (e *BaseStoppableEvent) ShouldStopPropagation() bool

ShouldStopPropagation returns whether propagation should stop

func (*BaseStoppableEvent) StopPropagation

func (e *BaseStoppableEvent) StopPropagation()

StopPropagation stops event propagation

type BatchingDispatcher

type BatchingDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

BatchingDispatcher batches events and dispatches them in groups

func NewBatchingDispatcher

func NewBatchingDispatcher(batchSize int, flushInterval time.Duration) *BatchingDispatcher

NewBatchingDispatcher creates a new batching dispatcher

func (*BatchingDispatcher) Dispatch

func (d *BatchingDispatcher) Dispatch(event interface{}) error

Dispatch adds an event to the batch

func (*BatchingDispatcher) Flush

func (d *BatchingDispatcher) Flush() error

Flush dispatches all batched events

func (*BatchingDispatcher) GetBatchSize

func (d *BatchingDispatcher) GetBatchSize() int

GetBatchSize returns the current batch size

func (*BatchingDispatcher) Stop

func (d *BatchingDispatcher) Stop()

Stop stops the batching dispatcher

type Broadcastable

type Broadcastable interface {
	Event

	// ShouldBroadcast determines if the event should be broadcast
	ShouldBroadcast() bool

	// BroadcastOn returns the channels to broadcast on
	BroadcastOn() []string

	// BroadcastAs returns the event name for broadcasting
	BroadcastAs() string

	// BroadcastWith returns the data to broadcast
	BroadcastWith() map[string]interface{}

	// BroadcastWhen returns conditions for broadcasting
	BroadcastWhen() bool
}

Broadcastable represents an event that should be broadcast

type CoalescingDispatcher

type CoalescingDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

CoalescingDispatcher coalesces rapid identical events into a single dispatch

func NewCoalescingDispatcher

func NewCoalescingDispatcher(coalesce time.Duration) *CoalescingDispatcher

NewCoalescingDispatcher creates a new coalescing dispatcher

func (*CoalescingDispatcher) Dispatch

func (d *CoalescingDispatcher) Dispatch(event interface{}) error

Dispatch coalesces events before dispatching

func (*CoalescingDispatcher) GetCoalescedCount

func (d *CoalescingDispatcher) GetCoalescedCount(eventName string) int

GetCoalescedCount returns how many times an event has been coalesced

func (*CoalescingDispatcher) Stop

func (d *CoalescingDispatcher) Stop()

Stop stops the coalescing dispatcher

type ConditionalObserver

type ConditionalObserver struct {
	BaseObserver
	// contains filtered or unexported fields
}

ConditionalObserver only fires events when conditions are met

func NewConditionalObserver

func NewConditionalObserver(observer ModelObserver, condition func(string, interface{}) bool) *ConditionalObserver

NewConditionalObserver creates an observer that only fires when condition is true

func (*ConditionalObserver) Created

func (o *ConditionalObserver) Created(model interface{}) error

Created conditionally calls the wrapped observer

func (*ConditionalObserver) Creating

func (o *ConditionalObserver) Creating(model interface{}) error

Creating conditionally calls the wrapped observer

func (*ConditionalObserver) Deleted

func (o *ConditionalObserver) Deleted(model interface{}) error

Deleted conditionally calls the wrapped observer

func (*ConditionalObserver) Deleting

func (o *ConditionalObserver) Deleting(model interface{}) error

Deleting conditionally calls the wrapped observer

func (*ConditionalObserver) Restored

func (o *ConditionalObserver) Restored(model interface{}) error

Restored conditionally calls the wrapped observer

func (*ConditionalObserver) Restoring

func (o *ConditionalObserver) Restoring(model interface{}) error

Restoring conditionally calls the wrapped observer

func (*ConditionalObserver) Saved

func (o *ConditionalObserver) Saved(model interface{}) error

Saved conditionally calls the wrapped observer

func (*ConditionalObserver) Saving

func (o *ConditionalObserver) Saving(model interface{}) error

Saving conditionally calls the wrapped observer

func (*ConditionalObserver) Updated

func (o *ConditionalObserver) Updated(model interface{}) error

Updated conditionally calls the wrapped observer

func (*ConditionalObserver) Updating

func (o *ConditionalObserver) Updating(model interface{}) error

Updating conditionally calls the wrapped observer

type DebouncingDispatcher

type DebouncingDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

DebouncingDispatcher debounces events to prevent rapid firing

func NewDebouncingDispatcher

func NewDebouncingDispatcher(debounce time.Duration) *DebouncingDispatcher

NewDebouncingDispatcher creates a new debouncing dispatcher

func (*DebouncingDispatcher) Dispatch

func (d *DebouncingDispatcher) Dispatch(event interface{}) error

Dispatch debounces event dispatching

func (*DebouncingDispatcher) DispatchNow

func (d *DebouncingDispatcher) DispatchNow(event interface{}) error

DispatchNow immediately dispatches an event, bypassing debounce

func (*DebouncingDispatcher) GetPendingCount

func (d *DebouncingDispatcher) GetPendingCount() int

GetPendingCount returns the number of pending debounced events

func (*DebouncingDispatcher) Stop

func (d *DebouncingDispatcher) Stop()

Stop stops all debounce timers

type DefaultDispatcher

type DefaultDispatcher struct {
	// contains filtered or unexported fields
}

DefaultDispatcher is the default event dispatcher implementation

func NewDispatcher

func NewDispatcher() *DefaultDispatcher

NewDispatcher creates a new event dispatcher

func (*DefaultDispatcher) Dispatch

func (d *DefaultDispatcher) Dispatch(event interface{}) error

Dispatch fires an event to all registered listeners

func (*DefaultDispatcher) DispatchAfter

func (d *DefaultDispatcher) DispatchAfter(event interface{}, delay time.Duration) error

DispatchAfter fires an event after a delay

func (*DefaultDispatcher) DispatchAsync

func (d *DefaultDispatcher) DispatchAsync(event interface{}) error

DispatchAsync fires an event asynchronously

func (*DefaultDispatcher) DispatchNow

func (d *DefaultDispatcher) DispatchNow(event interface{}) error

DispatchNow fires an event synchronously

func (*DefaultDispatcher) Flush

func (d *DefaultDispatcher) Flush(event string)

Flush removes all listeners for an event

func (*DefaultDispatcher) Forget

func (d *DefaultDispatcher) Forget(event string)

Forget removes all listeners

func (*DefaultDispatcher) GetListeners

func (d *DefaultDispatcher) GetListeners(event interface{}) []Listener

GetListeners returns all listeners for an event

func (*DefaultDispatcher) HasListeners

func (d *DefaultDispatcher) HasListeners(event interface{}) bool

HasListeners checks if an event has listeners

func (*DefaultDispatcher) Listen

func (d *DefaultDispatcher) Listen(events interface{}, listener Listener)

Listen registers a listener for one or more events

func (*DefaultDispatcher) SetQueueDispatcher

func (d *DefaultDispatcher) SetQueueDispatcher(qd QueueDispatcher)

SetQueueDispatcher sets the queue dispatcher for async events

func (*DefaultDispatcher) Subscribe

func (d *DefaultDispatcher) Subscribe(subscriber Subscriber)

Subscribe registers an event subscriber

func (*DefaultDispatcher) Until

func (d *DefaultDispatcher) Until(event interface{}) (interface{}, error)

Until dispatches events until the first non-nil return

type Dispatcher

type Dispatcher interface {
	// Listen registers a listener for one or more events
	Listen(events interface{}, listener Listener)

	// Subscribe registers an event subscriber
	Subscribe(subscriber Subscriber)

	// Dispatch fires an event to all registered listeners
	Dispatch(event interface{}) error

	// DispatchNow fires an event synchronously
	DispatchNow(event interface{}) error

	// DispatchAsync fires an event asynchronously
	DispatchAsync(event interface{}) error

	// DispatchAfter fires an event after a delay
	DispatchAfter(event interface{}, delay time.Duration) error

	// Until dispatches events until the first non-nil return
	Until(event interface{}) (interface{}, error)

	// Flush removes all listeners for an event
	Flush(event string)

	// Forget removes specific listeners
	Forget(event string)

	// HasListeners checks if an event has listeners
	HasListeners(event interface{}) bool

	// GetListeners returns all listeners for an event
	GetListeners(event interface{}) []Listener
}

Dispatcher manages event dispatching to listeners

func GetDispatcher

func GetDispatcher() Dispatcher

GetDispatcher returns the global event dispatcher

type Event

type Event interface {
	// Name returns the event name for identification
	Name() string
}

Event represents an event that can be dispatched

type EventJob

type EventJob struct {
	Event        interface{}       `json:"event"`
	ListenerType string            `json:"listener_type"`
	Timestamp    time.Time         `json:"timestamp"`
	Attempts     int               `json:"attempts"`
	Metadata     map[string]string `json:"metadata,omitempty"`
}

EventJob represents a queued event job

type EventListenerJob

type EventListenerJob struct {
	Event        interface{} `json:"event"`
	EventType    string      `json:"event_type"`
	ListenerType string      `json:"listener_type"`
	Attempts     int         `json:"attempts"`
	MaxRetries   int         `json:"max_retries"`
	// contains filtered or unexported fields
}

EventListenerJob implements the queue.Job interface for event listeners

func (*EventListenerJob) Failed

func (j *EventListenerJob) Failed(err error)

Failed handles job failure

func (*EventListenerJob) Handle

func (j *EventListenerJob) Handle() error

Handle processes the event listener job

type EventMap

type EventMap map[string]string // method name -> event name

EventMap allows explicit event mapping for subscribers

type EventMiddleware

type EventMiddleware interface {
	// Handle processes the event and calls the next middleware
	Handle(event interface{}, next func(interface{}) error) error
}

EventMiddleware processes events before they reach listeners

type EventProvider

type EventProvider interface {
	// Register registers events with the dispatcher
	Register(dispatcher Dispatcher)
}

EventProvider provides event registration

type EventRegistry

type EventRegistry struct {
	// contains filtered or unexported fields
}

EventRegistry provides event and listener registration

func NewEventRegistry

func NewEventRegistry() *EventRegistry

NewEventRegistry creates a new event registry

func (*EventRegistry) AddProvider

func (r *EventRegistry) AddProvider(provider EventProvider)

AddProvider adds an event provider

func (*EventRegistry) BootProviders

func (r *EventRegistry) BootProviders(dispatcher Dispatcher)

BootProviders boots all registered providers

func (*EventRegistry) Clear

func (r *EventRegistry) Clear()

Clear clears all registrations

func (*EventRegistry) Count

func (r *EventRegistry) Count() int

Count returns the total number of listener registrations

func (*EventRegistry) DiscoverFromType

func (r *EventRegistry) DiscoverFromType(subscriber interface{}) map[string]string

DiscoverFromType discovers event mappings from a subscriber type

func (*EventRegistry) GetAllEvents

func (r *EventRegistry) GetAllEvents() []string

GetAllEvents returns all registered event names

func (*EventRegistry) GetListeners

func (r *EventRegistry) GetListeners(eventName string) []string

GetListeners returns all listeners for an event

func (*EventRegistry) Register

func (r *EventRegistry) Register(eventName string, listenerName string)

Register registers a listener for an event

type EventWorker

type EventWorker struct {
	// contains filtered or unexported fields
}

EventWorker processes queued events

func NewEventWorker

func NewEventWorker(dispatcher Dispatcher) *EventWorker

NewEventWorker creates a new event worker

func (*EventWorker) Process

func (w *EventWorker) Process(jobData string) error

Process processes a queued event job

func (*EventWorker) RegisterListener

func (w *EventWorker) RegisterListener(listenerType string, factory func() Listener)

RegisterListener registers a listener factory for async processing

type FakeDispatcher

type FakeDispatcher struct {
	// contains filtered or unexported fields
}

FakeDispatcher is a fake event dispatcher for testing

func Fake

func Fake() *FakeDispatcher

Fake sets up fake event dispatching for testing

func NewFakeDispatcher

func NewFakeDispatcher() *FakeDispatcher

NewFakeDispatcher creates a new fake dispatcher

func (*FakeDispatcher) AssertDispatched

func (f *FakeDispatcher) AssertDispatched(eventType interface{}, callback func(interface{}) bool) error

AssertDispatched asserts that an event was dispatched

func (*FakeDispatcher) AssertDispatchedTimes

func (f *FakeDispatcher) AssertDispatchedTimes(eventType interface{}, times int) error

AssertDispatchedTimes asserts an event was dispatched n times

func (*FakeDispatcher) AssertNotDispatched

func (f *FakeDispatcher) AssertNotDispatched(eventType interface{}) error

AssertNotDispatched asserts that an event was not dispatched

func (*FakeDispatcher) AssertNothingDispatched

func (f *FakeDispatcher) AssertNothingDispatched() error

AssertNothingDispatched asserts that no events were dispatched

func (*FakeDispatcher) ClearEvents

func (f *FakeDispatcher) ClearEvents()

ClearEvents clears all recorded events

func (*FakeDispatcher) Dispatch

func (f *FakeDispatcher) Dispatch(event interface{}) error

Dispatch records the event without executing listeners

func (*FakeDispatcher) DispatchAfter

func (f *FakeDispatcher) DispatchAfter(event interface{}, delay time.Duration) error

DispatchAfter records the event with delay

func (*FakeDispatcher) DispatchAsync

func (f *FakeDispatcher) DispatchAsync(event interface{}) error

DispatchAsync records the event asynchronously

func (*FakeDispatcher) DispatchNow

func (f *FakeDispatcher) DispatchNow(event interface{}) error

DispatchNow records the event synchronously

func (*FakeDispatcher) Flush

func (f *FakeDispatcher) Flush(event string)

Flush removes all listeners for an event

func (*FakeDispatcher) Forget

func (f *FakeDispatcher) Forget(event string)

Forget removes specific listeners

func (*FakeDispatcher) GetDispatchedEvents

func (f *FakeDispatcher) GetDispatchedEvents() []interface{}

GetDispatchedEvents returns all dispatched events

func (*FakeDispatcher) GetListeners

func (f *FakeDispatcher) GetListeners(event interface{}) []Listener

GetListeners returns all listeners for an event

func (*FakeDispatcher) HasListeners

func (f *FakeDispatcher) HasListeners(event interface{}) bool

HasListeners checks if an event has listeners

func (*FakeDispatcher) Listen

func (f *FakeDispatcher) Listen(events interface{}, listener Listener)

Listen registers a listener (but won't execute in fake mode)

func (*FakeDispatcher) StartFaking

func (f *FakeDispatcher) StartFaking()

StartFaking starts faking events again

func (*FakeDispatcher) StopFaking

func (f *FakeDispatcher) StopFaking()

StopFaking stops faking and executes listeners normally

func (*FakeDispatcher) Subscribe

func (f *FakeDispatcher) Subscribe(subscriber Subscriber)

Subscribe registers an event subscriber

func (*FakeDispatcher) Until

func (f *FakeDispatcher) Until(event interface{}) (interface{}, error)

Until dispatches events until the first non-nil return

type FilterMiddleware

type FilterMiddleware struct {
	// contains filtered or unexported fields
}

FilterMiddleware filters events based on condition

func NewFilterMiddleware

func NewFilterMiddleware(condition func(interface{}) bool) *FilterMiddleware

NewFilterMiddleware creates a new filter middleware

func (*FilterMiddleware) Handle

func (m *FilterMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle filters the event

type Listener

type Listener interface {
	// Handle processes the event
	Handle(event interface{}) error

	// ShouldQueue determines if this listener should be queued
	ShouldQueue() bool
}

Listener handles events when they are dispatched

func GetListeners

func GetListeners(event interface{}) []Listener

GetListeners returns all listeners for an event using the global dispatcher

type LoggingMiddleware

type LoggingMiddleware struct {
	// contains filtered or unexported fields
}

LoggingMiddleware logs all events

func NewLoggingMiddleware

func NewLoggingMiddleware() *LoggingMiddleware

NewLoggingMiddleware creates a new logging middleware

func (*LoggingMiddleware) ClearLog

func (m *LoggingMiddleware) ClearLog()

ClearLog clears the event log

func (*LoggingMiddleware) GetLog

func (m *LoggingMiddleware) GetLog() []string

GetLog returns the event log

func (*LoggingMiddleware) Handle

func (m *LoggingMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle logs the event

type MappedSubscriber

type MappedSubscriber struct {
	// contains filtered or unexported fields
}

MappedSubscriber allows explicit mapping of methods to events

func NewMappedSubscriber

func NewMappedSubscriber(instance interface{}, mappings EventMap) *MappedSubscriber

NewMappedSubscriber creates a subscriber with explicit event mappings

func (*MappedSubscriber) Subscribe

func (s *MappedSubscriber) Subscribe(dispatcher Dispatcher)

Subscribe registers methods according to the mapping

type MiddlewareDispatcher

type MiddlewareDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

MiddlewareDispatcher wraps a dispatcher with middleware support

func NewMiddlewareDispatcher

func NewMiddlewareDispatcher() *MiddlewareDispatcher

NewMiddlewareDispatcher creates a new middleware-enabled dispatcher

func (*MiddlewareDispatcher) ClearMiddleware

func (d *MiddlewareDispatcher) ClearMiddleware()

ClearMiddleware removes all middleware

func (*MiddlewareDispatcher) Dispatch

func (d *MiddlewareDispatcher) Dispatch(event interface{}) error

Dispatch dispatches an event through middleware chain

func (*MiddlewareDispatcher) GetMiddleware

func (d *MiddlewareDispatcher) GetMiddleware() []EventMiddleware

GetMiddleware returns all registered middleware

func (*MiddlewareDispatcher) Use

func (d *MiddlewareDispatcher) Use(middleware EventMiddleware)

Use adds middleware to the dispatcher

func (*MiddlewareDispatcher) UseFunc

func (d *MiddlewareDispatcher) UseFunc(fn func(event interface{}, next func(interface{}) error) error)

UseFunc adds a middleware function

type MiddlewareFunc

type MiddlewareFunc func(event interface{}, next func(interface{}) error) error

MiddlewareFunc adapts a function to the EventMiddleware interface

func (MiddlewareFunc) Handle

func (f MiddlewareFunc) Handle(event interface{}, next func(interface{}) error) error

Handle implements EventMiddleware

type ModelEvent

type ModelEvent struct {
	BaseEvent
	Model     interface{}
	Action    string // creating, created, updating, etc.
	ModelType string
}

ModelEvent represents a model lifecycle event

type ModelObserver

type ModelObserver interface {
	// Creating is called before a model is created
	Creating(model interface{}) error
	// Created is called after a model is created
	Created(model interface{}) error
	// Updating is called before a model is updated
	Updating(model interface{}) error
	// Updated is called after a model is updated
	Updated(model interface{}) error
	// Saving is called before a model is saved (create or update)
	Saving(model interface{}) error
	// Saved is called after a model is saved (create or update)
	Saved(model interface{}) error
	// Deleting is called before a model is deleted
	Deleting(model interface{}) error
	// Deleted is called after a model is deleted
	Deleted(model interface{}) error
	// Restoring is called before a soft-deleted model is restored
	Restoring(model interface{}) error
	// Restored is called after a soft-deleted model is restored
	Restored(model interface{}) error
}

ModelObserver interface for observing model lifecycle events

type Observable

type Observable interface {
	// Observe registers an observer for the model
	Observe(observer Observer)

	// GetObservers returns all registered observers
	GetObservers() []Observer
}

Observable represents a model that can be observed

type ObservableDispatcher

type ObservableDispatcher struct {
	*SubscriberDispatcher
	// contains filtered or unexported fields
}

ObservableDispatcher integrates model observers with the event dispatcher

func NewObservableDispatcher

func NewObservableDispatcher() *ObservableDispatcher

NewObservableDispatcher creates a new dispatcher with model observer support

func (*ObservableDispatcher) FireModelEvent

func (d *ObservableDispatcher) FireModelEvent(event string, model interface{}) error

FireModelEvent fires a model lifecycle event

func (*ObservableDispatcher) Observe

func (d *ObservableDispatcher) Observe(modelType string, observer ModelObserver)

Observe registers a model observer

func (*ObservableDispatcher) ObserveModel

func (d *ObservableDispatcher) ObserveModel(model interface{}, observer ModelObserver)

ObserveModel registers an observer for a model instance

type ObservableModel

type ObservableModel interface {
	// GetModelName returns the model type name
	GetModelName() string
	// FireEvent fires an event for this model
	FireEvent(event string) error
}

ObservableModel interface for models that can be observed

type Observer

type Observer interface {
	// Creating is called before a model is created
	Creating(model interface{}) error

	// Created is called after a model is created
	Created(model interface{})

	// Updating is called before a model is updated
	Updating(model interface{}) error

	// Updated is called after a model is updated
	Updated(model interface{})

	// Saving is called before a model is saved
	Saving(model interface{}) error

	// Saved is called after a model is saved
	Saved(model interface{})

	// Deleting is called before a model is deleted
	Deleting(model interface{}) error

	// Deleted is called after a model is deleted
	Deleted(model interface{})

	// Restoring is called before a soft-deleted model is restored
	Restoring(model interface{}) error

	// Restored is called after a soft-deleted model is restored
	Restored(model interface{})
}

Observer handles model lifecycle events

type ObserverRegistry

type ObserverRegistry struct {
	// contains filtered or unexported fields
}

ObserverRegistry manages model observers

func NewObserverRegistry

func NewObserverRegistry() *ObserverRegistry

NewObserverRegistry creates a new observer registry

func (*ObserverRegistry) ClearAll

func (r *ObserverRegistry) ClearAll()

ClearAll removes all observers

func (*ObserverRegistry) ClearObservers

func (r *ObserverRegistry) ClearObservers(modelType string)

ClearObservers removes all observers for a model type

func (*ObserverRegistry) Fire

func (r *ObserverRegistry) Fire(event string, model interface{}) error

Fire fires a model event to all registered observers

func (*ObserverRegistry) GetObservers

func (r *ObserverRegistry) GetObservers(modelType string) []ModelObserver

GetObservers returns all observers for a model type

func (*ObserverRegistry) Observe

func (r *ObserverRegistry) Observe(modelType string, observer ModelObserver)

Observe registers an observer for a model type

func (*ObserverRegistry) ObserveModel

func (r *ObserverRegistry) ObserveModel(model interface{}, observer ModelObserver)

ObserveModel registers an observer for a model instance (extracts type)

type PendingEvents

type PendingEvents struct {
	// contains filtered or unexported fields
}

PendingEvents tracks events that should be dispatched after database commit

func NewPendingEvents

func NewPendingEvents() *PendingEvents

NewPendingEvents creates a new pending events tracker

func (*PendingEvents) Add

func (p *PendingEvents) Add(event interface{})

Add adds an event to pending

func (*PendingEvents) Clear

func (p *PendingEvents) Clear()

Clear clears all pending events without returning them

func (*PendingEvents) Flush

func (p *PendingEvents) Flush() []interface{}

Flush returns and clears all pending events

type PriorityDispatcher

type PriorityDispatcher struct {
	*QueueIntegratedDispatcher
}

PriorityDispatcher handles listeners with priority ordering

func NewPriorityDispatcher

func NewPriorityDispatcher() *PriorityDispatcher

NewPriorityDispatcher creates a new priority-aware dispatcher

type PriorityListener

type PriorityListener interface {
	Listener
	// Priority returns the listener priority (higher numbers = higher priority)
	Priority() int
}

PriorityListener extends Listener with priority support

type QueueDispatcher

type QueueDispatcher interface {
	Push(event interface{}, listener Listener, delay time.Duration) error
}

QueueDispatcher handles queued event dispatching

type QueueIntegratedDispatcher

type QueueIntegratedDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

QueueIntegratedDispatcher extends DefaultDispatcher with deep queue integration

func NewQueueIntegratedDispatcher

func NewQueueIntegratedDispatcher() *QueueIntegratedDispatcher

NewQueueIntegratedDispatcher creates a new queue-integrated dispatcher

func (*QueueIntegratedDispatcher) Dispatch

func (d *QueueIntegratedDispatcher) Dispatch(event interface{}) error

Dispatch fires an event to all registered listeners with enhanced queue support

func (*QueueIntegratedDispatcher) ProcessEventListenerJob

func (d *QueueIntegratedDispatcher) ProcessEventListenerJob(data []byte) error

ProcessEventListenerJob processes an event listener job from the queue

func (*QueueIntegratedDispatcher) RegisterListenerFactory

func (d *QueueIntegratedDispatcher) RegisterListenerFactory(listenerType string, factory func() Listener)

RegisterListenerFactory registers a factory function for creating listener instances

type QueuedBaseListener

type QueuedBaseListener struct {
	BaseListener
	Connection string
	Queue      string
	Delay      time.Duration
	MaxTries   int
}

QueuedBaseListener provides a base for queued listeners

func (*QueuedBaseListener) OnConnection

func (l *QueuedBaseListener) OnConnection() string

OnConnection returns the queue connection

func (*QueuedBaseListener) OnQueue

func (l *QueuedBaseListener) OnQueue() string

OnQueue returns the queue name

func (*QueuedBaseListener) ShouldQueue

func (l *QueuedBaseListener) ShouldQueue() bool

ShouldQueue returns true for queued listeners

func (*QueuedBaseListener) Tries

func (l *QueuedBaseListener) Tries() int

Tries returns the number of retry attempts

func (*QueuedBaseListener) WithDelay

func (l *QueuedBaseListener) WithDelay() time.Duration

WithDelay returns the processing delay

type QueuedListener

type QueuedListener interface {
	Listener

	// OnConnection specifies the queue connection
	OnConnection() string

	// OnQueue specifies the queue name
	OnQueue() string

	// WithDelay specifies the delay before processing
	WithDelay() time.Duration

	// Tries specifies the number of retry attempts
	Tries() int
}

QueuedListener extends Listener with queue configuration

type QueuedMethodListener

type QueuedMethodListener struct {
	// contains filtered or unexported fields
}

QueuedMethodListener wraps a method as a queued listener

func (*QueuedMethodListener) Handle

func (l *QueuedMethodListener) Handle(event interface{}) error

func (*QueuedMethodListener) OnQueue

func (l *QueuedMethodListener) OnQueue() string

OnQueue returns the queue name

func (*QueuedMethodListener) ShouldQueue

func (l *QueuedMethodListener) ShouldQueue() bool

ShouldQueue returns true for queued listeners

func (*QueuedMethodListener) Tries

func (l *QueuedMethodListener) Tries() int

Tries returns the number of retry attempts

func (*QueuedMethodListener) WithDelay

func (l *QueuedMethodListener) WithDelay() int

WithDelay returns the delay in seconds

type RateLimitedDispatcher

type RateLimitedDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

RateLimitedDispatcher provides rate limiting for event dispatching

func NewRateLimitedDispatcher

func NewRateLimitedDispatcher(maxEvents int, window time.Duration) *RateLimitedDispatcher

NewRateLimitedDispatcher creates a new rate-limited dispatcher

func (*RateLimitedDispatcher) Dispatch

func (d *RateLimitedDispatcher) Dispatch(event interface{}) error

Dispatch dispatches events with rate limiting

func (*RateLimitedDispatcher) GetRemainingEvents

func (d *RateLimitedDispatcher) GetRemainingEvents(eventName string) int

GetRemainingEvents returns the number of events that can still be dispatched

type RetryMiddleware

type RetryMiddleware struct {
	// contains filtered or unexported fields
}

RetryMiddleware retries failed event dispatches

func NewRetryMiddleware

func NewRetryMiddleware(maxRetries int, delay time.Duration) *RetryMiddleware

NewRetryMiddleware creates a new retry middleware

func (*RetryMiddleware) GetAttempts

func (m *RetryMiddleware) GetAttempts(eventName string) int

GetAttempts returns the number of attempts for an event

func (*RetryMiddleware) Handle

func (m *RetryMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle retries failed dispatches

type ShouldHandle

type ShouldHandle interface {
	// ShouldHandle determines if the listener should handle the event
	ShouldHandle(event interface{}) bool
}

ShouldHandle allows conditional event handling

type StoppableEvent

type StoppableEvent interface {
	Event
	// ShouldStopPropagation returns true if event propagation should stop
	ShouldStopPropagation() bool
	// StopPropagation marks the event to stop propagation
	StopPropagation()
}

StoppableEvent allows events to signal that propagation should stop

type StoppablePropagationDispatcher

type StoppablePropagationDispatcher struct {
	*PriorityDispatcher
}

StoppablePropagationDispatcher handles events that can stop propagation

func NewStoppablePropagationDispatcher

func NewStoppablePropagationDispatcher() *StoppablePropagationDispatcher

NewStoppablePropagationDispatcher creates a new stoppable propagation dispatcher

func (*StoppablePropagationDispatcher) Dispatch

func (d *StoppablePropagationDispatcher) Dispatch(event interface{}) error

Dispatch fires an event with support for stopping propagation

type StoppablePropagationListener

type StoppablePropagationListener interface {
	Listener
	// HandleWithPropagation processes the event and can stop propagation
	HandleWithPropagation(event interface{}) (stopPropagation bool, err error)
}

StoppablePropagationListener can signal to stop event propagation

type Subscriber

type Subscriber interface {
	// Subscribe registers the subscriber's listeners
	Subscribe(dispatcher Dispatcher)
}

Subscriber registers multiple event listeners

type SubscriberDispatcher

type SubscriberDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

SubscriberDispatcher extends the base dispatcher with subscriber support

func NewSubscriberDispatcher

func NewSubscriberDispatcher() *SubscriberDispatcher

NewSubscriberDispatcher creates a new dispatcher with subscriber support

func (*SubscriberDispatcher) GetSubscribers

func (d *SubscriberDispatcher) GetSubscribers() []Subscriber

GetSubscribers returns all registered subscribers

func (*SubscriberDispatcher) Subscribe

func (d *SubscriberDispatcher) Subscribe(subscriber Subscriber)

Subscribe registers a subscriber with the dispatcher

type SubscriberError

type SubscriberError struct {
	Subscriber string
	Method     string
	Err        error
}

SubscriberError represents an error during subscription

func (*SubscriberError) Error

func (e *SubscriberError) Error() string

type SubscriberGroup

type SubscriberGroup struct {
	// contains filtered or unexported fields
}

SubscriberGroup allows grouping multiple subscribers

func NewSubscriberGroup

func NewSubscriberGroup(subscribers ...Subscriber) *SubscriberGroup

NewSubscriberGroup creates a new subscriber group

func (*SubscriberGroup) Add

func (g *SubscriberGroup) Add(subscriber Subscriber)

Add adds a subscriber to the group

func (*SubscriberGroup) Subscribe

func (g *SubscriberGroup) Subscribe(dispatcher Dispatcher)

Subscribe registers all subscribers in the group

type ThrottlingDispatcher

type ThrottlingDispatcher struct {
	*DefaultDispatcher
	// contains filtered or unexported fields
}

ThrottlingDispatcher throttles event dispatching to a maximum rate

func NewThrottlingDispatcher

func NewThrottlingDispatcher(interval time.Duration) *ThrottlingDispatcher

NewThrottlingDispatcher creates a new throttling dispatcher

func (*ThrottlingDispatcher) CanDispatch

func (d *ThrottlingDispatcher) CanDispatch(event interface{}) bool

CanDispatch checks if an event can be dispatched now

func (*ThrottlingDispatcher) Dispatch

func (d *ThrottlingDispatcher) Dispatch(event interface{}) error

Dispatch throttles event dispatching

func (*ThrottlingDispatcher) Reset

func (d *ThrottlingDispatcher) Reset(eventName string)

Reset resets the throttle state for an event

type TimingMiddleware

type TimingMiddleware struct {
	// contains filtered or unexported fields
}

TimingMiddleware measures event dispatch time

func NewTimingMiddleware

func NewTimingMiddleware() *TimingMiddleware

NewTimingMiddleware creates a new timing middleware

func (*TimingMiddleware) GetAllTimings

func (m *TimingMiddleware) GetAllTimings() map[string]time.Duration

GetAllTimings returns all timings

func (*TimingMiddleware) GetTiming

func (m *TimingMiddleware) GetTiming(eventName string) time.Duration

GetTiming returns the timing for an event

func (*TimingMiddleware) Handle

func (m *TimingMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle measures dispatch time

type TransactionalDispatcher

type TransactionalDispatcher struct {
	Dispatcher
	// contains filtered or unexported fields
}

TransactionalDispatcher wraps a dispatcher with transaction support

func NewTransactionalDispatcher

func NewTransactionalDispatcher(dispatcher Dispatcher) *TransactionalDispatcher

NewTransactionalDispatcher creates a new transactional dispatcher

func (*TransactionalDispatcher) BeginTransaction

func (t *TransactionalDispatcher) BeginTransaction()

BeginTransaction marks the start of a transaction

func (*TransactionalDispatcher) Commit

func (t *TransactionalDispatcher) Commit() error

Commit commits the transaction and dispatches pending events

func (*TransactionalDispatcher) DispatchAfterCommit

func (t *TransactionalDispatcher) DispatchAfterCommit(event interface{})

DispatchAfterCommit dispatches an event after the current transaction commits

func (*TransactionalDispatcher) Rollback

func (t *TransactionalDispatcher) Rollback()

Rollback rolls back the transaction and clears pending events

type TransformMiddleware

type TransformMiddleware struct {
	// contains filtered or unexported fields
}

TransformMiddleware transforms events before dispatching

func NewTransformMiddleware

func NewTransformMiddleware(transformer func(interface{}) interface{}) *TransformMiddleware

NewTransformMiddleware creates a new transform middleware

func (*TransformMiddleware) Handle

func (m *TransformMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle transforms the event

type ValidationMiddleware

type ValidationMiddleware struct {
	// contains filtered or unexported fields
}

ValidationMiddleware validates events before dispatching

func NewValidationMiddleware

func NewValidationMiddleware(validator func(interface{}) error) *ValidationMiddleware

NewValidationMiddleware creates a new validation middleware

func (*ValidationMiddleware) Handle

func (m *ValidationMiddleware) Handle(event interface{}, next func(interface{}) error) error

Handle validates the event

type WildcardCache

type WildcardCache struct {
	// contains filtered or unexported fields
}

WildcardCache caches wildcard pattern matching results for performance

func NewWildcardCache

func NewWildcardCache() *WildcardCache

NewWildcardCache creates a new wildcard cache

func (*WildcardCache) Clear

func (c *WildcardCache) Clear()

Clear clears the cache

func (*WildcardCache) Matches

func (c *WildcardCache) Matches(eventName, pattern string) bool

Matches checks if an event matches a pattern using cache

type WildcardListener

type WildcardListener struct {
	Pattern  string
	Listener Listener
}

WildcardListener wraps a listener for wildcard pattern matching

func (*WildcardListener) Matches

func (w *WildcardListener) Matches(eventName string) bool

Matches checks if an event name matches the wildcard pattern

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL