subscription

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package subscription provides real-time data subscription and notification capabilities. It supports dynamic subscription management, configurable notification channels, persistent subscription storage, and efficient event distribution. The package enables real-time monitoring of metrics with customizable alert thresholds.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataChangeDetector

type DataChangeDetector struct {
	// contains filtered or unexported fields
}

DataChangeDetector detects significant changes in subscription data

func NewDataChangeDetector

func NewDataChangeDetector() *DataChangeDetector

NewDataChangeDetector creates a new data change detector

func (*DataChangeDetector) DetectChanges

func (dcd *DataChangeDetector) DetectChanges(subscriptionID, providerID string, currentData interface{}) []Notification

DetectChanges analyzes data for significant changes

func (*DataChangeDetector) SetThreshold

func (dcd *DataChangeDetector) SetThreshold(metricName string, threshold float64)

SetThreshold sets a change threshold for a specific metric

type EnhancedSubscriptionCallback

type EnhancedSubscriptionCallback struct {
	// contains filtered or unexported fields
}

EnhancedSubscriptionCallback wraps a callback with change detection and notification management.

func NewEnhancedSubscriptionCallback

func NewEnhancedSubscriptionCallback(
	callback plugin.DataCallback,
	subscriptionID, providerID string,
	notificationMgr *NotificationManager,
) *EnhancedSubscriptionCallback

NewEnhancedSubscriptionCallback creates an enhanced callback with change detection

func (*EnhancedSubscriptionCallback) Call

func (esc *EnhancedSubscriptionCallback) Call(data interface{}, err error)

Call invokes the callback with change detection and notifications

func (*EnhancedSubscriptionCallback) SetSubscriptionID

func (esc *EnhancedSubscriptionCallback) SetSubscriptionID(id string)

SetSubscriptionID sets the subscription ID after the subscription is created

type Manager added in v0.3.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages data subscriptions for the observability plugin

func (*Manager) GetData added in v0.3.0

func (sm *Manager) GetData(ctx context.Context, providerID string, params map[string]interface{}) (interface{}, error)

GetData retrieves data for a subscription (public method)

func (*Manager) GetStats added in v0.3.0

func (sm *Manager) GetStats() map[string]interface{}

GetStats returns subscription manager statistics

func (*Manager) GetSubscription added in v0.3.0

func (sm *Manager) GetSubscription(subscriptionID plugin.SubscriptionID) (*Subscription, error)

GetSubscription returns subscription details

func (*Manager) ListSubscriptions added in v0.3.0

func (sm *Manager) ListSubscriptions() map[string]*Subscription

ListSubscriptions returns all active subscriptions

func (*Manager) Start added in v0.3.0

func (sm *Manager) Start(ctx context.Context) error

Start starts the subscription manager

func (*Manager) Stop added in v0.3.0

func (sm *Manager) Stop() error

Stop stops the subscription manager

func (*Manager) Subscribe added in v0.3.0

func (sm *Manager) Subscribe(providerID string, params map[string]interface{}, callback plugin.DataCallback) (plugin.SubscriptionID, error)

Subscribe creates a new data subscription

func (*Manager) Unsubscribe added in v0.3.0

func (sm *Manager) Unsubscribe(subscriptionID plugin.SubscriptionID) error

Unsubscribe removes a data subscription

type Notification

type Notification struct {
	ID             string                 `json:"id"`
	SubscriptionID string                 `json:"subscription_id"`
	ProviderID     string                 `json:"provider_id"`
	Level          NotificationLevel      `json:"level"`
	Title          string                 `json:"title"`
	Message        string                 `json:"message"`
	Data           interface{}            `json:"data"`
	Metadata       map[string]interface{} `json:"metadata"`
	Timestamp      time.Time              `json:"timestamp"`
	Acknowledged   bool                   `json:"acknowledged"`
}

Notification represents a data change notification

type NotificationCallback

type NotificationCallback func(notification Notification)

NotificationCallback is a function that handles notifications

type NotificationLevel

type NotificationLevel string

NotificationLevel represents the severity level of a notification

const (
	// LevelInfo is the info notification level.
	LevelInfo NotificationLevel = "info"
	// LevelWarning is the warning notification level.
	LevelWarning NotificationLevel = "warning"
	// LevelError is the error notification level.
	LevelError NotificationLevel = "error"
	// LevelCritical is the critical notification level.
	LevelCritical NotificationLevel = "critical"
)

type NotificationManager

type NotificationManager struct {
	// contains filtered or unexported fields
}

NotificationManager manages notifications and callbacks

func NewNotificationManager

func NewNotificationManager(maxHistory int) *NotificationManager

NewNotificationManager creates a new notification manager

func (*NotificationManager) AcknowledgeNotification

func (nm *NotificationManager) AcknowledgeNotification(notificationID string) error

AcknowledgeNotification marks a notification as acknowledged

func (*NotificationManager) GetNotifications

func (nm *NotificationManager) GetNotifications(limit int) []Notification

GetNotifications returns recent notifications

func (*NotificationManager) GetStats

func (nm *NotificationManager) GetStats() map[string]interface{}

GetStats returns notification statistics

func (*NotificationManager) RegisterCallback

func (nm *NotificationManager) RegisterCallback(subscriptionType string, callback NotificationCallback)

RegisterCallback registers a callback for specific subscription types

func (*NotificationManager) SendNotification

func (nm *NotificationManager) SendNotification(notification *Notification)

SendNotification sends a notification to registered callbacks

func (*NotificationManager) UnregisterCallback

func (nm *NotificationManager) UnregisterCallback(subscriptionType string)

UnregisterCallback removes a callback (this is a simplified implementation)

type Persistence added in v0.3.0

type Persistence struct {
	// contains filtered or unexported fields
}

Persistence handles subscription persistence and recovery

func NewSubscriptionPersistence

func NewSubscriptionPersistence(config PersistenceConfig, subscriptionMgr *Manager) (*Persistence, error)

NewSubscriptionPersistence creates a new subscription persistence manager

func (*Persistence) BackupSubscriptions added in v0.3.0

func (sp *Persistence) BackupSubscriptions() (string, error)

BackupSubscriptions creates a backup of all subscriptions

func (*Persistence) CleanupOldBackups added in v0.3.0

func (sp *Persistence) CleanupOldBackups(maxAge time.Duration) error

CleanupOldBackups removes backup files older than the specified duration

func (*Persistence) DeleteSubscription added in v0.3.0

func (sp *Persistence) DeleteSubscription(subscriptionID string) error

DeleteSubscription removes a persisted subscription

func (*Persistence) GetBackupList added in v0.3.0

func (sp *Persistence) GetBackupList() ([]string, error)

GetBackupList returns a list of available backup files

func (*Persistence) GetPersistenceStats added in v0.3.0

func (sp *Persistence) GetPersistenceStats() map[string]interface{}

GetPersistenceStats returns statistics about persistence operations

func (*Persistence) LoadSubscriptions added in v0.3.0

func (sp *Persistence) LoadSubscriptions() error

LoadSubscriptions loads subscriptions from disk

func (*Persistence) RestoreFromBackup added in v0.3.0

func (sp *Persistence) RestoreFromBackup(backupFile string) error

RestoreFromBackup restores subscriptions from a backup file

func (*Persistence) SaveSubscription added in v0.3.0

func (sp *Persistence) SaveSubscription(subscriptionID string) error

SaveSubscription saves a single subscription

func (*Persistence) SaveSubscriptions added in v0.3.0

func (sp *Persistence) SaveSubscriptions() error

SaveSubscriptions saves all current subscriptions to disk

func (*Persistence) Stop added in v0.3.0

func (sp *Persistence) Stop()

Stop stops the auto-save loop

type PersistenceConfig

type PersistenceConfig struct {
	DataDir      string        `json:"data_dir"`
	AutoSave     bool          `json:"auto_save"`
	SaveInterval time.Duration `json:"save_interval"`
}

PersistenceConfig configuration for subscription persistence

type PersistentSubscription

type PersistentSubscription struct {
	ID             string                 `json:"id"`
	ProviderID     string                 `json:"provider_id"`
	Params         map[string]interface{} `json:"params"`
	CreatedAt      time.Time              `json:"created_at"`
	UpdateInterval time.Duration          `json:"update_interval"`
	Active         bool                   `json:"active"`
	AutoRestore    bool                   `json:"auto_restore"`
	Metadata       map[string]interface{} `json:"metadata"`
}

PersistentSubscription represents a subscription that can be persisted

type Recovery added in v0.3.0

type Recovery struct {
	// contains filtered or unexported fields
}

Recovery handles recovery of failed subscriptions

func NewSubscriptionRecovery

func NewSubscriptionRecovery(subscriptionMgr *Manager, persistence *Persistence) *Recovery

NewSubscriptionRecovery creates a new subscription recovery manager

func (*Recovery) RecoverFailedSubscriptions added in v0.3.0

func (sr *Recovery) RecoverFailedSubscriptions(ctx context.Context) error

RecoverFailedSubscriptions attempts to recover failed subscriptions

type Subscription

type Subscription struct {
	ID             string                 `json:"id"`
	ProviderID     string                 `json:"provider_id"`
	Params         map[string]interface{} `json:"params"`
	Callback       plugin.DataCallback    `json:"-"`
	CreatedAt      time.Time              `json:"created_at"`
	LastUpdate     time.Time              `json:"last_update"`
	UpdateCount    int64                  `json:"update_count"`
	UpdateInterval time.Duration          `json:"update_interval"`
	Active         bool                   `json:"active"`
	ErrorCount     int                    `json:"error_count"`
	LastError      string                 `json:"last_error,omitempty"`
}

Subscription represents an active data subscription

type SubscriptionManager

type SubscriptionManager = Manager

func NewSubscriptionManager

func NewSubscriptionManager(client *prometheus.CachedClient) *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

type SubscriptionPersistence

type SubscriptionPersistence = Persistence

type SubscriptionRecovery

type SubscriptionRecovery = Recovery

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL