core

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchBuilder

type BatchBuilder interface {
	Add(record *pool.Record) error
	Build() ([]*pool.Record, error)
	Reset()
	Size() int
	IsFull() bool
}

BatchBuilder helps build record batches efficiently

type BatchStream

type BatchStream struct {
	Batches <-chan []*pool.Record
	Errors  <-chan error
}

BatchStream represents a stream of record batches

type ChangeEvent

type ChangeEvent struct {
	Type      ChangeType
	Table     string
	Timestamp time.Time
	Position  Position
	Before    map[string]interface{}
	After     map[string]interface{}
}

ChangeEvent represents a change data capture event

type ChangeStream

type ChangeStream struct {
	Changes <-chan *ChangeEvent
	Errors  <-chan error
}

ChangeStream represents a stream of change events (for CDC)

type ChangeType

type ChangeType string

ChangeType represents the type of change

const (
	ChangeTypeInsert ChangeType = "insert"
	ChangeTypeUpdate ChangeType = "update"
	ChangeTypeDelete ChangeType = "delete"
)

type CircuitBreaker

type CircuitBreaker interface {
	Allow() error
	MarkSuccess()
	MarkFailed()
	State() string
	Reset()
}

CircuitBreaker provides circuit breaker pattern

type ConnectionPool

type ConnectionPool interface {
	Get(ctx context.Context) (interface{}, error)
	Put(conn interface{}) error
	Close() error
	Stats() PoolStats
}

ConnectionPool manages connection pooling

type Connector

type Connector interface {
	// Metadata
	Name() string
	Type() ConnectorType
	Version() string

	// Lifecycle
	Initialize(ctx context.Context, config *config.BaseConfig) error
	Close(ctx context.Context) error

	// Health and monitoring
	Health(ctx context.Context) error
	Metrics() map[string]interface{}
}

Connector is the base interface for all connectors

type ConnectorFactory

type ConnectorFactory interface {
	CreateSource(connectorType string, config *config.BaseConfig) (Source, error)
	CreateDestination(connectorType string, config *config.BaseConfig) (Destination, error)
	ListSources() []string
	ListDestinations() []string
}

ConnectorFactory creates connector instances

type ConnectorMetadata

type ConnectorMetadata struct {
	Name          string                 `json:"name"`
	Type          ConnectorType          `json:"type"`
	Version       string                 `json:"version"`
	Description   string                 `json:"description"`
	Author        string                 `json:"author"`
	Documentation string                 `json:"documentation"`
	Capabilities  []string               `json:"capabilities"`
	ConfigSchema  map[string]interface{} `json:"config_schema"`
}

ConnectorMetadata provides metadata about a connector

type ConnectorRegistry

type ConnectorRegistry interface {
	Register(name string, factory func() Connector) error
	Get(name string) (Connector, error)
	List() []ConnectorMetadata
	Exists(name string) bool
}

ConnectorRegistry manages available connectors

type ConnectorType

type ConnectorType string

ConnectorType represents the type of connector

const (
	ConnectorTypeSource      ConnectorType = "source"
	ConnectorTypeDestination ConnectorType = "destination"
)

type DataQualityChecker

type DataQualityChecker interface {
	CheckRecord(record *pool.Record) error
	CheckBatch(records []*pool.Record) []error
	GetQualityMetrics() map[string]interface{}
	SetRules(rules map[string]interface{})
}

DataQualityChecker performs data quality checks

type Destination

type Destination interface {
	// Core functionality
	Initialize(ctx context.Context, config *config.BaseConfig) error
	CreateSchema(ctx context.Context, schema *Schema) error
	Write(ctx context.Context, stream *RecordStream) error
	WriteBatch(ctx context.Context, stream *BatchStream) error
	Close(ctx context.Context) error

	// Capabilities
	SupportsBulkLoad() bool
	SupportsTransactions() bool
	SupportsUpsert() bool
	SupportsBatch() bool
	SupportsStreaming() bool

	// Advanced operations
	BulkLoad(ctx context.Context, reader interface{}, format string) error
	BeginTransaction(ctx context.Context) (Transaction, error)
	Upsert(ctx context.Context, records []*pool.Record, keys []string) error

	// Schema operations
	AlterSchema(ctx context.Context, oldSchema, newSchema *Schema) error
	DropSchema(ctx context.Context, schema *Schema) error

	// Health and metrics
	Health(ctx context.Context) error
	Metrics() map[string]interface{}
}

Destination is the interface that all destination connectors must implement

type ErrorHandler

type ErrorHandler interface {
	HandleError(ctx context.Context, err error, record *pool.Record) error
	ShouldRetry(err error) bool
	GetRetryDelay(attempt int) time.Duration
	RecordError(err error, details map[string]interface{})
}

ErrorHandler defines how connectors handle errors

type Field

type Field struct {
	Name        string
	Type        FieldType
	Description string
	Nullable    bool
	Primary     bool
	Unique      bool
	Default     interface{}
}

Field represents a field in the schema

type FieldType

type FieldType string

FieldType represents the data type of a field

const (
	FieldTypeString    FieldType = "string"
	FieldTypeInt       FieldType = "int"
	FieldTypeFloat     FieldType = "float"
	FieldTypeBool      FieldType = "bool"
	FieldTypeTimestamp FieldType = "timestamp"
	FieldTypeDate      FieldType = "date"
	FieldTypeTime      FieldType = "time"
	FieldTypeJSON      FieldType = "json"
	FieldTypeBinary    FieldType = "binary"
)

type FilterFunc

type FilterFunc func(record *pool.Record) bool

FilterFunc is a function that filters records

type HealthStatus

type HealthStatus struct {
	Status    string                 `json:"status"` // "healthy", "unhealthy", "degraded"
	Timestamp time.Time              `json:"timestamp"`
	Details   map[string]interface{} `json:"details"`
	Error     error                  `json:"error,omitempty"`
}

HealthStatus represents the health status of a connector

type Metric

type Metric struct {
	Name        string            `json:"name"`
	Type        MetricType        `json:"type"`
	Value       interface{}       `json:"value"`
	Labels      map[string]string `json:"labels"`
	Timestamp   time.Time         `json:"timestamp"`
	Description string            `json:"description"`
}

Metric represents a single metric

type MetricType

type MetricType string

MetricType represents the type of metric

const (
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGauge     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
	MetricTypeSummary   MetricType = "summary"
)

type Optimizer

type Optimizer interface {
	OptimizeBatchSize(current int, metrics map[string]interface{}) int
	OptimizeConcurrency(current int, metrics map[string]interface{}) int
	OptimizeBufferSize(current int, metrics map[string]interface{}) int
	SuggestOptimizations(metrics map[string]interface{}) []string
}

Optimizer provides performance optimization hints

type Pipeline

type Pipeline interface {
	AddTransform(name string, transform TransformFunc)
	AddFilter(name string, filter FilterFunc)
	Process(ctx context.Context, input *RecordStream) (*RecordStream, error)
	ProcessBatch(ctx context.Context, input *BatchStream) (*BatchStream, error)
}

Pipeline represents a data processing pipeline

type PoolStats

type PoolStats struct {
	Active   int
	Idle     int
	Total    int
	MaxSize  int
	Waits    int64
	Timeouts int64
}

PoolStats represents connection pool statistics

type Position

type Position interface {
	// String returns a string representation of the position
	String() string
	// Compare returns -1 if this < other, 0 if equal, 1 if this > other
	Compare(other Position) int
}

Position represents a position in the data stream

type PositionManager

type PositionManager interface {
	SavePosition(ctx context.Context, position Position) error
	LoadPosition(ctx context.Context) (Position, error)
	ResetPosition(ctx context.Context) error
}

PositionManager manages position/checkpoint state

type ProgressReporter

type ProgressReporter interface {
	ReportProgress(processed int64, total int64)
	ReportThroughput(recordsPerSecond float64)
	ReportLatency(latency time.Duration)
	GetProgress() (processed int64, total int64)
}

ProgressReporter reports progress of operations

type RateLimiter

type RateLimiter interface {
	Allow() bool
	Wait(ctx context.Context) error
	Limit() int
	SetLimit(limit int)
}

RateLimiter provides rate limiting capabilities

type RecordStream

type RecordStream struct {
	Records <-chan *pool.Record
	Errors  <-chan error
}

RecordStream represents a stream of records

type Schema

type Schema struct {
	Name        string
	Description string
	Fields      []Field
	Version     int
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

Schema represents the data schema

type SchemaRegistry

type SchemaRegistry interface {
	RegisterSchema(ctx context.Context, schema *Schema) error
	GetSchema(ctx context.Context, name string, version int) (*Schema, error)
	ListSchemas(ctx context.Context) ([]*Schema, error)
	GetLatestSchema(ctx context.Context, name string) (*Schema, error)
}

SchemaRegistry manages schema versions

type Source

type Source interface {
	// Core functionality
	Initialize(ctx context.Context, config *config.BaseConfig) error
	Discover(ctx context.Context) (*Schema, error)
	Read(ctx context.Context) (*RecordStream, error)
	ReadBatch(ctx context.Context, batchSize int) (*BatchStream, error)
	Close(ctx context.Context) error

	// State management
	GetPosition() Position
	SetPosition(position Position) error
	GetState() State
	SetState(state State) error

	// Capabilities
	SupportsIncremental() bool
	SupportsRealtime() bool
	SupportsBatch() bool

	// Real-time/CDC support
	Subscribe(ctx context.Context, tables []string) (*ChangeStream, error)

	// Health and metrics
	Health(ctx context.Context) error
	Metrics() map[string]interface{}
}

Source is the interface that all source connectors must implement

type State

type State map[string]interface{}

State represents connector state

type StreamController

type StreamController interface {
	Pause() error
	Resume() error
	Cancel() error
	Status() string
}

StreamController provides control over data streams

type Transaction

type Transaction interface {
	Commit(ctx context.Context) error
	Rollback(ctx context.Context) error
}

Transaction represents a database transaction

type TransformFunc

type TransformFunc func(record *pool.Record) (*pool.Record, error)

TransformFunc is a function that transforms records

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL