Documentation
¶
Overview ¶
Package subscription provides real-time data subscription and notification capabilities. It supports dynamic subscription management, configurable notification channels, persistent subscription storage, and efficient event distribution. The package enables real-time monitoring of metrics with customizable alert thresholds.
Index ¶
- type DataChangeDetector
- type EnhancedSubscriptionCallback
- type Manager
- func (sm *Manager) GetData(ctx context.Context, providerID string, params map[string]interface{}) (interface{}, error)
- func (sm *Manager) GetStats() map[string]interface{}
- func (sm *Manager) GetSubscription(subscriptionID plugin.SubscriptionID) (*Subscription, error)
- func (sm *Manager) ListSubscriptions() map[string]*Subscription
- func (sm *Manager) Start(ctx context.Context) error
- func (sm *Manager) Stop() error
- func (sm *Manager) Subscribe(providerID string, params map[string]interface{}, callback plugin.DataCallback) (plugin.SubscriptionID, error)
- func (sm *Manager) Unsubscribe(subscriptionID plugin.SubscriptionID) error
- type Notification
- type NotificationCallback
- type NotificationLevel
- type NotificationManager
- func (nm *NotificationManager) AcknowledgeNotification(notificationID string) error
- func (nm *NotificationManager) GetNotifications(limit int) []Notification
- func (nm *NotificationManager) GetStats() map[string]interface{}
- func (nm *NotificationManager) RegisterCallback(subscriptionType string, callback NotificationCallback)
- func (nm *NotificationManager) SendNotification(notification *Notification)
- func (nm *NotificationManager) UnregisterCallback(subscriptionType string)
- type Persistence
- func (sp *Persistence) BackupSubscriptions() (string, error)
- func (sp *Persistence) CleanupOldBackups(maxAge time.Duration) error
- func (sp *Persistence) DeleteSubscription(subscriptionID string) error
- func (sp *Persistence) GetBackupList() ([]string, error)
- func (sp *Persistence) GetPersistenceStats() map[string]interface{}
- func (sp *Persistence) LoadSubscriptions() error
- func (sp *Persistence) RestoreFromBackup(backupFile string) error
- func (sp *Persistence) SaveSubscription(subscriptionID string) error
- func (sp *Persistence) SaveSubscriptions() error
- func (sp *Persistence) Stop()
- type PersistenceConfig
- type PersistentSubscription
- type Recovery
- type Subscription
- type SubscriptionManager
- type SubscriptionPersistence
- type SubscriptionRecovery
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DataChangeDetector ¶
type DataChangeDetector struct {
// contains filtered or unexported fields
}
DataChangeDetector detects significant changes in subscription data
func NewDataChangeDetector ¶
func NewDataChangeDetector() *DataChangeDetector
NewDataChangeDetector creates a new data change detector
func (*DataChangeDetector) DetectChanges ¶
func (dcd *DataChangeDetector) DetectChanges(subscriptionID, providerID string, currentData interface{}) []Notification
DetectChanges analyzes data for significant changes
func (*DataChangeDetector) SetThreshold ¶
func (dcd *DataChangeDetector) SetThreshold(metricName string, threshold float64)
SetThreshold sets a change threshold for a specific metric
type EnhancedSubscriptionCallback ¶
type EnhancedSubscriptionCallback struct {
// contains filtered or unexported fields
}
EnhancedSubscriptionCallback wraps a callback with change detection and notification management.
func NewEnhancedSubscriptionCallback ¶
func NewEnhancedSubscriptionCallback( callback plugin.DataCallback, subscriptionID, providerID string, notificationMgr *NotificationManager, ) *EnhancedSubscriptionCallback
NewEnhancedSubscriptionCallback creates an enhanced callback with change detection
func (*EnhancedSubscriptionCallback) Call ¶
func (esc *EnhancedSubscriptionCallback) Call(data interface{}, err error)
Call invokes the callback with change detection and notifications
func (*EnhancedSubscriptionCallback) SetSubscriptionID ¶
func (esc *EnhancedSubscriptionCallback) SetSubscriptionID(id string)
SetSubscriptionID sets the subscription ID after the subscription is created
type Manager ¶ added in v0.3.0
type Manager struct {
// contains filtered or unexported fields
}
Manager manages data subscriptions for the observability plugin
func (*Manager) GetData ¶ added in v0.3.0
func (sm *Manager) GetData(ctx context.Context, providerID string, params map[string]interface{}) (interface{}, error)
GetData retrieves data for a subscription (public method)
func (*Manager) GetSubscription ¶ added in v0.3.0
func (sm *Manager) GetSubscription(subscriptionID plugin.SubscriptionID) (*Subscription, error)
GetSubscription returns subscription details
func (*Manager) ListSubscriptions ¶ added in v0.3.0
func (sm *Manager) ListSubscriptions() map[string]*Subscription
ListSubscriptions returns all active subscriptions
func (*Manager) Subscribe ¶ added in v0.3.0
func (sm *Manager) Subscribe(providerID string, params map[string]interface{}, callback plugin.DataCallback) (plugin.SubscriptionID, error)
Subscribe creates a new data subscription
func (*Manager) Unsubscribe ¶ added in v0.3.0
func (sm *Manager) Unsubscribe(subscriptionID plugin.SubscriptionID) error
Unsubscribe removes a data subscription
type Notification ¶
type Notification struct {
ID string `json:"id"`
SubscriptionID string `json:"subscription_id"`
ProviderID string `json:"provider_id"`
Level NotificationLevel `json:"level"`
Title string `json:"title"`
Message string `json:"message"`
Data interface{} `json:"data"`
Metadata map[string]interface{} `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Acknowledged bool `json:"acknowledged"`
}
Notification represents a data change notification
type NotificationCallback ¶
type NotificationCallback func(notification Notification)
NotificationCallback is a function that handles notifications
type NotificationLevel ¶
type NotificationLevel string
NotificationLevel represents the severity level of a notification
const ( // LevelInfo is the info notification level. LevelInfo NotificationLevel = "info" // LevelWarning is the warning notification level. LevelWarning NotificationLevel = "warning" // LevelError is the error notification level. LevelError NotificationLevel = "error" // LevelCritical is the critical notification level. LevelCritical NotificationLevel = "critical" )
type NotificationManager ¶
type NotificationManager struct {
// contains filtered or unexported fields
}
NotificationManager manages notifications and callbacks
func NewNotificationManager ¶
func NewNotificationManager(maxHistory int) *NotificationManager
NewNotificationManager creates a new notification manager
func (*NotificationManager) AcknowledgeNotification ¶
func (nm *NotificationManager) AcknowledgeNotification(notificationID string) error
AcknowledgeNotification marks a notification as acknowledged
func (*NotificationManager) GetNotifications ¶
func (nm *NotificationManager) GetNotifications(limit int) []Notification
GetNotifications returns recent notifications
func (*NotificationManager) GetStats ¶
func (nm *NotificationManager) GetStats() map[string]interface{}
GetStats returns notification statistics
func (*NotificationManager) RegisterCallback ¶
func (nm *NotificationManager) RegisterCallback(subscriptionType string, callback NotificationCallback)
RegisterCallback registers a callback for specific subscription types
func (*NotificationManager) SendNotification ¶
func (nm *NotificationManager) SendNotification(notification *Notification)
SendNotification sends a notification to registered callbacks
func (*NotificationManager) UnregisterCallback ¶
func (nm *NotificationManager) UnregisterCallback(subscriptionType string)
UnregisterCallback removes a callback (this is a simplified implementation)
type Persistence ¶ added in v0.3.0
type Persistence struct {
// contains filtered or unexported fields
}
Persistence handles subscription persistence and recovery
func NewSubscriptionPersistence ¶
func NewSubscriptionPersistence(config PersistenceConfig, subscriptionMgr *Manager) (*Persistence, error)
NewSubscriptionPersistence creates a new subscription persistence manager
func (*Persistence) BackupSubscriptions ¶ added in v0.3.0
func (sp *Persistence) BackupSubscriptions() (string, error)
BackupSubscriptions creates a backup of all subscriptions
func (*Persistence) CleanupOldBackups ¶ added in v0.3.0
func (sp *Persistence) CleanupOldBackups(maxAge time.Duration) error
CleanupOldBackups removes backup files older than the specified duration
func (*Persistence) DeleteSubscription ¶ added in v0.3.0
func (sp *Persistence) DeleteSubscription(subscriptionID string) error
DeleteSubscription removes a persisted subscription
func (*Persistence) GetBackupList ¶ added in v0.3.0
func (sp *Persistence) GetBackupList() ([]string, error)
GetBackupList returns a list of available backup files
func (*Persistence) GetPersistenceStats ¶ added in v0.3.0
func (sp *Persistence) GetPersistenceStats() map[string]interface{}
GetPersistenceStats returns statistics about persistence operations
func (*Persistence) LoadSubscriptions ¶ added in v0.3.0
func (sp *Persistence) LoadSubscriptions() error
LoadSubscriptions loads subscriptions from disk
func (*Persistence) RestoreFromBackup ¶ added in v0.3.0
func (sp *Persistence) RestoreFromBackup(backupFile string) error
RestoreFromBackup restores subscriptions from a backup file
func (*Persistence) SaveSubscription ¶ added in v0.3.0
func (sp *Persistence) SaveSubscription(subscriptionID string) error
SaveSubscription saves a single subscription
func (*Persistence) SaveSubscriptions ¶ added in v0.3.0
func (sp *Persistence) SaveSubscriptions() error
SaveSubscriptions saves all current subscriptions to disk
func (*Persistence) Stop ¶ added in v0.3.0
func (sp *Persistence) Stop()
Stop stops the auto-save loop
type PersistenceConfig ¶
type PersistenceConfig struct {
DataDir string `json:"data_dir"`
AutoSave bool `json:"auto_save"`
SaveInterval time.Duration `json:"save_interval"`
}
PersistenceConfig configuration for subscription persistence
type PersistentSubscription ¶
type PersistentSubscription struct {
ID string `json:"id"`
ProviderID string `json:"provider_id"`
Params map[string]interface{} `json:"params"`
CreatedAt time.Time `json:"created_at"`
UpdateInterval time.Duration `json:"update_interval"`
Active bool `json:"active"`
AutoRestore bool `json:"auto_restore"`
Metadata map[string]interface{} `json:"metadata"`
}
PersistentSubscription represents a subscription that can be persisted
type Recovery ¶ added in v0.3.0
type Recovery struct {
// contains filtered or unexported fields
}
Recovery handles recovery of failed subscriptions
func NewSubscriptionRecovery ¶
func NewSubscriptionRecovery(subscriptionMgr *Manager, persistence *Persistence) *Recovery
NewSubscriptionRecovery creates a new subscription recovery manager
type Subscription ¶
type Subscription struct {
ID string `json:"id"`
ProviderID string `json:"provider_id"`
Params map[string]interface{} `json:"params"`
Callback plugin.DataCallback `json:"-"`
CreatedAt time.Time `json:"created_at"`
LastUpdate time.Time `json:"last_update"`
UpdateCount int64 `json:"update_count"`
UpdateInterval time.Duration `json:"update_interval"`
Active bool `json:"active"`
ErrorCount int `json:"error_count"`
LastError string `json:"last_error,omitempty"`
}
Subscription represents an active data subscription
type SubscriptionManager ¶
type SubscriptionManager = Manager
func NewSubscriptionManager ¶
func NewSubscriptionManager(client *prometheus.CachedClient) *SubscriptionManager
NewSubscriptionManager creates a new subscription manager
type SubscriptionPersistence ¶
type SubscriptionPersistence = Persistence
type SubscriptionRecovery ¶
type SubscriptionRecovery = Recovery