base

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseConnector

type BaseConnector struct {
	// contains filtered or unexported fields
}

BaseConnector provides common functionality for all connectors

func NewBaseConnector

func NewBaseConnector(name string, connectorType core.ConnectorType, version string) *BaseConnector

NewBaseConnector creates a new base connector with production features

func (*BaseConnector) Close

func (bc *BaseConnector) Close(ctx context.Context) error

Close shuts down the connector

func (*BaseConnector) ExecuteWithCircuitBreaker

func (bc *BaseConnector) ExecuteWithCircuitBreaker(fn func() error) error

ExecuteWithCircuitBreaker executes a function with circuit breaker protection

func (*BaseConnector) ExecuteWithRetry

func (bc *BaseConnector) ExecuteWithRetry(ctx context.Context, fn func() error) error

ExecuteWithRetry executes a function with retry logic

func (*BaseConnector) GetBatchBuilder

func (bc *BaseConnector) GetBatchBuilder() core.BatchBuilder

GetBatchBuilder returns the batch builder

func (*BaseConnector) GetCircuitBreaker

func (bc *BaseConnector) GetCircuitBreaker() *clients.CircuitBreaker

GetCircuitBreaker returns the circuit breaker

func (*BaseConnector) GetConfig

func (bc *BaseConnector) GetConfig() *config.BaseConfig

GetConfig returns the connector configuration

func (*BaseConnector) GetConnectionPool

func (bc *BaseConnector) GetConnectionPool() core.ConnectionPool

GetConnectionPool returns the connection pool

func (*BaseConnector) GetContext

func (bc *BaseConnector) GetContext() context.Context

GetContext returns the connector context

func (*BaseConnector) GetErrorHandler

func (bc *BaseConnector) GetErrorHandler() *ErrorHandler

GetErrorHandler returns the error handler

func (*BaseConnector) GetLogger

func (bc *BaseConnector) GetLogger() *zap.Logger

GetLogger returns the connector logger

func (*BaseConnector) GetMetricsCollector

func (bc *BaseConnector) GetMetricsCollector() *metrics.Collector

GetMetricsCollector returns the metrics collector

func (*BaseConnector) GetPosition

func (bc *BaseConnector) GetPosition() core.Position

GetPosition returns the current position

func (*BaseConnector) GetQualityChecker

func (bc *BaseConnector) GetQualityChecker() core.DataQualityChecker

GetQualityChecker returns the data quality checker

func (*BaseConnector) GetRateLimiter

func (bc *BaseConnector) GetRateLimiter() clients.RateLimiter

GetRateLimiter returns the rate limiter

func (*BaseConnector) GetState

func (bc *BaseConnector) GetState() core.State

GetState returns the current state

func (*BaseConnector) HandleError

func (bc *BaseConnector) HandleError(ctx context.Context, err error, record *models.Record) error

HandleError handles an error with the configured error handler

func (*BaseConnector) Health

func (bc *BaseConnector) Health(ctx context.Context) error

Health performs a health check

func (*BaseConnector) Initialize

func (bc *BaseConnector) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize sets up the base connector

func (*BaseConnector) IsHealthy

func (bc *BaseConnector) IsHealthy() bool

IsHealthy returns true if the connector is healthy

func (*BaseConnector) Metrics

func (bc *BaseConnector) Metrics() map[string]interface{}

Metrics returns current metrics

func (*BaseConnector) Name

func (bc *BaseConnector) Name() string

Name returns the connector name

func (*BaseConnector) OptimizeBatchSize

func (bc *BaseConnector) OptimizeBatchSize() int

OptimizeBatchSize returns an optimized batch size based on current metrics

func (*BaseConnector) OptimizeConcurrency

func (bc *BaseConnector) OptimizeConcurrency() int

OptimizeConcurrency returns an optimized concurrency level

func (*BaseConnector) RateLimit

func (bc *BaseConnector) RateLimit(ctx context.Context) error

RateLimit checks and enforces rate limiting

func (*BaseConnector) RecordMetric

func (bc *BaseConnector) RecordMetric(name string, value interface{}, metricType core.MetricType)

RecordMetric records a metric

func (*BaseConnector) ReportProgress

func (bc *BaseConnector) ReportProgress(processed, total int64)

ReportProgress reports operation progress

func (*BaseConnector) SetBatchBuilder

func (bc *BaseConnector) SetBatchBuilder(builder core.BatchBuilder)

SetBatchBuilder sets the batch builder

func (*BaseConnector) SetConnectionPool

func (bc *BaseConnector) SetConnectionPool(pool core.ConnectionPool)

SetConnectionPool sets the connection pool

func (*BaseConnector) SetPosition

func (bc *BaseConnector) SetPosition(position core.Position) error

SetPosition updates the current position

func (*BaseConnector) SetQualityChecker

func (bc *BaseConnector) SetQualityChecker(checker core.DataQualityChecker)

SetQualityChecker sets the data quality checker

func (*BaseConnector) SetState

func (bc *BaseConnector) SetState(state core.State) error

SetState updates the connector state

func (*BaseConnector) ShouldRetry

func (bc *BaseConnector) ShouldRetry(err error) bool

ShouldRetry checks if an error should be retried

func (*BaseConnector) Type

func (bc *BaseConnector) Type() core.ConnectorType

Type returns the connector type

func (*BaseConnector) UpdateHealth

func (bc *BaseConnector) UpdateHealth(healthy bool, details map[string]interface{})

UpdateHealth updates the health status

func (*BaseConnector) Validate

func (bc *BaseConnector) Validate() error

Validate validates the connector configuration

func (*BaseConnector) Version

func (bc *BaseConnector) Version() string

Version returns the connector version

type ErrorHandler

type ErrorHandler struct {
	// contains filtered or unexported fields
}

ErrorHandler handles errors with categorization and retry logic

func NewErrorHandler

func NewErrorHandler(logger *zap.Logger, maxRetries int, baseDelay time.Duration) *ErrorHandler

NewErrorHandler creates a new error handler

func (*ErrorHandler) ExecuteWithRetry

func (eh *ErrorHandler) ExecuteWithRetry(ctx context.Context, fn RetryFunc) error

ExecuteWithRetry executes a function with retry logic

func (*ErrorHandler) GetErrorStats

func (eh *ErrorHandler) GetErrorStats() map[string]interface{}

GetErrorStats returns error statistics

func (*ErrorHandler) GetRetryDelay

func (eh *ErrorHandler) GetRetryDelay(attempt int) time.Duration

GetRetryDelay calculates the retry delay for a given attempt

func (*ErrorHandler) HandleError

func (eh *ErrorHandler) HandleError(ctx context.Context, err error, record *models.Record) error

HandleError processes an error with appropriate handling

func (*ErrorHandler) RecordError

func (eh *ErrorHandler) RecordError(err error, details map[string]interface{})

RecordError records error details for analysis

func (*ErrorHandler) ResetStats

func (eh *ErrorHandler) ResetStats()

ResetStats resets error statistics

func (*ErrorHandler) ShouldRetry

func (eh *ErrorHandler) ShouldRetry(err error) bool

ShouldRetry determines if an error should be retried

type HealthChecker

type HealthChecker struct {
	// contains filtered or unexported fields
}

HealthChecker performs periodic health checks

func NewHealthChecker

func NewHealthChecker(name string, interval time.Duration) *HealthChecker

NewHealthChecker creates a new health checker

func (*HealthChecker) CheckCount

func (hc *HealthChecker) CheckCount() int64

CheckCount returns the total number of health checks performed

func (*HealthChecker) FailureCount

func (hc *HealthChecker) FailureCount() int64

FailureCount returns the total number of failed health checks

func (*HealthChecker) GetStatus

func (hc *HealthChecker) GetStatus() *core.HealthStatus

GetStatus returns the current health status

func (*HealthChecker) IsHealthy

func (hc *HealthChecker) IsHealthy() bool

IsHealthy returns true if the service is healthy

func (*HealthChecker) SetCheckFunc

func (hc *HealthChecker) SetCheckFunc(fn func(ctx context.Context) error)

SetCheckFunc sets the health check function

func (*HealthChecker) Start

func (hc *HealthChecker) Start(ctx context.Context)

Start begins periodic health checks

func (*HealthChecker) Stop

func (hc *HealthChecker) Stop()

Stop stops the health checker

func (*HealthChecker) UpdateStatus

func (hc *HealthChecker) UpdateStatus(healthy bool, details map[string]interface{})

UpdateStatus manually updates the health status

type PerformanceOptimizer

type PerformanceOptimizer struct {
	// contains filtered or unexported fields
}

PerformanceOptimizer provides performance optimization recommendations

func NewPerformanceOptimizer

func NewPerformanceOptimizer(collector *metrics.Collector) *PerformanceOptimizer

NewPerformanceOptimizer creates a new performance optimizer

func (*PerformanceOptimizer) GenerateReport

func (po *PerformanceOptimizer) GenerateReport(metrics map[string]interface{}) *PerformanceReport

GenerateReport creates a performance report

func (*PerformanceOptimizer) GetPerformanceTrend

func (po *PerformanceOptimizer) GetPerformanceTrend() string

GetPerformanceTrend returns the performance trend

func (*PerformanceOptimizer) OptimizeBatchSize

func (po *PerformanceOptimizer) OptimizeBatchSize(current int, metrics map[string]interface{}) int

OptimizeBatchSize recommends an optimal batch size

func (*PerformanceOptimizer) OptimizeBufferSize

func (po *PerformanceOptimizer) OptimizeBufferSize(current int, metrics map[string]interface{}) int

OptimizeBufferSize recommends an optimal buffer size

func (*PerformanceOptimizer) OptimizeConcurrency

func (po *PerformanceOptimizer) OptimizeConcurrency(current int, metrics map[string]interface{}) int

OptimizeConcurrency recommends an optimal concurrency level

func (*PerformanceOptimizer) SuggestOptimizations

func (po *PerformanceOptimizer) SuggestOptimizations(metrics map[string]interface{}) []string

SuggestOptimizations provides optimization suggestions

type PerformanceReport

type PerformanceReport struct {
	Timestamp         time.Time
	AverageThroughput float64
	AverageLatency    time.Duration
	P99Latency        time.Duration
	ErrorRate         float64
	TotalProcessed    int64
	TotalErrors       int64
	Trend             string
	Recommendations   []string
}

PerformanceReport generates a performance report

type ProgressReporter

type ProgressReporter struct {
	// contains filtered or unexported fields
}

ProgressReporter tracks and reports progress of operations

func NewProgressReporter

func NewProgressReporter(logger *zap.Logger, collector *metrics.Collector) *ProgressReporter

NewProgressReporter creates a new progress reporter

func (*ProgressReporter) GetAverageLatency

func (pr *ProgressReporter) GetAverageLatency() time.Duration

GetAverageLatency returns average latency

func (*ProgressReporter) GetAverageThroughput

func (pr *ProgressReporter) GetAverageThroughput() float64

GetAverageThroughput returns average throughput

func (*ProgressReporter) GetETA

func (pr *ProgressReporter) GetETA() time.Duration

GetETA estimates time remaining

func (*ProgressReporter) GetElapsedTime

func (pr *ProgressReporter) GetElapsedTime() time.Duration

GetElapsedTime returns time since start

func (*ProgressReporter) GetProgress

func (pr *ProgressReporter) GetProgress() (processed, total int64)

GetProgress returns current progress

func (*ProgressReporter) GetSnapshot

func (pr *ProgressReporter) GetSnapshot() *ProgressSnapshot

GetSnapshot returns a progress snapshot

func (*ProgressReporter) IncrementProcessed

func (pr *ProgressReporter) IncrementProcessed(count int64)

IncrementProcessed increments the processed count

func (*ProgressReporter) ReportLatency

func (pr *ProgressReporter) ReportLatency(latency time.Duration)

ReportLatency records processing latency

func (*ProgressReporter) ReportProgress

func (pr *ProgressReporter) ReportProgress(processed, total int64)

ReportProgress updates the progress

func (*ProgressReporter) ReportThroughput

func (pr *ProgressReporter) ReportThroughput(recordsPerSecond float64)

ReportThroughput records throughput

func (*ProgressReporter) SetReportInterval

func (pr *ProgressReporter) SetReportInterval(interval time.Duration)

SetReportInterval sets the progress reporting interval

func (*ProgressReporter) SetTotal

func (pr *ProgressReporter) SetTotal(total int64)

SetTotal sets the total number of records to process

func (*ProgressReporter) Start

func (pr *ProgressReporter) Start()

Start begins periodic progress reporting

func (*ProgressReporter) Stop

func (pr *ProgressReporter) Stop()

Stop stops progress reporting

type ProgressSnapshot

type ProgressSnapshot struct {
	Timestamp        time.Time
	ProcessedRecords int64
	TotalRecords     int64
	Percentage       float64
	Throughput       float64
	AverageLatency   time.Duration
	ElapsedTime      time.Duration
	ETA              time.Duration
}

ProgressSnapshot represents a point-in-time progress snapshot

type RetryFunc

type RetryFunc func() error

RetryFunc is a function that can be retried

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts     int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	Multiplier      float64
	RandomizeFactor float64
}

RetryPolicy defines retry behavior

func AggressiveRetryPolicy

func AggressiveRetryPolicy() *RetryPolicy

AggressiveRetryPolicy returns a policy for critical operations

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy

func NewRetryPolicy

func NewRetryPolicy(maxAttempts int, initialDelay time.Duration) *RetryPolicy

NewRetryPolicy creates a new retry policy with exponential backoff

func NoRetryPolicy

func NoRetryPolicy() *RetryPolicy

NoRetryPolicy returns a policy that doesn't retry

func (*RetryPolicy) Clone

func (rp *RetryPolicy) Clone() *RetryPolicy

Clone creates a copy of the retry policy

func (*RetryPolicy) Execute

func (rp *RetryPolicy) Execute(ctx context.Context, fn func() error) error

Execute runs a function with the retry policy

func (*RetryPolicy) ExecuteWithCondition

func (rp *RetryPolicy) ExecuteWithCondition(ctx context.Context, fn func() error, shouldRetry func(error) bool) error

ExecuteWithCondition runs a function with retry only if condition is met

func (*RetryPolicy) GetDelay

func (rp *RetryPolicy) GetDelay(attempt int) time.Duration

GetDelay returns the delay for a specific attempt (for testing/preview)

func (*RetryPolicy) WithDelay

func (rp *RetryPolicy) WithDelay(initial, max time.Duration) *RetryPolicy

WithDelay returns a new policy with updated delays

func (*RetryPolicy) WithMaxAttempts

func (rp *RetryPolicy) WithMaxAttempts(attempts int) *RetryPolicy

WithMaxAttempts returns a new policy with updated max attempts

func (*RetryPolicy) WithMultiplier

func (rp *RetryPolicy) WithMultiplier(multiplier float64) *RetryPolicy

WithMultiplier returns a new policy with updated multiplier

func (*RetryPolicy) WithRandomization

func (rp *RetryPolicy) WithRandomization(factor float64) *RetryPolicy

WithRandomization returns a new policy with updated randomization

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL