Documentation
¶
Index ¶
- Constants
- Variables
- type Canal
- func (c *Canal) AddDumpDatabases(dbs ...string)
- func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)
- func (c *Canal) AddDumpTables(db string, tables ...string)
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) ClearTableCache(db []byte, table []byte)
- func (c *Canal) Close()
- func (c *Canal) Ctx() context.Context
- func (c *Canal) Dump() error
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDelay() uint32
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) GetTable(db string, table string) (*schema.Table, error)
- func (c *Canal) Run() error
- func (c *Canal) RunFrom(pos mysql.Position) error
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) SetTableCache(db []byte, table []byte, schema *schema.Table)
- func (c *Canal) StartFromGTID(set mysql.GTIDSet) error
- func (c *Canal) SyncedGTIDSet() mysql.GTIDSet
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) SyncedTimestamp() uint32
- func (c *Canal) WaitDumpDone() <-chan struct{}
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error
- func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error
- func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error
- func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error
- func (h *DummyEventHandler) OnRow(*RowsEvent) error
- func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error
- func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error
- func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error
- func (h *DummyEventHandler) String() string
- type DumpConfig
- type EventHandler
- type RowsEvent
Constants ¶
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" )
The action name for sync.
Variables ¶
var ErrExcludedTable = errors.New("excluded table meta")
var UnknownTableRetryPeriod = time.Second * time.Duration(10)
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
This section is empty.
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) AddDumpDatabases ¶
func (*Canal) AddDumpIgnoreTables ¶
func (*Canal) AddDumpTables ¶
func (*Canal) CheckBinlogRowImage ¶
CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) ClearTableCache ¶
ClearTableCache clear table cache
func (*Canal) FlushBinlog ¶
func (*Canal) Run ¶
Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SetTableCache ¶
SetTableCache sets table cache value for the given table
func (*Canal) SyncedGTIDSet ¶
func (*Canal) SyncedPosition ¶
func (*Canal) SyncedTimestamp ¶
func (*Canal) WaitDumpDone ¶
func (c *Canal) WaitDumpDone() <-chan struct{}
type Config ¶
type Config struct {
Addr string `toml:"addr"`
User string `toml:"user"`
Password string `toml:"password"`
Charset string `toml:"charset"`
ServerID uint32 `toml:"server_id"`
Flavor string `toml:"flavor"`
HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
ReadTimeout time.Duration `toml:"read_timeout"`
// IncludeTableRegex or ExcludeTableRegex should contain database name.
// IncludeTableRegex defines the tables that will be included, if empty, all tables will be included.
// ExcludeTableRegex defines the tables that will be excluded from the ones defined by IncludeTableRegex.
// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
// this will include all database's 'canal' table, except database 'mysql'.
// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
IncludeTableRegex []string `toml:"include_table_regex"`
ExcludeTableRegex []string `toml:"exclude_table_regex"`
// discard row event without table meta
DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`
Dump DumpConfig `toml:"dump"`
UseDecimal bool `toml:"use_decimal"`
ParseTime bool `toml:"parse_time"`
TimestampStringLocation *time.Location
// SemiSyncEnabled enables semi-sync or not.
SemiSyncEnabled bool `toml:"semi_sync_enabled"`
// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
// this configuration will not work if DisableRetrySync is true
MaxReconnectAttempts int `toml:"max_reconnect_attempts"`
// whether disable re-sync for broken connection
DisableRetrySync bool `toml:"disable_retry_sync"`
// whether the function WaitUntilPos() can use FLUSH BINARY LOGS
// to ensure we advance past a position. This should not strictly be required,
// and requires additional privileges.
DisableFlushBinlogWhileWaiting bool `toml:"disable_flush_binlog_while_waiting"`
// Set TLS config
TLSConfig *tls.Config
// Set Logger
Logger loggers.Advanced
// Set Dialer
Dialer client.Dialer
// Set Localhost
Localhost string
}
func NewConfigWithFile ¶
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
NewDefaultConfig initiates some default config for Canal
type DummyEventHandler ¶
type DummyEventHandler struct {
}
func (*DummyEventHandler) OnDDL ¶
func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error
func (*DummyEventHandler) OnGTID ¶
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error
func (*DummyEventHandler) OnPosSynced ¶
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error
func (*DummyEventHandler) OnRow ¶
func (h *DummyEventHandler) OnRow(*RowsEvent) error
func (*DummyEventHandler) OnRowsQueryEvent ¶
func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error
func (*DummyEventHandler) OnTableChanged ¶
func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error
func (*DummyEventHandler) OnXID ¶
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type DumpConfig ¶
type DumpConfig struct {
// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
// If not set, ignore using mysqldump.
ExecutionPath string `toml:"mysqldump"`
// Will override Databases, tables is in database table_db
Tables []string `toml:"tables"`
TableDB string `toml:"table_db"`
Databases []string `toml:"dbs"`
// Ignore table format is db.table
IgnoreTables []string `toml:"ignore_tables"`
// Dump only selected records. Quotes are mandatory
Where string `toml:"where"`
// If true, discard error msg, else, output to stderr
DiscardErr bool `toml:"discard_err"`
// Set true to skip --master-data if we have no privilege to do
// 'FLUSH TABLES WITH READ LOCK'
SkipMasterData bool `toml:"skip_master_data"`
// Set to change the default max_allowed_packet size
MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
// Set to change the default protocol to connect with
Protocol string `toml:"protocol"`
// Set extra options
ExtraOptions []string `toml:"extra_options"`
}
type EventHandler ¶
type EventHandler interface {
OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
// OnTableChanged is called when the table is created, altered, renamed or dropped.
// You need to clear the associated data like cache with the table.
// It will be called before OnDDL.
OnTableChanged(header *replication.EventHeader, schema string, table string) error
OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(header *replication.EventHeader, nextPos mysql.Position) error
OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnRowsQueryEvent is called when binlog_rows_query_log_events=ON for each DML query.
// You'll get the original executed query, with comments if present.
// It will be called before OnRow.
OnRowsQueryEvent(e *replication.RowsQueryEvent) error
String() string
}
type RowsEvent ¶
type RowsEvent struct {
Table *schema.Table
Action string
// changed row list
// binlog has three update event version, v0, v1 and v2.
// for v1 and v2, the rows number must be even.
// Two rows for one event, format is [before update row, after update row]
// for update v0, only one row for a event, and we don't support this version.
Rows [][]interface{}
// Header can be used to inspect the event
Header *replication.EventHeader
}
RowsEvent is the event for row replication.