Documentation
¶
Index ¶
- Constants
- Variables
- func GetAllDBTables(path string) ([]string, error)
- func OpenRaw(dns string) (*sql.DB, *sqlite3.SQLiteConn, error)
- type ChangeLogEvent
- type ChangeLogState
- type ColumnInfo
- type EnhancedRows
- type EnhancedStatement
- type SqliteDriverConnector
- type SqliteStreamDB
- func (conn *SqliteStreamDB) BackupTo(bkFilePath string) error
- func (conn *SqliteStreamDB) CleanupChangeLogs() (int64, error)
- func (conn *SqliteStreamDB) Execute(query string) error
- func (conn *SqliteStreamDB) GetPath() string
- func (conn *SqliteStreamDB) GetRawConnection() *sqlite3.SQLiteConn
- func (conn *SqliteStreamDB) GetTableInfo(table string) ([]*ColumnInfo, error)
- func (conn *SqliteStreamDB) InstallCDC() error
- func (conn *SqliteStreamDB) RemoveCDC(tables bool) error
- func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error
- func (conn *SqliteStreamDB) RestoreFrom(bkFilePath string) error
Constants ¶
View Source
const ScanLimit = uint(100)
ScanLimit is number of change log rows processed at a time, to limit memory usage
Variables ¶
View Source
var ErrLogNotReadyToPublish = errors.New("not ready to publish changes")
View Source
var ErrNoTableMapping = errors.New("no table mapping found")
View Source
var MarmotPrefix = "__marmot__"
Functions ¶
func GetAllDBTables ¶ added in v0.3.13
Types ¶
type ChangeLogEvent ¶
type ChangeLogEvent struct {
Id int64
Type string
TableName string
Row map[string]any
// contains filtered or unexported fields
}
func (*ChangeLogEvent) Hash ¶
func (e *ChangeLogEvent) Hash() (uint64, error)
func (*ChangeLogEvent) Marshal ¶
func (e *ChangeLogEvent) Marshal() ([]byte, error)
func (*ChangeLogEvent) Unmarshal ¶
func (e *ChangeLogEvent) Unmarshal(data []byte) error
type ChangeLogState ¶
type ChangeLogState = int16
const ( Pending ChangeLogState = 0 Published ChangeLogState = 1 Failed ChangeLogState = -1 )
type ColumnInfo ¶
type EnhancedRows ¶
func (*EnhancedRows) Finalize ¶
func (rs *EnhancedRows) Finalize()
type EnhancedStatement ¶
func (*EnhancedStatement) Finalize ¶
func (stmt *EnhancedStatement) Finalize()
type SqliteDriverConnector ¶
type SqliteDriverConnector struct {
// contains filtered or unexported fields
}
func (SqliteDriverConnector) Driver ¶
func (t SqliteDriverConnector) Driver() driver.Driver
type SqliteStreamDB ¶
type SqliteStreamDB struct {
*goqu.Database
OnChange func(event *ChangeLogEvent) error
// contains filtered or unexported fields
}
func OpenStreamDB ¶
func OpenStreamDB(path string, tables []string) (*SqliteStreamDB, error)
func (*SqliteStreamDB) BackupTo ¶
func (conn *SqliteStreamDB) BackupTo(bkFilePath string) error
func (*SqliteStreamDB) CleanupChangeLogs ¶
func (conn *SqliteStreamDB) CleanupChangeLogs() (int64, error)
func (*SqliteStreamDB) Execute ¶
func (conn *SqliteStreamDB) Execute(query string) error
func (*SqliteStreamDB) GetPath ¶
func (conn *SqliteStreamDB) GetPath() string
func (*SqliteStreamDB) GetRawConnection ¶
func (conn *SqliteStreamDB) GetRawConnection() *sqlite3.SQLiteConn
func (*SqliteStreamDB) GetTableInfo ¶
func (conn *SqliteStreamDB) GetTableInfo(table string) ([]*ColumnInfo, error)
func (*SqliteStreamDB) InstallCDC ¶ added in v0.3.17
func (conn *SqliteStreamDB) InstallCDC() error
func (*SqliteStreamDB) RemoveCDC ¶
func (conn *SqliteStreamDB) RemoveCDC(tables bool) error
func (*SqliteStreamDB) Replicate ¶
func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error
func (*SqliteStreamDB) RestoreFrom ¶
func (conn *SqliteStreamDB) RestoreFrom(bkFilePath string) error
Click to show internal directories.
Click to hide internal directories.