sinker

package
v0.0.0-...-77b25ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages Sinkers and routes sink operations to appropriate sinkers.

func NewManager

func NewManager() *Manager

NewManager creates a new Sinker manager

func (*Manager) Close

func (m *Manager) Close() error

Close closes all sinkers

func (*Manager) Configure

func (m *Manager) Configure(dbConfigs map[string]config.SinkConfig)

Configure configures the manager with database configurations

func (*Manager) GetSinkConfig

func (m *Manager) GetSinkConfig(dbName string) (config.SinkConfig, bool)

GetSinkConfig returns the configuration for a named sink

func (*Manager) GetSinker

func (m *Manager) GetSinker(dbName string) (Sinker, error)

GetSinker returns a Sinker for the given database name, creating one if needed

func (*Manager) GetTimezone

func (m *Manager) GetTimezone() *time.Location

GetTimezone returns the configured MSSQL timezone

func (*Manager) InitAll

func (m *Manager) InitAll(ctx context.Context) error

InitAll initializes all configured sinks and runs their migrations. This should be called during startup to ensure all sink databases exist and are migrated before any CDC processing begins.

func (*Manager) ListDatabases

func (m *Manager) ListDatabases() []string

ListDatabases returns all configured database names

func (*Manager) Query

func (m *Manager) Query(dbName, query string) ([]string, []map[string]any, error)

Query executes a read-only SELECT query on a sink and returns results. Uses the Sinker's existing Reader connection instead of creating a temporary one. Time formatting is handled by types.Codec in the sinker.Query() method.

func (*Manager) QueryTables

func (m *Manager) QueryTables(dbName string) ([]string, error)

QueryTables returns the list of tables in a sink database. Uses the Sinker's existing Reader connection instead of creating a temporary one.

func (*Manager) SetTimezone

func (m *Manager) SetTimezone(tz *time.Location)

SetTimezone sets the MSSQL timezone for datetime conversion in API responses

func (*Manager) Truncate

func (m *Manager) Truncate(ctx context.Context, dbName string, tables []string) error

Truncate deletes all data from specified tables in a sink database

func (*Manager) Write

func (m *Manager) Write(ctx context.Context, sinks []core.Sink, batchCtx *core.BatchContext, monitorDB *monitor.DB) error

Write routes sink operations to appropriate sinkers based on Database field. BatchCtx provides batch_id for sink_logs correlation. monitorDB receives sink_logs for each sink × table × operation.

type Sinker

type Sinker interface {
	// DatabaseName returns the database name this sinker handles
	DatabaseName() string

	// DatabaseType returns the type of database (sqlite, duckdb, mssql, etc.)
	DatabaseType() string

	// Write writes a batch of sink operations to the sink with context for timeout/cancellation
	Write(ctx context.Context, ops []core.Sink) error

	// ExecContext executes a raw SQL query (used for table maintenance like TRUNCATE/DELETE)
	ExecContext(ctx context.Context, query string) error

	// Migrate runs the migration for this sinker's database.
	// It re-discovers and re-applies migrations from the configured migrations path.
	Migrate(ctx context.Context) error

	// Truncate deletes all data from the specified tables.
	// It disables foreign key checks during the operation.
	Truncate(ctx context.Context, tables []string) error

	// Close closes the sinker and releases resources
	Close() error

	// QueryTables returns the list of user tables in the sink database
	QueryTables() ([]string, error)

	// Query executes a read-only query and returns columns and results.
	// Uses the existing Reader connection instead of creating a temporary one.
	// Returns columns in SQLite table order (not alphabetically sorted).
	Query(query string, limit int) ([]string, []map[string]any, error)
}

Sinker defines the interface for sink writers that write transformed data to sinks. Each implementation handles its own SQL syntax and migration strategy.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL