events

package
v0.1.0-alpha.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

README

pkg/events

Pure event bus infrastructure for event-driven architecture.

Overview

This package provides generic pub/sub and request-response coordination mechanisms. It contains no domain knowledge - all application-specific event types are defined in pkg/controller/events.

Architecture

pkg/events/              # Generic infrastructure (reusable)
├── bus.go              # EventBus with startup coordination
├── request.go          # Scatter-gather pattern
└── bus_test.go         # Infrastructure tests

pkg/controller/events/   # Domain-specific event catalog
└── types.go            # 50+ controller event types

Core Components

EventBus

Thread-safe pub/sub coordinator with startup synchronization.

Features:

  • Non-blocking publish with backpressure handling (drops events to slow subscribers)
  • Startup coordination via buffering (prevents race conditions during initialization)
  • Thread-safe concurrent access
  • Simple subscribe/publish API

Usage:

import "haptic/pkg/events"

// Create bus with buffer capacity for pre-start events
bus := events.NewEventBus(100)

// Subscribe (returns read-only channel)
eventChan := bus.Subscribe(200) // 200 event buffer

// Start after all subscribers are ready (releases buffered events)
bus.Start()

// Publish events
bus.Publish(myEvent)

// Receive events
for event := range eventChan {
    switch e := event.(type) {
    case SomeEventType:
        // Handle event
    }
}
Event Interface

All events must implement the Event interface:

type Event interface {
    EventType() string      // Unique event identifier (e.g., "config.validated")
    Timestamp() time.Time   // When the event occurred
}
Startup Coordination

The EventBus includes buffering to prevent race conditions during initialization:

  1. Before Start(): Events are buffered
  2. After Start(): Buffered events are replayed, then events flow normally

This ensures no events are lost if published before all subscribers connect.

Example:

bus := events.NewEventBus(100)

// Components subscribe during setup
component1 := NewComponent1(bus)
component2 := NewComponent2(bus)
component3 := NewComponent3(bus)

// Start the bus after all subscribers are ready
// This replays any buffered events and switches to normal operation
bus.Start()

// Now all components will receive events
bus.Publish(SystemReadyEvent{})
Request-Response (Scatter-Gather)

Synchronous coordination across multiple responders using the scatter-gather pattern.

Use Cases:

  • Configuration validation (multiple validators must approve)
  • Distributed queries (gather responses from multiple sources)
  • Coordinated operations (need confirmation from multiple parties)

Interfaces:

type Request interface {
    Event
    RequestID() string  // Unique ID for correlating responses
}

type Response interface {
    Event
    RequestID() string  // Links back to request
    Responder() string  // Who sent this response
}

Usage:

import (
    "context"
    "time"
    "haptic/pkg/events"
)

// Create request
req := MyValidationRequest{
    id: "req-123",
    data: configToValidate,
}

// Send request and wait for responses
result, err := bus.Request(ctx, req, events.RequestOptions{
    Timeout:            10 * time.Second,
    ExpectedResponders: []string{"validator-1", "validator-2", "validator-3"},
    MinResponses:       2,  // Optional: allow partial responses
})

if err != nil {
    // Timeout or context cancellation
    log.Error("request failed", "error", err)
}

// Process responses
for _, resp := range result.Responses {
    // Handle each response
}

// Check for missing responders
if len(result.Errors) > 0 {
    log.Warn("some responders did not reply", "errors", result.Errors)
}

Responder Implementation:

func (v *Validator) Run(ctx context.Context, bus *events.EventBus) {
    eventChan := bus.Subscribe(100)

    for {
        select {
        case event := <-eventChan:
            if req, ok := event.(MyValidationRequest); ok {
                // Process request
                valid, errors := v.validate(req.data)

                // Send response
                resp := MyValidationResponse{
                    reqID:     req.RequestID(),
                    responder: "validator-1",
                    valid:     valid,
                    errors:    errors,
                }
                bus.Publish(resp)
            }
        case <-ctx.Done():
            return
        }
    }
}

Design Principles

1. Generic Infrastructure

This package is domain-agnostic - it provides the plumbing, not the events:

  • EventBus, Request, Response are generic mechanisms
  • Event types are defined in pkg/controller/events
  • Could be extracted as a standalone library
2. Non-Blocking

The EventBus never blocks publishers:

  • Full subscriber channels → event dropped for that subscriber
  • Prevents slow consumers from blocking the system
  • Subscribers must drain their channels promptly
3. Thread-Safe

All operations are safe for concurrent access:

  • Multiple goroutines can publish simultaneously
  • Multiple goroutines can subscribe simultaneously
  • Thread-safe startup coordination
4. Simple API

Minimal surface area:

  • Publish(event) - send event to all subscribers
  • Subscribe(bufferSize) - create new event channel
  • Start() - release buffered events
  • Request(ctx, req, opts) - scatter-gather pattern

Testing

Tests use simple mock events and verify infrastructure behavior:

go test ./pkg/events/... -v

Test coverage includes:

  • Basic pub/sub
  • Startup coordination (buffering/replay)
  • Slow subscriber behavior
  • Concurrent publishing
  • Request-response coordination
  • Timeout handling
  • Context cancellation

Performance Characteristics

  • Publish: O(N) where N = number of subscribers (non-blocking select)
  • Subscribe: O(1) append to slice
  • Memory: Bounded by subscriber buffer sizes and pre-start buffer
  • Startup Buffer: O(M) where M = events published before Start()

When to Use

Use EventBus for:

  • Async pub/sub notifications
  • Observability events
  • Decoupling components
  • Event-driven workflows

Use Request() for:

  • Multi-phase validation
  • Distributed queries
  • Coordinated operations requiring responses

Don't Use for:

  • Direct function calls (if you don't need decoupling)
  • High-frequency data streams (consider channels instead)
  • Large payloads (events should be lightweight)

Integration

See docs/design.md for:

  • Event-driven architecture overview
  • Controller event catalog
  • Startup coordination flow
  • Request-response validation example
  • pkg/controller/events - Domain-specific event type definitions
  • pkg/controller/commentator - Event observability component
  • pkg/controller/reconciler - Event-driven reconciliation logic

Documentation

Overview

Package events provides an event bus for component coordination in the HAPTIC controller.

The event bus supports two communication patterns: 1. Async pub/sub: Fire-and-forget event publishing for observability and loose coupling 2. Sync request-response: Scatter-gather pattern for coordinated validation and queries

Index

Constants

View Source
const (
	// MaxPreStartBufferSize is the maximum number of events that can be buffered
	// before EventBus.Start() is called. This prevents unbounded memory growth
	// during startup if many events are published before subscribers are ready.
	// Events exceeding this limit are dropped with a warning.
	MaxPreStartBufferSize = 1000
)

Variables

This section is empty.

Functions

func Subscribe

func Subscribe[T Event](ctx context.Context, bus *EventBus, bufferSize int) <-chan T

Subscribe is a generic function that returns a typed channel for a specific event type.

This provides compile-time type safety by returning a channel of the specific event type T. Events are filtered to only include those that match type T.

The generic type T must be a pointer to a struct that implements the Event interface. This is required because events are typically published as pointer types.

Parameters:

  • ctx: Context for the goroutine lifetime
  • bus: The EventBus to subscribe to
  • bufferSize: Size of the output channel buffer

Returns a channel of type T that receives only events of that type.

Example:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventChan := events.Subscribe[*ReconciliationTriggeredEvent](ctx, bus, 100)
for event := range eventChan {
    // event is already *ReconciliationTriggeredEvent
    fmt.Println(event.Reason)
}

Note: The context controls the lifetime of the internal forwarding goroutine. When the context is cancelled, the goroutine stops, the subscription is removed from the bus, and the channel will stop receiving events.

func SubscribeMultiple

func SubscribeMultiple(ctx context.Context, bus *EventBus, bufferSize int, types ...string) <-chan Event

SubscribeMultiple is a generic function that returns a typed channel for multiple event types.

This is useful when a component needs to receive events of different types that share a common interface or base type.

Parameters:

  • ctx: Context for the goroutine lifetime
  • bus: The EventBus to subscribe to
  • bufferSize: Size of the output channel buffer
  • types: Event type strings to filter for

Returns a channel that receives events matching any of the specified types.

Example:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventChan := events.SubscribeMultiple(ctx, bus, 100,
    "reconciliation.triggered",
    "reconciliation.started",
    "reconciliation.completed")
for event := range eventChan {
    switch e := event.(type) {
    case *ReconciliationTriggeredEvent:
        // handle
    case *ReconciliationStartedEvent:
        // handle
    }
}

Types

type Event

type Event interface {
	// EventType returns a unique identifier for this event type.
	// Convention: use dot-notation like "config.parsed" or "deployment.completed"
	EventType() string

	// Timestamp returns when this event occurred.
	// Used for event correlation and temporal analysis.
	Timestamp() time.Time
}

Event is the base interface for all events in the system. Events are used for asynchronous pub/sub communication between components.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus provides centralized pub/sub coordination for all controller components.

The EventBus supports two patterns: - Publish() for async fire-and-forget events (observability, notifications) - Request() for sync scatter-gather pattern (validation, queries)

EventBus is thread-safe and can be used concurrently from multiple goroutines.

Startup Coordination: Events published before Start() is called are buffered and replayed after Start(). This prevents race conditions during component initialization.

Typed Subscriptions: In addition to universal subscriptions (Subscribe), the EventBus supports typed subscriptions (SubscribeTypes) that filter events at the bus level for efficiency.

func NewEventBus

func NewEventBus(capacity int) *EventBus

NewEventBus creates a new EventBus.

The bus starts in buffering mode - events published before Start() is called will be buffered and replayed when Start() is invoked. This ensures no events are lost during component initialization.

The capacity parameter sets the initial buffer size for pre-start events. Recommended: 100 for most applications.

func (*EventBus) Pause

func (b *EventBus) Pause()

Pause temporarily suspends event delivery, buffering events for later replay.

This reuses the existing preStartBuffer infrastructure used during startup. Events published while paused are buffered and will be replayed when Start() is called again.

Use cases:

  • Leadership transition (pause while starting leader-only components)
  • Hot reload scenarios
  • Testing

This method is idempotent - calling it when already paused has no effect. Thread-safe and can be called concurrently with Publish() and Subscribe().

Example:

// During leadership transition
bus.Pause()                                    // Buffer events
bus.Publish(BecameLeaderEvent{})               // Buffered
startLeaderOnlyComponents()                    // Components subscribe
bus.Start()                                    // Replay buffered events

func (*EventBus) Publish

func (b *EventBus) Publish(event Event) int

Publish sends an event to all subscribers.

If Start() has not been called yet, the event is buffered and will be replayed when Start() is invoked. This prevents events from being lost during component initialization.

After Start() is called, this is a non-blocking operation. If a subscriber's channel is full, the event is dropped for that subscriber to prevent slow consumers from blocking the entire system.

Returns the number of subscribers that successfully received the event. Returns 0 if event was buffered (before Start()).

func (*EventBus) Request

func (b *EventBus) Request(ctx context.Context, request Request, opts RequestOptions) (*RequestResult, error)

Request sends a request event and waits for responses using the scatter-gather pattern.

This is a synchronous operation that: 1. Publishes the request event to all subscribers (scatter phase) 2. Collects response events matching the request ID (gather phase) 3. Returns when all expected responders have replied or timeout occurs

The request must implement the Request interface to provide a unique RequestID for correlating responses.

Use this method when you need coordinated responses from multiple components, such as multi-phase validation or distributed queries.

Example:

req := NewConfigValidationRequest(config, version)
result, err := bus.Request(ctx, req, RequestOptions{
    Timeout: 10 * time.Second,
    ExpectedResponders: []string{"basic", "template", "jsonpath"},
})

func (*EventBus) Start

func (b *EventBus) Start()

Start releases all buffered events and switches the bus to normal operation mode.

This method should be called after all components have subscribed to the bus during application startup. It ensures that no events are lost during the initialization phase.

Behavior:

  1. Marks the bus as started
  2. Replays all buffered events to subscribers in order
  3. Clears the buffer
  4. All subsequent Publish() calls go directly to subscribers

This method is idempotent - calling it multiple times has no additional effect. Thread-safe and can be called concurrently with Publish() and Subscribe().

Example:

bus := NewEventBus(100)

// Components subscribe during setup
commentator := NewEventCommentator(bus, logger, 1000)
validator := NewValidator(bus)
// ... more subscribers ...

// Release buffered events
bus.Start()

func (*EventBus) Subscribe

func (b *EventBus) Subscribe(bufferSize int) <-chan Event

Subscribe creates a new subscription to the event bus.

The returned channel will receive all events published to the bus. The bufferSize parameter controls the channel buffer size - larger buffers reduce the chance of dropped events for slow consumers.

Subscribers must continuously read from the channel to avoid dropped events. A bufferSize of 100 is recommended for most use cases.

The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.

IMPORTANT: For all-replica components, call this method BEFORE EventBus.Start() to ensure buffered events are received. Subscribing after Start() will trigger a warning as it may indicate a bug. For leader-only components that intentionally subscribe late (after leader election), use SubscribeLeaderOnly() instead.

func (*EventBus) SubscribeLeaderOnly

func (b *EventBus) SubscribeLeaderOnly(bufferSize int) <-chan Event

SubscribeLeaderOnly creates a subscription for leader-only components.

This method is identical to Subscribe() but does not log a warning when called after EventBus.Start(). Use this for components that only run on the leader replica and are intentionally started after leader election.

Leader-only components rely on the state replay mechanism: all-replica components re-publish their cached state when BecameLeaderEvent is received, ensuring leader-only components don't miss critical state even though they subscribe late.

The returned channel is read-only and will never be closed. To stop receiving events, the subscriber should call Unsubscribe() to remove the subscription and prevent memory leaks.

func (*EventBus) SubscribeTypes

func (b *EventBus) SubscribeTypes(bufferSize int, eventTypes ...string) <-chan Event

SubscribeTypes creates a subscription that only receives events of the specified types.

This is more efficient than universal Subscribe() when a component only cares about specific event types, as filtering happens at the EventBus level rather than in each subscriber's event loop.

Parameters:

  • bufferSize: Size of the output channel buffer
  • eventTypes: Event type strings to filter for (from Event.EventType())

Returns a channel that receives only events matching the specified types. The channel is read-only and will never be closed.

To stop receiving events and prevent memory leaks, call UnsubscribeTyped() with the returned channel.

Example:

eventChan := bus.SubscribeTypes(100,
    "reconciliation.triggered",
    "template.rendered",
    "validation.completed")
defer bus.UnsubscribeTyped(eventChan) // Clean up when done
for event := range eventChan {
    // Only receives the specified event types
}

func (*EventBus) Unsubscribe

func (b *EventBus) Unsubscribe(ch <-chan Event)

Unsubscribe removes a subscription from the event bus.

This method should be called when a subscriber no longer needs to receive events, to prevent memory leaks. After calling Unsubscribe, the channel will no longer receive events.

Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.

This method is safe to call multiple times for the same channel.

func (*EventBus) UnsubscribeTyped

func (b *EventBus) UnsubscribeTyped(ch <-chan Event)

UnsubscribeTyped removes a typed subscription from the event bus.

This method should be called when a subscriber no longer needs to receive events from a typed subscription (created via SubscribeTypes), to prevent memory leaks.

Note: The channel is not closed by this method. The subscriber is responsible for draining any remaining events from the channel if needed.

This method is safe to call multiple times for the same channel.

type Request

type Request interface {
	Event
	// RequestID returns a unique identifier for this request.
	// Responses must include this ID to be correlated correctly.
	RequestID() string
}

Request is the interface for request events in the scatter-gather pattern.

Requests are broadcast to all subscribers (scatter phase), and responses are correlated by RequestID (gather phase).

type RequestOptions

type RequestOptions struct {
	// Timeout is the maximum time to wait for responses.
	// If zero, defaults to 10 seconds.
	Timeout time.Duration

	// ExpectedResponders lists the names of components expected to respond.
	// If empty, the request will wait indefinitely for responses.
	ExpectedResponders []string

	// MinResponses is the minimum number of responses required.
	// If zero, all ExpectedResponders must respond.
	// Set to a lower value to implement graceful degradation.
	MinResponses int
}

RequestOptions configures the behavior of a scatter-gather request.

type RequestResult

type RequestResult struct {
	// Responses contains all responses received before timeout or completion.
	Responses []Response

	// Errors contains error messages for responders that didn't respond
	// or timed out. Empty if all expected responders replied.
	Errors []string
}

RequestResult contains the aggregated results from a scatter-gather request.

type Response

type Response interface {
	Event
	// RequestID returns the ID of the request this response belongs to.
	RequestID() string
	// Responder returns the name of the component that sent this response.
	Responder() string
}

Response is the interface for response events in the scatter-gather pattern.

Each response includes the original RequestID and the name of the responder for tracking and debugging purposes.

Directories

Path Synopsis
Package ringbuffer provides a thread-safe generic ring buffer implementation.
Package ringbuffer provides a thread-safe generic ring buffer implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL