Documentation
¶
Index ¶
- Variables
- type Config
- type Event
- type EventHub
- type EventType
- type EventhubImpl
- type GatewayState
- type SQLBackend
- func (b *SQLBackend) Cleanup(retentionPeriod time.Duration) error
- func (b *SQLBackend) CleanupRange(gatewayID string, before time.Time) error
- func (b *SQLBackend) Close() error
- func (b *SQLBackend) Initialize() error
- func (b *SQLBackend) Publish(gatewayID string, event Event) error
- func (b *SQLBackend) RegisterGateway(gatewayID string) error
- func (b *SQLBackend) Subscribe(gatewayID string) (<-chan Event, error)
- func (b *SQLBackend) Unsubscribe(gatewayID string, subscriber <-chan Event) error
- func (b *SQLBackend) UnsubscribeAll(gatewayID string) error
- type SQLBackendConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrGatewayNotFound = errors.New("gateway not found") ErrGatewayAlreadyExists = errors.New("gateway already exists") ErrSubscriberNotFound = errors.New("subscriber not found") )
Sentinel errors for gateway operations.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
PollInterval time.Duration
CleanupInterval time.Duration
RetentionPeriod time.Duration
}
Config holds configuration for the EventHub
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults
type Event ¶
type Event struct {
GatewayID string `json:"gateway_id"`
ProcessedTimestamp time.Time `json:"processed_timestamp"`
OriginatedTimestamp time.Time `json:"originated_timestamp"`
EventType EventType `json:"event_type"`
Action string `json:"action"`
EntityID string `json:"entity_id"`
EventID string `json:"event_id"`
// EventData carries optional event-specific details that are not already
// represented by top-level fields such as Action and EntityID.
EventData string `json:"event_data"`
}
Event represents a change event in the system
type EventHub ¶
type EventHub interface {
// Initialize sets up the event hub
Initialize() error
// RegisterGateway registers a new gateway for event tracking.
RegisterGateway(gatewayID string) error
// PublishEvent publishes an event for a gateway.
PublishEvent(gatewayID string, event Event) error
// Subscribe subscribes to events for a gateway.
Subscribe(gatewayID string) (<-chan Event, error)
// Unsubscribe removes a specific subscription for a gateway.
Unsubscribe(gatewayID string, subscriber <-chan Event) error
// UnsubscribeAll removes all subscriptions for a gateway.
UnsubscribeAll(gatewayID string) error
// CleanUpEvents removes old events
CleanUpEvents() error
// Close gracefully shuts down the event hub
Close() error
}
EventHub defines the interface for event publishing and subscribing
func NewWithBackend ¶
func NewWithBackend(backend EventhubImpl, logger *slog.Logger) EventHub
NewWithBackend creates a new EventHub with a custom backend
type EventType ¶
type EventType string
EventType represents the type of event
const ( // EventTypeAPI represents an API configuration change event EventTypeAPI EventType = "API" // EventTypeAPIKey represents an API key change event EventTypeAPIKey EventType = "API_KEY" // EventTypeCertificate represents a certificate change event EventTypeCertificate EventType = "CERTIFICATE" // EventTypeSubscription represents a subscription change event EventTypeSubscription EventType = "SUBSCRIPTION" // EventTypeSubscriptionPlan represents a subscription plan change event EventTypeSubscriptionPlan EventType = "SUBSCRIPTION_PLAN" // EventTypeApplication represents an application metadata change event EventTypeApplication EventType = "APPLICATION" // EventTypeLLMProvider represents an LLM provider change event EventTypeLLMProvider EventType = "LLM_PROVIDER" // EventTypeLLMProxy represents an LLM proxy change event EventTypeLLMProxy EventType = "LLM_PROXY" // EventTypeLLMTemplate represents an LLM template change event EventTypeLLMTemplate EventType = "LLM_TEMPLATE" // EventTypeMCPProxy represents an MCP proxy change event EventTypeMCPProxy EventType = "MCP_PROXY" // EmptyEventData is the canonical JSON payload for events that do not // require additional data beyond the top-level event fields. EmptyEventData = "{}" )
type EventhubImpl ¶
type EventhubImpl interface {
// Initialize sets up the backend
Initialize() error
// RegisterGateway registers a new gateway for event tracking.
RegisterGateway(gatewayID string) error
// Publish publishes an event for a gateway.
Publish(gatewayID string, event Event) error
// Subscribe subscribes to events for a gateway, returning a channel.
Subscribe(gatewayID string) (<-chan Event, error)
// Unsubscribe removes a specific subscription for a gateway.
Unsubscribe(gatewayID string, subscriber <-chan Event) error
// UnsubscribeAll removes all subscriptions for a gateway.
UnsubscribeAll(gatewayID string) error
// Cleanup removes events older than the retention period
Cleanup(retentionPeriod time.Duration) error
// CleanupRange removes events in a time range for a gateway.
CleanupRange(gatewayID string, before time.Time) error
// Close gracefully shuts down the backend
Close() error
}
EventhubImpl defines the backend interface for pluggable event hub implementations
type GatewayState ¶
type GatewayState struct {
GatewayID string `json:"gateway_id"`
VersionID string `json:"version_id"`
UpdatedAt time.Time `json:"updated_at"`
}
GatewayState tracks the version state of a gateway.
type SQLBackend ¶
type SQLBackend struct {
// contains filtered or unexported fields
}
SQLBackend implements EventhubImpl using SQL polling
func NewSQLBackend ¶
func NewSQLBackend(db *sql.DB, logger *slog.Logger, config SQLBackendConfig) *SQLBackend
NewSQLBackend creates a new SQL-backed event hub
func (*SQLBackend) Cleanup ¶
func (b *SQLBackend) Cleanup(retentionPeriod time.Duration) error
Cleanup removes events older than the retention period
func (*SQLBackend) CleanupRange ¶
func (b *SQLBackend) CleanupRange(gatewayID string, before time.Time) error
CleanupRange removes events for a gateway before a given time.
func (*SQLBackend) Close ¶
func (b *SQLBackend) Close() error
Close gracefully shuts down the backend
func (*SQLBackend) Initialize ¶
func (b *SQLBackend) Initialize() error
Initialize prepares statements and starts background goroutines
func (*SQLBackend) Publish ¶
func (b *SQLBackend) Publish(gatewayID string, event Event) error
Publish publishes an event atomically (insert event + update gateway version).
func (*SQLBackend) RegisterGateway ¶
func (b *SQLBackend) RegisterGateway(gatewayID string) error
RegisterGateway registers a new gateway for event tracking.
func (*SQLBackend) Subscribe ¶
func (b *SQLBackend) Subscribe(gatewayID string) (<-chan Event, error)
Subscribe subscribes to events for a gateway.
func (*SQLBackend) Unsubscribe ¶
func (b *SQLBackend) Unsubscribe(gatewayID string, subscriber <-chan Event) error
Unsubscribe removes a specific subscription for a gateway.
func (*SQLBackend) UnsubscribeAll ¶
func (b *SQLBackend) UnsubscribeAll(gatewayID string) error
UnsubscribeAll removes all subscriptions for a gateway.
type SQLBackendConfig ¶
type SQLBackendConfig struct {
PollInterval time.Duration
CleanupInterval time.Duration
RetentionPeriod time.Duration
GatewayStatePageSize int
}
SQLBackendConfig holds configuration for the SQL backend
func DefaultSQLBackendConfig ¶
func DefaultSQLBackendConfig() SQLBackendConfig
DefaultSQLBackendConfig returns a SQLBackendConfig with sensible defaults