Documentation
¶
Index ¶
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) Configure(dbConfigs map[string]config.SinkConfig)
- func (m *Manager) GetSinkConfig(dbName string) (config.SinkConfig, bool)
- func (m *Manager) GetSinker(dbName string) (Sinker, error)
- func (m *Manager) GetTimezone() *time.Location
- func (m *Manager) InitAll(ctx context.Context) error
- func (m *Manager) ListDatabases() []string
- func (m *Manager) Query(dbName, query string) ([]string, []map[string]any, error)
- func (m *Manager) QueryTables(dbName string) ([]string, error)
- func (m *Manager) SetTimezone(tz *time.Location)
- func (m *Manager) Truncate(ctx context.Context, dbName string, tables []string) error
- func (m *Manager) Write(ctx context.Context, sinks []core.Sink, batchCtx *core.BatchContext, ...) error
- type Sinker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages Sinkers and routes sink operations to appropriate sinkers.
func (*Manager) Configure ¶
func (m *Manager) Configure(dbConfigs map[string]config.SinkConfig)
Configure configures the manager with database configurations
func (*Manager) GetSinkConfig ¶
func (m *Manager) GetSinkConfig(dbName string) (config.SinkConfig, bool)
GetSinkConfig returns the configuration for a named sink
func (*Manager) GetSinker ¶
GetSinker returns a Sinker for the given database name, creating one if needed
func (*Manager) GetTimezone ¶
GetTimezone returns the configured MSSQL timezone
func (*Manager) InitAll ¶
InitAll initializes all configured sinks and runs their migrations. This should be called during startup to ensure all sink databases exist and are migrated before any CDC processing begins.
func (*Manager) ListDatabases ¶
ListDatabases returns all configured database names
func (*Manager) Query ¶
Query executes a read-only SELECT query on a sink and returns results. Uses the Sinker's existing Reader connection instead of creating a temporary one. Time formatting is handled by types.Codec in the sinker.Query() method.
func (*Manager) QueryTables ¶
QueryTables returns the list of tables in a sink database. Uses the Sinker's existing Reader connection instead of creating a temporary one.
func (*Manager) SetTimezone ¶
SetTimezone sets the MSSQL timezone for datetime conversion in API responses
func (*Manager) Write ¶
func (m *Manager) Write(ctx context.Context, sinks []core.Sink, batchCtx *core.BatchContext, monitorDB *monitor.DB) error
Write routes sink operations to appropriate sinkers based on Database field. BatchCtx provides batch_id for sink_logs correlation. monitorDB receives sink_logs for each sink × table × operation.
type Sinker ¶
type Sinker interface {
// DatabaseName returns the database name this sinker handles
DatabaseName() string
// DatabaseType returns the type of database (sqlite, duckdb, mssql, etc.)
DatabaseType() string
// Write writes a batch of sink operations to the sink with context for timeout/cancellation
Write(ctx context.Context, ops []core.Sink) error
// ExecContext executes a raw SQL query (used for table maintenance like TRUNCATE/DELETE)
ExecContext(ctx context.Context, query string) error
// Migrate runs the migration for this sinker's database.
// It re-discovers and re-applies migrations from the configured migrations path.
Migrate(ctx context.Context) error
// Truncate deletes all data from the specified tables.
// It disables foreign key checks during the operation.
Truncate(ctx context.Context, tables []string) error
// Close closes the sinker and releases resources
Close() error
// QueryTables returns the list of user tables in the sink database
QueryTables() ([]string, error)
// Query executes a read-only query and returns columns and results.
// Uses the existing Reader connection instead of creating a temporary one.
// Returns columns in SQLite table order (not alphabetically sorted).
Query(query string, limit int) ([]string, []map[string]any, error)
}
Sinker defines the interface for sink writers that write transformed data to sinks. Each implementation handles its own SQL syntax and migration strategy.