Documentation
¶
Overview ¶
Package events provides an event bus for component coordination in the HAPTIC controller.
The event bus supports two communication patterns: 1. Async pub/sub: Fire-and-forget event publishing for observability and loose coupling 2. Sync request-response: Scatter-gather pattern for coordinated validation and queries
Index ¶
- Constants
- func Subscribe[T Event](ctx context.Context, bus *EventBus, bufferSize int) <-chan T
- func SubscribeMultiple(ctx context.Context, bus *EventBus, bufferSize int, types ...string) <-chan Event
- type Event
- type EventBus
- func (b *EventBus) Pause()
- func (b *EventBus) Publish(event Event) int
- func (b *EventBus) Request(ctx context.Context, request Request, opts RequestOptions) (*RequestResult, error)
- func (b *EventBus) Start()
- func (b *EventBus) Subscribe(bufferSize int) <-chan Event
- func (b *EventBus) SubscribeLeaderOnly(bufferSize int) <-chan Event
- func (b *EventBus) SubscribeTypes(bufferSize int, eventTypes ...string) <-chan Event
- func (b *EventBus) Unsubscribe(ch <-chan Event)
- func (b *EventBus) UnsubscribeTyped(ch <-chan Event)
- type Request
- type RequestOptions
- type RequestResult
- type Response
Constants ¶
const ( // MaxPreStartBufferSize is the maximum number of events that can be buffered // before EventBus.Start() is called. This prevents unbounded memory growth // during startup if many events are published before subscribers are ready. // Events exceeding this limit are dropped with a warning. MaxPreStartBufferSize = 1000 )
Variables ¶
This section is empty.
Functions ¶
func Subscribe ¶
Subscribe is a generic function that returns a typed channel for a specific event type.
This provides compile-time type safety by returning a channel of the specific event type T. Events are filtered to only include those that match type T.
The generic type T must be a pointer to a struct that implements the Event interface. This is required because events are typically published as pointer types.
Parameters:
- ctx: Context for the goroutine lifetime
- bus: The EventBus to subscribe to
- bufferSize: Size of the output channel buffer
Returns a channel of type T that receives only events of that type.
Example:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventChan := events.Subscribe[*ReconciliationTriggeredEvent](ctx, bus, 100)
for event := range eventChan {
// event is already *ReconciliationTriggeredEvent
fmt.Println(event.Reason)
}
Note: The context controls the lifetime of the internal forwarding goroutine. When the context is cancelled, the goroutine stops, the subscription is removed from the bus, and the channel will stop receiving events.
func SubscribeMultiple ¶
func SubscribeMultiple(ctx context.Context, bus *EventBus, bufferSize int, types ...string) <-chan Event
SubscribeMultiple is a generic function that returns a typed channel for multiple event types.
This is useful when a component needs to receive events of different types that share a common interface or base type.
Parameters:
- ctx: Context for the goroutine lifetime
- bus: The EventBus to subscribe to
- bufferSize: Size of the output channel buffer
- types: Event type strings to filter for
Returns a channel that receives events matching any of the specified types.
Example:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventChan := events.SubscribeMultiple(ctx, bus, 100,
"reconciliation.triggered",
"reconciliation.started",
"reconciliation.completed")
for event := range eventChan {
switch e := event.(type) {
case *ReconciliationTriggeredEvent:
// handle
case *ReconciliationStartedEvent:
// handle
}
}
Types ¶
type Event ¶
type Event interface {
// EventType returns a unique identifier for this event type.
// Convention: use dot-notation like "config.parsed" or "deployment.completed"
EventType() string
// Timestamp returns when this event occurred.
// Used for event correlation and temporal analysis.
Timestamp() time.Time
}
Event is the base interface for all events in the system. Events are used for asynchronous pub/sub communication between components.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides centralized pub/sub coordination for all controller components.
The EventBus supports two patterns: - Publish() for async fire-and-forget events (observability, notifications) - Request() for sync scatter-gather pattern (validation, queries)
EventBus is thread-safe and can be used concurrently from multiple goroutines.
Startup Coordination: Events published before Start() is called are buffered and replayed after Start(). This prevents race conditions during component initialization.
Typed Subscriptions: In addition to universal subscriptions (Subscribe), the EventBus supports typed subscriptions (SubscribeTypes) that filter events at the bus level for efficiency.
func NewEventBus ¶
NewEventBus creates a new EventBus.
The bus starts in buffering mode - events published before Start() is called will be buffered and replayed when Start() is invoked. This ensures no events are lost during component initialization.
The capacity parameter sets the initial buffer size for pre-start events. Recommended: 100 for most applications.
func (*EventBus) Pause ¶
func (b *EventBus) Pause()
Pause temporarily suspends event delivery, buffering events for later replay.
This reuses the existing preStartBuffer infrastructure used during startup. Events published while paused are buffered and will be replayed when Start() is called again.
Use cases:
- Leadership transition (pause while starting leader-only components)
- Hot reload scenarios
- Testing
This method is idempotent - calling it when already paused has no effect. Thread-safe and can be called concurrently with Publish() and Subscribe().
Example:
// During leadership transition
bus.Pause() // Buffer events
bus.Publish(BecameLeaderEvent{}) // Buffered
startLeaderOnlyComponents() // Components subscribe
bus.Start() // Replay buffered events
func (*EventBus) Publish ¶
Publish sends an event to all subscribers.
If Start() has not been called yet, the event is buffered and will be replayed when Start() is invoked. This prevents events from being lost during component initialization.
After Start() is called, this is a non-blocking operation. If a subscriber's channel is full, the event is dropped for that subscriber to prevent slow consumers from blocking the entire system.
Returns the number of subscribers that successfully received the event. Returns 0 if event was buffered (before Start()).
func (*EventBus) Request ¶
func (b *EventBus) Request(ctx context.Context, request Request, opts RequestOptions) (*RequestResult, error)
Request sends a request event and waits for responses using the scatter-gather pattern.
This is a synchronous operation that: 1. Publishes the request event to all subscribers (scatter phase) 2. Collects response events matching the request ID (gather phase) 3. Returns when all expected responders have replied or timeout occurs
The request must implement the Request interface to provide a unique RequestID for correlating responses.
Use this method when you need coordinated responses from multiple components, such as multi-phase validation or distributed queries.
Example:
req := NewConfigValidationRequest(config, version)
result, err := bus.Request(ctx, req, RequestOptions{
Timeout: 10 * time.Second,
ExpectedResponders: []string{"basic", "template", "jsonpath"},
})
func (*EventBus) Start ¶
func (b *EventBus) Start()
Start releases all buffered events and switches the bus to normal operation mode.
This method should be called after all components have subscribed to the bus during application startup. It ensures that no events are lost during the initialization phase.
Behavior:
- Marks the bus as started
- Replays all buffered events to subscribers in order
- Clears the buffer
- All subsequent Publish() calls go directly to subscribers
This method is idempotent - calling it multiple times has no additional effect. Thread-safe and can be called concurrently with Publish() and Subscribe().
Example:
bus := NewEventBus(100) // Components subscribe during setup commentator := NewEventCommentator(bus, logger, 1000) validator := NewValidator(bus) // ... more subscribers ... // Release buffered events bus.Start()
func (*EventBus) Subscribe ¶
Subscribe creates a new subscription to the event bus.
The returned channel will receive all events published to the bus. The bufferSize parameter controls the channel buffer size - larger buffers reduce the chance of dropped events for slow consumers.
Subscribers must continuously read from the channel to avoid dropped events. A bufferSize of 100 is recommended for most use cases.
The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.
IMPORTANT: For all-replica components, call this method BEFORE EventBus.Start() to ensure buffered events are received. Subscribing after Start() will trigger a warning as it may indicate a bug. For leader-only components that intentionally subscribe late (after leader election), use SubscribeLeaderOnly() instead.
func (*EventBus) SubscribeLeaderOnly ¶
SubscribeLeaderOnly creates a subscription for leader-only components.
This method is identical to Subscribe() but does not log a warning when called after EventBus.Start(). Use this for components that only run on the leader replica and are intentionally started after leader election.
Leader-only components rely on the state replay mechanism: all-replica components re-publish their cached state when BecameLeaderEvent is received, ensuring leader-only components don't miss critical state even though they subscribe late.
The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.
func (*EventBus) SubscribeTypes ¶
SubscribeTypes creates a subscription that only receives events of the specified types.
This is more efficient than universal Subscribe() when a component only cares about specific event types, as filtering happens at the EventBus level rather than in each subscriber's event loop.
Parameters:
- bufferSize: Size of the output channel buffer
- eventTypes: Event type strings to filter for (from Event.EventType())
Returns a channel that receives only events matching the specified types. The channel is read-only and will never be closed.
To stop receiving events and prevent memory leaks, call UnsubscribeTyped() with the returned channel.
Example:
eventChan := bus.SubscribeTypes(100,
"reconciliation.triggered",
"template.rendered",
"validation.completed")
defer bus.UnsubscribeTyped(eventChan) // Clean up when done
for event := range eventChan {
// Only receives the specified event types
}
func (*EventBus) Unsubscribe ¶
Unsubscribe removes a subscription from the event bus.
This method should be called when a subscriber no longer needs to receive events, to prevent memory leaks. After calling Unsubscribe, the channel will no longer receive events.
Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.
This method is safe to call multiple times for the same channel.
func (*EventBus) UnsubscribeTyped ¶
UnsubscribeTyped removes a typed subscription from the event bus.
This method should be called when a subscriber no longer needs to receive events from a typed subscription (created via SubscribeTypes), to prevent memory leaks.
Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.
This method is safe to call multiple times for the same channel.
type Request ¶
type Request interface {
Event
// RequestID returns a unique identifier for this request.
// Responses must include this ID to be correlated correctly.
RequestID() string
}
Request is the interface for request events in the scatter-gather pattern.
Requests are broadcast to all subscribers (scatter phase), and responses are correlated by RequestID (gather phase).
type RequestOptions ¶
type RequestOptions struct {
// Timeout is the maximum time to wait for responses.
// If zero, defaults to 10 seconds.
Timeout time.Duration
// ExpectedResponders lists the names of components expected to respond.
// If empty, the request will wait indefinitely for responses.
ExpectedResponders []string
// MinResponses is the minimum number of responses required.
// If zero, all ExpectedResponders must respond.
// Set to a lower value to implement graceful degradation.
MinResponses int
}
RequestOptions configures the behavior of a scatter-gather request.
type RequestResult ¶
type RequestResult struct {
// Responses contains all responses received before timeout or completion.
Responses []Response
// Errors contains error messages for responders that didn't respond
// or timed out. Empty if all expected responders replied.
Errors []string
}
RequestResult contains the aggregated results from a scatter-gather request.
type Response ¶
type Response interface {
Event
// RequestID returns the ID of the request this response belongs to.
RequestID() string
// Responder returns the name of the component that sent this response.
Responder() string
}
Response is the interface for response events in the scatter-gather pattern.
Each response includes the original RequestID and the name of the responder for tracking and debugging purposes.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package ringbuffer provides a thread-safe generic ring buffer implementation.
|
Package ringbuffer provides a thread-safe generic ring buffer implementation. |