eventhub

package
v0.0.0-...-f9b1259 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrGatewayNotFound      = errors.New("gateway not found")
	ErrGatewayAlreadyExists = errors.New("gateway already exists")
	ErrSubscriberNotFound   = errors.New("subscriber not found")
)

Sentinel errors for gateway operations.

Functions

This section is empty.

Types

type Config

type Config struct {
	PollInterval    time.Duration
	CleanupInterval time.Duration
	RetentionPeriod time.Duration
}

Config holds configuration for the EventHub

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults

type Event

type Event struct {
	GatewayID           string    `json:"gateway_id"`
	ProcessedTimestamp  time.Time `json:"processed_timestamp"`
	OriginatedTimestamp time.Time `json:"originated_timestamp"`
	EventType           EventType `json:"event_type"`
	Action              string    `json:"action"`
	EntityID            string    `json:"entity_id"`
	EventID             string    `json:"event_id"`
	// EventData carries optional event-specific details that are not already
	// represented by top-level fields such as Action and EntityID.
	EventData string `json:"event_data"`
}

Event represents a change event in the system

type EventHub

type EventHub interface {
	// Initialize sets up the event hub
	Initialize() error
	// RegisterGateway registers a new gateway for event tracking.
	RegisterGateway(gatewayID string) error
	// PublishEvent publishes an event for a gateway.
	PublishEvent(gatewayID string, event Event) error
	// Subscribe subscribes to events for a gateway.
	Subscribe(gatewayID string) (<-chan Event, error)
	// Unsubscribe removes a specific subscription for a gateway.
	Unsubscribe(gatewayID string, subscriber <-chan Event) error
	// UnsubscribeAll removes all subscriptions for a gateway.
	UnsubscribeAll(gatewayID string) error
	// CleanUpEvents removes old events
	CleanUpEvents() error
	// Close gracefully shuts down the event hub
	Close() error
}

EventHub defines the interface for event publishing and subscribing

func New

func New(db *sql.DB, logger *slog.Logger, config Config) EventHub

New creates a new EventHub backed by SQLite

func NewWithBackend

func NewWithBackend(backend EventhubImpl, logger *slog.Logger) EventHub

NewWithBackend creates a new EventHub with a custom backend

type EventType

type EventType string

EventType represents the type of event

const (
	// EventTypeAPI represents an API configuration change event
	EventTypeAPI EventType = "API"
	// EventTypeAPIKey represents an API key change event
	EventTypeAPIKey EventType = "API_KEY"
	// EventTypeCertificate represents a certificate change event
	EventTypeCertificate EventType = "CERTIFICATE"
	// EventTypeSubscription represents a subscription change event
	EventTypeSubscription EventType = "SUBSCRIPTION"
	// EventTypeSubscriptionPlan represents a subscription plan change event
	EventTypeSubscriptionPlan EventType = "SUBSCRIPTION_PLAN"
	// EventTypeApplication represents an application metadata change event
	EventTypeApplication EventType = "APPLICATION"
	// EventTypeLLMProvider represents an LLM provider change event
	EventTypeLLMProvider EventType = "LLM_PROVIDER"
	// EventTypeLLMProxy represents an LLM proxy change event
	EventTypeLLMProxy EventType = "LLM_PROXY"
	// EventTypeLLMTemplate represents an LLM template change event
	EventTypeLLMTemplate EventType = "LLM_TEMPLATE"
	// EventTypeMCPProxy represents an MCP proxy change event
	EventTypeMCPProxy EventType = "MCP_PROXY"

	// EmptyEventData is the canonical JSON payload for events that do not
	// require additional data beyond the top-level event fields.
	EmptyEventData = "{}"
)

type EventhubImpl

type EventhubImpl interface {
	// Initialize sets up the backend
	Initialize() error
	// RegisterGateway registers a new gateway for event tracking.
	RegisterGateway(gatewayID string) error
	// Publish publishes an event for a gateway.
	Publish(gatewayID string, event Event) error
	// Subscribe subscribes to events for a gateway, returning a channel.
	Subscribe(gatewayID string) (<-chan Event, error)
	// Unsubscribe removes a specific subscription for a gateway.
	Unsubscribe(gatewayID string, subscriber <-chan Event) error
	// UnsubscribeAll removes all subscriptions for a gateway.
	UnsubscribeAll(gatewayID string) error
	// Cleanup removes events older than the retention period
	Cleanup(retentionPeriod time.Duration) error
	// CleanupRange removes events in a time range for a gateway.
	CleanupRange(gatewayID string, before time.Time) error
	// Close gracefully shuts down the backend
	Close() error
}

EventhubImpl defines the backend interface for pluggable event hub implementations

type GatewayState

type GatewayState struct {
	GatewayID string    `json:"gateway_id"`
	VersionID string    `json:"version_id"`
	UpdatedAt time.Time `json:"updated_at"`
}

GatewayState tracks the version state of a gateway.

type SQLBackend

type SQLBackend struct {
	// contains filtered or unexported fields
}

SQLBackend implements EventhubImpl using SQL polling

func NewSQLBackend

func NewSQLBackend(db *sql.DB, logger *slog.Logger, config SQLBackendConfig) *SQLBackend

NewSQLBackend creates a new SQL-backed event hub

func (*SQLBackend) Cleanup

func (b *SQLBackend) Cleanup(retentionPeriod time.Duration) error

Cleanup removes events older than the retention period

func (*SQLBackend) CleanupRange

func (b *SQLBackend) CleanupRange(gatewayID string, before time.Time) error

CleanupRange removes events for a gateway before a given time.

func (*SQLBackend) Close

func (b *SQLBackend) Close() error

Close gracefully shuts down the backend

func (*SQLBackend) Initialize

func (b *SQLBackend) Initialize() error

Initialize prepares statements and starts background goroutines

func (*SQLBackend) Publish

func (b *SQLBackend) Publish(gatewayID string, event Event) error

Publish publishes an event atomically (insert event + update gateway version).

func (*SQLBackend) RegisterGateway

func (b *SQLBackend) RegisterGateway(gatewayID string) error

RegisterGateway registers a new gateway for event tracking.

func (*SQLBackend) Subscribe

func (b *SQLBackend) Subscribe(gatewayID string) (<-chan Event, error)

Subscribe subscribes to events for a gateway.

func (*SQLBackend) Unsubscribe

func (b *SQLBackend) Unsubscribe(gatewayID string, subscriber <-chan Event) error

Unsubscribe removes a specific subscription for a gateway.

func (*SQLBackend) UnsubscribeAll

func (b *SQLBackend) UnsubscribeAll(gatewayID string) error

UnsubscribeAll removes all subscriptions for a gateway.

type SQLBackendConfig

type SQLBackendConfig struct {
	PollInterval         time.Duration
	CleanupInterval      time.Duration
	RetentionPeriod      time.Duration
	GatewayStatePageSize int
}

SQLBackendConfig holds configuration for the SQL backend

func DefaultSQLBackendConfig

func DefaultSQLBackendConfig() SQLBackendConfig

DefaultSQLBackendConfig returns a SQLBackendConfig with sensible defaults

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL