Documentation
¶
Overview ¶
Package performance provides bulk loading optimizations
Package performance provides connector-specific optimizations ¶
Package performance provides memory optimization utilities ¶
Package performance provides optimization strategies ¶
Package performance provides profiling and optimization tools for Nebula ¶
Package performance provides optimization strategies
Index ¶
- func CompareMemoryProfiles(before, after *MemoryProfile) string
- type APIOptConfig
- type AdaptiveStrategy
- type BatchManager
- type BatchingStrategy
- type Benchmark
- type BufferPool
- type BulkLoadConfig
- type BulkLoadMetrics
- type BulkLoadOptimizer
- type BulkLoader
- type CDCOptConfig
- type CacheOptimizer
- type CachingStrategy
- type CloudWarehouseOptConfig
- type ClusteringStage
- type Column
- type ColumnStore
- type ColumnarStrategy
- type CompressionStrategy
- type ConnectorOptConfig
- type ConnectorOptimizer
- func (co *ConnectorOptimizer) ApplyOptimizations(ctx context.Context, records []*models.Record) ([]*models.Record, error)
- func (co *ConnectorOptimizer) GetOptimizationReport() string
- func (co *ConnectorOptimizer) OptimizeDestination(dest core.Destination) core.Destination
- func (co *ConnectorOptimizer) OptimizeSource(source core.Source) core.Source
- type FormatConversionStage
- type LatencyTracker
- type LoadFile
- type LoadStage
- type MemoryConfig
- type MemoryMetrics
- type MemoryOptimizationStage
- type MemoryOptimizer
- func (mo *MemoryOptimizer) Compact()
- func (mo *MemoryOptimizer) GetMetrics() *MemoryMetrics
- func (mo *MemoryOptimizer) OptimizeRecord(record *models.Record) *models.Record
- func (mo *MemoryOptimizer) OptimizeRecords(records []*models.Record) []*models.Record
- func (mo *MemoryOptimizer) ReleaseRecord(record *models.Record)
- type MemoryProfile
- type Metrics
- type ObjectPool
- type OptimizationMetrics
- type OptimizationStrategy
- type OptimizedAPISource
- func (oas *OptimizedAPISource) Close(ctx context.Context) error
- func (oas *OptimizedAPISource) Discover(ctx context.Context) (*core.Schema, error)
- func (oas *OptimizedAPISource) GetPosition() core.Position
- func (oas *OptimizedAPISource) GetState() core.State
- func (oas *OptimizedAPISource) Health(ctx context.Context) error
- func (oas *OptimizedAPISource) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (oas *OptimizedAPISource) Metrics() map[string]interface{}
- func (oas *OptimizedAPISource) Read(ctx context.Context) (*core.RecordStream, error)
- func (oas *OptimizedAPISource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
- func (oas *OptimizedAPISource) SetPosition(position core.Position) error
- func (oas *OptimizedAPISource) SetState(state core.State) error
- func (oas *OptimizedAPISource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
- func (oas *OptimizedAPISource) SupportsBatch() bool
- func (oas *OptimizedAPISource) SupportsIncremental() bool
- func (oas *OptimizedAPISource) SupportsRealtime() bool
- type OptimizedCDCSource
- func (ocs *OptimizedCDCSource) Close(ctx context.Context) error
- func (ocs *OptimizedCDCSource) Discover(ctx context.Context) (*core.Schema, error)
- func (ocs *OptimizedCDCSource) GetPosition() core.Position
- func (ocs *OptimizedCDCSource) GetState() core.State
- func (ocs *OptimizedCDCSource) Health(ctx context.Context) error
- func (ocs *OptimizedCDCSource) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (ocs *OptimizedCDCSource) Metrics() map[string]interface{}
- func (ocs *OptimizedCDCSource) Read(ctx context.Context) (*core.RecordStream, error)
- func (ocs *OptimizedCDCSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
- func (ocs *OptimizedCDCSource) SetPosition(position core.Position) error
- func (ocs *OptimizedCDCSource) SetState(state core.State) error
- func (ocs *OptimizedCDCSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
- func (ocs *OptimizedCDCSource) SupportsBatch() bool
- func (ocs *OptimizedCDCSource) SupportsIncremental() bool
- func (ocs *OptimizedCDCSource) SupportsRealtime() bool
- type OptimizedCloudWarehouse
- func (ocw *OptimizedCloudWarehouse) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
- func (ocw *OptimizedCloudWarehouse) BeginTransaction(ctx context.Context) (core.Transaction, error)
- func (ocw *OptimizedCloudWarehouse) BulkLoad(ctx context.Context, reader interface{}, format string) error
- func (ocw *OptimizedCloudWarehouse) Close(ctx context.Context) error
- func (ocw *OptimizedCloudWarehouse) CreateSchema(ctx context.Context, schema *core.Schema) error
- func (ocw *OptimizedCloudWarehouse) DropSchema(ctx context.Context, schema *core.Schema) error
- func (ocw *OptimizedCloudWarehouse) Health(ctx context.Context) error
- func (ocw *OptimizedCloudWarehouse) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (ocw *OptimizedCloudWarehouse) Metrics() map[string]interface{}
- func (ocw *OptimizedCloudWarehouse) SupportsBatch() bool
- func (ocw *OptimizedCloudWarehouse) SupportsBulkLoad() bool
- func (ocw *OptimizedCloudWarehouse) SupportsStreaming() bool
- func (ocw *OptimizedCloudWarehouse) SupportsTransactions() bool
- func (ocw *OptimizedCloudWarehouse) SupportsUpsert() bool
- func (ocw *OptimizedCloudWarehouse) Upsert(ctx context.Context, records []*models.Record, keys []string) error
- func (ocw *OptimizedCloudWarehouse) Write(ctx context.Context, stream *core.RecordStream) error
- func (ocw *OptimizedCloudWarehouse) WriteBatch(ctx context.Context, stream *core.BatchStream) error
- type OptimizedGenericDestination
- func (ogd *OptimizedGenericDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
- func (ogd *OptimizedGenericDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
- func (ogd *OptimizedGenericDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
- func (ogd *OptimizedGenericDestination) Close(ctx context.Context) error
- func (ogd *OptimizedGenericDestination) CreateSchema(ctx context.Context, schema *core.Schema) error
- func (ogd *OptimizedGenericDestination) DropSchema(ctx context.Context, schema *core.Schema) error
- func (ogd *OptimizedGenericDestination) Health(ctx context.Context) error
- func (ogd *OptimizedGenericDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (ogd *OptimizedGenericDestination) Metrics() map[string]interface{}
- func (ogd *OptimizedGenericDestination) SupportsBatch() bool
- func (ogd *OptimizedGenericDestination) SupportsBulkLoad() bool
- func (ogd *OptimizedGenericDestination) SupportsStreaming() bool
- func (ogd *OptimizedGenericDestination) SupportsTransactions() bool
- func (ogd *OptimizedGenericDestination) SupportsUpsert() bool
- func (ogd *OptimizedGenericDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error
- func (ogd *OptimizedGenericDestination) Write(ctx context.Context, stream *core.RecordStream) error
- func (ogd *OptimizedGenericDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
- type OptimizedGenericSource
- func (ogs *OptimizedGenericSource) Close(ctx context.Context) error
- func (ogs *OptimizedGenericSource) Discover(ctx context.Context) (*core.Schema, error)
- func (ogs *OptimizedGenericSource) GetPosition() core.Position
- func (ogs *OptimizedGenericSource) GetState() core.State
- func (ogs *OptimizedGenericSource) Health(ctx context.Context) error
- func (ogs *OptimizedGenericSource) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (ogs *OptimizedGenericSource) Metrics() map[string]interface{}
- func (ogs *OptimizedGenericSource) Read(ctx context.Context) (*core.RecordStream, error)
- func (ogs *OptimizedGenericSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
- func (ogs *OptimizedGenericSource) SetPosition(position core.Position) error
- func (ogs *OptimizedGenericSource) SetState(state core.State) error
- func (ogs *OptimizedGenericSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
- func (ogs *OptimizedGenericSource) SupportsBatch() bool
- func (ogs *OptimizedGenericSource) SupportsIncremental() bool
- func (ogs *OptimizedGenericSource) SupportsRealtime() bool
- type OptimizedSnowflakeDestination
- func (osd *OptimizedSnowflakeDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
- func (osd *OptimizedSnowflakeDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
- func (osd *OptimizedSnowflakeDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
- func (osd *OptimizedSnowflakeDestination) Close(ctx context.Context) error
- func (osd *OptimizedSnowflakeDestination) CreateSchema(ctx context.Context, schema *core.Schema) error
- func (osd *OptimizedSnowflakeDestination) DropSchema(ctx context.Context, schema *core.Schema) error
- func (osd *OptimizedSnowflakeDestination) Health(ctx context.Context) error
- func (osd *OptimizedSnowflakeDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (osd *OptimizedSnowflakeDestination) Metrics() map[string]interface{}
- func (osd *OptimizedSnowflakeDestination) SupportsBatch() bool
- func (osd *OptimizedSnowflakeDestination) SupportsBulkLoad() bool
- func (osd *OptimizedSnowflakeDestination) SupportsStreaming() bool
- func (osd *OptimizedSnowflakeDestination) SupportsTransactions() bool
- func (osd *OptimizedSnowflakeDestination) SupportsUpsert() bool
- func (osd *OptimizedSnowflakeDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error
- func (osd *OptimizedSnowflakeDestination) Write(ctx context.Context, stream *core.RecordStream) error
- func (osd *OptimizedSnowflakeDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
- type Optimizer
- func (o *Optimizer) ExecuteParallel(ctx context.Context, items []interface{}, fn func(interface{}) error) error
- func (o *Optimizer) GetBuffer() []byte
- func (o *Optimizer) GetMetrics() *OptimizationMetrics
- func (o *Optimizer) OptimizeBatch(currentSize int, throughput float64) int
- func (o *Optimizer) OptimizeCPU()
- func (o *Optimizer) OptimizeMemory()
- func (o *Optimizer) PutBuffer(buf []byte)
- type OptimizerConfig
- type ParallelismStrategy
- type PartitioningStage
- type PipelineMetrics
- type PipelineOptimizer
- type PipelineStage
- type ProfileResult
- type Profiler
- func (p *Profiler) GenerateReport() *ProfileResult
- func (p *Profiler) GetMetrics() *Metrics
- func (p *Profiler) IncrementBytes(bytes int64)
- func (p *Profiler) IncrementErrors(count int64)
- func (p *Profiler) IncrementRecords(count int64)
- func (p *Profiler) RecordLatency(d time.Duration)
- func (p *Profiler) SetCustomMetric(key string, value interface{})
- func (p *Profiler) Start()
- func (p *Profiler) Stop() *Metrics
- type ProfilerConfig
- type ResourceMonitor
- type ResourceUsage
- type SchemaOptimizationStrategy
- type SnowflakeOptConfig
- type SortingStage
- type StringIntern
- type UnsafeStringOptimizer
- type WorkerPool
- type ZeroCopyMetrics
- type ZeroCopyOptimizer
- type ZeroCopyStrategy
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CompareMemoryProfiles ¶
func CompareMemoryProfiles(before, after *MemoryProfile) string
CompareMemoryProfiles compares two memory profiles
Types ¶
type APIOptConfig ¶
type APIOptConfig struct {
RequestBatching bool
ResponseCaching bool
CompressionEnabled bool
ParallelRequests int
CacheTTL time.Duration
}
APIOptConfig configures API optimizations
type AdaptiveStrategy ¶
type AdaptiveStrategy struct {
// contains filtered or unexported fields
}
AdaptiveStrategy adapts based on runtime metrics
func NewAdaptiveStrategy ¶
func NewAdaptiveStrategy(strategies []OptimizationStrategy) *AdaptiveStrategy
NewAdaptiveStrategy creates adaptive strategy
func (*AdaptiveStrategy) Apply ¶
func (as *AdaptiveStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies best strategy based on metrics
func (*AdaptiveStrategy) Name ¶
func (as *AdaptiveStrategy) Name() string
Name returns strategy name
func (*AdaptiveStrategy) ShouldApply ¶
func (as *AdaptiveStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply always returns true for adaptive
func (*AdaptiveStrategy) UpdateMetrics ¶
func (as *AdaptiveStrategy) UpdateMetrics(throughput float64)
UpdateMetrics updates strategy metrics
type BatchManager ¶
type BatchManager struct {
// contains filtered or unexported fields
}
BatchManager manages dynamic batch sizing
func NewBatchManager ¶
func NewBatchManager(optimalSize int, adaptive bool) *BatchManager
NewBatchManager creates a batch manager
func (*BatchManager) OptimizeSize ¶
func (bm *BatchManager) OptimizeSize(currentSize int, throughput float64) int
OptimizeSize optimizes batch size based on throughput
type BatchingStrategy ¶
type BatchingStrategy struct {
// contains filtered or unexported fields
}
BatchingStrategy optimizes batch sizes
func (*BatchingStrategy) Apply ¶
func (bs *BatchingStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies batching optimization
func (*BatchingStrategy) Name ¶
func (bs *BatchingStrategy) Name() string
Name returns strategy name
func (*BatchingStrategy) ShouldApply ¶
func (bs *BatchingStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type Benchmark ¶
type Benchmark struct {
// contains filtered or unexported fields
}
Benchmark provides benchmarking utilities
type BufferPool ¶
type BufferPool struct {
// contains filtered or unexported fields
}
BufferPool provides memory buffer pooling
func NewBufferPool ¶
func NewBufferPool(poolSize, bufferSize int) *BufferPool
NewBufferPool creates a buffer pool
type BulkLoadConfig ¶
type BulkLoadConfig struct {
TargetWarehouse string
MaxBatchSize int
MaxFileSize int64
ParallelLoaders int
StagingLocation string
// Format options
FileFormat string // csv, json, parquet, avro
Compression string // none, gzip, snappy, zstd
EnableColumnar bool
// Optimization options
EnableSorting bool
SortKeys []string
EnablePartitioning bool
PartitionKeys []string
EnableClustering bool
ClusterKeys []string
// Performance options
MemoryLimit int64
NetworkBandwidth int64 // bytes/sec
RetryAttempts int
RetryDelay time.Duration
}
BulkLoadConfig configures bulk loading
func DefaultBulkLoadConfig ¶
func DefaultBulkLoadConfig(warehouse string) *BulkLoadConfig
DefaultBulkLoadConfig returns default configuration
type BulkLoadMetrics ¶
type BulkLoadMetrics struct {
RecordsLoaded int64
BytesLoaded int64
FilesCreated int64
LoadDuration time.Duration
CompressionRatio float64
Throughput float64
Errors int64
Retries int64
}
BulkLoadMetrics tracks bulk load metrics
type BulkLoadOptimizer ¶
type BulkLoadOptimizer struct {
// contains filtered or unexported fields
}
BulkLoadOptimizer provides warehouse-specific optimizations
func NewBulkLoadOptimizer ¶
func NewBulkLoadOptimizer(warehouse string) *BulkLoadOptimizer
NewBulkLoadOptimizer creates warehouse-specific optimizer
func (*BulkLoadOptimizer) OptimizeForBigQuery ¶
func (blo *BulkLoadOptimizer) OptimizeForBigQuery()
OptimizeForBigQuery optimizes for BigQuery
func (*BulkLoadOptimizer) OptimizeForRedshift ¶
func (blo *BulkLoadOptimizer) OptimizeForRedshift()
OptimizeForRedshift optimizes for Redshift
func (*BulkLoadOptimizer) OptimizeForSnowflake ¶
func (blo *BulkLoadOptimizer) OptimizeForSnowflake()
OptimizeForSnowflake optimizes for Snowflake
type BulkLoader ¶
type BulkLoader struct {
// contains filtered or unexported fields
}
BulkLoader provides optimized bulk loading capabilities
func NewBulkLoader ¶
func NewBulkLoader(config *BulkLoadConfig) *BulkLoader
NewBulkLoader creates a new bulk loader
func (*BulkLoader) GetMetrics ¶
func (bl *BulkLoader) GetMetrics() *BulkLoadMetrics
GetMetrics returns bulk load metrics
type CDCOptConfig ¶
type CDCOptConfig struct {
SnapshotBatchSize int
StreamingBatchSize int
ParallelSnapshots int
CompressionEnabled bool
MemoryOptimization bool
}
CDCOptConfig configures CDC optimizations
type CacheOptimizer ¶
type CacheOptimizer struct {
// contains filtered or unexported fields
}
CacheOptimizer provides caching optimizations
func NewCacheOptimizer ¶
func NewCacheOptimizer(capacity int) *CacheOptimizer
NewCacheOptimizer creates a cache optimizer
func (*CacheOptimizer) Get ¶
func (co *CacheOptimizer) Get(key string) (interface{}, bool)
Get gets value from cache
func (*CacheOptimizer) GetHitRate ¶
func (co *CacheOptimizer) GetHitRate() float64
GetHitRate returns cache hit rate
func (*CacheOptimizer) Set ¶
func (co *CacheOptimizer) Set(key string, value interface{})
Set sets value in cache
type CachingStrategy ¶
type CachingStrategy struct {
// contains filtered or unexported fields
}
CachingStrategy applies caching
func (*CachingStrategy) Apply ¶
func (cs *CachingStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies caching optimization
func (*CachingStrategy) ShouldApply ¶
func (cs *CachingStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type CloudWarehouseOptConfig ¶
type CloudWarehouseOptConfig struct {
BatchSize int
ParallelLoaders int
CompressionEnabled bool
ColumnarFormat bool
StreamingInserts bool
}
CloudWarehouseOptConfig configures cloud warehouse optimizations
type ClusteringStage ¶
type ClusteringStage struct {
// contains filtered or unexported fields
}
ClusteringStage clusters records
func (*ClusteringStage) Name ¶
func (cs *ClusteringStage) Name() string
type Column ¶
type Column struct {
// contains filtered or unexported fields
}
Column represents a data column
type ColumnStore ¶
type ColumnStore struct {
// contains filtered or unexported fields
}
ColumnStore provides columnar storage optimization
func (*ColumnStore) OptimizeRecords ¶
func (cs *ColumnStore) OptimizeRecords(records []*models.Record) []*models.Record
OptimizeRecords converts records to columnar format
type ColumnarStrategy ¶
type ColumnarStrategy struct {
// contains filtered or unexported fields
}
ColumnarStrategy converts to columnar format
func (*ColumnarStrategy) Apply ¶
func (cs *ColumnarStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies columnar optimization
func (*ColumnarStrategy) Name ¶
func (cs *ColumnarStrategy) Name() string
Name returns strategy name
func (*ColumnarStrategy) ShouldApply ¶
func (cs *ColumnarStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type CompressionStrategy ¶
type CompressionStrategy struct {
// contains filtered or unexported fields
}
CompressionStrategy applies compression
func (*CompressionStrategy) Apply ¶
func (cs *CompressionStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies compression optimization
func (*CompressionStrategy) Name ¶
func (cs *CompressionStrategy) Name() string
Name returns strategy name
func (*CompressionStrategy) ShouldApply ¶
func (cs *CompressionStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type ConnectorOptConfig ¶
type ConnectorOptConfig struct {
ConnectorType string
TargetThroughput int // records/sec
MaxMemoryMB int
MaxCPUPercent float64
EnableProfiling bool
// Optimization strategies
EnableBatching bool
EnableCompression bool
EnableColumnar bool
EnableParallelism bool
EnableCaching bool
EnableZeroCopy bool
}
ConnectorOptConfig configures connector optimization
func DefaultConnectorOptConfig ¶
func DefaultConnectorOptConfig(connectorType string) *ConnectorOptConfig
DefaultConnectorOptConfig returns default configuration
type ConnectorOptimizer ¶
type ConnectorOptimizer struct {
// contains filtered or unexported fields
}
ConnectorOptimizer provides connector-specific optimizations
func NewConnectorOptimizer ¶
func NewConnectorOptimizer(config *ConnectorOptConfig) *ConnectorOptimizer
NewConnectorOptimizer creates a connector optimizer
func (*ConnectorOptimizer) ApplyOptimizations ¶
func (co *ConnectorOptimizer) ApplyOptimizations(ctx context.Context, records []*models.Record) ([]*models.Record, error)
ApplyOptimizations applies all applicable optimizations
func (*ConnectorOptimizer) GetOptimizationReport ¶
func (co *ConnectorOptimizer) GetOptimizationReport() string
GetOptimizationReport generates optimization report
func (*ConnectorOptimizer) OptimizeDestination ¶
func (co *ConnectorOptimizer) OptimizeDestination(dest core.Destination) core.Destination
OptimizeDestination optimizes a destination connector
func (*ConnectorOptimizer) OptimizeSource ¶
func (co *ConnectorOptimizer) OptimizeSource(source core.Source) core.Source
OptimizeSource optimizes a source connector
type FormatConversionStage ¶
type FormatConversionStage struct {
// contains filtered or unexported fields
}
FormatConversionStage converts to target format
func (*FormatConversionStage) Name ¶
func (fcs *FormatConversionStage) Name() string
type LatencyTracker ¶
type LatencyTracker struct {
// contains filtered or unexported fields
}
LatencyTracker tracks latency percentiles
func NewLatencyTracker ¶
func NewLatencyTracker() *LatencyTracker
NewLatencyTracker creates a latency tracker
func (*LatencyTracker) GetPercentiles ¶
func (lt *LatencyTracker) GetPercentiles() (p50, p95, p99 time.Duration)
GetPercentiles returns latency percentiles
func (*LatencyTracker) Record ¶
func (lt *LatencyTracker) Record(d time.Duration)
Record records a latency sample
type LoadStage ¶
type LoadStage interface {
Name() string
Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)
}
LoadStage represents a loading stage
type MemoryConfig ¶
type MemoryConfig struct {
EnableObjectPooling bool
ObjectPoolSize int
EnableStringInterning bool
StringInternSize int
EnableColumnStore bool
EnableCompaction bool
CompactionThreshold float64
MaxMemoryMB int
}
MemoryConfig configures memory optimization
func DefaultMemoryConfig ¶
func DefaultMemoryConfig() *MemoryConfig
DefaultMemoryConfig returns default configuration
type MemoryMetrics ¶
type MemoryMetrics struct {
AllocatedMB uint64
UsedMB uint64
ObjectsPooled int64
StringsInterned int64
CompactionCount int64
BytesSaved int64
}
MemoryMetrics tracks memory metrics
type MemoryOptimizationStage ¶
type MemoryOptimizationStage struct {
// contains filtered or unexported fields
}
MemoryOptimizationStage optimizes memory usage
func (*MemoryOptimizationStage) Name ¶
func (mos *MemoryOptimizationStage) Name() string
type MemoryOptimizer ¶
type MemoryOptimizer struct {
// contains filtered or unexported fields
}
MemoryOptimizer provides memory optimization capabilities
func NewMemoryOptimizer ¶
func NewMemoryOptimizer(config *MemoryConfig) *MemoryOptimizer
NewMemoryOptimizer creates a memory optimizer
func (*MemoryOptimizer) Compact ¶
func (mo *MemoryOptimizer) Compact()
Compact performs memory compaction
func (*MemoryOptimizer) GetMetrics ¶
func (mo *MemoryOptimizer) GetMetrics() *MemoryMetrics
GetMetrics returns memory metrics
func (*MemoryOptimizer) OptimizeRecord ¶
func (mo *MemoryOptimizer) OptimizeRecord(record *models.Record) *models.Record
OptimizeRecord optimizes a single record
func (*MemoryOptimizer) OptimizeRecords ¶
func (mo *MemoryOptimizer) OptimizeRecords(records []*models.Record) []*models.Record
OptimizeRecords optimizes memory usage for records
func (*MemoryOptimizer) ReleaseRecord ¶
func (mo *MemoryOptimizer) ReleaseRecord(record *models.Record)
ReleaseRecord returns record to pool
type MemoryProfile ¶
type MemoryProfile struct {
Timestamp int64
AllocMB uint64
TotalAllocMB uint64
SysMB uint64
NumGC uint32
PauseNs uint64
Objects int64
}
MemoryProfile represents memory profile data
func CaptureMemoryProfile ¶
func CaptureMemoryProfile() *MemoryProfile
CaptureMemoryProfile captures current memory profile
type Metrics ¶
type Metrics struct {
// Throughput metrics
RecordsProcessed int64
BytesProcessed int64
RecordsPerSecond float64
BytesPerSecond float64
// Latency metrics
MinLatency time.Duration
MaxLatency time.Duration
AvgLatency time.Duration
P50Latency time.Duration
P95Latency time.Duration
P99Latency time.Duration
// Resource metrics
CPUUsagePercent float64
MemoryUsageMB uint64
GoroutineCount int
GCCount uint32
GCPauseTotalNs uint64
// Error metrics
ErrorCount int64
RetryCount int64
// Custom metrics
CustomMetrics map[string]interface{}
}
Metrics tracks performance metrics
type ObjectPool ¶
type ObjectPool struct {
// contains filtered or unexported fields
}
ObjectPool provides object pooling
func NewObjectPool ¶
func NewObjectPool(capacity int) *ObjectPool
NewObjectPool creates an object pool
func (*ObjectPool) GetMap ¶
func (op *ObjectPool) GetMap() map[string]interface{}
GetMap gets a map from pool
func (*ObjectPool) GetRecord ¶
func (op *ObjectPool) GetRecord() *models.Record
GetRecord gets a record from pool
func (*ObjectPool) PutMap ¶
func (op *ObjectPool) PutMap(m map[string]interface{})
PutMap returns map to pool
func (*ObjectPool) PutRecord ¶
func (op *ObjectPool) PutRecord(record *models.Record)
PutRecord returns record to pool
type OptimizationMetrics ¶
type OptimizationMetrics struct {
MemoryPoolHits int64
MemoryPoolMisses int64
BatchesOptimized int64
PipelineStalls int64
ZeroCopyOperations int64
CacheHits int64
CacheMisses int64
WorkerUtilization float64
}
OptimizationMetrics tracks optimization metrics
type OptimizationStrategy ¶
type OptimizationStrategy interface {
Name() string
Apply(ctx context.Context, data interface{}) (interface{}, error)
ShouldApply(metrics *Metrics) bool
}
OptimizationStrategy defines an optimization strategy
type OptimizedAPISource ¶
type OptimizedAPISource struct {
// contains filtered or unexported fields
}
OptimizedAPISource wraps API source with optimizations
func (*OptimizedAPISource) Close ¶
func (oas *OptimizedAPISource) Close(ctx context.Context) error
Close implements source close
func (*OptimizedAPISource) GetPosition ¶
func (oas *OptimizedAPISource) GetPosition() core.Position
GetPosition delegates to underlying source
func (*OptimizedAPISource) GetState ¶
func (oas *OptimizedAPISource) GetState() core.State
GetState delegates to underlying source
func (*OptimizedAPISource) Health ¶
func (oas *OptimizedAPISource) Health(ctx context.Context) error
Health delegates to underlying source
func (*OptimizedAPISource) Initialize ¶
func (oas *OptimizedAPISource) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying source
func (*OptimizedAPISource) Metrics ¶
func (oas *OptimizedAPISource) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedAPISource) Read ¶
func (oas *OptimizedAPISource) Read(ctx context.Context) (*core.RecordStream, error)
Read implements optimized read with caching
func (*OptimizedAPISource) ReadBatch ¶
func (oas *OptimizedAPISource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
ReadBatch delegates to underlying source
func (*OptimizedAPISource) SetPosition ¶
func (oas *OptimizedAPISource) SetPosition(position core.Position) error
SetPosition delegates to underlying source
func (*OptimizedAPISource) SetState ¶
func (oas *OptimizedAPISource) SetState(state core.State) error
SetState delegates to underlying source
func (*OptimizedAPISource) Subscribe ¶
func (oas *OptimizedAPISource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
Subscribe delegates to underlying source
func (*OptimizedAPISource) SupportsBatch ¶
func (oas *OptimizedAPISource) SupportsBatch() bool
SupportsBatch delegates to underlying source
func (*OptimizedAPISource) SupportsIncremental ¶
func (oas *OptimizedAPISource) SupportsIncremental() bool
SupportsIncremental delegates to underlying source
func (*OptimizedAPISource) SupportsRealtime ¶
func (oas *OptimizedAPISource) SupportsRealtime() bool
SupportsRealtime delegates to underlying source
type OptimizedCDCSource ¶
type OptimizedCDCSource struct {
// contains filtered or unexported fields
}
OptimizedCDCSource wraps CDC source with optimizations
func (*OptimizedCDCSource) Close ¶
func (ocs *OptimizedCDCSource) Close(ctx context.Context) error
Close implements source close
func (*OptimizedCDCSource) GetPosition ¶
func (ocs *OptimizedCDCSource) GetPosition() core.Position
GetPosition delegates to underlying source
func (*OptimizedCDCSource) GetState ¶
func (ocs *OptimizedCDCSource) GetState() core.State
GetState delegates to underlying source
func (*OptimizedCDCSource) Health ¶
func (ocs *OptimizedCDCSource) Health(ctx context.Context) error
Health delegates to underlying source
func (*OptimizedCDCSource) Initialize ¶
func (ocs *OptimizedCDCSource) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying source
func (*OptimizedCDCSource) Metrics ¶
func (ocs *OptimizedCDCSource) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedCDCSource) Read ¶
func (ocs *OptimizedCDCSource) Read(ctx context.Context) (*core.RecordStream, error)
Read implements optimized read
func (*OptimizedCDCSource) ReadBatch ¶
func (ocs *OptimizedCDCSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
ReadBatch delegates to underlying source
func (*OptimizedCDCSource) SetPosition ¶
func (ocs *OptimizedCDCSource) SetPosition(position core.Position) error
SetPosition delegates to underlying source
func (*OptimizedCDCSource) SetState ¶
func (ocs *OptimizedCDCSource) SetState(state core.State) error
SetState delegates to underlying source
func (*OptimizedCDCSource) Subscribe ¶
func (ocs *OptimizedCDCSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
Subscribe delegates to underlying source
func (*OptimizedCDCSource) SupportsBatch ¶
func (ocs *OptimizedCDCSource) SupportsBatch() bool
SupportsBatch delegates to underlying source
func (*OptimizedCDCSource) SupportsIncremental ¶
func (ocs *OptimizedCDCSource) SupportsIncremental() bool
SupportsIncremental delegates to underlying source
func (*OptimizedCDCSource) SupportsRealtime ¶
func (ocs *OptimizedCDCSource) SupportsRealtime() bool
SupportsRealtime delegates to underlying source
type OptimizedCloudWarehouse ¶
type OptimizedCloudWarehouse struct {
// contains filtered or unexported fields
}
OptimizedCloudWarehouse wraps cloud warehouse with optimizations
func (*OptimizedCloudWarehouse) AlterSchema ¶
func (ocw *OptimizedCloudWarehouse) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
AlterSchema delegates to underlying destination
func (*OptimizedCloudWarehouse) BeginTransaction ¶
func (ocw *OptimizedCloudWarehouse) BeginTransaction(ctx context.Context) (core.Transaction, error)
BeginTransaction delegates to underlying destination
func (*OptimizedCloudWarehouse) BulkLoad ¶
func (ocw *OptimizedCloudWarehouse) BulkLoad(ctx context.Context, reader interface{}, format string) error
BulkLoad delegates to underlying destination
func (*OptimizedCloudWarehouse) Close ¶
func (ocw *OptimizedCloudWarehouse) Close(ctx context.Context) error
Close implements destination close
func (*OptimizedCloudWarehouse) CreateSchema ¶
CreateSchema delegates to underlying destination
func (*OptimizedCloudWarehouse) DropSchema ¶
DropSchema delegates to underlying destination
func (*OptimizedCloudWarehouse) Health ¶
func (ocw *OptimizedCloudWarehouse) Health(ctx context.Context) error
Health delegates to underlying destination
func (*OptimizedCloudWarehouse) Initialize ¶
func (ocw *OptimizedCloudWarehouse) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying destination
func (*OptimizedCloudWarehouse) Metrics ¶
func (ocw *OptimizedCloudWarehouse) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedCloudWarehouse) SupportsBatch ¶
func (ocw *OptimizedCloudWarehouse) SupportsBatch() bool
SupportsBatch delegates to underlying destination
func (*OptimizedCloudWarehouse) SupportsBulkLoad ¶
func (ocw *OptimizedCloudWarehouse) SupportsBulkLoad() bool
SupportsBulkLoad delegates to underlying destination
func (*OptimizedCloudWarehouse) SupportsStreaming ¶
func (ocw *OptimizedCloudWarehouse) SupportsStreaming() bool
SupportsStreaming delegates to underlying destination
func (*OptimizedCloudWarehouse) SupportsTransactions ¶
func (ocw *OptimizedCloudWarehouse) SupportsTransactions() bool
SupportsTransactions delegates to underlying destination
func (*OptimizedCloudWarehouse) SupportsUpsert ¶
func (ocw *OptimizedCloudWarehouse) SupportsUpsert() bool
SupportsUpsert delegates to underlying destination
func (*OptimizedCloudWarehouse) Upsert ¶
func (ocw *OptimizedCloudWarehouse) Upsert(ctx context.Context, records []*models.Record, keys []string) error
Upsert delegates to underlying destination
func (*OptimizedCloudWarehouse) Write ¶
func (ocw *OptimizedCloudWarehouse) Write(ctx context.Context, stream *core.RecordStream) error
Write implements optimized write
func (*OptimizedCloudWarehouse) WriteBatch ¶
func (ocw *OptimizedCloudWarehouse) WriteBatch(ctx context.Context, stream *core.BatchStream) error
WriteBatch delegates to underlying destination
type OptimizedGenericDestination ¶
type OptimizedGenericDestination struct {
// contains filtered or unexported fields
}
OptimizedGenericDestination wraps generic destination with basic optimizations
func (*OptimizedGenericDestination) AlterSchema ¶
func (ogd *OptimizedGenericDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
AlterSchema delegates to underlying destination
func (*OptimizedGenericDestination) BeginTransaction ¶
func (ogd *OptimizedGenericDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
BeginTransaction delegates to underlying destination
func (*OptimizedGenericDestination) BulkLoad ¶
func (ogd *OptimizedGenericDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
BulkLoad delegates to underlying destination
func (*OptimizedGenericDestination) Close ¶
func (ogd *OptimizedGenericDestination) Close(ctx context.Context) error
Close implements destination close
func (*OptimizedGenericDestination) CreateSchema ¶
func (ogd *OptimizedGenericDestination) CreateSchema(ctx context.Context, schema *core.Schema) error
CreateSchema delegates to underlying destination
func (*OptimizedGenericDestination) DropSchema ¶
DropSchema delegates to underlying destination
func (*OptimizedGenericDestination) Health ¶
func (ogd *OptimizedGenericDestination) Health(ctx context.Context) error
Health delegates to underlying destination
func (*OptimizedGenericDestination) Initialize ¶
func (ogd *OptimizedGenericDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying destination
func (*OptimizedGenericDestination) Metrics ¶
func (ogd *OptimizedGenericDestination) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedGenericDestination) SupportsBatch ¶
func (ogd *OptimizedGenericDestination) SupportsBatch() bool
SupportsBatch delegates to underlying destination
func (*OptimizedGenericDestination) SupportsBulkLoad ¶
func (ogd *OptimizedGenericDestination) SupportsBulkLoad() bool
SupportsBulkLoad delegates to underlying destination
func (*OptimizedGenericDestination) SupportsStreaming ¶
func (ogd *OptimizedGenericDestination) SupportsStreaming() bool
SupportsStreaming delegates to underlying destination
func (*OptimizedGenericDestination) SupportsTransactions ¶
func (ogd *OptimizedGenericDestination) SupportsTransactions() bool
SupportsTransactions delegates to underlying destination
func (*OptimizedGenericDestination) SupportsUpsert ¶
func (ogd *OptimizedGenericDestination) SupportsUpsert() bool
SupportsUpsert delegates to underlying destination
func (*OptimizedGenericDestination) Upsert ¶
func (ogd *OptimizedGenericDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error
Upsert delegates to underlying destination
func (*OptimizedGenericDestination) Write ¶
func (ogd *OptimizedGenericDestination) Write(ctx context.Context, stream *core.RecordStream) error
Write implements optimized write
func (*OptimizedGenericDestination) WriteBatch ¶
func (ogd *OptimizedGenericDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
WriteBatch delegates to underlying destination
type OptimizedGenericSource ¶
type OptimizedGenericSource struct {
// contains filtered or unexported fields
}
OptimizedGenericSource wraps generic source with basic optimizations
func (*OptimizedGenericSource) Close ¶
func (ogs *OptimizedGenericSource) Close(ctx context.Context) error
Close implements source close
func (*OptimizedGenericSource) GetPosition ¶
func (ogs *OptimizedGenericSource) GetPosition() core.Position
GetPosition delegates to underlying source
func (*OptimizedGenericSource) GetState ¶
func (ogs *OptimizedGenericSource) GetState() core.State
GetState delegates to underlying source
func (*OptimizedGenericSource) Health ¶
func (ogs *OptimizedGenericSource) Health(ctx context.Context) error
Health delegates to underlying source
func (*OptimizedGenericSource) Initialize ¶
func (ogs *OptimizedGenericSource) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying source
func (*OptimizedGenericSource) Metrics ¶
func (ogs *OptimizedGenericSource) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedGenericSource) Read ¶
func (ogs *OptimizedGenericSource) Read(ctx context.Context) (*core.RecordStream, error)
Read implements optimized read
func (*OptimizedGenericSource) ReadBatch ¶
func (ogs *OptimizedGenericSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
ReadBatch delegates to underlying source
func (*OptimizedGenericSource) SetPosition ¶
func (ogs *OptimizedGenericSource) SetPosition(position core.Position) error
SetPosition delegates to underlying source
func (*OptimizedGenericSource) SetState ¶
func (ogs *OptimizedGenericSource) SetState(state core.State) error
SetState delegates to underlying source
func (*OptimizedGenericSource) Subscribe ¶
func (ogs *OptimizedGenericSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
Subscribe delegates to underlying source
func (*OptimizedGenericSource) SupportsBatch ¶
func (ogs *OptimizedGenericSource) SupportsBatch() bool
SupportsBatch delegates to underlying source
func (*OptimizedGenericSource) SupportsIncremental ¶
func (ogs *OptimizedGenericSource) SupportsIncremental() bool
SupportsIncremental delegates to underlying source
func (*OptimizedGenericSource) SupportsRealtime ¶
func (ogs *OptimizedGenericSource) SupportsRealtime() bool
SupportsRealtime delegates to underlying source
type OptimizedSnowflakeDestination ¶
type OptimizedSnowflakeDestination struct {
// contains filtered or unexported fields
}
OptimizedSnowflakeDestination wraps Snowflake with optimizations
func (*OptimizedSnowflakeDestination) AlterSchema ¶
func (osd *OptimizedSnowflakeDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
AlterSchema delegates to underlying destination
func (*OptimizedSnowflakeDestination) BeginTransaction ¶
func (osd *OptimizedSnowflakeDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
BeginTransaction delegates to underlying destination
func (*OptimizedSnowflakeDestination) BulkLoad ¶
func (osd *OptimizedSnowflakeDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
BulkLoad delegates to underlying destination
func (*OptimizedSnowflakeDestination) Close ¶
func (osd *OptimizedSnowflakeDestination) Close(ctx context.Context) error
Close implements destination close
func (*OptimizedSnowflakeDestination) CreateSchema ¶
func (osd *OptimizedSnowflakeDestination) CreateSchema(ctx context.Context, schema *core.Schema) error
CreateSchema delegates to underlying destination
func (*OptimizedSnowflakeDestination) DropSchema ¶
func (osd *OptimizedSnowflakeDestination) DropSchema(ctx context.Context, schema *core.Schema) error
DropSchema delegates to underlying destination
func (*OptimizedSnowflakeDestination) Health ¶
func (osd *OptimizedSnowflakeDestination) Health(ctx context.Context) error
Health delegates to underlying destination
func (*OptimizedSnowflakeDestination) Initialize ¶
func (osd *OptimizedSnowflakeDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize delegates to underlying destination
func (*OptimizedSnowflakeDestination) Metrics ¶
func (osd *OptimizedSnowflakeDestination) Metrics() map[string]interface{}
Metrics returns combined metrics
func (*OptimizedSnowflakeDestination) SupportsBatch ¶
func (osd *OptimizedSnowflakeDestination) SupportsBatch() bool
SupportsBatch delegates to underlying destination
func (*OptimizedSnowflakeDestination) SupportsBulkLoad ¶
func (osd *OptimizedSnowflakeDestination) SupportsBulkLoad() bool
SupportsBulkLoad delegates to underlying destination
func (*OptimizedSnowflakeDestination) SupportsStreaming ¶
func (osd *OptimizedSnowflakeDestination) SupportsStreaming() bool
SupportsStreaming delegates to underlying destination
func (*OptimizedSnowflakeDestination) SupportsTransactions ¶
func (osd *OptimizedSnowflakeDestination) SupportsTransactions() bool
SupportsTransactions delegates to underlying destination
func (*OptimizedSnowflakeDestination) SupportsUpsert ¶
func (osd *OptimizedSnowflakeDestination) SupportsUpsert() bool
SupportsUpsert delegates to underlying destination
func (*OptimizedSnowflakeDestination) Upsert ¶
func (osd *OptimizedSnowflakeDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error
Upsert delegates to underlying destination
func (*OptimizedSnowflakeDestination) Write ¶
func (osd *OptimizedSnowflakeDestination) Write(ctx context.Context, stream *core.RecordStream) error
Write implements optimized write
func (*OptimizedSnowflakeDestination) WriteBatch ¶
func (osd *OptimizedSnowflakeDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
WriteBatch delegates to underlying destination
type Optimizer ¶
type Optimizer struct {
// contains filtered or unexported fields
}
Optimizer provides performance optimization capabilities
func NewOptimizer ¶
func NewOptimizer(config *OptimizerConfig) *Optimizer
NewOptimizer creates a new optimizer
func (*Optimizer) ExecuteParallel ¶
func (o *Optimizer) ExecuteParallel(ctx context.Context, items []interface{}, fn func(interface{}) error) error
ExecuteParallel executes function in parallel with optimization
func (*Optimizer) GetMetrics ¶
func (o *Optimizer) GetMetrics() *OptimizationMetrics
GetMetrics returns optimization metrics
func (*Optimizer) OptimizeBatch ¶
OptimizeBatch optimizes batch size based on performance
func (*Optimizer) OptimizeMemory ¶
func (o *Optimizer) OptimizeMemory()
OptimizeMemory optimizes memory usage
type OptimizerConfig ¶
type OptimizerConfig struct {
// Memory optimization
EnableMemoryPool bool
MemoryPoolSize int
BufferSize int
// CPU optimization
WorkerCount int
MaxGoroutines int
EnableAffinity bool
// Batch optimization
OptimalBatchSize int
AdaptiveBatching bool
BatchTimeout time.Duration
// Pipeline optimization
EnablePipelining bool
PipelineDepth int
// Zero-copy optimization
EnableZeroCopy bool
// Cache optimization
EnableCaching bool
CacheSize int
}
OptimizerConfig configures the optimizer
func DefaultOptimizerConfig ¶
func DefaultOptimizerConfig() *OptimizerConfig
DefaultOptimizerConfig returns optimized configuration
type ParallelismStrategy ¶
type ParallelismStrategy struct {
// contains filtered or unexported fields
}
ParallelismStrategy applies parallel processing
func (*ParallelismStrategy) Apply ¶
func (ps *ParallelismStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies parallelism optimization
func (*ParallelismStrategy) Name ¶
func (ps *ParallelismStrategy) Name() string
Name returns strategy name
func (*ParallelismStrategy) ShouldApply ¶
func (ps *ParallelismStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type PartitioningStage ¶
type PartitioningStage struct {
// contains filtered or unexported fields
}
PartitioningStage partitions records
func (*PartitioningStage) Name ¶
func (ps *PartitioningStage) Name() string
type PipelineMetrics ¶
type PipelineMetrics struct {
StageLatencies map[string]time.Duration
Throughput float64
Stalls int64
}
PipelineMetrics tracks pipeline metrics
type PipelineOptimizer ¶
type PipelineOptimizer struct {
// contains filtered or unexported fields
}
PipelineOptimizer optimizes data pipelines
func NewPipelineOptimizer ¶
func NewPipelineOptimizer(depth int) *PipelineOptimizer
NewPipelineOptimizer creates a pipeline optimizer
func (*PipelineOptimizer) AddStage ¶
func (po *PipelineOptimizer) AddStage(stage PipelineStage)
AddStage adds a pipeline stage
type PipelineStage ¶
type PipelineStage interface {
Process(ctx context.Context, input interface{}) (interface{}, error)
Name() string
}
PipelineStage represents a pipeline stage
type ProfileResult ¶
type ProfileResult struct {
Name string
Duration time.Duration
Metrics *Metrics
Resources *ResourceUsage
Report string
}
ProfileResult contains profiling results
type Profiler ¶
type Profiler struct {
// contains filtered or unexported fields
}
Profiler provides performance profiling capabilities
func NewProfiler ¶
func NewProfiler(config *ProfilerConfig) *Profiler
NewProfiler creates a new profiler
func (*Profiler) GenerateReport ¶
func (p *Profiler) GenerateReport() *ProfileResult
GenerateReport generates a performance report
func (*Profiler) GetMetrics ¶
GetMetrics returns current metrics
func (*Profiler) IncrementBytes ¶
IncrementBytes increments byte counter
func (*Profiler) IncrementErrors ¶
IncrementErrors increments error counter
func (*Profiler) IncrementRecords ¶
IncrementRecords increments record counter
func (*Profiler) RecordLatency ¶
RecordLatency records operation latency
func (*Profiler) SetCustomMetric ¶
SetCustomMetric sets a custom metric
type ProfilerConfig ¶
type ProfilerConfig struct {
Name string
EnableCPUProfile bool
EnableMemProfile bool
EnableTrace bool
SamplingInterval time.Duration
ResourceMonitor bool
}
ProfilerConfig configures the profiler
func DefaultProfilerConfig ¶
func DefaultProfilerConfig(name string) *ProfilerConfig
DefaultProfilerConfig returns default configuration
type ResourceMonitor ¶
type ResourceMonitor struct {
// contains filtered or unexported fields
}
ResourceMonitor monitors system resources
func NewResourceMonitor ¶
func NewResourceMonitor() *ResourceMonitor
NewResourceMonitor creates a resource monitor
func (*ResourceMonitor) GetResourceUsage ¶
func (rm *ResourceMonitor) GetResourceUsage() (*ResourceUsage, error)
GetResourceUsage returns current resource usage
type ResourceUsage ¶
type ResourceUsage struct {
CPUPercent float64
MemoryRSS uint64
MemoryVMS uint64
SystemMemoryPercent float64
SystemMemoryAvailable uint64
GoroutineCount int
ThreadCount int32
OpenFDs int32
}
ResourceUsage contains resource usage information
type SchemaOptimizationStrategy ¶
type SchemaOptimizationStrategy struct {
// contains filtered or unexported fields
}
SchemaOptimizationStrategy optimizes schema operations
func NewSchemaOptimizationStrategy ¶
func NewSchemaOptimizationStrategy() *SchemaOptimizationStrategy
NewSchemaOptimizationStrategy creates schema optimization strategy
func (*SchemaOptimizationStrategy) Apply ¶
func (sos *SchemaOptimizationStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies schema optimization
func (*SchemaOptimizationStrategy) Name ¶
func (sos *SchemaOptimizationStrategy) Name() string
Name returns strategy name
func (*SchemaOptimizationStrategy) ShouldApply ¶
func (sos *SchemaOptimizationStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply
type SnowflakeOptConfig ¶
type SnowflakeOptConfig struct {
UseStagingArea bool
FileFormat string
CompressionType string
ParallelUploads int
MicroBatchSize int
ColumnarEnabled bool
}
SnowflakeOptConfig configures Snowflake optimizations
type SortingStage ¶
type SortingStage struct {
// contains filtered or unexported fields
}
SortingStage sorts records by keys
func (*SortingStage) Name ¶
func (ss *SortingStage) Name() string
type StringIntern ¶
type StringIntern struct {
// contains filtered or unexported fields
}
StringIntern provides string interning
func NewStringIntern ¶
func NewStringIntern(capacity int) *StringIntern
NewStringIntern creates a string intern table
func (*StringIntern) GetStats ¶
func (si *StringIntern) GetStats() (size, saved int64)
GetStats returns interning statistics
func (*StringIntern) Intern ¶
func (si *StringIntern) Intern(s string) string
Intern interns a string
type UnsafeStringOptimizer ¶
type UnsafeStringOptimizer struct {
// contains filtered or unexported fields
}
UnsafeStringOptimizer provides unsafe string optimizations
func NewUnsafeStringOptimizer ¶
func NewUnsafeStringOptimizer(enabled bool) *UnsafeStringOptimizer
NewUnsafeStringOptimizer creates unsafe string optimizer
func (*UnsafeStringOptimizer) BytesToString ¶
func (uso *UnsafeStringOptimizer) BytesToString(b []byte) string
BytesToString converts bytes to string without allocation
func (*UnsafeStringOptimizer) StringToBytes ¶
func (uso *UnsafeStringOptimizer) StringToBytes(s string) []byte
StringToBytes converts string to bytes without allocation
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages worker goroutines
func NewWorkerPool ¶
func NewWorkerPool(workers, maxGoroutines int) *WorkerPool
NewWorkerPool creates a worker pool
func (*WorkerPool) Execute ¶
func (wp *WorkerPool) Execute(ctx context.Context, items []interface{}, fn func(interface{}) error) error
Execute executes tasks in parallel
func (*WorkerPool) GetUtilization ¶
func (wp *WorkerPool) GetUtilization() float64
GetUtilization returns worker utilization
type ZeroCopyMetrics ¶
ZeroCopyMetrics tracks zero-copy metrics
type ZeroCopyOptimizer ¶
type ZeroCopyOptimizer struct {
// contains filtered or unexported fields
}
ZeroCopyOptimizer provides zero-copy optimizations
func NewZeroCopyOptimizer ¶
func NewZeroCopyOptimizer(enableUnsafe bool) *ZeroCopyOptimizer
NewZeroCopyOptimizer creates a zero-copy optimizer
func (*ZeroCopyOptimizer) OptimizeRecordTransfer ¶
func (zco *ZeroCopyOptimizer) OptimizeRecordTransfer(records []*models.Record) []*models.Record
OptimizeRecordTransfer optimizes record transfer
type ZeroCopyStrategy ¶
type ZeroCopyStrategy struct {
// contains filtered or unexported fields
}
ZeroCopyStrategy applies zero-copy optimizations
func (*ZeroCopyStrategy) Apply ¶
func (zcs *ZeroCopyStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)
Apply applies zero-copy optimization
func (*ZeroCopyStrategy) Name ¶
func (zcs *ZeroCopyStrategy) Name() string
Name returns strategy name
func (*ZeroCopyStrategy) ShouldApply ¶
func (zcs *ZeroCopyStrategy) ShouldApply(metrics *Metrics) bool
ShouldApply determines if strategy should apply