Documentation
¶
Overview ¶
Package admin provides administration and caching services for CBT
Index ¶
- Variables
- type BoundsCache
- type BoundsLock
- type CacheManager
- func (c *CacheManager) AcquireLock(ctx context.Context, modelID string) (BoundsLock, error)
- func (c *CacheManager) DeleteAllConfigOverrides(ctx context.Context) error
- func (c *CacheManager) DeleteBounds(ctx context.Context, modelID string) error
- func (c *CacheManager) DeleteConfigOverride(ctx context.Context, modelID string) error
- func (c *CacheManager) GetAllConfigOverrides(ctx context.Context) ([]ConfigOverride, error)
- func (c *CacheManager) GetBounds(ctx context.Context, modelID string) (*BoundsCache, error)
- func (c *CacheManager) GetConfigOverride(ctx context.Context, modelID string) (*ConfigOverride, error)
- func (c *CacheManager) GetConfigOverrideVersion(ctx context.Context) (int64, error)
- func (c *CacheManager) SetBounds(ctx context.Context, cache *BoundsCache) error
- func (c *CacheManager) SetConfigOverride(ctx context.Context, override *ConfigOverride) error
- type ConfigOverride
- type GapInfo
- type ProcessedRange
- type Service
- type TableConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrCacheManagerUnavailable = errors.New("cache manager not available") )
var ErrLockTimeout = errors.New("timeout acquiring bounds lock")
ErrLockTimeout is returned when lock acquisition times out
Functions ¶
This section is empty.
Types ¶
type BoundsCache ¶ added in v0.0.2
type BoundsCache struct {
ModelID string `json:"model_id"`
Min uint64 `json:"min"`
Max uint64 `json:"max"`
// Track scan times
LastIncrementalScan time.Time `json:"last_incremental_scan"`
LastFullScan time.Time `json:"last_full_scan"`
// For optimization hints
PreviousMin uint64 `json:"previous_min"`
PreviousMax uint64 `json:"previous_max"`
// Initial scan tracking
InitialScanComplete bool `json:"initial_scan_complete"`
InitialScanStarted *time.Time `json:"initial_scan_started,omitempty"`
// Metadata
UpdatedAt time.Time `json:"updated_at"`
}
BoundsCache represents cached external model bounds
type BoundsLock ¶ added in v0.0.54
BoundsLock represents a distributed lock for bounds updates
type CacheManager ¶
type CacheManager struct {
// contains filtered or unexported fields
}
CacheManager manages Redis-based caching for external models
func NewCacheManager ¶
func NewCacheManager(redisClient *redis.Client) *CacheManager
NewCacheManager creates a new cache manager instance
func (*CacheManager) AcquireLock ¶ added in v0.0.54
func (c *CacheManager) AcquireLock(ctx context.Context, modelID string) (BoundsLock, error)
AcquireLock acquires a distributed lock for bounds updates on a specific model. The lock uses redsync (Redlock algorithm) with exponential backoff for retries. Returns a BoundsLock that must be released when done.
func (*CacheManager) DeleteAllConfigOverrides ¶ added in v0.1.0
func (c *CacheManager) DeleteAllConfigOverrides(ctx context.Context) error
DeleteAllConfigOverrides removes all config overrides and increments the version counter.
func (*CacheManager) DeleteBounds ¶ added in v0.1.0
func (c *CacheManager) DeleteBounds(ctx context.Context, modelID string) error
DeleteBounds removes cached external model bounds from Redis
func (*CacheManager) DeleteConfigOverride ¶ added in v0.1.0
func (c *CacheManager) DeleteConfigOverride(ctx context.Context, modelID string) error
DeleteConfigOverride removes a config override and increments the version counter.
func (*CacheManager) GetAllConfigOverrides ¶ added in v0.1.0
func (c *CacheManager) GetAllConfigOverrides(ctx context.Context) ([]ConfigOverride, error)
GetAllConfigOverrides retrieves all config overrides using SCAN.
func (*CacheManager) GetBounds ¶ added in v0.0.2
func (c *CacheManager) GetBounds(ctx context.Context, modelID string) (*BoundsCache, error)
GetBounds retrieves cached external model bounds from Redis
func (*CacheManager) GetConfigOverride ¶ added in v0.1.0
func (c *CacheManager) GetConfigOverride(ctx context.Context, modelID string) (*ConfigOverride, error)
GetConfigOverride retrieves a config override for a specific model.
func (*CacheManager) GetConfigOverrideVersion ¶ added in v0.1.0
func (c *CacheManager) GetConfigOverrideVersion(ctx context.Context) (int64, error)
GetConfigOverrideVersion returns the current version counter for change detection.
func (*CacheManager) SetBounds ¶ added in v0.0.2
func (c *CacheManager) SetBounds(ctx context.Context, cache *BoundsCache) error
SetBounds stores external model bounds in Redis cache (no TTL)
func (*CacheManager) SetConfigOverride ¶ added in v0.1.0
func (c *CacheManager) SetConfigOverride(ctx context.Context, override *ConfigOverride) error
SetConfigOverride stores a config override and increments the version counter.
type ConfigOverride ¶ added in v0.1.0
type ConfigOverride struct {
ModelID string `json:"model_id"`
ModelType string `json:"model_type"` // "transformation" or "external"
Enabled *bool `json:"enabled,omitempty"`
Override json.RawMessage `json:"override"`
UpdatedAt time.Time `json:"updated_at"`
}
ConfigOverride represents a live configuration override stored in Redis.
type ProcessedRange ¶ added in v0.0.25
ProcessedRange represents a processed range from the admin table
type Service ¶
type Service interface {
// Position tracking (for incremental transformations)
GetNextUnprocessedPosition(ctx context.Context, modelID string) (uint64, error) // Returns next position for forward fill
GetLastProcessedPosition(ctx context.Context, modelID string) (uint64, error) // Returns position of last record
GetFirstPosition(ctx context.Context, modelID string) (uint64, error)
RecordCompletion(ctx context.Context, modelID string, position, interval uint64) error
// Scheduled transformation tracking
RecordScheduledCompletion(ctx context.Context, modelID string, startDateTime time.Time) error
GetLastScheduledExecution(ctx context.Context, modelID string) (*time.Time, error)
GetAllLastScheduledExecutions(ctx context.Context, modelIDs []string) (map[string]*time.Time, error)
// Coverage and gap management
GetCoverage(ctx context.Context, modelID string, startPos, endPos uint64) (bool, error)
GetProcessedRanges(ctx context.Context, modelID string) ([]ProcessedRange, error)
GetAllProcessedRanges(ctx context.Context, modelIDs []string) (map[string][]ProcessedRange, error)
FindGaps(ctx context.Context, modelID string, minPos, maxPos, interval uint64) ([]GapInfo, error)
// Consolidation
ConsolidateHistoricalData(ctx context.Context, modelID string) (uint64, error)
// Period deletion
DeletePeriod(ctx context.Context, modelID string, startPos, endPos uint64) (uint64, error)
// External bounds cache
GetExternalBounds(ctx context.Context, modelID string) (*BoundsCache, error)
SetExternalBounds(ctx context.Context, cache *BoundsCache) error
DeleteExternalBounds(ctx context.Context, modelID string) error
// Distributed locking for bounds updates
AcquireBoundsLock(ctx context.Context, modelID string) (BoundsLock, error)
// Admin table info
GetIncrementalAdminDatabase() string
GetIncrementalAdminTable() string
GetScheduledAdminDatabase() string
GetScheduledAdminTable() string
// Config override operations
GetConfigOverride(ctx context.Context, modelID string) (*ConfigOverride, error)
GetAllConfigOverrides(ctx context.Context) ([]ConfigOverride, error)
SetConfigOverride(ctx context.Context, override *ConfigOverride) error
DeleteConfigOverride(ctx context.Context, modelID string) error
DeleteAllConfigOverrides(ctx context.Context) error
GetConfigOverrideVersion(ctx context.Context) (int64, error)
// GetCacheManager returns the underlying cache manager
GetCacheManager() *CacheManager
}
Service defines the public interface for the admin service
func NewService ¶
func NewService(log logrus.FieldLogger, client clickhouse.ClientInterface, cluster, localSuffix string, config TableConfig, redisClient *redis.Client) Service
NewService creates a new admin table manager with type-specific admin tables