sql

package
v0.0.0-...-77b25ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPluginNotFound   = fmt.Errorf("plugin not found")
	ErrInvalidConfig    = fmt.Errorf("invalid plugin configuration")
	ErrMissingField     = fmt.Errorf("missing required field")
	ErrInvalidSQL       = fmt.Errorf("invalid SQL template")
	ErrMissingParameter = fmt.Errorf("missing parameter in SQL template")
	ErrSQLFileNotFound  = fmt.Errorf("SQL file not found")
	ErrExecutionFailed  = fmt.Errorf("SQL execution failed")
	ErrRoutingFailed    = fmt.Errorf("result routing failed")
	ErrWriteFailed      = fmt.Errorf("sink write failed")
	ErrInvalidParameter = fmt.Errorf("invalid parameter")
)

Plugin errors

Functions

func DataSetToMap

func DataSetToMap(ds *DataSet) []map[string]interface{}

DataSetToMap converts DataSet to slice of maps for easier processing

func NormalizeSQLForTest

func NormalizeSQLForTest(sql string) string

Test helper for testing SQL normalization

func OperationToSinkType

func OperationToSinkType(op Operation) string

OperationToSinkType converts Operation to sink type string

Types

type CDCParameters

type CDCParameters struct {
	CDCLSN       string
	CDCTxID      string
	CDCTable     string
	CDCOperation int
	Fields       map[string]interface{}
}

CDCParameters represents CDC data mapped to SQL parameters

type ConfigError

type ConfigError struct {
	Field   string
	Message string
}

ConfigError represents a configuration validation error

func NewConfigError

func NewConfigError(field, message string) *ConfigError

NewConfigError creates a new config error

func (*ConfigError) Error

func (e *ConfigError) Error() string

type DataSet

type DataSet struct {
	Columns []string
	Rows    [][]interface{}
}

DataSet represents query results

func (*DataSet) String

func (ds *DataSet) String() string

String implements fmt.Stringer for DataSet

type DriverExecutor

type DriverExecutor interface {
	// Execute runs a parameterized SQL query
	Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)

	// ExecuteBatch runs multiple SQL statements in a transaction
	ExecuteBatch(statements []string, params []map[string]interface{}) error

	// DB returns the underlying database connection
	DB() *sql.DB
}

DriverExecutor defines the interface for database-specific SQL execution

type DriverType

type DriverType string

DriverType represents the type of database driver

const (
	DriverMSSQL  DriverType = "mssql"
	DriverSQLite DriverType = "sqlite"
)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the SQL Plugin execution engine It is stateless and shared across all skills. The caller is responsible for setting the current skill via HandleWithSkill or by setting e.skill directly before calling Handle.

func NewEngine

func NewEngine(skill *Skill, mssqlDB *sql.DB) *Engine

NewEngine creates a new SQL Plugin engine

func (*Engine) Handle

func (e *Engine) Handle(tx *core.Transaction) ([]core.Sink, error)

Handle processes a core.Transaction through the SQL Plugin It extracts CDC changes, executes sinks against MSSQL Returns all job operations as []core.Sink for the caller to write Multiple sinks targeting the same table+pk are merged into a single sink

func (*Engine) HandleWithPull

func (e *Engine) HandleWithPull(tx *core.Transaction, skill *Skill, batchCtx *core.BatchContext, monitorDB *monitor.DB) ([]core.Sink, error)

HandleWithPull executes the engine with a specific skill and writes skill log. Skill log is written with EXECUTED status and rows_processed from returned sinks. If no sinks are produced (if condition not met), SKIP is logged with rows_processed=0. If execution fails, ERROR is logged.

func (*Engine) HandleWithSkill

func (e *Engine) HandleWithSkill(tx *core.Transaction, skill *Skill) ([]core.Sink, error)

HandleWithSkill executes the engine with a specific skill, then restores the original skill. This allows the engine to be stateless and shared across all skills.

type ExecutionError

type ExecutionError struct {
	SQL    string
	Params map[string]interface{}
	Err    error
}

ExecutionError represents a SQL execution error

func NewExecutionError

func NewExecutionError(sql string, params map[string]interface{}, err error) *ExecutionError

NewExecutionError creates a new execution error

func (*ExecutionError) Error

func (e *ExecutionError) Error() string

func (*ExecutionError) Unwrap

func (e *ExecutionError) Unwrap() error

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes SQL templates with parameter substitution

func NewExecutor

func NewExecutor(db *sql.DB) *Executor

NewExecutor creates a new SQL template executor

func NewExecutorWithDriver

func NewExecutorWithDriver(db *sql.DB, dbType DriverType) *Executor

NewExecutorWithDriver creates a new executor with a specific driver dbType: "mssql" or "sqlite"

func (*Executor) ExecuteDriver

func (e *Executor) ExecuteDriver(sqlTmpl string, params map[string]interface{}) (*DataSet, error)

ExecuteDriver executes a SQL template using driver-based parameterized queries

type ExtractOption

type ExtractOption int

ExtractOption defines how to convert @name parameters

const (
	// OptionPlaceholders converts @name to ? (for SQLite)
	OptionPlaceholders ExtractOption = iota
	// OptionKeepNames keeps @name (for MSSQL)
	OptionKeepNames
)

type FileMetadata

type FileMetadata struct {
	Path        string    `json:"path"`
	IsSQL       bool      `json:"is_sql"`
	CurVersion  string    `json:"cur_version"` // SHA256 hash of current loaded content
	CurModTime  time.Time `json:"cur_mod_time"`
	CurSize     int64     `json:"cur_size"`
	CurLoadedAt time.Time `json:"cur_loaded_at"`
	NewVersion  string    `json:"new_version"` // SHA256 hash of new content (if changed)
	NewModTime  time.Time `json:"new_mod_time"`
	NewSize     int64     `json:"new_size"`
	NeedsReload bool      `json:"needs_reload"`
	IsDeleted   bool      `json:"is_deleted"`
}

FileMetadata tracks the state of a single file (.yml or .sql)

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader handles loading and parsing SQL plugin configurations. Deprecated: The New() function should be used instead for new code.

func NewLoader

func NewLoader(pluginsDir string) *Loader

NewLoader creates a new SQL plugin loader

func (*Loader) Exists

func (l *Loader) Exists(file string) bool

Exists checks if a plugin file exists at the given relative path

func (*Loader) Load

func (l *Loader) Load(file string) (*Skill, error)

Load loads and parses a SQL plugin from the given relative file path. The file parameter is a relative path from the plugins directory (e.g., "orders.yml" or "f9/orders.yml").

func (*Loader) LoadAll

func (l *Loader) LoadAll() (map[string]*Skill, error)

LoadAll loads all SQL plugins from the plugins directory and its subdirectories. It scans for *.yml files recursively and returns a map keyed by file path.

type MSSQLExecutor

type MSSQLExecutor struct {
	// contains filtered or unexported fields
}

MSSQLExecutor implements DriverExecutor for MSSQL

func NewMSSQLExecutor

func NewMSSQLExecutor(db *sql.DB) *MSSQLExecutor

NewMSSQLExecutor creates a new MSSQL executor

func (*MSSQLExecutor) DB

func (e *MSSQLExecutor) DB() *sql.DB

DB returns the underlying database connection

func (*MSSQLExecutor) Execute

func (e *MSSQLExecutor) Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)

Execute runs a parameterized SQL query on MSSQL

func (*MSSQLExecutor) ExecuteBatch

func (e *MSSQLExecutor) ExecuteBatch(statements []string, params []map[string]interface{}) error

ExecuteBatch executes multiple statements in a transaction

type OnConflictStrategy

type OnConflictStrategy int

OnConflictStrategy defines how to handle conflicts on insert/update

const (
	// ConflictOverwrite replaces existing record (INSERT OR REPLACE, direct UPDATE)
	ConflictOverwrite OnConflictStrategy = iota
	// ConflictSkip ignores the operation if record exists (INSERT OR IGNORE, conditional UPDATE)
	ConflictSkip
	// ConflictError returns an error if record exists (INSERT with check, conditional UPDATE)
	ConflictError
)

func ParseOnConflictStrategy

func ParseOnConflictStrategy(s string) OnConflictStrategy

ParseOnConflictStrategy parses string to OnConflictStrategy

func (OnConflictStrategy) String

func (s OnConflictStrategy) String() string

String returns the strategy name

type Operation

type Operation int

Operation represents the CDC operation type

const (
	Insert Operation = 1 + iota
	Update
	Delete
)

func (Operation) String

func (o Operation) String() string

String returns the operation name

type ParamExtractor

type ParamExtractor struct {
	// contains filtered or unexported fields
}

ParamExtractor extracts @name parameters from SQL templates and converts them to driver-specific parameter formats

func NewParamExtractor

func NewParamExtractor() *ParamExtractor

NewParamExtractor creates a new parameter extractor

func (*ParamExtractor) Extract

func (e *ParamExtractor) Extract(sqlTmpl string, params map[string]interface{}, opt ExtractOption) (string, []interface{}, error)

Extract converts SQL template to driver-specific format sqlTmpl: SELECT * FROM orders WHERE order_id = @orders_order_id params: map of parameter name → value Returns: converted SQL, args/named args, error

func (*ParamExtractor) ExtractNamed

func (e *ParamExtractor) ExtractNamed(sqlTmpl string, params map[string]interface{}) (string, []interface{}, error)

ExtractNamed extracts parameters for MSSQL named args

func (*ParamExtractor) ExtractParamNames

func (e *ParamExtractor) ExtractParamNames(sqlTmpl string) []string

ExtractParamNames extracts all parameter names from SQL template in order

func (*ParamExtractor) ReplacePlaceholders

func (e *ParamExtractor) ReplacePlaceholders(sqlTmpl string) string

ReplacePlaceholders replaces @name with ? in SQL template

func (*ParamExtractor) Validate

func (e *ParamExtractor) Validate(sqlTmpl string, params map[string]interface{}) error

Validate checks if all @name parameters in SQL have corresponding values

type Parameter

type Parameter struct {
	Name  string // @orders_order_id
	Value interface{}
}

Parameter represents a named parameter in SQL template

type Plugin

type Plugin struct {
	Skills *Skills // thread-safe collection of skills
	// contains filtered or unexported fields
}

Plugin implements the plugin.Plugin interface for SQL plugins. It manages multiple skills (yaml configs + SQL templates) with a shared execution engine. It supports hot-reloading when skill files change. Sink writing is handled by the platform layer based on the Database field in returned sinks.

func New

func New(path string, db *sql.DB) *Plugin

New creates a new SQL plugin that manages multiple skills. Pass db=nil if you need to call AttachDB later. Sink writing is handled by the platform layer - this plugin only handles SQL execution.

func (*Plugin) AttachDB

func (p *Plugin) AttachDB(db *sql.DB)

AttachDB sets the database connection for this plugin's engine.

func (*Plugin) Handle

func (p *Plugin) Handle(tx *core.Transaction) ([]core.Sink, error)

Handle processes a CDC transaction through all SQL skills. It iterates over all skills, checks if they match the transaction table, and executes the engine for each matching skill.

func (*Plugin) HandleWithPull

func (p *Plugin) HandleWithPull(tx *core.Transaction, batchCtx *core.BatchContext, monitorDB *monitor.DB) ([]core.Sink, error)

HandleWithPull processes a CDC transaction with pull context for observability. Skill logs are written for each skill:

  • SKIP: skill did not match the transaction table
  • EXECUTED: skill matched and produced sinks
  • ERROR: skill matched but execution failed

Sinks are returned for the caller to write.

func (*Plugin) Name

func (p *Plugin) Name() string

Name implements plugin.Plugin

func (*Plugin) StartWatch

func (p *Plugin) StartWatch()

StartWatch starts the internal file watcher for skill files.

func (*Plugin) Stop

func (p *Plugin) Stop() error

Stop implements plugin.Plugin

func (*Plugin) StopWatch

func (p *Plugin) StopWatch()

StopWatch stops the internal file watcher.

func (*Plugin) Type

func (p *Plugin) Type() string

Type implements plugin.Plugin

type PluginMetadata

type PluginMetadata struct {
	Name        string                  `json:"name"`
	Type        string                  `json:"type"`   // "sql"
	Status      string                  `json:"status"` // loaded | stale | error
	NeedsReload bool                    `json:"needs_reload"`
	LoadCount   int                     `json:"load_count"`
	LastError   string                  `json:"last_error,omitempty"`
	Files       map[string]FileMetadata `json:"files"`
}

PluginMetadata contains metadata and file tracking information for a SQL plugin

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool wraps a cnlangzi/sqlite.DB for a business sink database. cnlangzi/sqlite provides read/write separation, buffered batch writes, and automatic WAL checkpoint management.

func NewPool

func NewPool(ctx context.Context, name, sqlitePath string) (*Pool, error)

NewPool creates a new SQLite connection pool for a sink. Path normalization: ./data/sinks/business.db -> ./data/sinks/business The actual DB file will be at {path}/{name}.db

func (*Pool) Close

func (p *Pool) Close() error

Close closes the database connection

func (*Pool) DB

func (p *Pool) DB() *sqlite.DB

DB returns the underlying sqlite.DB for direct access if needed

func (*Pool) MigrationsPath

func (p *Pool) MigrationsPath() string

MigrationsPath returns the migrations directory path

func (*Pool) Name

func (p *Pool) Name() string

Name returns the sink name

func (*Pool) Path

func (p *Pool) Path() string

Path returns the base path for this sink

func (*Pool) Read

func (p *Pool) Read() *sqlite.DB

Read returns the sqlite.DB for read operations (direct reads)

func (*Pool) Write

func (p *Pool) Write() *sqlite.DB

Write returns the sqlite.DB for write operations (buffered batch writes)

func (*Pool) WriteTx

func (p *Pool) WriteTx(ctx context.Context) (*sqlite.Tx, error)

WriteTx starts a buffered transaction for atomic writes. The Tx buffers all statements until Commit is called.

type PoolManager

type PoolManager struct {
	// contains filtered or unexported fields
}

PoolManager manages multiple sink pools

func NewPoolManager

func NewPoolManager(skillsDir string) *PoolManager

NewPoolManager creates a new pool manager

func (*PoolManager) Close

func (m *PoolManager) Close() error

Close closes all pools

func (*PoolManager) GetPool

func (m *PoolManager) GetPool(ctx context.Context, skillName string, sqlitePath string) (*Pool, error)

GetPool returns an existing pool or creates a new one for the given skill

func (*PoolManager) ListPools

func (m *PoolManager) ListPools() []string

ListPools returns all managed pool names

type RoutingError

type RoutingError struct {
	SinkName string
	Err      error
}

RoutingError represents a result routing error

func NewRoutingError

func NewRoutingError(sinkName string, err error) *RoutingError

NewRoutingError creates a new routing error

func (*RoutingError) Error

func (e *RoutingError) Error() string

func (*RoutingError) Unwrap

func (e *RoutingError) Unwrap() error

type SQLiteExecutor

type SQLiteExecutor struct {
	// contains filtered or unexported fields
}

SQLiteExecutor implements DriverExecutor for SQLite

func NewSQLiteExecutor

func NewSQLiteExecutor(db *sql.DB) *SQLiteExecutor

NewSQLiteExecutor creates a new SQLite executor

func (*SQLiteExecutor) DB

func (e *SQLiteExecutor) DB() *sql.DB

DB returns the underlying database connection

func (*SQLiteExecutor) Execute

func (e *SQLiteExecutor) Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)

Execute runs a parameterized SQL query on SQLite

func (*SQLiteExecutor) ExecuteBatch

func (e *SQLiteExecutor) ExecuteBatch(statements []string, params []map[string]interface{}) error

ExecuteBatch executes multiple statements in a transaction

type SQLiteSink

type SQLiteSink struct {
	// contains filtered or unexported fields
}

SQLiteSink writes business data to a dedicated SQLite file per skill. It uses connection pooling for read/write separation and supports migrations.

func NewSQLiteSink

func NewSQLiteSink(skill *Skill, pool *Pool) *SQLiteSink

NewSQLiteSink creates a new SQLite sink for a skill

func (*SQLiteSink) Close

func (s *SQLiteSink) Close() error

Close closes the sink

func (*SQLiteSink) GetPool

func (s *SQLiteSink) GetPool() *Pool

GetPool returns the underlying pool

func (*SQLiteSink) LastModified

func (s *SQLiteSink) LastModified() (time.Time, error)

LastModified returns the last modification time of the sink database

func (*SQLiteSink) Query

func (s *SQLiteSink) Query(query string, args ...any) (*sql.Rows, error)

Query executes a read query against the sink database

func (*SQLiteSink) QueryRow

func (s *SQLiteSink) QueryRow(query string, args ...any) *sql.Row

QueryRow executes a read query that returns a single row

func (*SQLiteSink) RunMigrations

func (s *SQLiteSink) RunMigrations() error

RunMigrations runs any pending migrations using sqle/migrate

func (*SQLiteSink) Write

func (s *SQLiteSink) Write(ctx context.Context, ops []core.Sink) error

Write writes transformed DataSets to the sink database

type Sink

type Sink struct {
	SinkConfig `yaml:",inline"` // Embedded SinkConfig for backward compatibility
	When       []string         `yaml:"when"` // Required: [insert, update] or [delete]
}

Sink represents a single sink configuration with operation filter

type SinkConfig

type SinkConfig struct {
	Name       string `yaml:"name"`        // Sink name
	On         string `yaml:"on"`          // Table filter (required for multi-table)
	If         string `yaml:"if"`          // SQL-style condition expression (govaluate)
	SQL        string `yaml:"sql"`         // Inline SQL template
	SQLFile    string `yaml:"sql_file"`    // External SQL file path
	Output     string `yaml:"output"`      // Target table name
	PrimaryKey string `yaml:"primary_key"` // Primary key column
	OnConflict string `yaml:"on_conflict"` // Conflict strategy: overwrite | skip | error (default: skip)

	OnLower string `yaml:"-"` // Precomputed lowercase of On (for performance)
	// contains filtered or unexported fields
}

SinkConfig represents a single job configuration

func FilterSinks

func FilterSinks(sinks []SinkConfig, tableName string) []SinkConfig

FilterSinks filters sinks by table name (for multi-table CDC)

func (*SinkConfig) EvalIf

func (s *SinkConfig) EvalIf(cdcParams CDCParameters) bool

EvalIf evaluates the 'if' expression for a sink against CDC data. It returns true if:

  • The sink has no 'if' expression (always true)
  • The expression evaluates to true

It returns false if the expression evaluates to false or an error occurs.

Table and field names in expressions are treated case-insensitively:

"orders.status = 'vip'" and "ORDERS.STATUS = 'vip'" both map to the same fields.

Literal values in expressions remain case-sensitive.

func (*SinkConfig) GetOnConflict

func (s *SinkConfig) GetOnConflict() OnConflictStrategy

GetOnConflict returns the OnConflictStrategy for this job

type SinkOp

type SinkOp struct {
	Config  SinkConfig
	DataSet *DataSet
	OpType  Operation
}

SinkOp represents a single sink operation (write to SQLite)

type Skill

type Skill struct {
	Name        string              `yaml:"name"`
	Description string              `yaml:"description"`
	On          []string            `yaml:"on"`       // Tables to monitor
	Database    string              `yaml:"database"` // Target database name (maps to platform-configured storage)
	Sinks       []Sink              `yaml:"sinks"`
	Outputs     map[string][]string `yaml:"-"` // key = output table name, value = field names

	File         string    `yaml:"-"` // Auto-assigned: relative path from config.plugins.sql.path
	Id           string    `yaml:"-"` // Auto-assigned: SHA256(File)[:12]
	Raw          string    `yaml:"-"` // Auto-assigned: raw YAML content
	Error        string    `yaml:"-"` // last load/parsing error message (if any)
	LastLoadedAt time.Time `yaml:"-"` // last successful load time
	// contains filtered or unexported fields
}

Skill represents a SQL plugin configuration

func (*Skill) FilterByOperation

func (s *Skill) FilterByOperation(opType Operation) []SinkConfig

FilterByOperation returns sinks that handle the given operation type. The 'when' field must contain the operation string ("insert", "update", or "delete"). Results are cached per skill for performance.

func (*Skill) ValidateSinks

func (s *Skill) ValidateSinks() error

ValidateSinks validates the sinks configuration and returns an error if invalid. It ensures:

  • 'when' field is required for all sinks
  • Only [insert, update] or [delete] are valid 'when' values
  • No mixing of insert/update with delete in the same sink
  • 'if' expressions (if present) are valid govaluate expressions

type Skills

type Skills struct {
	// contains filtered or unexported fields
}

Skills aggregates skill operations with internal locking.

func NewSkills

func NewSkills() *Skills

NewSkills creates a new empty Skills collection.

func (*Skills) Delete

func (s *Skills) Delete(id string)

Delete removes a skill by its Id.

func (*Skills) Get

func (s *Skills) Get(id string) (*Skill, bool)

Get returns a skill by its Id. Returns (nil, false) if not found.

func (*Skills) Len

func (s *Skills) Len() int

Len returns the number of skills in the collection.

func (*Skills) List

func (s *Skills) List() []*Skill

List returns all skills. The returned slice is a copy for thread-safe iteration.

func (*Skills) Set

func (s *Skills) Set(skill *Skill)

Set adds or updates a skill in the collection.

type WriteError

type WriteError struct {
	Table string
	Err   error
}

WriteError represents a sink write error

func NewWriteError

func NewWriteError(table string, err error) *WriteError

NewWriteError creates a new write error

func (*WriteError) Error

func (e *WriteError) Error() string

func (*WriteError) Unwrap

func (e *WriteError) Unwrap() error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer writes DataSet results to SQLite sinks

func NewWriter

func NewWriter(db *sql.DB) *Writer

NewWriter creates a new sink writer

func (*Writer) DB

func (w *Writer) DB() *sql.DB

DB returns the underlying database connection

func (*Writer) EnsureTable

func (w *Writer) EnsureTable(table string, columns []string) error

EnsureTable ensures a table exists with the given schema

func (*Writer) WriteBatch

func (w *Writer) WriteBatch(ops []*SinkOp) error

WriteBatch writes multiple sink operations in a single transaction

func (*Writer) WriteDelete

func (w *Writer) WriteDelete(table string, pk string, ds *DataSet) error

WriteDelete writes delete operations to a table

func (*Writer) WriteDeleteInTx

func (w *Writer) WriteDeleteInTx(tx *sql.Tx, table string, pk string, ds *DataSet) error

WriteDeleteInTx writes delete operations within an existing transaction

func (*Writer) WriteMultiple

func (w *Writer) WriteMultiple(tableData map[string]*DataSet, pkColumn string) error

WriteMultiple writes multiple DataSets to their respective tables

func (*Writer) WriteMultipleInTx

func (w *Writer) WriteMultipleInTx(tx *sql.Tx, tableData map[string]*DataSet, pkColumn string) error

WriteMultipleInTx writes multiple DataSets within a single transaction

func (*Writer) WriteUpsert

func (w *Writer) WriteUpsert(table string, pk string, ds *DataSet) error

WriteUpsert writes a DataSet to a table using INSERT OR REPLACE (legacy method)

func (*Writer) WriteUpsertInTx

func (w *Writer) WriteUpsertInTx(tx *sql.Tx, table string, pk string, ds *DataSet) error

WriteUpsertInTx writes a DataSet to a table using INSERT OR REPLACE within an existing transaction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL