Documentation
¶
Overview ¶
Package worker implements the worker functionality for CBT
Index ¶
Constants ¶
const ( // ScanTypeIncremental is the incremental scan type ScanTypeIncremental = "incremental" // ScanTypeFull is the full scan type ScanTypeFull = "full" )
Variables ¶
var ( // ErrInvalidConcurrency is returned when concurrency is not positive ErrInvalidConcurrency = errors.New("concurrency must be positive") // ErrWorkerShutdownTimeout is returned when worker shutdown times out ErrWorkerShutdownTimeout = errors.New("worker shutdown timed out") )
var ( ErrInvalidTaskContext = errors.New("invalid task context type") ErrInvalidTransformationType = errors.New("invalid transformation type") ErrTableDoesNotExist = errors.New("table does not exist") )
Define static errors
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Concurrency int `yaml:"concurrency" default:"10"`
Tags []string `yaml:"tags,omitempty"` // Optional tag-based model filtering
ShutdownTimeout int `yaml:"shutdownTimeout" default:"30"`
}
Config contains worker-specific settings
type ModelExecutor ¶
type ModelExecutor struct {
// contains filtered or unexported fields
}
ModelExecutor implements the execution of model transformations
func NewModelExecutor ¶
func NewModelExecutor(log logrus.FieldLogger, chClient clickhouse.ClientInterface, modelsService models.Service, adminManager admin.Service) *ModelExecutor
NewModelExecutor creates a new model executor
func (*ModelExecutor) Execute ¶
func (e *ModelExecutor) Execute(ctx context.Context, taskCtxInterface any) error
Execute runs the model transformation
func (*ModelExecutor) UpdateBounds ¶ added in v0.0.2
func (e *ModelExecutor) UpdateBounds(ctx context.Context, modelID, scanType string) error
UpdateBounds updates the external model bounds cache with distributed locking to prevent race conditions between concurrent full and incremental scans.
The function operates in phases: 1. Get model and initial cache (unlocked - for skip checks) 2. Execute ClickHouse query (unlocked - can take 5+ minutes) 3. Acquire lock, re-read fresh cache, apply bounds, release lock
type Service ¶
type Service interface {
// Start initializes and starts the worker service
Start(ctx context.Context) error
// Stop gracefully shuts down the worker service
Stop() error
}
Service defines the public interface for the worker service
func NewService ¶
func NewService(log logrus.FieldLogger, cfg *Config, chClient clickhouse.ClientInterface, adminService admin.Service, modelsService models.Service, redisOpt *redis.Options, validator validation.Validator) (Service, error)
NewService creates a new worker application