subscription

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package subscription provides real-time data subscription and notification capabilities. It supports dynamic subscription management, configurable notification channels, persistent subscription storage, and efficient event distribution. The package enables real-time monitoring of metrics with customizable alert thresholds.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataChangeDetector

type DataChangeDetector struct {
	// contains filtered or unexported fields
}

DataChangeDetector detects significant changes in subscription data

func NewDataChangeDetector

func NewDataChangeDetector() *DataChangeDetector

NewDataChangeDetector creates a new data change detector

func (*DataChangeDetector) DetectChanges

func (dcd *DataChangeDetector) DetectChanges(subscriptionID, providerID string, currentData interface{}) []Notification

DetectChanges analyzes data for significant changes

func (*DataChangeDetector) SetThreshold

func (dcd *DataChangeDetector) SetThreshold(metricName string, threshold float64)

SetThreshold sets a change threshold for a specific metric

type EnhancedSubscriptionCallback

type EnhancedSubscriptionCallback struct {
	// contains filtered or unexported fields
}

Enhanced subscription callback wrapper

func NewEnhancedSubscriptionCallback

func NewEnhancedSubscriptionCallback(
	callback plugin.DataCallback,
	subscriptionID, providerID string,
	notificationMgr *NotificationManager,
) *EnhancedSubscriptionCallback

NewEnhancedSubscriptionCallback creates an enhanced callback with change detection

func (*EnhancedSubscriptionCallback) Call

func (esc *EnhancedSubscriptionCallback) Call(data interface{}, err error)

Call invokes the callback with change detection and notifications

func (*EnhancedSubscriptionCallback) SetSubscriptionID

func (esc *EnhancedSubscriptionCallback) SetSubscriptionID(id string)

SetSubscriptionID sets the subscription ID after the subscription is created

type Notification

type Notification struct {
	ID             string                 `json:"id"`
	SubscriptionID string                 `json:"subscription_id"`
	ProviderID     string                 `json:"provider_id"`
	Level          NotificationLevel      `json:"level"`
	Title          string                 `json:"title"`
	Message        string                 `json:"message"`
	Data           interface{}            `json:"data"`
	Metadata       map[string]interface{} `json:"metadata"`
	Timestamp      time.Time              `json:"timestamp"`
	Acknowledged   bool                   `json:"acknowledged"`
}

Notification represents a data change notification

type NotificationCallback

type NotificationCallback func(notification Notification)

NotificationCallback is a function that handles notifications

type NotificationLevel

type NotificationLevel string

NotificationLevel represents the severity level of a notification

const (
	LevelInfo     NotificationLevel = "info"
	LevelWarning  NotificationLevel = "warning"
	LevelError    NotificationLevel = "error"
	LevelCritical NotificationLevel = "critical"
)

type NotificationManager

type NotificationManager struct {
	// contains filtered or unexported fields
}

NotificationManager manages notifications and callbacks

func NewNotificationManager

func NewNotificationManager(maxHistory int) *NotificationManager

NewNotificationManager creates a new notification manager

func (*NotificationManager) AcknowledgeNotification

func (nm *NotificationManager) AcknowledgeNotification(notificationID string) error

AcknowledgeNotification marks a notification as acknowledged

func (*NotificationManager) GetNotifications

func (nm *NotificationManager) GetNotifications(limit int) []Notification

GetNotifications returns recent notifications

func (*NotificationManager) GetStats

func (nm *NotificationManager) GetStats() map[string]interface{}

GetStats returns notification statistics

func (*NotificationManager) RegisterCallback

func (nm *NotificationManager) RegisterCallback(subscriptionType string, callback NotificationCallback)

RegisterCallback registers a callback for specific subscription types

func (*NotificationManager) SendNotification

func (nm *NotificationManager) SendNotification(notification Notification)

SendNotification sends a notification to registered callbacks

func (*NotificationManager) UnregisterCallback

func (nm *NotificationManager) UnregisterCallback(subscriptionType string)

UnregisterCallback removes a callback (this is a simplified implementation)

type PersistenceConfig

type PersistenceConfig struct {
	DataDir      string        `json:"data_dir"`
	AutoSave     bool          `json:"auto_save"`
	SaveInterval time.Duration `json:"save_interval"`
}

PersistenceConfig configuration for subscription persistence

type PersistentSubscription

type PersistentSubscription struct {
	ID             string                 `json:"id"`
	ProviderID     string                 `json:"provider_id"`
	Params         map[string]interface{} `json:"params"`
	CreatedAt      time.Time              `json:"created_at"`
	UpdateInterval time.Duration          `json:"update_interval"`
	Active         bool                   `json:"active"`
	AutoRestore    bool                   `json:"auto_restore"`
	Metadata       map[string]interface{} `json:"metadata"`
}

PersistentSubscription represents a subscription that can be persisted

type Subscription

type Subscription struct {
	ID             string                 `json:"id"`
	ProviderID     string                 `json:"provider_id"`
	Params         map[string]interface{} `json:"params"`
	Callback       plugin.DataCallback    `json:"-"`
	CreatedAt      time.Time              `json:"created_at"`
	LastUpdate     time.Time              `json:"last_update"`
	UpdateCount    int64                  `json:"update_count"`
	UpdateInterval time.Duration          `json:"update_interval"`
	Active         bool                   `json:"active"`
	ErrorCount     int                    `json:"error_count"`
	LastError      string                 `json:"last_error,omitempty"`
}

Subscription represents an active data subscription

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages data subscriptions for the observability plugin

func NewSubscriptionManager

func NewSubscriptionManager(client *prometheus.CachedClient) *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

func (*SubscriptionManager) GetData

func (sm *SubscriptionManager) GetData(ctx context.Context, providerID string, params map[string]interface{}) (interface{}, error)

GetData retrieves data for a subscription (public method)

func (*SubscriptionManager) GetStats

func (sm *SubscriptionManager) GetStats() map[string]interface{}

GetStats returns subscription manager statistics

func (*SubscriptionManager) GetSubscription

func (sm *SubscriptionManager) GetSubscription(subscriptionID plugin.SubscriptionID) (*Subscription, error)

GetSubscription returns subscription details

func (*SubscriptionManager) ListSubscriptions

func (sm *SubscriptionManager) ListSubscriptions() map[string]*Subscription

ListSubscriptions returns all active subscriptions

func (*SubscriptionManager) Start

func (sm *SubscriptionManager) Start(ctx context.Context) error

Start starts the subscription manager

func (*SubscriptionManager) Stop

func (sm *SubscriptionManager) Stop() error

Stop stops the subscription manager

func (*SubscriptionManager) Subscribe

func (sm *SubscriptionManager) Subscribe(providerID string, params map[string]interface{}, callback plugin.DataCallback) (plugin.SubscriptionID, error)

Subscribe creates a new data subscription

func (*SubscriptionManager) Unsubscribe

func (sm *SubscriptionManager) Unsubscribe(subscriptionID plugin.SubscriptionID) error

Unsubscribe removes a data subscription

type SubscriptionPersistence

type SubscriptionPersistence struct {
	// contains filtered or unexported fields
}

SubscriptionPersistence handles subscription persistence and recovery

func NewSubscriptionPersistence

func NewSubscriptionPersistence(config PersistenceConfig, subscriptionMgr *SubscriptionManager) (*SubscriptionPersistence, error)

NewSubscriptionPersistence creates a new subscription persistence manager

func (*SubscriptionPersistence) BackupSubscriptions

func (sp *SubscriptionPersistence) BackupSubscriptions() (string, error)

BackupSubscriptions creates a backup of all subscriptions

func (*SubscriptionPersistence) CleanupOldBackups

func (sp *SubscriptionPersistence) CleanupOldBackups(maxAge time.Duration) error

CleanupOldBackups removes backup files older than the specified duration

func (*SubscriptionPersistence) DeleteSubscription

func (sp *SubscriptionPersistence) DeleteSubscription(subscriptionID string) error

DeleteSubscription removes a persisted subscription

func (*SubscriptionPersistence) GetBackupList

func (sp *SubscriptionPersistence) GetBackupList() ([]string, error)

GetBackupList returns a list of available backup files

func (*SubscriptionPersistence) GetPersistenceStats

func (sp *SubscriptionPersistence) GetPersistenceStats() map[string]interface{}

GetPersistenceStats returns statistics about persistence operations

func (*SubscriptionPersistence) LoadSubscriptions

func (sp *SubscriptionPersistence) LoadSubscriptions() error

LoadSubscriptions loads subscriptions from disk

func (*SubscriptionPersistence) RestoreFromBackup

func (sp *SubscriptionPersistence) RestoreFromBackup(backupFile string) error

RestoreFromBackup restores subscriptions from a backup file

func (*SubscriptionPersistence) SaveSubscription

func (sp *SubscriptionPersistence) SaveSubscription(subscriptionID string) error

SaveSubscription saves a single subscription

func (*SubscriptionPersistence) SaveSubscriptions

func (sp *SubscriptionPersistence) SaveSubscriptions() error

SaveSubscriptions saves all current subscriptions to disk

func (*SubscriptionPersistence) Stop

func (sp *SubscriptionPersistence) Stop()

Stop stops the auto-save loop

type SubscriptionRecovery

type SubscriptionRecovery struct {
	// contains filtered or unexported fields
}

SubscriptionRecovery handles recovery of failed subscriptions

func NewSubscriptionRecovery

func NewSubscriptionRecovery(subscriptionMgr *SubscriptionManager, persistence *SubscriptionPersistence) *SubscriptionRecovery

NewSubscriptionRecovery creates a new subscription recovery manager

func (*SubscriptionRecovery) RecoverFailedSubscriptions

func (sr *SubscriptionRecovery) RecoverFailedSubscriptions(ctx context.Context) error

RecoverFailedSubscriptions attempts to recover failed subscriptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL