admin

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: GPL-3.0 Imports: 12 Imported by: 0

Documentation

Overview

Package admin provides administration and caching services for CBT

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCacheManagerUnavailable is returned when cache manager is not available
	ErrCacheManagerUnavailable = errors.New("cache manager not available")
)
View Source
var ErrLockTimeout = errors.New("timeout acquiring bounds lock")

ErrLockTimeout is returned when lock acquisition times out

Functions

This section is empty.

Types

type BoundsCache added in v0.0.2

type BoundsCache struct {
	ModelID string `json:"model_id"`
	Min     uint64 `json:"min"`
	Max     uint64 `json:"max"`

	// Track scan times
	LastIncrementalScan time.Time `json:"last_incremental_scan"`
	LastFullScan        time.Time `json:"last_full_scan"`

	// For optimization hints
	PreviousMin uint64 `json:"previous_min"`
	PreviousMax uint64 `json:"previous_max"`

	// Initial scan tracking
	InitialScanComplete bool       `json:"initial_scan_complete"`
	InitialScanStarted  *time.Time `json:"initial_scan_started,omitempty"`

	// Metadata
	UpdatedAt time.Time `json:"updated_at"`
}

BoundsCache represents cached external model bounds

type BoundsLock added in v0.0.54

type BoundsLock interface {
	Unlock(ctx context.Context) error
}

BoundsLock represents a distributed lock for bounds updates

type CacheManager

type CacheManager struct {
	// contains filtered or unexported fields
}

CacheManager manages Redis-based caching for external models

func NewCacheManager

func NewCacheManager(redisClient *redis.Client) *CacheManager

NewCacheManager creates a new cache manager instance

func (*CacheManager) AcquireLock added in v0.0.54

func (c *CacheManager) AcquireLock(ctx context.Context, modelID string) (BoundsLock, error)

AcquireLock acquires a distributed lock for bounds updates on a specific model. The lock uses redsync (Redlock algorithm) with exponential backoff for retries. Returns a BoundsLock that must be released when done.

func (*CacheManager) DeleteAllConfigOverrides added in v0.1.0

func (c *CacheManager) DeleteAllConfigOverrides(ctx context.Context) error

DeleteAllConfigOverrides removes all config overrides and increments the version counter.

func (*CacheManager) DeleteBounds added in v0.1.0

func (c *CacheManager) DeleteBounds(ctx context.Context, modelID string) error

DeleteBounds removes cached external model bounds from Redis

func (*CacheManager) DeleteConfigOverride added in v0.1.0

func (c *CacheManager) DeleteConfigOverride(ctx context.Context, modelID string) error

DeleteConfigOverride removes a config override and increments the version counter.

func (*CacheManager) GetAllConfigOverrides added in v0.1.0

func (c *CacheManager) GetAllConfigOverrides(ctx context.Context) ([]ConfigOverride, error)

GetAllConfigOverrides retrieves all config overrides using SCAN.

func (*CacheManager) GetBounds added in v0.0.2

func (c *CacheManager) GetBounds(ctx context.Context, modelID string) (*BoundsCache, error)

GetBounds retrieves cached external model bounds from Redis

func (*CacheManager) GetConfigOverride added in v0.1.0

func (c *CacheManager) GetConfigOverride(ctx context.Context, modelID string) (*ConfigOverride, error)

GetConfigOverride retrieves a config override for a specific model.

func (*CacheManager) GetConfigOverrideVersion added in v0.1.0

func (c *CacheManager) GetConfigOverrideVersion(ctx context.Context) (int64, error)

GetConfigOverrideVersion returns the current version counter for change detection.

func (*CacheManager) SetBounds added in v0.0.2

func (c *CacheManager) SetBounds(ctx context.Context, cache *BoundsCache) error

SetBounds stores external model bounds in Redis cache (no TTL)

func (*CacheManager) SetConfigOverride added in v0.1.0

func (c *CacheManager) SetConfigOverride(ctx context.Context, override *ConfigOverride) error

SetConfigOverride stores a config override and increments the version counter.

type ConfigOverride added in v0.1.0

type ConfigOverride struct {
	ModelID   string          `json:"model_id"`
	ModelType string          `json:"model_type"` // "transformation" or "external"
	Enabled   *bool           `json:"enabled,omitempty"`
	Override  json.RawMessage `json:"override"`
	UpdatedAt time.Time       `json:"updated_at"`
}

ConfigOverride represents a live configuration override stored in Redis.

type GapInfo

type GapInfo struct {
	StartPos uint64
	EndPos   uint64
}

GapInfo represents a gap in the processed data

type ProcessedRange added in v0.0.25

type ProcessedRange struct {
	Position uint64 `ch:"position"`
	Interval uint64 `ch:"interval"`
}

ProcessedRange represents a processed range from the admin table

type Service

type Service interface {
	// Position tracking (for incremental transformations)
	GetNextUnprocessedPosition(ctx context.Context, modelID string) (uint64, error) // Returns next position for forward fill
	GetLastProcessedPosition(ctx context.Context, modelID string) (uint64, error)   // Returns position of last record
	GetFirstPosition(ctx context.Context, modelID string) (uint64, error)
	RecordCompletion(ctx context.Context, modelID string, position, interval uint64) error

	// Scheduled transformation tracking
	RecordScheduledCompletion(ctx context.Context, modelID string, startDateTime time.Time) error
	GetLastScheduledExecution(ctx context.Context, modelID string) (*time.Time, error)
	GetAllLastScheduledExecutions(ctx context.Context, modelIDs []string) (map[string]*time.Time, error)

	// Coverage and gap management
	GetCoverage(ctx context.Context, modelID string, startPos, endPos uint64) (bool, error)
	GetProcessedRanges(ctx context.Context, modelID string) ([]ProcessedRange, error)
	GetAllProcessedRanges(ctx context.Context, modelIDs []string) (map[string][]ProcessedRange, error)
	FindGaps(ctx context.Context, modelID string, minPos, maxPos, interval uint64) ([]GapInfo, error)

	// Consolidation
	ConsolidateHistoricalData(ctx context.Context, modelID string) (uint64, error)

	// Period deletion
	DeletePeriod(ctx context.Context, modelID string, startPos, endPos uint64) (uint64, error)

	// External bounds cache
	GetExternalBounds(ctx context.Context, modelID string) (*BoundsCache, error)
	SetExternalBounds(ctx context.Context, cache *BoundsCache) error
	DeleteExternalBounds(ctx context.Context, modelID string) error

	// Distributed locking for bounds updates
	AcquireBoundsLock(ctx context.Context, modelID string) (BoundsLock, error)

	// Admin table info
	GetIncrementalAdminDatabase() string
	GetIncrementalAdminTable() string
	GetScheduledAdminDatabase() string
	GetScheduledAdminTable() string

	// Config override operations
	GetConfigOverride(ctx context.Context, modelID string) (*ConfigOverride, error)
	GetAllConfigOverrides(ctx context.Context) ([]ConfigOverride, error)
	SetConfigOverride(ctx context.Context, override *ConfigOverride) error
	DeleteConfigOverride(ctx context.Context, modelID string) error
	DeleteAllConfigOverrides(ctx context.Context) error
	GetConfigOverrideVersion(ctx context.Context) (int64, error)

	// GetCacheManager returns the underlying cache manager
	GetCacheManager() *CacheManager
}

Service defines the public interface for the admin service

func NewService

func NewService(log logrus.FieldLogger, client clickhouse.ClientInterface, cluster, localSuffix string, config TableConfig, redisClient *redis.Client) Service

NewService creates a new admin table manager with type-specific admin tables

type TableConfig added in v0.0.25

type TableConfig struct {
	IncrementalDatabase string
	IncrementalTable    string
	ScheduledDatabase   string
	ScheduledTable      string
}

TableConfig represents configuration for admin tables

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL