Documentation
¶
Index ¶
- Variables
- func DataSetToMap(ds *DataSet) []map[string]interface{}
- func NormalizeSQLForTest(sql string) string
- func OperationToSinkType(op Operation) string
- type CDCParameters
- type ConfigError
- type DataSet
- type DriverExecutor
- type DriverType
- type Engine
- type ExecutionError
- type Executor
- type ExtractOption
- type FileMetadata
- type Loader
- type MSSQLExecutor
- type OnConflictStrategy
- type Operation
- type ParamExtractor
- func (e *ParamExtractor) Extract(sqlTmpl string, params map[string]interface{}, opt ExtractOption) (string, []interface{}, error)
- func (e *ParamExtractor) ExtractNamed(sqlTmpl string, params map[string]interface{}) (string, []interface{}, error)
- func (e *ParamExtractor) ExtractParamNames(sqlTmpl string) []string
- func (e *ParamExtractor) ReplacePlaceholders(sqlTmpl string) string
- func (e *ParamExtractor) Validate(sqlTmpl string, params map[string]interface{}) error
- type Parameter
- type Plugin
- func (p *Plugin) AttachDB(db *sql.DB)
- func (p *Plugin) Handle(tx *core.Transaction) ([]core.Sink, error)
- func (p *Plugin) HandleWithPull(tx *core.Transaction, batchCtx *core.BatchContext, monitorDB *monitor.DB) ([]core.Sink, error)
- func (p *Plugin) Name() string
- func (p *Plugin) StartWatch()
- func (p *Plugin) Stop() error
- func (p *Plugin) StopWatch()
- func (p *Plugin) Type() string
- type PluginMetadata
- type Pool
- type PoolManager
- type RoutingError
- type SQLiteExecutor
- type SQLiteSink
- func (s *SQLiteSink) Close() error
- func (s *SQLiteSink) GetPool() *Pool
- func (s *SQLiteSink) LastModified() (time.Time, error)
- func (s *SQLiteSink) Query(query string, args ...any) (*sql.Rows, error)
- func (s *SQLiteSink) QueryRow(query string, args ...any) *sql.Row
- func (s *SQLiteSink) RunMigrations() error
- func (s *SQLiteSink) Write(ctx context.Context, ops []core.Sink) error
- type Sink
- type SinkConfig
- type SinkOp
- type Skill
- type Skills
- type WriteError
- type Writer
- func (w *Writer) DB() *sql.DB
- func (w *Writer) EnsureTable(table string, columns []string) error
- func (w *Writer) WriteBatch(ops []*SinkOp) error
- func (w *Writer) WriteDelete(table string, pk string, ds *DataSet) error
- func (w *Writer) WriteDeleteInTx(tx *sql.Tx, table string, pk string, ds *DataSet) error
- func (w *Writer) WriteMultiple(tableData map[string]*DataSet, pkColumn string) error
- func (w *Writer) WriteMultipleInTx(tx *sql.Tx, tableData map[string]*DataSet, pkColumn string) error
- func (w *Writer) WriteUpsert(table string, pk string, ds *DataSet) error
- func (w *Writer) WriteUpsertInTx(tx *sql.Tx, table string, pk string, ds *DataSet) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrPluginNotFound = fmt.Errorf("plugin not found") ErrInvalidConfig = fmt.Errorf("invalid plugin configuration") ErrMissingField = fmt.Errorf("missing required field") ErrInvalidSQL = fmt.Errorf("invalid SQL template") ErrMissingParameter = fmt.Errorf("missing parameter in SQL template") ErrSQLFileNotFound = fmt.Errorf("SQL file not found") ErrExecutionFailed = fmt.Errorf("SQL execution failed") ErrRoutingFailed = fmt.Errorf("result routing failed") ErrWriteFailed = fmt.Errorf("sink write failed") ErrInvalidParameter = fmt.Errorf("invalid parameter") )
Plugin errors
Functions ¶
func DataSetToMap ¶
DataSetToMap converts DataSet to slice of maps for easier processing
func NormalizeSQLForTest ¶
Test helper for testing SQL normalization
func OperationToSinkType ¶
OperationToSinkType converts Operation to sink type string
Types ¶
type CDCParameters ¶
type CDCParameters struct {
CDCLSN string
CDCTxID string
CDCTable string
CDCOperation int
Fields map[string]interface{}
}
CDCParameters represents CDC data mapped to SQL parameters
type ConfigError ¶
ConfigError represents a configuration validation error
func NewConfigError ¶
func NewConfigError(field, message string) *ConfigError
NewConfigError creates a new config error
func (*ConfigError) Error ¶
func (e *ConfigError) Error() string
type DataSet ¶
type DataSet struct {
Columns []string
Rows [][]interface{}
}
DataSet represents query results
type DriverExecutor ¶
type DriverExecutor interface {
// Execute runs a parameterized SQL query
Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)
// ExecuteBatch runs multiple SQL statements in a transaction
ExecuteBatch(statements []string, params []map[string]interface{}) error
// DB returns the underlying database connection
DB() *sql.DB
}
DriverExecutor defines the interface for database-specific SQL execution
type DriverType ¶
type DriverType string
DriverType represents the type of database driver
const ( DriverMSSQL DriverType = "mssql" DriverSQLite DriverType = "sqlite" )
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the SQL Plugin execution engine It is stateless and shared across all skills. The caller is responsible for setting the current skill via HandleWithSkill or by setting e.skill directly before calling Handle.
func (*Engine) Handle ¶
Handle processes a core.Transaction through the SQL Plugin It extracts CDC changes, executes sinks against MSSQL Returns all job operations as []core.Sink for the caller to write Multiple sinks targeting the same table+pk are merged into a single sink
func (*Engine) HandleWithPull ¶
func (e *Engine) HandleWithPull(tx *core.Transaction, skill *Skill, batchCtx *core.BatchContext, monitorDB *monitor.DB) ([]core.Sink, error)
HandleWithPull executes the engine with a specific skill and writes skill log. Skill log is written with EXECUTED status and rows_processed from returned sinks. If no sinks are produced (if condition not met), SKIP is logged with rows_processed=0. If execution fails, ERROR is logged.
func (*Engine) HandleWithSkill ¶
HandleWithSkill executes the engine with a specific skill, then restores the original skill. This allows the engine to be stateless and shared across all skills.
type ExecutionError ¶
ExecutionError represents a SQL execution error
func NewExecutionError ¶
func NewExecutionError(sql string, params map[string]interface{}, err error) *ExecutionError
NewExecutionError creates a new execution error
func (*ExecutionError) Error ¶
func (e *ExecutionError) Error() string
func (*ExecutionError) Unwrap ¶
func (e *ExecutionError) Unwrap() error
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor executes SQL templates with parameter substitution
func NewExecutor ¶
NewExecutor creates a new SQL template executor
func NewExecutorWithDriver ¶
func NewExecutorWithDriver(db *sql.DB, dbType DriverType) *Executor
NewExecutorWithDriver creates a new executor with a specific driver dbType: "mssql" or "sqlite"
type ExtractOption ¶
type ExtractOption int
ExtractOption defines how to convert @name parameters
const ( // OptionPlaceholders converts @name to ? (for SQLite) OptionPlaceholders ExtractOption = iota // OptionKeepNames keeps @name (for MSSQL) OptionKeepNames )
type FileMetadata ¶
type FileMetadata struct {
Path string `json:"path"`
IsSQL bool `json:"is_sql"`
CurVersion string `json:"cur_version"` // SHA256 hash of current loaded content
CurModTime time.Time `json:"cur_mod_time"`
CurSize int64 `json:"cur_size"`
CurLoadedAt time.Time `json:"cur_loaded_at"`
NewVersion string `json:"new_version"` // SHA256 hash of new content (if changed)
NewModTime time.Time `json:"new_mod_time"`
NewSize int64 `json:"new_size"`
NeedsReload bool `json:"needs_reload"`
IsDeleted bool `json:"is_deleted"`
}
FileMetadata tracks the state of a single file (.yml or .sql)
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
Loader handles loading and parsing SQL plugin configurations. Deprecated: The New() function should be used instead for new code.
type MSSQLExecutor ¶
type MSSQLExecutor struct {
// contains filtered or unexported fields
}
MSSQLExecutor implements DriverExecutor for MSSQL
func NewMSSQLExecutor ¶
func NewMSSQLExecutor(db *sql.DB) *MSSQLExecutor
NewMSSQLExecutor creates a new MSSQL executor
func (*MSSQLExecutor) DB ¶
func (e *MSSQLExecutor) DB() *sql.DB
DB returns the underlying database connection
func (*MSSQLExecutor) Execute ¶
func (e *MSSQLExecutor) Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)
Execute runs a parameterized SQL query on MSSQL
func (*MSSQLExecutor) ExecuteBatch ¶
func (e *MSSQLExecutor) ExecuteBatch(statements []string, params []map[string]interface{}) error
ExecuteBatch executes multiple statements in a transaction
type OnConflictStrategy ¶
type OnConflictStrategy int
OnConflictStrategy defines how to handle conflicts on insert/update
const ( // ConflictOverwrite replaces existing record (INSERT OR REPLACE, direct UPDATE) ConflictOverwrite OnConflictStrategy = iota // ConflictSkip ignores the operation if record exists (INSERT OR IGNORE, conditional UPDATE) ConflictSkip // ConflictError returns an error if record exists (INSERT with check, conditional UPDATE) ConflictError )
func ParseOnConflictStrategy ¶
func ParseOnConflictStrategy(s string) OnConflictStrategy
ParseOnConflictStrategy parses string to OnConflictStrategy
func (OnConflictStrategy) String ¶
func (s OnConflictStrategy) String() string
String returns the strategy name
type ParamExtractor ¶
type ParamExtractor struct {
// contains filtered or unexported fields
}
ParamExtractor extracts @name parameters from SQL templates and converts them to driver-specific parameter formats
func NewParamExtractor ¶
func NewParamExtractor() *ParamExtractor
NewParamExtractor creates a new parameter extractor
func (*ParamExtractor) Extract ¶
func (e *ParamExtractor) Extract(sqlTmpl string, params map[string]interface{}, opt ExtractOption) (string, []interface{}, error)
Extract converts SQL template to driver-specific format sqlTmpl: SELECT * FROM orders WHERE order_id = @orders_order_id params: map of parameter name → value Returns: converted SQL, args/named args, error
func (*ParamExtractor) ExtractNamed ¶
func (e *ParamExtractor) ExtractNamed(sqlTmpl string, params map[string]interface{}) (string, []interface{}, error)
ExtractNamed extracts parameters for MSSQL named args
func (*ParamExtractor) ExtractParamNames ¶
func (e *ParamExtractor) ExtractParamNames(sqlTmpl string) []string
ExtractParamNames extracts all parameter names from SQL template in order
func (*ParamExtractor) ReplacePlaceholders ¶
func (e *ParamExtractor) ReplacePlaceholders(sqlTmpl string) string
ReplacePlaceholders replaces @name with ? in SQL template
type Parameter ¶
type Parameter struct {
Name string // @orders_order_id
Value interface{}
}
Parameter represents a named parameter in SQL template
type Plugin ¶
type Plugin struct {
Skills *Skills // thread-safe collection of skills
// contains filtered or unexported fields
}
Plugin implements the plugin.Plugin interface for SQL plugins. It manages multiple skills (yaml configs + SQL templates) with a shared execution engine. It supports hot-reloading when skill files change. Sink writing is handled by the platform layer based on the Database field in returned sinks.
func New ¶
New creates a new SQL plugin that manages multiple skills. Pass db=nil if you need to call AttachDB later. Sink writing is handled by the platform layer - this plugin only handles SQL execution.
func (*Plugin) Handle ¶
Handle processes a CDC transaction through all SQL skills. It iterates over all skills, checks if they match the transaction table, and executes the engine for each matching skill.
func (*Plugin) HandleWithPull ¶
func (p *Plugin) HandleWithPull(tx *core.Transaction, batchCtx *core.BatchContext, monitorDB *monitor.DB) ([]core.Sink, error)
HandleWithPull processes a CDC transaction with pull context for observability. Skill logs are written for each skill:
- SKIP: skill did not match the transaction table
- EXECUTED: skill matched and produced sinks
- ERROR: skill matched but execution failed
Sinks are returned for the caller to write.
func (*Plugin) StartWatch ¶
func (p *Plugin) StartWatch()
StartWatch starts the internal file watcher for skill files.
type PluginMetadata ¶
type PluginMetadata struct {
Name string `json:"name"`
Type string `json:"type"` // "sql"
Status string `json:"status"` // loaded | stale | error
NeedsReload bool `json:"needs_reload"`
LoadCount int `json:"load_count"`
LastError string `json:"last_error,omitempty"`
Files map[string]FileMetadata `json:"files"`
}
PluginMetadata contains metadata and file tracking information for a SQL plugin
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool wraps a cnlangzi/sqlite.DB for a business sink database. cnlangzi/sqlite provides read/write separation, buffered batch writes, and automatic WAL checkpoint management.
func NewPool ¶
NewPool creates a new SQLite connection pool for a sink. Path normalization: ./data/sinks/business.db -> ./data/sinks/business The actual DB file will be at {path}/{name}.db
func (*Pool) MigrationsPath ¶
MigrationsPath returns the migrations directory path
type PoolManager ¶
type PoolManager struct {
// contains filtered or unexported fields
}
PoolManager manages multiple sink pools
func NewPoolManager ¶
func NewPoolManager(skillsDir string) *PoolManager
NewPoolManager creates a new pool manager
func (*PoolManager) GetPool ¶
func (m *PoolManager) GetPool(ctx context.Context, skillName string, sqlitePath string) (*Pool, error)
GetPool returns an existing pool or creates a new one for the given skill
func (*PoolManager) ListPools ¶
func (m *PoolManager) ListPools() []string
ListPools returns all managed pool names
type RoutingError ¶
RoutingError represents a result routing error
func NewRoutingError ¶
func NewRoutingError(sinkName string, err error) *RoutingError
NewRoutingError creates a new routing error
func (*RoutingError) Error ¶
func (e *RoutingError) Error() string
func (*RoutingError) Unwrap ¶
func (e *RoutingError) Unwrap() error
type SQLiteExecutor ¶
type SQLiteExecutor struct {
// contains filtered or unexported fields
}
SQLiteExecutor implements DriverExecutor for SQLite
func NewSQLiteExecutor ¶
func NewSQLiteExecutor(db *sql.DB) *SQLiteExecutor
NewSQLiteExecutor creates a new SQLite executor
func (*SQLiteExecutor) DB ¶
func (e *SQLiteExecutor) DB() *sql.DB
DB returns the underlying database connection
func (*SQLiteExecutor) Execute ¶
func (e *SQLiteExecutor) Execute(sqlTmpl string, params map[string]interface{}) (*DataSet, error)
Execute runs a parameterized SQL query on SQLite
func (*SQLiteExecutor) ExecuteBatch ¶
func (e *SQLiteExecutor) ExecuteBatch(statements []string, params []map[string]interface{}) error
ExecuteBatch executes multiple statements in a transaction
type SQLiteSink ¶
type SQLiteSink struct {
// contains filtered or unexported fields
}
SQLiteSink writes business data to a dedicated SQLite file per skill. It uses connection pooling for read/write separation and supports migrations.
func NewSQLiteSink ¶
func NewSQLiteSink(skill *Skill, pool *Pool) *SQLiteSink
NewSQLiteSink creates a new SQLite sink for a skill
func (*SQLiteSink) GetPool ¶
func (s *SQLiteSink) GetPool() *Pool
GetPool returns the underlying pool
func (*SQLiteSink) LastModified ¶
func (s *SQLiteSink) LastModified() (time.Time, error)
LastModified returns the last modification time of the sink database
func (*SQLiteSink) QueryRow ¶
func (s *SQLiteSink) QueryRow(query string, args ...any) *sql.Row
QueryRow executes a read query that returns a single row
func (*SQLiteSink) RunMigrations ¶
func (s *SQLiteSink) RunMigrations() error
RunMigrations runs any pending migrations using sqle/migrate
type Sink ¶
type Sink struct {
SinkConfig `yaml:",inline"` // Embedded SinkConfig for backward compatibility
When []string `yaml:"when"` // Required: [insert, update] or [delete]
}
Sink represents a single sink configuration with operation filter
type SinkConfig ¶
type SinkConfig struct {
Name string `yaml:"name"` // Sink name
On string `yaml:"on"` // Table filter (required for multi-table)
If string `yaml:"if"` // SQL-style condition expression (govaluate)
SQL string `yaml:"sql"` // Inline SQL template
SQLFile string `yaml:"sql_file"` // External SQL file path
Output string `yaml:"output"` // Target table name
PrimaryKey string `yaml:"primary_key"` // Primary key column
OnConflict string `yaml:"on_conflict"` // Conflict strategy: overwrite | skip | error (default: skip)
OnLower string `yaml:"-"` // Precomputed lowercase of On (for performance)
// contains filtered or unexported fields
}
SinkConfig represents a single job configuration
func FilterSinks ¶
func FilterSinks(sinks []SinkConfig, tableName string) []SinkConfig
FilterSinks filters sinks by table name (for multi-table CDC)
func (*SinkConfig) EvalIf ¶
func (s *SinkConfig) EvalIf(cdcParams CDCParameters) bool
EvalIf evaluates the 'if' expression for a sink against CDC data. It returns true if:
- The sink has no 'if' expression (always true)
- The expression evaluates to true
It returns false if the expression evaluates to false or an error occurs.
Table and field names in expressions are treated case-insensitively:
"orders.status = 'vip'" and "ORDERS.STATUS = 'vip'" both map to the same fields.
Literal values in expressions remain case-sensitive.
func (*SinkConfig) GetOnConflict ¶
func (s *SinkConfig) GetOnConflict() OnConflictStrategy
GetOnConflict returns the OnConflictStrategy for this job
type SinkOp ¶
type SinkOp struct {
Config SinkConfig
DataSet *DataSet
OpType Operation
}
SinkOp represents a single sink operation (write to SQLite)
type Skill ¶
type Skill struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
On []string `yaml:"on"` // Tables to monitor
Database string `yaml:"database"` // Target database name (maps to platform-configured storage)
Sinks []Sink `yaml:"sinks"`
Outputs map[string][]string `yaml:"-"` // key = output table name, value = field names
File string `yaml:"-"` // Auto-assigned: relative path from config.plugins.sql.path
Id string `yaml:"-"` // Auto-assigned: SHA256(File)[:12]
Raw string `yaml:"-"` // Auto-assigned: raw YAML content
Error string `yaml:"-"` // last load/parsing error message (if any)
LastLoadedAt time.Time `yaml:"-"` // last successful load time
// contains filtered or unexported fields
}
Skill represents a SQL plugin configuration
func (*Skill) FilterByOperation ¶
func (s *Skill) FilterByOperation(opType Operation) []SinkConfig
FilterByOperation returns sinks that handle the given operation type. The 'when' field must contain the operation string ("insert", "update", or "delete"). Results are cached per skill for performance.
func (*Skill) ValidateSinks ¶
ValidateSinks validates the sinks configuration and returns an error if invalid. It ensures:
- 'when' field is required for all sinks
- Only [insert, update] or [delete] are valid 'when' values
- No mixing of insert/update with delete in the same sink
- 'if' expressions (if present) are valid govaluate expressions
type Skills ¶
type Skills struct {
// contains filtered or unexported fields
}
Skills aggregates skill operations with internal locking.
type WriteError ¶
WriteError represents a sink write error
func NewWriteError ¶
func NewWriteError(table string, err error) *WriteError
NewWriteError creates a new write error
func (*WriteError) Error ¶
func (e *WriteError) Error() string
func (*WriteError) Unwrap ¶
func (e *WriteError) Unwrap() error
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer writes DataSet results to SQLite sinks
func (*Writer) EnsureTable ¶
EnsureTable ensures a table exists with the given schema
func (*Writer) WriteBatch ¶
WriteBatch writes multiple sink operations in a single transaction
func (*Writer) WriteDelete ¶
WriteDelete writes delete operations to a table
func (*Writer) WriteDeleteInTx ¶
WriteDeleteInTx writes delete operations within an existing transaction
func (*Writer) WriteMultiple ¶
WriteMultiple writes multiple DataSets to their respective tables
func (*Writer) WriteMultipleInTx ¶
func (w *Writer) WriteMultipleInTx(tx *sql.Tx, tableData map[string]*DataSet, pkColumn string) error
WriteMultipleInTx writes multiple DataSets within a single transaction
func (*Writer) WriteUpsert ¶
WriteUpsert writes a DataSet to a table using INSERT OR REPLACE (legacy method)