worker

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2026 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Overview

Package worker implements the worker functionality for CBT

Index

Constants

View Source
const (
	// ScanTypeIncremental is the incremental scan type
	ScanTypeIncremental = "incremental"
	// ScanTypeFull is the full scan type
	ScanTypeFull = "full"
)

Variables

View Source
var (
	// ErrInvalidConcurrency is returned when concurrency is not positive
	ErrInvalidConcurrency = errors.New("concurrency must be positive")
	// ErrWorkerShutdownTimeout is returned when worker shutdown times out
	ErrWorkerShutdownTimeout = errors.New("worker shutdown timed out")
)
View Source
var (
	ErrInvalidTaskContext        = errors.New("invalid task context type")
	ErrInvalidTransformationType = errors.New("invalid transformation type")
	ErrTableDoesNotExist         = errors.New("table does not exist")
)

Define static errors

Functions

This section is empty.

Types

type Config

type Config struct {
	Concurrency     int      `yaml:"concurrency" default:"10"`
	Tags            []string `yaml:"tags,omitempty"` // Optional tag-based model filtering
	ShutdownTimeout int      `yaml:"shutdownTimeout" default:"30"`
}

Config contains worker-specific settings

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ModelExecutor

type ModelExecutor struct {
	// contains filtered or unexported fields
}

ModelExecutor implements the execution of model transformations

func NewModelExecutor

func NewModelExecutor(log logrus.FieldLogger, chClient clickhouse.ClientInterface, modelsService models.Service, adminManager admin.Service) *ModelExecutor

NewModelExecutor creates a new model executor

func (*ModelExecutor) Execute

func (e *ModelExecutor) Execute(ctx context.Context, taskCtxInterface interface{}) error

Execute runs the model transformation

func (*ModelExecutor) UpdateBounds added in v0.0.2

func (e *ModelExecutor) UpdateBounds(ctx context.Context, modelID, scanType string) error

UpdateBounds updates the external model bounds cache with distributed locking to prevent race conditions between concurrent full and incremental scans.

The function operates in phases: 1. Get model and initial cache (unlocked - for skip checks) 2. Execute ClickHouse query (unlocked - can take 5+ minutes) 3. Acquire lock, re-read fresh cache, apply bounds, release lock

func (*ModelExecutor) Validate

func (e *ModelExecutor) Validate(ctx context.Context, taskCtxInterface interface{}) error

Validate checks if the model can be executed

type Service

type Service interface {
	// Start initializes and starts the worker service
	Start(ctx context.Context) error

	// Stop gracefully shuts down the worker service
	Stop() error
}

Service defines the public interface for the worker service

func NewService

func NewService(log logrus.FieldLogger, cfg *Config, chClient clickhouse.ClientInterface, adminService admin.Service, modelsService models.Service, redisOpt *redis.Options, validator validation.Validator) (Service, error)

NewService creates a new worker application

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL