Documentation
¶
Overview ¶
Package events provides an event bus for component coordination in the HAPTIC controller.
The event bus supports two communication patterns: 1. Async pub/sub: Fire-and-forget event publishing for observability and loose coupling 2. Sync request-response: Scatter-gather pattern for coordinated validation and queries
Index ¶
- Constants
- func Subscribe[T Event](ctx context.Context, bus *EventBus, bufferSize int) <-chan T
- func SubscribeMultiple(ctx context.Context, bus *EventBus, bufferSize int, types ...string) <-chan Event
- type CoalescibleEvent
- type DropCallback
- type DropInfo
- type Event
- type EventBus
- func (b *EventBus) DroppedEvents() uint64
- func (b *EventBus) DroppedEventsCritical() uint64
- func (b *EventBus) DroppedEventsObservability() uint64
- func (b *EventBus) Pause()
- func (b *EventBus) Publish(event Event) int
- func (b *EventBus) Request(ctx context.Context, request Request, opts RequestOptions) (*RequestResult, error)
- func (b *EventBus) SetDropCallback(cb DropCallback)
- func (b *EventBus) Start()
- func (b *EventBus) Subscribe(name string, bufferSize int) <-chan Event
- func (b *EventBus) SubscribeLeaderOnly(name string, bufferSize int) <-chan Event
- func (b *EventBus) SubscribeLossy(name string, bufferSize int) <-chan Event
- func (b *EventBus) SubscribeTypes(name string, bufferSize int, eventTypes ...string) <-chan Event
- func (b *EventBus) SubscribeTypesLeaderOnly(name string, bufferSize int, eventTypes ...string) <-chan Event
- func (b *EventBus) SubscribeTypesLossy(name string, bufferSize int, eventTypes ...string) <-chan Event
- func (b *EventBus) Unsubscribe(ch <-chan Event)
- func (b *EventBus) UnsubscribeTyped(ch <-chan Event)
- type Request
- type RequestOptions
- type RequestResult
- type Response
Constants ¶
const ( // MaxPreStartBufferSize is the maximum number of events that can be buffered // before EventBus.Start() is called. This prevents unbounded memory growth // during startup if many events are published before subscribers are ready. // Events exceeding this limit are dropped with a warning. MaxPreStartBufferSize = 1000 )
Variables ¶
This section is empty.
Functions ¶
func Subscribe ¶
Subscribe is a generic function that returns a typed channel for a specific event type.
This provides compile-time type safety by returning a channel of the specific event type T. Events are filtered to only include those that match type T.
The generic type T must be a pointer to a struct that implements the Event interface. This is required because events are typically published as pointer types.
Parameters:
- ctx: Context for the goroutine lifetime
- bus: The EventBus to subscribe to
- bufferSize: Size of the output channel buffer
Returns a channel of type T that receives only events of that type.
Example:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventChan := events.Subscribe[*ReconciliationTriggeredEvent](ctx, bus, 100)
for event := range eventChan {
// event is already *ReconciliationTriggeredEvent
fmt.Println(event.Reason)
}
Note: The context controls the lifetime of the internal forwarding goroutine. When the context is cancelled, the goroutine stops, the subscription is removed from the bus, and the channel will stop receiving events.
func SubscribeMultiple ¶
func SubscribeMultiple(ctx context.Context, bus *EventBus, bufferSize int, types ...string) <-chan Event
SubscribeMultiple is a generic function that returns a typed channel for multiple event types.
This is useful when a component needs to receive events of different types that share a common interface or base type.
Parameters:
- ctx: Context for the goroutine lifetime
- bus: The EventBus to subscribe to
- bufferSize: Size of the output channel buffer
- types: Event type strings to filter for
Returns a channel that receives events matching any of the specified types.
Example:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventChan := events.SubscribeMultiple(ctx, bus, 100,
"reconciliation.triggered",
"reconciliation.started",
"reconciliation.completed")
for event := range eventChan {
switch e := event.(type) {
case *ReconciliationTriggeredEvent:
// handle
case *ReconciliationStartedEvent:
// handle
}
}
Types ¶
type CoalescibleEvent ¶
type CoalescibleEvent interface {
Event
// Coalescible returns true if this event can be safely skipped when a newer
// event of the same type is available. The emitter sets this based on context:
// - true: "state update" events where only the latest matters
// - false: "command" events that must be processed individually
Coalescible() bool
}
CoalescibleEvent is an optional interface for events that support coalescing. Events implementing this interface can be safely skipped when a newer event of the same type is available in the queue.
This interface follows the Interface Segregation Principle - only events that need coalescing implement it, keeping the base Event interface minimal.
The Coalescible() method is set by the event emitter (not derived from event fields), following the Single Responsibility Principle - the emitter knows the context and decides whether this specific event instance can be coalesced.
type DropCallback ¶
type DropCallback func(info DropInfo)
DropCallback is called when an event is dropped due to a full subscriber buffer. This callback pattern keeps the EventBus domain-agnostic while allowing the controller layer to handle drops with appropriate logging and metrics.
type DropInfo ¶
type DropInfo struct {
EventType string // The type of event that was dropped
SubscriberName string // Name of the subscriber whose buffer was full
BufferSize int // Total buffer size
EventTypes string // For typed subscriptions, the event types being filtered (comma-separated)
}
DropInfo contains information about a dropped event for debugging.
type Event ¶
type Event interface {
// EventType returns a unique identifier for this event type.
// Convention: use dot-notation like "config.parsed" or "deployment.completed"
EventType() string
// Timestamp returns when this event occurred.
// Used for event correlation and temporal analysis.
Timestamp() time.Time
}
Event is the base interface for all events in the system. Events are used for asynchronous pub/sub communication between components.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides centralized pub/sub coordination for all controller components.
The EventBus supports two patterns: - Publish() for async fire-and-forget events (observability, notifications) - Request() for sync scatter-gather pattern (validation, queries)
EventBus is thread-safe and can be used concurrently from multiple goroutines.
Startup Coordination: Events published before Start() is called are buffered and replayed after Start(). This prevents race conditions during component initialization.
Typed Subscriptions: In addition to universal subscriptions (Subscribe), the EventBus supports typed subscriptions (SubscribeTypes) that filter events at the bus level for efficiency.
Lossy Subscriptions: For observability components where occasional drops are acceptable, use SubscribeLossy() or SubscribeTypesLossy(). These subscriptions silently drop events without triggering the onDrop callback, and drops are counted separately in DroppedEventsObservability().
Event Drop Monitoring: When subscriber buffers are full, events are dropped to prevent blocking. Use SetDropCallback() to receive notifications when critical drops occur. DroppedEventsCritical() returns drops from business-critical subscribers. DroppedEventsObservability() returns expected drops from lossy subscribers.
func NewEventBus ¶
NewEventBus creates a new EventBus.
The bus starts in buffering mode - events published before Start() is called will be buffered and replayed when Start() is invoked. This ensures no events are lost during component initialization.
The capacity parameter sets the initial buffer size for pre-start events. Recommended: 100 for most applications.
func (*EventBus) DroppedEvents ¶
DroppedEvents returns the total number of events that have been dropped due to full subscriber buffers since the EventBus was created.
This is the sum of DroppedEventsCritical() + DroppedEventsObservability() and is kept for backwards compatibility.
For more actionable monitoring, use the separate counters:
- DroppedEventsCritical() - Alert if > 0 (indicates a problem)
- DroppedEventsObservability() - Expected during high load (no action needed)
func (*EventBus) DroppedEventsCritical ¶
DroppedEventsCritical returns the number of events dropped from business-critical (non-lossy) subscribers.
Non-zero values indicate a problem that needs attention - critical subscribers are not keeping up with event volume.
func (*EventBus) DroppedEventsObservability ¶
DroppedEventsObservability returns the number of events dropped from lossy subscribers (observability components like commentator, debug/events).
Non-zero values are expected and acceptable during high load. These drops don't affect controller operation - they just mean some log entries or debug info was skipped.
func (*EventBus) Pause ¶
func (b *EventBus) Pause()
Pause temporarily suspends event delivery, buffering events for later replay.
This reuses the existing preStartBuffer infrastructure used during startup. Events published while paused are buffered and will be replayed when Start() is called again.
Use cases:
- Leadership transition (pause while starting leader-only components)
- Hot reload scenarios
- Testing
This method is idempotent - calling it when already paused has no effect. Thread-safe and can be called concurrently with Publish() and Subscribe().
Example:
// During leadership transition
bus.Pause() // Buffer events
bus.Publish(BecameLeaderEvent{}) // Buffered
startLeaderOnlyComponents() // Components subscribe
bus.Start() // Replay buffered events
func (*EventBus) Publish ¶
Publish sends an event to all subscribers.
If Start() has not been called yet, the event is buffered and will be replayed when Start() is invoked. This prevents events from being lost during component initialization.
After Start() is called, this is a non-blocking operation. If a subscriber's channel is full, the event is dropped for that subscriber to prevent slow consumers from blocking the entire system.
Returns the number of subscribers that successfully received the event. Returns 0 if event was buffered (before Start()).
func (*EventBus) Request ¶
func (b *EventBus) Request(ctx context.Context, request Request, opts RequestOptions) (*RequestResult, error)
Request sends a request event and waits for responses using the scatter-gather pattern.
This is a synchronous operation that: 1. Publishes the request event to all subscribers (scatter phase) 2. Collects response events matching the request ID (gather phase) 3. Returns when all expected responders have replied or timeout occurs
The request must implement the Request interface to provide a unique RequestID for correlating responses.
Use this method when you need coordinated responses from multiple components, such as multi-phase validation or distributed queries.
Example:
req := NewConfigValidationRequest(config, version)
result, err := bus.Request(ctx, req, RequestOptions{
Timeout: 10 * time.Second,
ExpectedResponders: []string{"basic", "template", "jsonpath"},
})
func (*EventBus) SetDropCallback ¶
func (b *EventBus) SetDropCallback(cb DropCallback)
SetDropCallback sets a callback to be invoked when events are dropped from CRITICAL (non-lossy) subscriber buffers. The callback receives the event type string.
This callback is NOT called for lossy subscribers (created via SubscribeLossy() or SubscribeTypesLossy()). Lossy drops are expected and silently counted in DroppedEventsObservability().
This callback pattern keeps the EventBus domain-agnostic while allowing the controller layer to handle drops with appropriate logging and metrics.
Set to nil to disable drop notifications.
Example:
bus.SetDropCallback(func(eventType string) {
slog.Warn("Event dropped from critical subscriber", "event_type", eventType)
metrics.EventsDroppedCritical.Inc()
})
func (*EventBus) Start ¶
func (b *EventBus) Start()
Start releases all buffered events and switches the bus to normal operation mode.
This method should be called after all components have subscribed to the bus during application startup. It ensures that no events are lost during the initialization phase.
Behavior:
- Marks the bus as started
- Replays all buffered events to subscribers in order
- Clears the buffer
- All subsequent Publish() calls go directly to subscribers
This method is idempotent - calling it multiple times has no additional effect. Thread-safe and can be called concurrently with Publish() and Subscribe().
Example:
bus := NewEventBus(100) // Components subscribe during setup commentator := NewEventCommentator(bus, logger, 1000) validator := NewValidator(bus) // ... more subscribers ... // Release buffered events bus.Start()
func (*EventBus) Subscribe ¶
Subscribe creates a new subscription to the event bus.
The returned channel will receive all events published to the bus. The bufferSize parameter controls the channel buffer size - larger buffers reduce the chance of dropped events for slow consumers.
Subscribers must continuously read from the channel to avoid dropped events. A bufferSize of 100 is recommended for most use cases.
The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.
IMPORTANT: For all-replica components, call this method BEFORE EventBus.Start() to ensure buffered events are received. Subscribing after Start() will trigger a warning as it may indicate a bug. For leader-only components that intentionally subscribe late (after leader election), use SubscribeLeaderOnly() instead.
Parameters:
- name: Subscriber name for debugging (e.g., "commentator", "reconciler")
- bufferSize: Size of the channel buffer
func (*EventBus) SubscribeLeaderOnly ¶
SubscribeLeaderOnly creates a subscription for leader-only components.
This method is identical to Subscribe() but does not log a warning when called after EventBus.Start(). Use this for components that only run on the leader replica and are intentionally started after leader election.
Leader-only components rely on the state replay mechanism: all-replica components re-publish their cached state when BecameLeaderEvent is received, ensuring leader-only components don't miss critical state even though they subscribe late.
The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.
Parameters:
- name: Subscriber name for debugging (e.g., "deployer", "scheduler")
- bufferSize: Size of the channel buffer
func (*EventBus) SubscribeLossy ¶
SubscribeLossy creates a subscription that silently drops events when buffer is full.
Use this for observability components (like commentator, debug/events) where occasional event drops are acceptable and expected during high load. Drops from lossy subscribers:
- Are counted in DroppedEventsObservability() (for metrics)
- Do NOT trigger the onDrop callback (no WARN logs)
This prevents log spam from expected drops in observability components while still allowing monitoring via metrics.
The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.
Parameters:
- name: Subscriber name for debugging (e.g., "commentator")
- bufferSize: Size of the channel buffer
func (*EventBus) SubscribeTypes ¶
SubscribeTypes creates a subscription that only receives events of the specified types.
This is more efficient than universal Subscribe() when a component only cares about specific event types, as filtering happens at the EventBus level rather than in each subscriber's event loop.
Parameters:
- name: Subscriber name for debugging (e.g., "reconciler", "renderer")
- bufferSize: Size of the output channel buffer
- eventTypes: Event type strings to filter for (from Event.EventType())
Returns a channel that receives only events matching the specified types. The channel is read-only and will never be closed.
To stop receiving events and prevent memory leaks, call UnsubscribeTyped() with the returned channel.
Example:
eventChan := bus.SubscribeTypes("executor", 100,
"reconciliation.triggered",
"template.rendered",
"validation.completed")
defer bus.UnsubscribeTyped(eventChan) // Clean up when done
for event := range eventChan {
// Only receives the specified event types
}
func (*EventBus) SubscribeTypesLeaderOnly ¶
func (b *EventBus) SubscribeTypesLeaderOnly(name string, bufferSize int, eventTypes ...string) <-chan Event
SubscribeTypesLeaderOnly creates a typed subscription for leader-only components.
This method is identical to SubscribeTypes() but is semantically named to indicate it's intended for leader-only components that subscribe after leader election.
Leader-only components rely on the state replay mechanism: all-replica components re-publish their cached state when BecameLeaderEvent is received, ensuring leader-only components don't miss critical state even though they subscribe late.
Parameters:
- name: Subscriber name for debugging (e.g., "deployer", "scheduler")
- bufferSize: Size of the output channel buffer
- eventTypes: Event type strings to filter for (from Event.EventType())
Returns a channel that receives only events matching the specified types. The channel is read-only and will never be closed.
To stop receiving events and prevent memory leaks, call UnsubscribeTyped() with the returned channel.
Example:
// In a leader-only component's constructor (after BecameLeaderEvent)
eventChan := bus.SubscribeTypesLeaderOnly("scheduler", 50,
events.EventTypeTemplateRendered,
events.EventTypeValidationCompleted,
events.EventTypeLostLeadership)
defer bus.UnsubscribeTyped(eventChan)
func (*EventBus) SubscribeTypesLossy ¶
func (b *EventBus) SubscribeTypesLossy(name string, bufferSize int, eventTypes ...string) <-chan Event
SubscribeTypesLossy creates a typed subscription that silently drops events when full.
Use this for observability components that filter by event type but where occasional drops are acceptable. Drops from lossy subscribers:
- Are counted in DroppedEventsObservability() (for metrics)
- Do NOT trigger the onDrop callback (no WARN logs)
Parameters:
- name: Subscriber name for debugging (e.g., "metrics", "debug-events")
- bufferSize: Size of the output channel buffer
- eventTypes: Event type strings to filter for (from Event.EventType())
Returns a channel that receives only events matching the specified types. The channel is read-only and will never be closed.
To stop receiving events and prevent memory leaks, call UnsubscribeTyped() with the returned channel.
func (*EventBus) Unsubscribe ¶
Unsubscribe removes a subscription from the event bus.
This method should be called when a subscriber no longer needs to receive events, to prevent memory leaks. After calling Unsubscribe, the channel will no longer receive events.
Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.
This method is safe to call multiple times for the same channel.
func (*EventBus) UnsubscribeTyped ¶
UnsubscribeTyped removes a typed subscription from the event bus.
This method should be called when a subscriber no longer needs to receive events from a typed subscription (created via SubscribeTypes), to prevent memory leaks.
Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.
This method is safe to call multiple times for the same channel.
type Request ¶
type Request interface {
Event
// RequestID returns a unique identifier for this request.
// Responses must include this ID to be correlated correctly.
RequestID() string
}
Request is the interface for request events in the scatter-gather pattern.
Requests are broadcast to all subscribers (scatter phase), and responses are correlated by RequestID (gather phase).
type RequestOptions ¶
type RequestOptions struct {
// Timeout is the maximum time to wait for responses.
// If zero, defaults to 10 seconds.
Timeout time.Duration
// ExpectedResponders lists the names of components expected to respond.
// If empty, the request will wait indefinitely for responses.
ExpectedResponders []string
// MinResponses is the minimum number of responses required.
// If zero, all ExpectedResponders must respond.
// Set to a lower value to implement graceful degradation.
MinResponses int
}
RequestOptions configures the behavior of a scatter-gather request.
type RequestResult ¶
type RequestResult struct {
// Responses contains all responses received before timeout or completion.
Responses []Response
// Errors contains error messages for responders that didn't respond
// or timed out. Empty if all expected responders replied.
Errors []string
}
RequestResult contains the aggregated results from a scatter-gather request.
type Response ¶
type Response interface {
Event
// RequestID returns the ID of the request this response belongs to.
RequestID() string
// Responder returns the name of the component that sent this response.
Responder() string
}
Response is the interface for response events in the scatter-gather pattern.
Each response includes the original RequestID and the name of the responder for tracking and debugging purposes.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package ringbuffer provides a thread-safe generic ring buffer implementation.
|
Package ringbuffer provides a thread-safe generic ring buffer implementation. |