Documentation
      ¶
    
    
  
    
  
    Index ¶
- type BaseConnector
 - func (bc *BaseConnector) Close(ctx context.Context) error
 - func (bc *BaseConnector) ExecuteWithCircuitBreaker(fn func() error) error
 - func (bc *BaseConnector) ExecuteWithRetry(ctx context.Context, fn func() error) error
 - func (bc *BaseConnector) GetBatchBuilder() core.BatchBuilder
 - func (bc *BaseConnector) GetCircuitBreaker() *clients.CircuitBreaker
 - func (bc *BaseConnector) GetConfig() *config.BaseConfig
 - func (bc *BaseConnector) GetConnectionPool() core.ConnectionPool
 - func (bc *BaseConnector) GetContext() context.Context
 - func (bc *BaseConnector) GetErrorHandler() *ErrorHandler
 - func (bc *BaseConnector) GetLogger() *zap.Logger
 - func (bc *BaseConnector) GetMetricsCollector() *metrics.Collector
 - func (bc *BaseConnector) GetPosition() core.Position
 - func (bc *BaseConnector) GetQualityChecker() core.DataQualityChecker
 - func (bc *BaseConnector) GetRateLimiter() clients.RateLimiter
 - func (bc *BaseConnector) GetState() core.State
 - func (bc *BaseConnector) HandleError(ctx context.Context, err error, record *models.Record) error
 - func (bc *BaseConnector) Health(ctx context.Context) error
 - func (bc *BaseConnector) Initialize(ctx context.Context, config *config.BaseConfig) error
 - func (bc *BaseConnector) IsHealthy() bool
 - func (bc *BaseConnector) Metrics() map[string]interface{}
 - func (bc *BaseConnector) Name() string
 - func (bc *BaseConnector) OptimizeBatchSize() int
 - func (bc *BaseConnector) OptimizeConcurrency() int
 - func (bc *BaseConnector) RateLimit(ctx context.Context) error
 - func (bc *BaseConnector) RecordMetric(name string, value interface{}, metricType core.MetricType)
 - func (bc *BaseConnector) ReportProgress(processed, total int64)
 - func (bc *BaseConnector) SetBatchBuilder(builder core.BatchBuilder)
 - func (bc *BaseConnector) SetConnectionPool(pool core.ConnectionPool)
 - func (bc *BaseConnector) SetPosition(position core.Position) error
 - func (bc *BaseConnector) SetQualityChecker(checker core.DataQualityChecker)
 - func (bc *BaseConnector) SetState(state core.State) error
 - func (bc *BaseConnector) ShouldRetry(err error) bool
 - func (bc *BaseConnector) Type() core.ConnectorType
 - func (bc *BaseConnector) UpdateHealth(healthy bool, details map[string]interface{})
 - func (bc *BaseConnector) Validate() error
 - func (bc *BaseConnector) Version() string
 
- type ErrorHandler
 - func (eh *ErrorHandler) ExecuteWithRetry(ctx context.Context, fn RetryFunc) error
 - func (eh *ErrorHandler) GetErrorStats() map[string]interface{}
 - func (eh *ErrorHandler) GetRetryDelay(attempt int) time.Duration
 - func (eh *ErrorHandler) HandleError(ctx context.Context, err error, record *models.Record) error
 - func (eh *ErrorHandler) RecordError(err error, details map[string]interface{})
 - func (eh *ErrorHandler) ResetStats()
 - func (eh *ErrorHandler) ShouldRetry(err error) bool
 
- type HealthChecker
 - func (hc *HealthChecker) CheckCount() int64
 - func (hc *HealthChecker) FailureCount() int64
 - func (hc *HealthChecker) GetStatus() *core.HealthStatus
 - func (hc *HealthChecker) IsHealthy() bool
 - func (hc *HealthChecker) SetCheckFunc(fn func(ctx context.Context) error)
 - func (hc *HealthChecker) Start(ctx context.Context)
 - func (hc *HealthChecker) Stop()
 - func (hc *HealthChecker) UpdateStatus(healthy bool, details map[string]interface{})
 
- type PerformanceOptimizer
 - func (po *PerformanceOptimizer) GenerateReport(metrics map[string]interface{}) *PerformanceReport
 - func (po *PerformanceOptimizer) GetPerformanceTrend() string
 - func (po *PerformanceOptimizer) OptimizeBatchSize(current int, metrics map[string]interface{}) int
 - func (po *PerformanceOptimizer) OptimizeBufferSize(current int, metrics map[string]interface{}) int
 - func (po *PerformanceOptimizer) OptimizeConcurrency(current int, metrics map[string]interface{}) int
 - func (po *PerformanceOptimizer) SuggestOptimizations(metrics map[string]interface{}) []string
 
- type PerformanceReport
 - type ProgressReporter
 - func (pr *ProgressReporter) GetAverageLatency() time.Duration
 - func (pr *ProgressReporter) GetAverageThroughput() float64
 - func (pr *ProgressReporter) GetETA() time.Duration
 - func (pr *ProgressReporter) GetElapsedTime() time.Duration
 - func (pr *ProgressReporter) GetProgress() (processed, total int64)
 - func (pr *ProgressReporter) GetSnapshot() *ProgressSnapshot
 - func (pr *ProgressReporter) IncrementProcessed(count int64)
 - func (pr *ProgressReporter) ReportLatency(latency time.Duration)
 - func (pr *ProgressReporter) ReportProgress(processed, total int64)
 - func (pr *ProgressReporter) ReportThroughput(recordsPerSecond float64)
 - func (pr *ProgressReporter) SetReportInterval(interval time.Duration)
 - func (pr *ProgressReporter) SetTotal(total int64)
 - func (pr *ProgressReporter) Start()
 - func (pr *ProgressReporter) Stop()
 
- type ProgressSnapshot
 - type RetryFunc
 - type RetryPolicy
 - func (rp *RetryPolicy) Clone() *RetryPolicy
 - func (rp *RetryPolicy) Execute(ctx context.Context, fn func() error) error
 - func (rp *RetryPolicy) ExecuteWithCondition(ctx context.Context, fn func() error, shouldRetry func(error) bool) error
 - func (rp *RetryPolicy) GetDelay(attempt int) time.Duration
 - func (rp *RetryPolicy) WithDelay(initial, max time.Duration) *RetryPolicy
 - func (rp *RetryPolicy) WithMaxAttempts(attempts int) *RetryPolicy
 - func (rp *RetryPolicy) WithMultiplier(multiplier float64) *RetryPolicy
 - func (rp *RetryPolicy) WithRandomization(factor float64) *RetryPolicy
 
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseConnector ¶
type BaseConnector struct {
	// contains filtered or unexported fields
}
    BaseConnector provides common functionality for all connectors
func NewBaseConnector ¶
func NewBaseConnector(name string, connectorType core.ConnectorType, version string) *BaseConnector
NewBaseConnector creates a new base connector with production features
func (*BaseConnector) Close ¶
func (bc *BaseConnector) Close(ctx context.Context) error
Close shuts down the connector
func (*BaseConnector) ExecuteWithCircuitBreaker ¶
func (bc *BaseConnector) ExecuteWithCircuitBreaker(fn func() error) error
ExecuteWithCircuitBreaker executes a function with circuit breaker protection
func (*BaseConnector) ExecuteWithRetry ¶
func (bc *BaseConnector) ExecuteWithRetry(ctx context.Context, fn func() error) error
ExecuteWithRetry executes a function with retry logic
func (*BaseConnector) GetBatchBuilder ¶
func (bc *BaseConnector) GetBatchBuilder() core.BatchBuilder
GetBatchBuilder returns the batch builder
func (*BaseConnector) GetCircuitBreaker ¶
func (bc *BaseConnector) GetCircuitBreaker() *clients.CircuitBreaker
GetCircuitBreaker returns the circuit breaker
func (*BaseConnector) GetConfig ¶
func (bc *BaseConnector) GetConfig() *config.BaseConfig
GetConfig returns the connector configuration
func (*BaseConnector) GetConnectionPool ¶
func (bc *BaseConnector) GetConnectionPool() core.ConnectionPool
GetConnectionPool returns the connection pool
func (*BaseConnector) GetContext ¶
func (bc *BaseConnector) GetContext() context.Context
GetContext returns the connector context
func (*BaseConnector) GetErrorHandler ¶
func (bc *BaseConnector) GetErrorHandler() *ErrorHandler
GetErrorHandler returns the error handler
func (*BaseConnector) GetLogger ¶
func (bc *BaseConnector) GetLogger() *zap.Logger
GetLogger returns the connector logger
func (*BaseConnector) GetMetricsCollector ¶
func (bc *BaseConnector) GetMetricsCollector() *metrics.Collector
GetMetricsCollector returns the metrics collector
func (*BaseConnector) GetPosition ¶
func (bc *BaseConnector) GetPosition() core.Position
GetPosition returns the current position
func (*BaseConnector) GetQualityChecker ¶
func (bc *BaseConnector) GetQualityChecker() core.DataQualityChecker
GetQualityChecker returns the data quality checker
func (*BaseConnector) GetRateLimiter ¶
func (bc *BaseConnector) GetRateLimiter() clients.RateLimiter
GetRateLimiter returns the rate limiter
func (*BaseConnector) GetState ¶
func (bc *BaseConnector) GetState() core.State
GetState returns the current state
func (*BaseConnector) HandleError ¶
HandleError handles an error with the configured error handler
func (*BaseConnector) Health ¶
func (bc *BaseConnector) Health(ctx context.Context) error
Health performs a health check
func (*BaseConnector) Initialize ¶
func (bc *BaseConnector) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize sets up the base connector
func (*BaseConnector) IsHealthy ¶
func (bc *BaseConnector) IsHealthy() bool
IsHealthy returns true if the connector is healthy
func (*BaseConnector) Metrics ¶
func (bc *BaseConnector) Metrics() map[string]interface{}
Metrics returns current metrics
func (*BaseConnector) OptimizeBatchSize ¶
func (bc *BaseConnector) OptimizeBatchSize() int
OptimizeBatchSize returns an optimized batch size based on current metrics
func (*BaseConnector) OptimizeConcurrency ¶
func (bc *BaseConnector) OptimizeConcurrency() int
OptimizeConcurrency returns an optimized concurrency level
func (*BaseConnector) RateLimit ¶
func (bc *BaseConnector) RateLimit(ctx context.Context) error
RateLimit checks and enforces rate limiting
func (*BaseConnector) RecordMetric ¶
func (bc *BaseConnector) RecordMetric(name string, value interface{}, metricType core.MetricType)
RecordMetric records a metric
func (*BaseConnector) ReportProgress ¶
func (bc *BaseConnector) ReportProgress(processed, total int64)
ReportProgress reports operation progress
func (*BaseConnector) SetBatchBuilder ¶
func (bc *BaseConnector) SetBatchBuilder(builder core.BatchBuilder)
SetBatchBuilder sets the batch builder
func (*BaseConnector) SetConnectionPool ¶
func (bc *BaseConnector) SetConnectionPool(pool core.ConnectionPool)
SetConnectionPool sets the connection pool
func (*BaseConnector) SetPosition ¶
func (bc *BaseConnector) SetPosition(position core.Position) error
SetPosition updates the current position
func (*BaseConnector) SetQualityChecker ¶
func (bc *BaseConnector) SetQualityChecker(checker core.DataQualityChecker)
SetQualityChecker sets the data quality checker
func (*BaseConnector) SetState ¶
func (bc *BaseConnector) SetState(state core.State) error
SetState updates the connector state
func (*BaseConnector) ShouldRetry ¶
func (bc *BaseConnector) ShouldRetry(err error) bool
ShouldRetry checks if an error should be retried
func (*BaseConnector) Type ¶
func (bc *BaseConnector) Type() core.ConnectorType
Type returns the connector type
func (*BaseConnector) UpdateHealth ¶
func (bc *BaseConnector) UpdateHealth(healthy bool, details map[string]interface{})
UpdateHealth updates the health status
func (*BaseConnector) Validate ¶
func (bc *BaseConnector) Validate() error
Validate validates the connector configuration
func (*BaseConnector) Version ¶
func (bc *BaseConnector) Version() string
Version returns the connector version
type ErrorHandler ¶
type ErrorHandler struct {
	// contains filtered or unexported fields
}
    ErrorHandler handles errors with categorization and retry logic
func NewErrorHandler ¶
NewErrorHandler creates a new error handler
func (*ErrorHandler) ExecuteWithRetry ¶
func (eh *ErrorHandler) ExecuteWithRetry(ctx context.Context, fn RetryFunc) error
ExecuteWithRetry executes a function with retry logic
func (*ErrorHandler) GetErrorStats ¶
func (eh *ErrorHandler) GetErrorStats() map[string]interface{}
GetErrorStats returns error statistics
func (*ErrorHandler) GetRetryDelay ¶
func (eh *ErrorHandler) GetRetryDelay(attempt int) time.Duration
GetRetryDelay calculates the retry delay for a given attempt
func (*ErrorHandler) HandleError ¶
HandleError processes an error with appropriate handling
func (*ErrorHandler) RecordError ¶
func (eh *ErrorHandler) RecordError(err error, details map[string]interface{})
RecordError records error details for analysis
func (*ErrorHandler) ResetStats ¶
func (eh *ErrorHandler) ResetStats()
ResetStats resets error statistics
func (*ErrorHandler) ShouldRetry ¶
func (eh *ErrorHandler) ShouldRetry(err error) bool
ShouldRetry determines if an error should be retried
type HealthChecker ¶
type HealthChecker struct {
	// contains filtered or unexported fields
}
    HealthChecker performs periodic health checks
func NewHealthChecker ¶
func NewHealthChecker(name string, interval time.Duration) *HealthChecker
NewHealthChecker creates a new health checker
func (*HealthChecker) CheckCount ¶
func (hc *HealthChecker) CheckCount() int64
CheckCount returns the total number of health checks performed
func (*HealthChecker) FailureCount ¶
func (hc *HealthChecker) FailureCount() int64
FailureCount returns the total number of failed health checks
func (*HealthChecker) GetStatus ¶
func (hc *HealthChecker) GetStatus() *core.HealthStatus
GetStatus returns the current health status
func (*HealthChecker) IsHealthy ¶
func (hc *HealthChecker) IsHealthy() bool
IsHealthy returns true if the service is healthy
func (*HealthChecker) SetCheckFunc ¶
func (hc *HealthChecker) SetCheckFunc(fn func(ctx context.Context) error)
SetCheckFunc sets the health check function
func (*HealthChecker) Start ¶
func (hc *HealthChecker) Start(ctx context.Context)
Start begins periodic health checks
func (*HealthChecker) UpdateStatus ¶
func (hc *HealthChecker) UpdateStatus(healthy bool, details map[string]interface{})
UpdateStatus manually updates the health status
type PerformanceOptimizer ¶
type PerformanceOptimizer struct {
	// contains filtered or unexported fields
}
    PerformanceOptimizer provides performance optimization recommendations
func NewPerformanceOptimizer ¶
func NewPerformanceOptimizer(collector *metrics.Collector) *PerformanceOptimizer
NewPerformanceOptimizer creates a new performance optimizer
func (*PerformanceOptimizer) GenerateReport ¶
func (po *PerformanceOptimizer) GenerateReport(metrics map[string]interface{}) *PerformanceReport
GenerateReport creates a performance report
func (*PerformanceOptimizer) GetPerformanceTrend ¶
func (po *PerformanceOptimizer) GetPerformanceTrend() string
GetPerformanceTrend returns the performance trend
func (*PerformanceOptimizer) OptimizeBatchSize ¶
func (po *PerformanceOptimizer) OptimizeBatchSize(current int, metrics map[string]interface{}) int
OptimizeBatchSize recommends an optimal batch size
func (*PerformanceOptimizer) OptimizeBufferSize ¶
func (po *PerformanceOptimizer) OptimizeBufferSize(current int, metrics map[string]interface{}) int
OptimizeBufferSize recommends an optimal buffer size
func (*PerformanceOptimizer) OptimizeConcurrency ¶
func (po *PerformanceOptimizer) OptimizeConcurrency(current int, metrics map[string]interface{}) int
OptimizeConcurrency recommends an optimal concurrency level
func (*PerformanceOptimizer) SuggestOptimizations ¶
func (po *PerformanceOptimizer) SuggestOptimizations(metrics map[string]interface{}) []string
SuggestOptimizations provides optimization suggestions
type PerformanceReport ¶
type PerformanceReport struct {
	Timestamp         time.Time
	AverageThroughput float64
	AverageLatency    time.Duration
	P99Latency        time.Duration
	ErrorRate         float64
	TotalProcessed    int64
	TotalErrors       int64
	Trend             string
	Recommendations   []string
}
    PerformanceReport generates a performance report
type ProgressReporter ¶
type ProgressReporter struct {
	// contains filtered or unexported fields
}
    ProgressReporter tracks and reports progress of operations
func NewProgressReporter ¶
func NewProgressReporter(logger *zap.Logger, collector *metrics.Collector) *ProgressReporter
NewProgressReporter creates a new progress reporter
func (*ProgressReporter) GetAverageLatency ¶
func (pr *ProgressReporter) GetAverageLatency() time.Duration
GetAverageLatency returns average latency
func (*ProgressReporter) GetAverageThroughput ¶
func (pr *ProgressReporter) GetAverageThroughput() float64
GetAverageThroughput returns average throughput
func (*ProgressReporter) GetETA ¶
func (pr *ProgressReporter) GetETA() time.Duration
GetETA estimates time remaining
func (*ProgressReporter) GetElapsedTime ¶
func (pr *ProgressReporter) GetElapsedTime() time.Duration
GetElapsedTime returns time since start
func (*ProgressReporter) GetProgress ¶
func (pr *ProgressReporter) GetProgress() (processed, total int64)
GetProgress returns current progress
func (*ProgressReporter) GetSnapshot ¶
func (pr *ProgressReporter) GetSnapshot() *ProgressSnapshot
GetSnapshot returns a progress snapshot
func (*ProgressReporter) IncrementProcessed ¶
func (pr *ProgressReporter) IncrementProcessed(count int64)
IncrementProcessed increments the processed count
func (*ProgressReporter) ReportLatency ¶
func (pr *ProgressReporter) ReportLatency(latency time.Duration)
ReportLatency records processing latency
func (*ProgressReporter) ReportProgress ¶
func (pr *ProgressReporter) ReportProgress(processed, total int64)
ReportProgress updates the progress
func (*ProgressReporter) ReportThroughput ¶
func (pr *ProgressReporter) ReportThroughput(recordsPerSecond float64)
ReportThroughput records throughput
func (*ProgressReporter) SetReportInterval ¶
func (pr *ProgressReporter) SetReportInterval(interval time.Duration)
SetReportInterval sets the progress reporting interval
func (*ProgressReporter) SetTotal ¶
func (pr *ProgressReporter) SetTotal(total int64)
SetTotal sets the total number of records to process
func (*ProgressReporter) Start ¶
func (pr *ProgressReporter) Start()
Start begins periodic progress reporting
type ProgressSnapshot ¶
type ProgressSnapshot struct {
	Timestamp        time.Time
	ProcessedRecords int64
	TotalRecords     int64
	Percentage       float64
	Throughput       float64
	AverageLatency   time.Duration
	ElapsedTime      time.Duration
	ETA              time.Duration
}
    ProgressSnapshot represents a point-in-time progress snapshot
type RetryPolicy ¶
type RetryPolicy struct {
	MaxAttempts     int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	Multiplier      float64
	RandomizeFactor float64
}
    RetryPolicy defines retry behavior
func AggressiveRetryPolicy ¶
func AggressiveRetryPolicy() *RetryPolicy
AggressiveRetryPolicy returns a policy for critical operations
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy
func NewRetryPolicy ¶
func NewRetryPolicy(maxAttempts int, initialDelay time.Duration) *RetryPolicy
NewRetryPolicy creates a new retry policy with exponential backoff
func NoRetryPolicy ¶
func NoRetryPolicy() *RetryPolicy
NoRetryPolicy returns a policy that doesn't retry
func (*RetryPolicy) Clone ¶
func (rp *RetryPolicy) Clone() *RetryPolicy
Clone creates a copy of the retry policy
func (*RetryPolicy) Execute ¶
func (rp *RetryPolicy) Execute(ctx context.Context, fn func() error) error
Execute runs a function with the retry policy
func (*RetryPolicy) ExecuteWithCondition ¶
func (rp *RetryPolicy) ExecuteWithCondition(ctx context.Context, fn func() error, shouldRetry func(error) bool) error
ExecuteWithCondition runs a function with retry only if condition is met
func (*RetryPolicy) GetDelay ¶
func (rp *RetryPolicy) GetDelay(attempt int) time.Duration
GetDelay returns the delay for a specific attempt (for testing/preview)
func (*RetryPolicy) WithDelay ¶
func (rp *RetryPolicy) WithDelay(initial, max time.Duration) *RetryPolicy
WithDelay returns a new policy with updated delays
func (*RetryPolicy) WithMaxAttempts ¶
func (rp *RetryPolicy) WithMaxAttempts(attempts int) *RetryPolicy
WithMaxAttempts returns a new policy with updated max attempts
func (*RetryPolicy) WithMultiplier ¶
func (rp *RetryPolicy) WithMultiplier(multiplier float64) *RetryPolicy
WithMultiplier returns a new policy with updated multiplier
func (*RetryPolicy) WithRandomization ¶
func (rp *RetryPolicy) WithRandomization(factor float64) *RetryPolicy
WithRandomization returns a new policy with updated randomization