Documentation
¶
Index ¶
- func CreateBigQueryWithEvolution(name string, config *config.BaseConfig, evolutionConfig *EvolutionConfig) (core.Destination, error)
- func CreateSnowflakeWithEvolution(name string, config *config.BaseConfig, evolutionConfig *EvolutionConfig) (core.Destination, error)
- func WrapDestinationWithEvolution(dest core.Destination, config *EvolutionConfig) (core.Destination, error)
- type EvolutionConfig
- type SchemaEvolutionDestination
- func (sed *SchemaEvolutionDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
- func (sed *SchemaEvolutionDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
- func (sed *SchemaEvolutionDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
- func (sed *SchemaEvolutionDestination) Close(ctx context.Context) error
- func (sed *SchemaEvolutionDestination) CreateSchema(ctx context.Context, schema *core.Schema) error
- func (sed *SchemaEvolutionDestination) DropSchema(ctx context.Context, schema *core.Schema) error
- func (sed *SchemaEvolutionDestination) Health(ctx context.Context) error
- func (sed *SchemaEvolutionDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (sed *SchemaEvolutionDestination) Metrics() map[string]interface{}
- func (sed *SchemaEvolutionDestination) SupportsBatch() bool
- func (sed *SchemaEvolutionDestination) SupportsBulkLoad() bool
- func (sed *SchemaEvolutionDestination) SupportsStreaming() bool
- func (sed *SchemaEvolutionDestination) SupportsTransactions() bool
- func (sed *SchemaEvolutionDestination) SupportsUpsert() bool
- func (sed *SchemaEvolutionDestination) Upsert(ctx context.Context, records []*models.Record, keys []string) error
- func (sed *SchemaEvolutionDestination) Write(ctx context.Context, stream *core.RecordStream) error
- func (sed *SchemaEvolutionDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateBigQueryWithEvolution ¶
func CreateBigQueryWithEvolution(name string, config *config.BaseConfig, evolutionConfig *EvolutionConfig) (core.Destination, error)
CreateBigQueryWithEvolution creates a BigQuery destination with schema evolution
func CreateSnowflakeWithEvolution ¶
func CreateSnowflakeWithEvolution(name string, config *config.BaseConfig, evolutionConfig *EvolutionConfig) (core.Destination, error)
CreateSnowflakeWithEvolution creates a Snowflake destination with schema evolution
func WrapDestinationWithEvolution ¶
func WrapDestinationWithEvolution(dest core.Destination, config *EvolutionConfig) (core.Destination, error)
WrapDestinationWithEvolution wraps any destination with schema evolution capabilities
Types ¶
type EvolutionConfig ¶
type EvolutionConfig struct {
// Evolution strategy: "default", "strict", "flexible"
Strategy string `json:"strategy"`
// Compatibility mode
CompatibilityMode schema.CompatibilityMode `json:"compatibility_mode"`
// Auto-evolution settings
EnableAutoEvolution bool `json:"enable_auto_evolution"`
SchemaCheckInterval time.Duration `json:"schema_check_interval"`
BatchSizeForInference int `json:"batch_size_for_inference"`
// Schema handling
FailOnIncompatible bool `json:"fail_on_incompatible"`
PreserveOldFields bool `json:"preserve_old_fields"`
// Performance
CacheSchemaLocally bool `json:"cache_schema_locally"`
MaxSchemaAgeMinutes int `json:"max_schema_age_minutes"`
}
EvolutionConfig configures schema evolution behavior
func CreateEvolutionConfig ¶
func CreateEvolutionConfig(properties map[string]interface{}) *EvolutionConfig
CreateEvolutionConfig creates evolution configuration from properties
func DefaultEvolutionConfig ¶
func DefaultEvolutionConfig() *EvolutionConfig
DefaultEvolutionConfig returns default configuration
type SchemaEvolutionDestination ¶
type SchemaEvolutionDestination struct {
// contains filtered or unexported fields
}
SchemaEvolutionDestination wraps a destination with automatic schema evolution capabilities
func NewSchemaEvolutionDestination ¶
func NewSchemaEvolutionDestination(dest core.Destination, config *EvolutionConfig) (*SchemaEvolutionDestination, error)
NewSchemaEvolutionDestination creates a new destination with schema evolution
func (*SchemaEvolutionDestination) AlterSchema ¶
func (sed *SchemaEvolutionDestination) AlterSchema(ctx context.Context, oldSchema, newSchema *core.Schema) error
AlterSchema handles schema alterations with evolution
func (*SchemaEvolutionDestination) BeginTransaction ¶
func (sed *SchemaEvolutionDestination) BeginTransaction(ctx context.Context) (core.Transaction, error)
func (*SchemaEvolutionDestination) BulkLoad ¶
func (sed *SchemaEvolutionDestination) BulkLoad(ctx context.Context, reader interface{}, format string) error
func (*SchemaEvolutionDestination) Close ¶
func (sed *SchemaEvolutionDestination) Close(ctx context.Context) error
Close closes the destination
func (*SchemaEvolutionDestination) CreateSchema ¶
CreateSchema creates the initial schema with evolution tracking
func (*SchemaEvolutionDestination) DropSchema ¶
func (*SchemaEvolutionDestination) Health ¶
func (sed *SchemaEvolutionDestination) Health(ctx context.Context) error
func (*SchemaEvolutionDestination) Initialize ¶
func (sed *SchemaEvolutionDestination) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize initializes both the destination and schema evolution
func (*SchemaEvolutionDestination) Metrics ¶
func (sed *SchemaEvolutionDestination) Metrics() map[string]interface{}
func (*SchemaEvolutionDestination) SupportsBatch ¶
func (sed *SchemaEvolutionDestination) SupportsBatch() bool
func (*SchemaEvolutionDestination) SupportsBulkLoad ¶
func (sed *SchemaEvolutionDestination) SupportsBulkLoad() bool
func (*SchemaEvolutionDestination) SupportsStreaming ¶
func (sed *SchemaEvolutionDestination) SupportsStreaming() bool
func (*SchemaEvolutionDestination) SupportsTransactions ¶
func (sed *SchemaEvolutionDestination) SupportsTransactions() bool
func (*SchemaEvolutionDestination) SupportsUpsert ¶
func (sed *SchemaEvolutionDestination) SupportsUpsert() bool
func (*SchemaEvolutionDestination) Write ¶
func (sed *SchemaEvolutionDestination) Write(ctx context.Context, stream *core.RecordStream) error
Write writes records with automatic schema evolution
func (*SchemaEvolutionDestination) WriteBatch ¶
func (sed *SchemaEvolutionDestination) WriteBatch(ctx context.Context, stream *core.BatchStream) error
WriteBatch writes batches with automatic schema evolution