performance

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package performance provides bulk loading optimizations

Package performance provides connector-specific optimizations

Package performance provides memory optimization utilities

Package performance provides optimization strategies

Package performance provides profiling and optimization tools for Nebula

Package performance provides optimization strategies

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CompareMemoryProfiles

func CompareMemoryProfiles(before, after *MemoryProfile) string

CompareMemoryProfiles compares two memory profiles

Types

type APIOptConfig

type APIOptConfig struct {
	RequestBatching    bool
	ResponseCaching    bool
	CompressionEnabled bool
	ParallelRequests   int
	CacheTTL           time.Duration
}

APIOptConfig configures API optimizations

type AdaptiveStrategy

type AdaptiveStrategy struct {
	// contains filtered or unexported fields
}

AdaptiveStrategy adapts based on runtime metrics

func NewAdaptiveStrategy

func NewAdaptiveStrategy(strategies []OptimizationStrategy) *AdaptiveStrategy

NewAdaptiveStrategy creates adaptive strategy

func (*AdaptiveStrategy) Apply

func (as *AdaptiveStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies best strategy based on metrics

func (*AdaptiveStrategy) Name

func (as *AdaptiveStrategy) Name() string

Name returns strategy name

func (*AdaptiveStrategy) ShouldApply

func (as *AdaptiveStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply always returns true for adaptive

func (*AdaptiveStrategy) UpdateMetrics

func (as *AdaptiveStrategy) UpdateMetrics(throughput float64)

UpdateMetrics updates strategy metrics

type BatchManager

type BatchManager struct {
	// contains filtered or unexported fields
}

BatchManager manages dynamic batch sizing

func NewBatchManager

func NewBatchManager(optimalSize int, adaptive bool) *BatchManager

NewBatchManager creates a batch manager

func (*BatchManager) OptimizeSize

func (bm *BatchManager) OptimizeSize(currentSize int, throughput float64) int

OptimizeSize optimizes batch size based on throughput

type BatchingStrategy

type BatchingStrategy struct {
	// contains filtered or unexported fields
}

BatchingStrategy optimizes batch sizes

func (*BatchingStrategy) Apply

func (bs *BatchingStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies batching optimization

func (*BatchingStrategy) Name

func (bs *BatchingStrategy) Name() string

Name returns strategy name

func (*BatchingStrategy) ShouldApply

func (bs *BatchingStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type Benchmark

type Benchmark struct {
	// contains filtered or unexported fields
}

Benchmark provides benchmarking utilities

func NewBenchmark

func NewBenchmark(name string) *Benchmark

NewBenchmark creates a new benchmark

func (*Benchmark) Run

func (b *Benchmark) Run(fn func() error, duration time.Duration) (*ProfileResult, error)

Run runs a benchmark function

func (*Benchmark) Stop

func (b *Benchmark) Stop()

Stop stops the benchmark

type BufferPool

type BufferPool struct {
	// contains filtered or unexported fields
}

BufferPool provides memory buffer pooling

func NewBufferPool

func NewBufferPool(poolSize, bufferSize int) *BufferPool

NewBufferPool creates a buffer pool

func (*BufferPool) Get

func (bp *BufferPool) Get() []byte

Get gets a buffer from pool

func (*BufferPool) Put

func (bp *BufferPool) Put(buf []byte)

Put returns buffer to pool

type BulkLoadConfig

type BulkLoadConfig struct {
	TargetWarehouse string
	MaxBatchSize    int
	MaxFileSize     int64
	ParallelLoaders int
	StagingLocation string

	// Format options
	FileFormat     string // csv, json, parquet, avro
	Compression    string // none, gzip, snappy, zstd
	EnableColumnar bool

	// Optimization options
	EnableSorting      bool
	SortKeys           []string
	EnablePartitioning bool
	PartitionKeys      []string
	EnableClustering   bool
	ClusterKeys        []string

	// Performance options
	MemoryLimit      int64
	NetworkBandwidth int64 // bytes/sec
	RetryAttempts    int
	RetryDelay       time.Duration
}

BulkLoadConfig configures bulk loading

func DefaultBulkLoadConfig

func DefaultBulkLoadConfig(warehouse string) *BulkLoadConfig

DefaultBulkLoadConfig returns default configuration

type BulkLoadMetrics

type BulkLoadMetrics struct {
	RecordsLoaded    int64
	BytesLoaded      int64
	FilesCreated     int64
	LoadDuration     time.Duration
	CompressionRatio float64
	Throughput       float64
	Errors           int64
	Retries          int64
}

BulkLoadMetrics tracks bulk load metrics

type BulkLoadOptimizer

type BulkLoadOptimizer struct {
	// contains filtered or unexported fields
}

BulkLoadOptimizer provides warehouse-specific optimizations

func NewBulkLoadOptimizer

func NewBulkLoadOptimizer(warehouse string) *BulkLoadOptimizer

NewBulkLoadOptimizer creates warehouse-specific optimizer

func (*BulkLoadOptimizer) Load

func (blo *BulkLoadOptimizer) Load(ctx context.Context, records []*models.Record) error

Load performs optimized bulk load

func (*BulkLoadOptimizer) OptimizeForBigQuery

func (blo *BulkLoadOptimizer) OptimizeForBigQuery()

OptimizeForBigQuery optimizes for BigQuery

func (*BulkLoadOptimizer) OptimizeForRedshift

func (blo *BulkLoadOptimizer) OptimizeForRedshift()

OptimizeForRedshift optimizes for Redshift

func (*BulkLoadOptimizer) OptimizeForSnowflake

func (blo *BulkLoadOptimizer) OptimizeForSnowflake()

OptimizeForSnowflake optimizes for Snowflake

type BulkLoader

type BulkLoader struct {
	// contains filtered or unexported fields
}

BulkLoader provides optimized bulk loading capabilities

func NewBulkLoader

func NewBulkLoader(config *BulkLoadConfig) *BulkLoader

NewBulkLoader creates a new bulk loader

func (*BulkLoader) GetMetrics

func (bl *BulkLoader) GetMetrics() *BulkLoadMetrics

GetMetrics returns bulk load metrics

func (*BulkLoader) Load

func (bl *BulkLoader) Load(ctx context.Context, records []*models.Record) error

Load performs optimized bulk loading

type CDCOptConfig

type CDCOptConfig struct {
	SnapshotBatchSize  int
	StreamingBatchSize int
	ParallelSnapshots  int
	CompressionEnabled bool
	MemoryOptimization bool
}

CDCOptConfig configures CDC optimizations

type CacheOptimizer

type CacheOptimizer struct {
	// contains filtered or unexported fields
}

CacheOptimizer provides caching optimizations

func NewCacheOptimizer

func NewCacheOptimizer(capacity int) *CacheOptimizer

NewCacheOptimizer creates a cache optimizer

func (*CacheOptimizer) Get

func (co *CacheOptimizer) Get(key string) (interface{}, bool)

Get gets value from cache

func (*CacheOptimizer) GetHitRate

func (co *CacheOptimizer) GetHitRate() float64

GetHitRate returns cache hit rate

func (*CacheOptimizer) Set

func (co *CacheOptimizer) Set(key string, value interface{})

Set sets value in cache

type CachingStrategy

type CachingStrategy struct {
	// contains filtered or unexported fields
}

CachingStrategy applies caching

func (*CachingStrategy) Apply

func (cs *CachingStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies caching optimization

func (*CachingStrategy) Name

func (cs *CachingStrategy) Name() string

Name returns strategy name

func (*CachingStrategy) ShouldApply

func (cs *CachingStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type CloudWarehouseOptConfig

type CloudWarehouseOptConfig struct {
	BatchSize          int
	ParallelLoaders    int
	CompressionEnabled bool
	ColumnarFormat     bool
	StreamingInserts   bool
}

CloudWarehouseOptConfig configures cloud warehouse optimizations

type ClusteringStage

type ClusteringStage struct {
	// contains filtered or unexported fields
}

ClusteringStage clusters records

func (*ClusteringStage) Name

func (cs *ClusteringStage) Name() string

func (*ClusteringStage) Process

func (cs *ClusteringStage) Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)

type Column

type Column struct {
	// contains filtered or unexported fields
}

Column represents a data column

type ColumnStore

type ColumnStore struct {
	// contains filtered or unexported fields
}

ColumnStore provides columnar storage optimization

func NewColumnStore

func NewColumnStore() *ColumnStore

NewColumnStore creates a column store

func (*ColumnStore) OptimizeRecords

func (cs *ColumnStore) OptimizeRecords(records []*models.Record) []*models.Record

OptimizeRecords converts records to columnar format

type ColumnarStrategy

type ColumnarStrategy struct {
	// contains filtered or unexported fields
}

ColumnarStrategy converts to columnar format

func (*ColumnarStrategy) Apply

func (cs *ColumnarStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies columnar optimization

func (*ColumnarStrategy) Name

func (cs *ColumnarStrategy) Name() string

Name returns strategy name

func (*ColumnarStrategy) ShouldApply

func (cs *ColumnarStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type CompressionStrategy

type CompressionStrategy struct {
	// contains filtered or unexported fields
}

CompressionStrategy applies compression

func (*CompressionStrategy) Apply

func (cs *CompressionStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies compression optimization

func (*CompressionStrategy) Name

func (cs *CompressionStrategy) Name() string

Name returns strategy name

func (*CompressionStrategy) ShouldApply

func (cs *CompressionStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type ConnectorOptConfig

type ConnectorOptConfig struct {
	ConnectorType    string
	TargetThroughput int // records/sec
	MaxMemoryMB      int
	MaxCPUPercent    float64
	EnableProfiling  bool

	// Optimization strategies
	EnableBatching    bool
	EnableCompression bool
	EnableColumnar    bool
	EnableParallelism bool
	EnableCaching     bool
	EnableZeroCopy    bool
}

ConnectorOptConfig configures connector optimization

func DefaultConnectorOptConfig

func DefaultConnectorOptConfig(connectorType string) *ConnectorOptConfig

DefaultConnectorOptConfig returns default configuration

type ConnectorOptimizer

type ConnectorOptimizer struct {
	// contains filtered or unexported fields
}

ConnectorOptimizer provides connector-specific optimizations

func NewConnectorOptimizer

func NewConnectorOptimizer(config *ConnectorOptConfig) *ConnectorOptimizer

NewConnectorOptimizer creates a connector optimizer

func (*ConnectorOptimizer) ApplyOptimizations

func (co *ConnectorOptimizer) ApplyOptimizations(ctx context.Context, records []*models.Record) ([]*models.Record, error)

ApplyOptimizations applies all applicable optimizations

func (*ConnectorOptimizer) GetOptimizationReport

func (co *ConnectorOptimizer) GetOptimizationReport() string

GetOptimizationReport generates optimization report

func (*ConnectorOptimizer) OptimizeDestination

func (co *ConnectorOptimizer) OptimizeDestination(dest core.Destination) core.Destination

OptimizeDestination optimizes a destination connector

func (*ConnectorOptimizer) OptimizeSource

func (co *ConnectorOptimizer) OptimizeSource(source core.Source) core.Source

OptimizeSource optimizes a source connector

type FormatConversionStage

type FormatConversionStage struct {
	// contains filtered or unexported fields
}

FormatConversionStage converts to target format

func (*FormatConversionStage) Name

func (fcs *FormatConversionStage) Name() string

func (*FormatConversionStage) Process

func (fcs *FormatConversionStage) Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)

type LatencyTracker

type LatencyTracker struct {
	// contains filtered or unexported fields
}

LatencyTracker tracks latency percentiles

func NewLatencyTracker

func NewLatencyTracker() *LatencyTracker

NewLatencyTracker creates a latency tracker

func (*LatencyTracker) GetPercentiles

func (lt *LatencyTracker) GetPercentiles() (p50, p95, p99 time.Duration)

GetPercentiles returns latency percentiles

func (*LatencyTracker) Record

func (lt *LatencyTracker) Record(d time.Duration)

Record records a latency sample

type LoadFile

type LoadFile struct {
	Name   string
	Format string
	Data   []byte
	Size   int64
}

LoadFile represents a file to load

type LoadStage

type LoadStage interface {
	Name() string
	Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)
}

LoadStage represents a loading stage

type MemoryConfig

type MemoryConfig struct {
	EnableObjectPooling   bool
	ObjectPoolSize        int
	EnableStringInterning bool
	StringInternSize      int
	EnableColumnStore     bool
	EnableCompaction      bool
	CompactionThreshold   float64
	MaxMemoryMB           int
}

MemoryConfig configures memory optimization

func DefaultMemoryConfig

func DefaultMemoryConfig() *MemoryConfig

DefaultMemoryConfig returns default configuration

type MemoryMetrics

type MemoryMetrics struct {
	AllocatedMB     uint64
	UsedMB          uint64
	ObjectsPooled   int64
	StringsInterned int64
	CompactionCount int64
	BytesSaved      int64
}

MemoryMetrics tracks memory metrics

type MemoryOptimizationStage

type MemoryOptimizationStage struct {
	// contains filtered or unexported fields
}

MemoryOptimizationStage optimizes memory usage

func (*MemoryOptimizationStage) Name

func (mos *MemoryOptimizationStage) Name() string

func (*MemoryOptimizationStage) Process

func (mos *MemoryOptimizationStage) Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)

type MemoryOptimizer

type MemoryOptimizer struct {
	// contains filtered or unexported fields
}

MemoryOptimizer provides memory optimization capabilities

func NewMemoryOptimizer

func NewMemoryOptimizer(config *MemoryConfig) *MemoryOptimizer

NewMemoryOptimizer creates a memory optimizer

func (*MemoryOptimizer) Compact

func (mo *MemoryOptimizer) Compact()

Compact performs memory compaction

func (*MemoryOptimizer) GetMetrics

func (mo *MemoryOptimizer) GetMetrics() *MemoryMetrics

GetMetrics returns memory metrics

func (*MemoryOptimizer) OptimizeRecord

func (mo *MemoryOptimizer) OptimizeRecord(record *models.Record) *models.Record

OptimizeRecord optimizes a single record

func (*MemoryOptimizer) OptimizeRecords

func (mo *MemoryOptimizer) OptimizeRecords(records []*models.Record) []*models.Record

OptimizeRecords optimizes memory usage for records

func (*MemoryOptimizer) ReleaseRecord

func (mo *MemoryOptimizer) ReleaseRecord(record *models.Record)

ReleaseRecord returns record to pool

type MemoryProfile

type MemoryProfile struct {
	Timestamp    int64
	AllocMB      uint64
	TotalAllocMB uint64
	SysMB        uint64
	NumGC        uint32
	PauseNs      uint64
	Objects      int64
}

MemoryProfile represents memory profile data

func CaptureMemoryProfile

func CaptureMemoryProfile() *MemoryProfile

CaptureMemoryProfile captures current memory profile

type Metrics

type Metrics struct {
	// Throughput metrics
	RecordsProcessed int64
	BytesProcessed   int64
	RecordsPerSecond float64
	BytesPerSecond   float64

	// Latency metrics
	MinLatency time.Duration
	MaxLatency time.Duration
	AvgLatency time.Duration
	P50Latency time.Duration
	P95Latency time.Duration
	P99Latency time.Duration

	// Resource metrics
	CPUUsagePercent float64
	MemoryUsageMB   uint64
	GoroutineCount  int
	GCCount         uint32
	GCPauseTotalNs  uint64

	// Error metrics
	ErrorCount int64
	RetryCount int64

	// Custom metrics
	CustomMetrics map[string]interface{}
}

Metrics tracks performance metrics

type ObjectPool

type ObjectPool struct {
	// contains filtered or unexported fields
}

ObjectPool provides object pooling

func NewObjectPool

func NewObjectPool(capacity int) *ObjectPool

NewObjectPool creates an object pool

func (*ObjectPool) GetMap

func (op *ObjectPool) GetMap() map[string]interface{}

GetMap gets a map from pool

func (*ObjectPool) GetRecord

func (op *ObjectPool) GetRecord() *models.Record

GetRecord gets a record from pool

func (*ObjectPool) PutMap

func (op *ObjectPool) PutMap(m map[string]interface{})

PutMap returns map to pool

func (*ObjectPool) PutRecord

func (op *ObjectPool) PutRecord(record *models.Record)

PutRecord returns record to pool

type OptimizationMetrics

type OptimizationMetrics struct {
	MemoryPoolHits     int64
	MemoryPoolMisses   int64
	BatchesOptimized   int64
	PipelineStalls     int64
	ZeroCopyOperations int64
	CacheHits          int64
	CacheMisses        int64
	WorkerUtilization  float64
}

OptimizationMetrics tracks optimization metrics

type OptimizationStrategy

type OptimizationStrategy interface {
	Name() string
	Apply(ctx context.Context, data interface{}) (interface{}, error)
	ShouldApply(metrics *Metrics) bool
}

OptimizationStrategy defines an optimization strategy

type OptimizedAPISource

type OptimizedAPISource struct {
	// contains filtered or unexported fields
}

OptimizedAPISource wraps API source with optimizations

func (*OptimizedAPISource) Close

func (oas *OptimizedAPISource) Close(ctx context.Context) error

Close implements source close

func (*OptimizedAPISource) Discover

func (oas *OptimizedAPISource) Discover(ctx context.Context) (*core.Schema, error)

Discover delegates to underlying source

func (*OptimizedAPISource) GetPosition

func (oas *OptimizedAPISource) GetPosition() core.Position

GetPosition delegates to underlying source

func (*OptimizedAPISource) GetState

func (oas *OptimizedAPISource) GetState() core.State

GetState delegates to underlying source

func (*OptimizedAPISource) Health

func (oas *OptimizedAPISource) Health(ctx context.Context) error

Health delegates to underlying source

func (*OptimizedAPISource) Initialize

func (oas *OptimizedAPISource) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying source

func (*OptimizedAPISource) Metrics

func (oas *OptimizedAPISource) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedAPISource) Read

Read implements optimized read with caching

func (*OptimizedAPISource) ReadBatch

func (oas *OptimizedAPISource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)

ReadBatch delegates to underlying source

func (*OptimizedAPISource) SetPosition

func (oas *OptimizedAPISource) SetPosition(position core.Position) error

SetPosition delegates to underlying source

func (*OptimizedAPISource) SetState

func (oas *OptimizedAPISource) SetState(state core.State) error

SetState delegates to underlying source

func (*OptimizedAPISource) Subscribe

func (oas *OptimizedAPISource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)

Subscribe delegates to underlying source

func (*OptimizedAPISource) SupportsBatch

func (oas *OptimizedAPISource) SupportsBatch() bool

SupportsBatch delegates to underlying source

func (*OptimizedAPISource) SupportsIncremental

func (oas *OptimizedAPISource) SupportsIncremental() bool

SupportsIncremental delegates to underlying source

func (*OptimizedAPISource) SupportsRealtime

func (oas *OptimizedAPISource) SupportsRealtime() bool

SupportsRealtime delegates to underlying source

type OptimizedCDCSource

type OptimizedCDCSource struct {
	// contains filtered or unexported fields
}

OptimizedCDCSource wraps CDC source with optimizations

func (*OptimizedCDCSource) Close

func (ocs *OptimizedCDCSource) Close(ctx context.Context) error

Close implements source close

func (*OptimizedCDCSource) Discover

func (ocs *OptimizedCDCSource) Discover(ctx context.Context) (*core.Schema, error)

Discover delegates to underlying source

func (*OptimizedCDCSource) GetPosition

func (ocs *OptimizedCDCSource) GetPosition() core.Position

GetPosition delegates to underlying source

func (*OptimizedCDCSource) GetState

func (ocs *OptimizedCDCSource) GetState() core.State

GetState delegates to underlying source

func (*OptimizedCDCSource) Health

func (ocs *OptimizedCDCSource) Health(ctx context.Context) error

Health delegates to underlying source

func (*OptimizedCDCSource) Initialize

func (ocs *OptimizedCDCSource) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying source

func (*OptimizedCDCSource) Metrics

func (ocs *OptimizedCDCSource) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedCDCSource) Read

Read implements optimized read

func (*OptimizedCDCSource) ReadBatch

func (ocs *OptimizedCDCSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)

ReadBatch delegates to underlying source

func (*OptimizedCDCSource) SetPosition

func (ocs *OptimizedCDCSource) SetPosition(position core.Position) error

SetPosition delegates to underlying source

func (*OptimizedCDCSource) SetState

func (ocs *OptimizedCDCSource) SetState(state core.State) error

SetState delegates to underlying source

func (*OptimizedCDCSource) Subscribe

func (ocs *OptimizedCDCSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)

Subscribe delegates to underlying source

func (*OptimizedCDCSource) SupportsBatch

func (ocs *OptimizedCDCSource) SupportsBatch() bool

SupportsBatch delegates to underlying source

func (*OptimizedCDCSource) SupportsIncremental

func (ocs *OptimizedCDCSource) SupportsIncremental() bool

SupportsIncremental delegates to underlying source

func (*OptimizedCDCSource) SupportsRealtime

func (ocs *OptimizedCDCSource) SupportsRealtime() bool

SupportsRealtime delegates to underlying source

type OptimizedCloudWarehouse

type OptimizedCloudWarehouse struct {
	// contains filtered or unexported fields
}

OptimizedCloudWarehouse wraps cloud warehouse with optimizations

func (*OptimizedCloudWarehouse) AlterSchema

func (ocw *OptimizedCloudWarehouse) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error

AlterSchema delegates to underlying destination

func (*OptimizedCloudWarehouse) BeginTransaction

func (ocw *OptimizedCloudWarehouse) BeginTransaction(ctx context.Context) (core.Transaction, error)

BeginTransaction delegates to underlying destination

func (*OptimizedCloudWarehouse) BulkLoad

func (ocw *OptimizedCloudWarehouse) BulkLoad(ctx context.Context, reader interface{}, format string) error

BulkLoad delegates to underlying destination

func (*OptimizedCloudWarehouse) Close

func (ocw *OptimizedCloudWarehouse) Close(ctx context.Context) error

Close implements destination close

func (*OptimizedCloudWarehouse) CreateSchema

func (ocw *OptimizedCloudWarehouse) CreateSchema(ctx context.Context, schema *core.Schema) error

CreateSchema delegates to underlying destination

func (*OptimizedCloudWarehouse) DropSchema

func (ocw *OptimizedCloudWarehouse) DropSchema(ctx context.Context, schema *core.Schema) error

DropSchema delegates to underlying destination

func (*OptimizedCloudWarehouse) Health

func (ocw *OptimizedCloudWarehouse) Health(ctx context.Context) error

Health delegates to underlying destination

func (*OptimizedCloudWarehouse) Initialize

func (ocw *OptimizedCloudWarehouse) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying destination

func (*OptimizedCloudWarehouse) Metrics

func (ocw *OptimizedCloudWarehouse) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedCloudWarehouse) SupportsBatch

func (ocw *OptimizedCloudWarehouse) SupportsBatch() bool

SupportsBatch delegates to underlying destination

func (*OptimizedCloudWarehouse) SupportsBulkLoad

func (ocw *OptimizedCloudWarehouse) SupportsBulkLoad() bool

SupportsBulkLoad delegates to underlying destination

func (*OptimizedCloudWarehouse) SupportsStreaming

func (ocw *OptimizedCloudWarehouse) SupportsStreaming() bool

SupportsStreaming delegates to underlying destination

func (*OptimizedCloudWarehouse) SupportsTransactions

func (ocw *OptimizedCloudWarehouse) SupportsTransactions() bool

SupportsTransactions delegates to underlying destination

func (*OptimizedCloudWarehouse) SupportsUpsert

func (ocw *OptimizedCloudWarehouse) SupportsUpsert() bool

SupportsUpsert delegates to underlying destination

func (*OptimizedCloudWarehouse) Upsert

func (ocw *OptimizedCloudWarehouse) Upsert(ctx context.Context, records []*models.Record, keys []string) error

Upsert delegates to underlying destination

func (*OptimizedCloudWarehouse) Write

func (ocw *OptimizedCloudWarehouse) Write(ctx context.Context, stream *core.RecordStream) error

Write implements optimized write

func (*OptimizedCloudWarehouse) WriteBatch

func (ocw *OptimizedCloudWarehouse) WriteBatch(ctx context.Context, stream *core.BatchStream) error

WriteBatch delegates to underlying destination

type OptimizedGenericDestination

type OptimizedGenericDestination struct {
	// contains filtered or unexported fields
}

OptimizedGenericDestination wraps generic destination with basic optimizations

func (*OptimizedGenericDestination) AlterSchema

func (ogd *OptimizedGenericDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error

AlterSchema delegates to underlying destination

func (*OptimizedGenericDestination) BeginTransaction

func (ogd *OptimizedGenericDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)

BeginTransaction delegates to underlying destination

func (*OptimizedGenericDestination) BulkLoad

func (ogd *OptimizedGenericDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error

BulkLoad delegates to underlying destination

func (*OptimizedGenericDestination) Close

Close implements destination close

func (*OptimizedGenericDestination) CreateSchema

func (ogd *OptimizedGenericDestination) CreateSchema(ctx context.Context, schema *core.Schema) error

CreateSchema delegates to underlying destination

func (*OptimizedGenericDestination) DropSchema

func (ogd *OptimizedGenericDestination) DropSchema(ctx context.Context, schema *core.Schema) error

DropSchema delegates to underlying destination

func (*OptimizedGenericDestination) Health

Health delegates to underlying destination

func (*OptimizedGenericDestination) Initialize

func (ogd *OptimizedGenericDestination) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying destination

func (*OptimizedGenericDestination) Metrics

func (ogd *OptimizedGenericDestination) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedGenericDestination) SupportsBatch

func (ogd *OptimizedGenericDestination) SupportsBatch() bool

SupportsBatch delegates to underlying destination

func (*OptimizedGenericDestination) SupportsBulkLoad

func (ogd *OptimizedGenericDestination) SupportsBulkLoad() bool

SupportsBulkLoad delegates to underlying destination

func (*OptimizedGenericDestination) SupportsStreaming

func (ogd *OptimizedGenericDestination) SupportsStreaming() bool

SupportsStreaming delegates to underlying destination

func (*OptimizedGenericDestination) SupportsTransactions

func (ogd *OptimizedGenericDestination) SupportsTransactions() bool

SupportsTransactions delegates to underlying destination

func (*OptimizedGenericDestination) SupportsUpsert

func (ogd *OptimizedGenericDestination) SupportsUpsert() bool

SupportsUpsert delegates to underlying destination

func (*OptimizedGenericDestination) Upsert

func (ogd *OptimizedGenericDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error

Upsert delegates to underlying destination

func (*OptimizedGenericDestination) Write

Write implements optimized write

func (*OptimizedGenericDestination) WriteBatch

func (ogd *OptimizedGenericDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error

WriteBatch delegates to underlying destination

type OptimizedGenericSource

type OptimizedGenericSource struct {
	// contains filtered or unexported fields
}

OptimizedGenericSource wraps generic source with basic optimizations

func (*OptimizedGenericSource) Close

func (ogs *OptimizedGenericSource) Close(ctx context.Context) error

Close implements source close

func (*OptimizedGenericSource) Discover

func (ogs *OptimizedGenericSource) Discover(ctx context.Context) (*core.Schema, error)

Discover delegates to underlying source

func (*OptimizedGenericSource) GetPosition

func (ogs *OptimizedGenericSource) GetPosition() core.Position

GetPosition delegates to underlying source

func (*OptimizedGenericSource) GetState

func (ogs *OptimizedGenericSource) GetState() core.State

GetState delegates to underlying source

func (*OptimizedGenericSource) Health

func (ogs *OptimizedGenericSource) Health(ctx context.Context) error

Health delegates to underlying source

func (*OptimizedGenericSource) Initialize

func (ogs *OptimizedGenericSource) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying source

func (*OptimizedGenericSource) Metrics

func (ogs *OptimizedGenericSource) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedGenericSource) Read

Read implements optimized read

func (*OptimizedGenericSource) ReadBatch

func (ogs *OptimizedGenericSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)

ReadBatch delegates to underlying source

func (*OptimizedGenericSource) SetPosition

func (ogs *OptimizedGenericSource) SetPosition(position core.Position) error

SetPosition delegates to underlying source

func (*OptimizedGenericSource) SetState

func (ogs *OptimizedGenericSource) SetState(state core.State) error

SetState delegates to underlying source

func (*OptimizedGenericSource) Subscribe

func (ogs *OptimizedGenericSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)

Subscribe delegates to underlying source

func (*OptimizedGenericSource) SupportsBatch

func (ogs *OptimizedGenericSource) SupportsBatch() bool

SupportsBatch delegates to underlying source

func (*OptimizedGenericSource) SupportsIncremental

func (ogs *OptimizedGenericSource) SupportsIncremental() bool

SupportsIncremental delegates to underlying source

func (*OptimizedGenericSource) SupportsRealtime

func (ogs *OptimizedGenericSource) SupportsRealtime() bool

SupportsRealtime delegates to underlying source

type OptimizedSnowflakeDestination

type OptimizedSnowflakeDestination struct {
	// contains filtered or unexported fields
}

OptimizedSnowflakeDestination wraps Snowflake with optimizations

func (*OptimizedSnowflakeDestination) AlterSchema

func (osd *OptimizedSnowflakeDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error

AlterSchema delegates to underlying destination

func (*OptimizedSnowflakeDestination) BeginTransaction

func (osd *OptimizedSnowflakeDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)

BeginTransaction delegates to underlying destination

func (*OptimizedSnowflakeDestination) BulkLoad

func (osd *OptimizedSnowflakeDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error

BulkLoad delegates to underlying destination

func (*OptimizedSnowflakeDestination) Close

Close implements destination close

func (*OptimizedSnowflakeDestination) CreateSchema

func (osd *OptimizedSnowflakeDestination) CreateSchema(ctx context.Context, schema *core.Schema) error

CreateSchema delegates to underlying destination

func (*OptimizedSnowflakeDestination) DropSchema

func (osd *OptimizedSnowflakeDestination) DropSchema(ctx context.Context, schema *core.Schema) error

DropSchema delegates to underlying destination

func (*OptimizedSnowflakeDestination) Health

Health delegates to underlying destination

func (*OptimizedSnowflakeDestination) Initialize

func (osd *OptimizedSnowflakeDestination) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize delegates to underlying destination

func (*OptimizedSnowflakeDestination) Metrics

func (osd *OptimizedSnowflakeDestination) Metrics() map[string]interface{}

Metrics returns combined metrics

func (*OptimizedSnowflakeDestination) SupportsBatch

func (osd *OptimizedSnowflakeDestination) SupportsBatch() bool

SupportsBatch delegates to underlying destination

func (*OptimizedSnowflakeDestination) SupportsBulkLoad

func (osd *OptimizedSnowflakeDestination) SupportsBulkLoad() bool

SupportsBulkLoad delegates to underlying destination

func (*OptimizedSnowflakeDestination) SupportsStreaming

func (osd *OptimizedSnowflakeDestination) SupportsStreaming() bool

SupportsStreaming delegates to underlying destination

func (*OptimizedSnowflakeDestination) SupportsTransactions

func (osd *OptimizedSnowflakeDestination) SupportsTransactions() bool

SupportsTransactions delegates to underlying destination

func (*OptimizedSnowflakeDestination) SupportsUpsert

func (osd *OptimizedSnowflakeDestination) SupportsUpsert() bool

SupportsUpsert delegates to underlying destination

func (*OptimizedSnowflakeDestination) Upsert

func (osd *OptimizedSnowflakeDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error

Upsert delegates to underlying destination

func (*OptimizedSnowflakeDestination) Write

Write implements optimized write

func (*OptimizedSnowflakeDestination) WriteBatch

func (osd *OptimizedSnowflakeDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error

WriteBatch delegates to underlying destination

type Optimizer

type Optimizer struct {
	// contains filtered or unexported fields
}

Optimizer provides performance optimization capabilities

func NewOptimizer

func NewOptimizer(config *OptimizerConfig) *Optimizer

NewOptimizer creates a new optimizer

func (*Optimizer) ExecuteParallel

func (o *Optimizer) ExecuteParallel(ctx context.Context, items []interface{}, fn func(interface{}) error) error

ExecuteParallel executes function in parallel with optimization

func (*Optimizer) GetBuffer

func (o *Optimizer) GetBuffer() []byte

GetBuffer gets an optimized buffer

func (*Optimizer) GetMetrics

func (o *Optimizer) GetMetrics() *OptimizationMetrics

GetMetrics returns optimization metrics

func (*Optimizer) OptimizeBatch

func (o *Optimizer) OptimizeBatch(currentSize int, throughput float64) int

OptimizeBatch optimizes batch size based on performance

func (*Optimizer) OptimizeCPU

func (o *Optimizer) OptimizeCPU()

OptimizeCPU optimizes CPU usage

func (*Optimizer) OptimizeMemory

func (o *Optimizer) OptimizeMemory()

OptimizeMemory optimizes memory usage

func (*Optimizer) PutBuffer

func (o *Optimizer) PutBuffer(buf []byte)

PutBuffer returns a buffer to the pool

type OptimizerConfig

type OptimizerConfig struct {
	// Memory optimization
	EnableMemoryPool bool
	MemoryPoolSize   int
	BufferSize       int

	// CPU optimization
	WorkerCount    int
	MaxGoroutines  int
	EnableAffinity bool

	// Batch optimization
	OptimalBatchSize int
	AdaptiveBatching bool
	BatchTimeout     time.Duration

	// Pipeline optimization
	EnablePipelining bool
	PipelineDepth    int

	// Zero-copy optimization
	EnableZeroCopy bool

	// Cache optimization
	EnableCaching bool
	CacheSize     int
}

OptimizerConfig configures the optimizer

func DefaultOptimizerConfig

func DefaultOptimizerConfig() *OptimizerConfig

DefaultOptimizerConfig returns optimized configuration

type ParallelismStrategy

type ParallelismStrategy struct {
	// contains filtered or unexported fields
}

ParallelismStrategy applies parallel processing

func (*ParallelismStrategy) Apply

func (ps *ParallelismStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies parallelism optimization

func (*ParallelismStrategy) Name

func (ps *ParallelismStrategy) Name() string

Name returns strategy name

func (*ParallelismStrategy) ShouldApply

func (ps *ParallelismStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type PartitioningStage

type PartitioningStage struct {
	// contains filtered or unexported fields
}

PartitioningStage partitions records

func (*PartitioningStage) Name

func (ps *PartitioningStage) Name() string

func (*PartitioningStage) Process

func (ps *PartitioningStage) Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)

type PipelineMetrics

type PipelineMetrics struct {
	StageLatencies map[string]time.Duration
	Throughput     float64
	Stalls         int64
}

PipelineMetrics tracks pipeline metrics

type PipelineOptimizer

type PipelineOptimizer struct {
	// contains filtered or unexported fields
}

PipelineOptimizer optimizes data pipelines

func NewPipelineOptimizer

func NewPipelineOptimizer(depth int) *PipelineOptimizer

NewPipelineOptimizer creates a pipeline optimizer

func (*PipelineOptimizer) AddStage

func (po *PipelineOptimizer) AddStage(stage PipelineStage)

AddStage adds a pipeline stage

func (*PipelineOptimizer) Execute

func (po *PipelineOptimizer) Execute(ctx context.Context, inputs []interface{}) ([]interface{}, error)

Execute executes the pipeline

type PipelineStage

type PipelineStage interface {
	Process(ctx context.Context, input interface{}) (interface{}, error)
	Name() string
}

PipelineStage represents a pipeline stage

type ProfileResult

type ProfileResult struct {
	Name      string
	Duration  time.Duration
	Metrics   *Metrics
	Resources *ResourceUsage
	Report    string
}

ProfileResult contains profiling results

type Profiler

type Profiler struct {
	// contains filtered or unexported fields
}

Profiler provides performance profiling capabilities

func NewProfiler

func NewProfiler(config *ProfilerConfig) *Profiler

NewProfiler creates a new profiler

func (*Profiler) GenerateReport

func (p *Profiler) GenerateReport() *ProfileResult

GenerateReport generates a performance report

func (*Profiler) GetMetrics

func (p *Profiler) GetMetrics() *Metrics

GetMetrics returns current metrics

func (*Profiler) IncrementBytes

func (p *Profiler) IncrementBytes(bytes int64)

IncrementBytes increments byte counter

func (*Profiler) IncrementErrors

func (p *Profiler) IncrementErrors(count int64)

IncrementErrors increments error counter

func (*Profiler) IncrementRecords

func (p *Profiler) IncrementRecords(count int64)

IncrementRecords increments record counter

func (*Profiler) RecordLatency

func (p *Profiler) RecordLatency(d time.Duration)

RecordLatency records operation latency

func (*Profiler) SetCustomMetric

func (p *Profiler) SetCustomMetric(key string, value interface{})

SetCustomMetric sets a custom metric

func (*Profiler) Start

func (p *Profiler) Start()

Start begins profiling

func (*Profiler) Stop

func (p *Profiler) Stop() *Metrics

Stop stops profiling and returns metrics

type ProfilerConfig

type ProfilerConfig struct {
	Name             string
	EnableCPUProfile bool
	EnableMemProfile bool
	EnableTrace      bool
	SamplingInterval time.Duration
	ResourceMonitor  bool
}

ProfilerConfig configures the profiler

func DefaultProfilerConfig

func DefaultProfilerConfig(name string) *ProfilerConfig

DefaultProfilerConfig returns default configuration

type ResourceMonitor

type ResourceMonitor struct {
	// contains filtered or unexported fields
}

ResourceMonitor monitors system resources

func NewResourceMonitor

func NewResourceMonitor() *ResourceMonitor

NewResourceMonitor creates a resource monitor

func (*ResourceMonitor) GetResourceUsage

func (rm *ResourceMonitor) GetResourceUsage() (*ResourceUsage, error)

GetResourceUsage returns current resource usage

type ResourceUsage

type ResourceUsage struct {
	CPUPercent            float64
	MemoryRSS             uint64
	MemoryVMS             uint64
	SystemMemoryPercent   float64
	SystemMemoryAvailable uint64
	GoroutineCount        int
	ThreadCount           int32
	OpenFDs               int32
}

ResourceUsage contains resource usage information

type SchemaOptimizationStrategy

type SchemaOptimizationStrategy struct {
	// contains filtered or unexported fields
}

SchemaOptimizationStrategy optimizes schema operations

func NewSchemaOptimizationStrategy

func NewSchemaOptimizationStrategy() *SchemaOptimizationStrategy

NewSchemaOptimizationStrategy creates schema optimization strategy

func (*SchemaOptimizationStrategy) Apply

func (sos *SchemaOptimizationStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies schema optimization

func (*SchemaOptimizationStrategy) Name

func (sos *SchemaOptimizationStrategy) Name() string

Name returns strategy name

func (*SchemaOptimizationStrategy) ShouldApply

func (sos *SchemaOptimizationStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

type SnowflakeOptConfig

type SnowflakeOptConfig struct {
	UseStagingArea  bool
	FileFormat      string
	CompressionType string
	ParallelUploads int
	MicroBatchSize  int
	ColumnarEnabled bool
}

SnowflakeOptConfig configures Snowflake optimizations

type SortingStage

type SortingStage struct {
	// contains filtered or unexported fields
}

SortingStage sorts records by keys

func (*SortingStage) Name

func (ss *SortingStage) Name() string

func (*SortingStage) Process

func (ss *SortingStage) Process(ctx context.Context, records []*models.Record) ([]*models.Record, error)

type StringIntern

type StringIntern struct {
	// contains filtered or unexported fields
}

StringIntern provides string interning

func NewStringIntern

func NewStringIntern(capacity int) *StringIntern

NewStringIntern creates a string intern table

func (*StringIntern) GetStats

func (si *StringIntern) GetStats() (size, saved int64)

GetStats returns interning statistics

func (*StringIntern) Intern

func (si *StringIntern) Intern(s string) string

Intern interns a string

type UnsafeStringOptimizer

type UnsafeStringOptimizer struct {
	// contains filtered or unexported fields
}

UnsafeStringOptimizer provides unsafe string optimizations

func NewUnsafeStringOptimizer

func NewUnsafeStringOptimizer(enabled bool) *UnsafeStringOptimizer

NewUnsafeStringOptimizer creates unsafe string optimizer

func (*UnsafeStringOptimizer) BytesToString

func (uso *UnsafeStringOptimizer) BytesToString(b []byte) string

BytesToString converts bytes to string without allocation

func (*UnsafeStringOptimizer) StringToBytes

func (uso *UnsafeStringOptimizer) StringToBytes(s string) []byte

StringToBytes converts string to bytes without allocation

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages worker goroutines

func NewWorkerPool

func NewWorkerPool(workers, maxGoroutines int) *WorkerPool

NewWorkerPool creates a worker pool

func (*WorkerPool) Execute

func (wp *WorkerPool) Execute(ctx context.Context, items []interface{}, fn func(interface{}) error) error

Execute executes tasks in parallel

func (*WorkerPool) GetUtilization

func (wp *WorkerPool) GetUtilization() float64

GetUtilization returns worker utilization

type ZeroCopyMetrics

type ZeroCopyMetrics struct {
	Operations int64
	BytesSaved int64
}

ZeroCopyMetrics tracks zero-copy metrics

type ZeroCopyOptimizer

type ZeroCopyOptimizer struct {
	// contains filtered or unexported fields
}

ZeroCopyOptimizer provides zero-copy optimizations

func NewZeroCopyOptimizer

func NewZeroCopyOptimizer(enableUnsafe bool) *ZeroCopyOptimizer

NewZeroCopyOptimizer creates a zero-copy optimizer

func (*ZeroCopyOptimizer) OptimizeRecordTransfer

func (zco *ZeroCopyOptimizer) OptimizeRecordTransfer(records []*models.Record) []*models.Record

OptimizeRecordTransfer optimizes record transfer

type ZeroCopyStrategy

type ZeroCopyStrategy struct {
	// contains filtered or unexported fields
}

ZeroCopyStrategy applies zero-copy optimizations

func (*ZeroCopyStrategy) Apply

func (zcs *ZeroCopyStrategy) Apply(ctx context.Context, data interface{}) (interface{}, error)

Apply applies zero-copy optimization

func (*ZeroCopyStrategy) Name

func (zcs *ZeroCopyStrategy) Name() string

Name returns strategy name

func (*ZeroCopyStrategy) ShouldApply

func (zcs *ZeroCopyStrategy) ShouldApply(metrics *Metrics) bool

ShouldApply determines if strategy should apply

Directories

Path Synopsis
Package examples demonstrates complete optimization workflow
Package examples demonstrates complete optimization workflow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL