Documentation
¶
Index ¶
- type BatchBuilder
- type BatchStream
- type ChangeEvent
- type ChangeStream
- type ChangeType
- type CircuitBreaker
- type ConnectionPool
- type Connector
- type ConnectorFactory
- type ConnectorMetadata
- type ConnectorRegistry
- type ConnectorType
- type DataQualityChecker
- type Destination
- type ErrorHandler
- type Field
- type FieldType
- type FilterFunc
- type HealthStatus
- type Metric
- type MetricType
- type Optimizer
- type Pipeline
- type PoolStats
- type Position
- type PositionManager
- type ProgressReporter
- type RateLimiter
- type RecordStream
- type Schema
- type SchemaRegistry
- type Source
- type State
- type StreamController
- type Transaction
- type TransformFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchBuilder ¶
type BatchBuilder interface {
Add(record *pool.Record) error
Build() ([]*pool.Record, error)
Reset()
Size() int
IsFull() bool
}
BatchBuilder helps build record batches efficiently
type BatchStream ¶
BatchStream represents a stream of record batches
type ChangeEvent ¶
type ChangeEvent struct {
Type ChangeType
Table string
Timestamp time.Time
Position Position
Before map[string]interface{}
After map[string]interface{}
}
ChangeEvent represents a change data capture event
type ChangeStream ¶
type ChangeStream struct {
Changes <-chan *ChangeEvent
Errors <-chan error
}
ChangeStream represents a stream of change events (for CDC)
type ChangeType ¶
type ChangeType string
ChangeType represents the type of change
const ( ChangeTypeInsert ChangeType = "insert" ChangeTypeUpdate ChangeType = "update" ChangeTypeDelete ChangeType = "delete" )
type CircuitBreaker ¶
CircuitBreaker provides circuit breaker pattern
type ConnectionPool ¶
type ConnectionPool interface {
Get(ctx context.Context) (interface{}, error)
Put(conn interface{}) error
Close() error
Stats() PoolStats
}
ConnectionPool manages connection pooling
type Connector ¶
type Connector interface {
// Metadata
Name() string
Type() ConnectorType
Version() string
// Lifecycle
Initialize(ctx context.Context, config *config.BaseConfig) error
Close(ctx context.Context) error
// Health and monitoring
Health(ctx context.Context) error
Metrics() map[string]interface{}
}
Connector is the base interface for all connectors
type ConnectorFactory ¶
type ConnectorFactory interface {
CreateSource(connectorType string, config *config.BaseConfig) (Source, error)
CreateDestination(connectorType string, config *config.BaseConfig) (Destination, error)
ListSources() []string
ListDestinations() []string
}
ConnectorFactory creates connector instances
type ConnectorMetadata ¶
type ConnectorMetadata struct {
Name string `json:"name"`
Type ConnectorType `json:"type"`
Version string `json:"version"`
Description string `json:"description"`
Author string `json:"author"`
Documentation string `json:"documentation"`
Capabilities []string `json:"capabilities"`
ConfigSchema map[string]interface{} `json:"config_schema"`
}
ConnectorMetadata provides metadata about a connector
type ConnectorRegistry ¶
type ConnectorRegistry interface {
Register(name string, factory func() Connector) error
Get(name string) (Connector, error)
List() []ConnectorMetadata
Exists(name string) bool
}
ConnectorRegistry manages available connectors
type ConnectorType ¶
type ConnectorType string
ConnectorType represents the type of connector
const ( ConnectorTypeSource ConnectorType = "source" ConnectorTypeDestination ConnectorType = "destination" )
type DataQualityChecker ¶
type DataQualityChecker interface {
CheckRecord(record *pool.Record) error
CheckBatch(records []*pool.Record) []error
GetQualityMetrics() map[string]interface{}
SetRules(rules map[string]interface{})
}
DataQualityChecker performs data quality checks
type Destination ¶
type Destination interface {
// Core functionality
Initialize(ctx context.Context, config *config.BaseConfig) error
CreateSchema(ctx context.Context, schema *Schema) error
Write(ctx context.Context, stream *RecordStream) error
WriteBatch(ctx context.Context, stream *BatchStream) error
Close(ctx context.Context) error
// Capabilities
SupportsBulkLoad() bool
SupportsTransactions() bool
SupportsUpsert() bool
SupportsBatch() bool
SupportsStreaming() bool
// Advanced operations
BulkLoad(ctx context.Context, reader interface{}, format string) error
BeginTransaction(ctx context.Context) (Transaction, error)
Upsert(ctx context.Context, records []*pool.Record, keys []string) error
// Schema operations
AlterSchema(ctx context.Context, oldSchema, newSchema *Schema) error
DropSchema(ctx context.Context, schema *Schema) error
// Health and metrics
Health(ctx context.Context) error
Metrics() map[string]interface{}
}
Destination is the interface that all destination connectors must implement
type ErrorHandler ¶
type ErrorHandler interface {
HandleError(ctx context.Context, err error, record *pool.Record) error
ShouldRetry(err error) bool
GetRetryDelay(attempt int) time.Duration
RecordError(err error, details map[string]interface{})
}
ErrorHandler defines how connectors handle errors
type Field ¶
type Field struct {
Name string
Type FieldType
Description string
Nullable bool
Primary bool
Unique bool
Default interface{}
}
Field represents a field in the schema
type FieldType ¶
type FieldType string
FieldType represents the data type of a field
const ( FieldTypeString FieldType = "string" FieldTypeInt FieldType = "int" FieldTypeFloat FieldType = "float" FieldTypeBool FieldType = "bool" FieldTypeTimestamp FieldType = "timestamp" FieldTypeDate FieldType = "date" FieldTypeTime FieldType = "time" FieldTypeJSON FieldType = "json" FieldTypeBinary FieldType = "binary" )
type FilterFunc ¶
FilterFunc is a function that filters records
type HealthStatus ¶
type HealthStatus struct {
Status string `json:"status"` // "healthy", "unhealthy", "degraded"
Timestamp time.Time `json:"timestamp"`
Details map[string]interface{} `json:"details"`
Error error `json:"error,omitempty"`
}
HealthStatus represents the health status of a connector
type Metric ¶
type Metric struct {
Name string `json:"name"`
Type MetricType `json:"type"`
Value interface{} `json:"value"`
Labels map[string]string `json:"labels"`
Timestamp time.Time `json:"timestamp"`
Description string `json:"description"`
}
Metric represents a single metric
type MetricType ¶
type MetricType string
MetricType represents the type of metric
const ( MetricTypeCounter MetricType = "counter" MetricTypeGauge MetricType = "gauge" MetricTypeHistogram MetricType = "histogram" MetricTypeSummary MetricType = "summary" )
type Optimizer ¶
type Optimizer interface {
OptimizeBatchSize(current int, metrics map[string]interface{}) int
OptimizeConcurrency(current int, metrics map[string]interface{}) int
OptimizeBufferSize(current int, metrics map[string]interface{}) int
SuggestOptimizations(metrics map[string]interface{}) []string
}
Optimizer provides performance optimization hints
type Pipeline ¶
type Pipeline interface {
AddTransform(name string, transform TransformFunc)
AddFilter(name string, filter FilterFunc)
Process(ctx context.Context, input *RecordStream) (*RecordStream, error)
ProcessBatch(ctx context.Context, input *BatchStream) (*BatchStream, error)
}
Pipeline represents a data processing pipeline
type Position ¶
type Position interface {
// String returns a string representation of the position
String() string
// Compare returns -1 if this < other, 0 if equal, 1 if this > other
Compare(other Position) int
}
Position represents a position in the data stream
type PositionManager ¶
type PositionManager interface {
SavePosition(ctx context.Context, position Position) error
LoadPosition(ctx context.Context) (Position, error)
ResetPosition(ctx context.Context) error
}
PositionManager manages position/checkpoint state
type ProgressReporter ¶
type ProgressReporter interface {
ReportProgress(processed int64, total int64)
ReportThroughput(recordsPerSecond float64)
ReportLatency(latency time.Duration)
GetProgress() (processed int64, total int64)
}
ProgressReporter reports progress of operations
type RateLimiter ¶
type RateLimiter interface {
Allow() bool
Wait(ctx context.Context) error
Limit() int
SetLimit(limit int)
}
RateLimiter provides rate limiting capabilities
type RecordStream ¶
RecordStream represents a stream of records
type Schema ¶
type Schema struct {
Name string
Description string
Fields []Field
Version int
CreatedAt time.Time
UpdatedAt time.Time
}
Schema represents the data schema
type SchemaRegistry ¶
type SchemaRegistry interface {
RegisterSchema(ctx context.Context, schema *Schema) error
GetSchema(ctx context.Context, name string, version int) (*Schema, error)
ListSchemas(ctx context.Context) ([]*Schema, error)
GetLatestSchema(ctx context.Context, name string) (*Schema, error)
}
SchemaRegistry manages schema versions
type Source ¶
type Source interface {
// Core functionality
Initialize(ctx context.Context, config *config.BaseConfig) error
Discover(ctx context.Context) (*Schema, error)
Read(ctx context.Context) (*RecordStream, error)
ReadBatch(ctx context.Context, batchSize int) (*BatchStream, error)
Close(ctx context.Context) error
// State management
GetPosition() Position
SetPosition(position Position) error
GetState() State
SetState(state State) error
// Capabilities
SupportsIncremental() bool
SupportsRealtime() bool
SupportsBatch() bool
// Real-time/CDC support
Subscribe(ctx context.Context, tables []string) (*ChangeStream, error)
// Health and metrics
Health(ctx context.Context) error
Metrics() map[string]interface{}
}
Source is the interface that all source connectors must implement
type StreamController ¶
StreamController provides control over data streams