Documentation
¶
Overview ¶
Package binlog 将自己伪装成slave获取mysql主从复杂流来 获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互 以及binlog的row模式下的格式解析。使用方式较为简单,首先你 要实现一个MysqlTableMapper
type mysqlColumnAttribute struct {
field string
typ string
}
func (m *mysqlColumnAttribute) Field() string {
return m.field
}
func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
return strings.Contains(m.typ, mysqlUnsigned)
}
type mysqlTableInfo struct {
name MysqlTableName
columns []MysqlColumn
}
func (m *mysqlTableInfo) Name() MysqlTableName {
return m.name
}
func (m *mysqlTableInfo) Columns() []MysqlColumn {
return m.columns
}
type exampleMysqlTableMapper struct {
db *sql.DB
}
func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
info := &mysqlTableInfo{
name: name,
columns: make([]MysqlColumn, 0, 10),
}
query := "desc " + name.String()
rows, err := e.db.Query(query)
if err != nil {
return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
}
defer rows.Close()
var null,key,extra string
var columnDefault []byte
for i := 0; rows.Next(); i++ {
column := &mysqlColumnAttribute{}
err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra)
if err != nil {
return info, err
}
info.columns = append(info.columns, column)
}
return info, nil
}
再申请一个NewRowStreamer,数据库连接信息为user:password@tcp(ip:port)/db user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址, port是mysql的端口,db是mysql的数据库名,serverID要与主库不同, SetStartBinlogPosition的参数可以通过SHOW MASTER STATUS获取
dsn := "example:example@tcp(localhost:3306)/mysql"
r, err := NewRowStreamer(dsn, 1234, e)
if err != nil {
fmt.Printf("NewRowStreamer fail. err: %v", err)
return
}
r.SetStartBinlogPosition(pos)
然后开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息
ctx := context.Background()
err = r.Stream(ctx, func(t *Transaction) error {
fmt.Printf("%v", *t)
return nil
})
最后可以通过ctx的cancal结束本binlog流的同步
Index ¶
- Constants
- Variables
- func SetLogger(logger Logger)
- type ColumnData
- type ColumnType
- func (c ColumnType) IsBit() bool
- func (c ColumnType) IsBlob() bool
- func (c ColumnType) IsDate() bool
- func (c ColumnType) IsDateTime() bool
- func (c ColumnType) IsDecimal() bool
- func (c ColumnType) IsFloat() bool
- func (c ColumnType) IsGeometry() bool
- func (c ColumnType) IsInteger() bool
- func (c ColumnType) IsString() bool
- func (c ColumnType) IsTime() bool
- func (c ColumnType) IsTimestamp() bool
- func (c ColumnType) String() string
- type FormatType
- type LogLevel
- type Logger
- type MysqlColumn
- type MysqlTable
- type MysqlTableMapper
- type MysqlTableName
- type Position
- type RowData
- type RowStreamer
- type SendTransactionFunc
- type StatementType
- type StreamEvent
- type Transaction
Examples ¶
Constants ¶
const ( ColumnTypeDecimal = replication.TypeDecimal //精确实数 ColumnTypeTiny = replication.TypeTiny //int8 ColumnTypeShort = replication.TypeShort //int16 ColumnTypeLong = replication.TypeLong //int32 ColumnTypeFloat = replication.TypeFloat //float32 ColumnTypeDouble = replication.TypeDouble //float64 ColumnTypeNull = replication.TypeNull //null ColumnTypeTimestamp = replication.TypeTimestamp //时间戳 ColumnTypeLongLong = replication.TypeLongLong //int64 ColumnTypeInt24 = replication.TypeInt24 //int24 ColumnTypeDate = replication.TypeDate //日期 ColumnTypeTime = replication.TypeTime //时间 ColumnTypeDateTime = replication.TypeDateTime //日期时间 ColumnTypeYear = replication.TypeYear //year ColumnTypeNewDate = replication.TypeNewDate //日期 ColumnTypeVarchar = replication.TypeVarchar //可变字符串 ColumnTypeBit = replication.TypeBit //bit ColumnTypeTimestamp2 = replication.TypeTimestamp2 //时间戳 ColumnTypeDateTime2 = replication.TypeDateTime2 //日期时间 ColumnTypeTime2 = replication.TypeTime2 //时间 ColumnTypeJSON = replication.TypeJSON //json ColumnTypeNewDecimal = replication.TypeNewDecimal //精确实数 ColumnTypeEnum = replication.TypeEnum //枚举 ColumnTypeSet = replication.TypeSet //字符串 ColumnTypeTinyBlob = replication.TypeTinyBlob //小型二进制 ColumnTypeMediumBlob = replication.TypeMediumBlob //中型二进制 ColumnTypeLongBlob = replication.TypeLongBlob //长型二进制 ColumnTypeBlob = replication.TypeBlob //长型二进制 ColumnTypeVarString = replication.TypeVarString //可变字符串 ColumnTypeString = replication.TypeString //字符串 ColumnTypeGeometry = replication.TypeGeometry //几何 )
列数据类型
Variables ¶
var ( FormatTypeRow = FormatType("ROW") //列 FormatTypeMixed = FormatType("MIXED") //混合 FormatTypeStatement = FormatType("STATEMENT") //语句 )
binlog格式类型
var (
ErrStreamEOF = errors.New("stream reached EOF") //信息流到达EOF
)
信息流到达EOF错误信息用于标识binlog流结束
Functions ¶
Types ¶
type ColumnData ¶
type ColumnData struct {
Filed string // 字段信息
Type ColumnType // binlog中的列类型
IsEmpty bool // data is empty,即该列没有变化
Data []byte // the data
}
ColumnData 单个列的信息
func NewColumnData ¶
func NewColumnData(filed string, typ ColumnType, isEmpty bool) *ColumnData
NewColumnData 创建ColumnData
func (*ColumnData) MarshalJSON ¶
func (c *ColumnData) MarshalJSON() ([]byte, error)
MarshalJSON 实现ColumnData的json序列化
type Logger ¶
type Logger interface {
Errorf(string, ...interface{}) //错误日志打印
Infof(string, ...interface{}) //进程日志打印
Debugf(string, ...interface{}) //调试日志打印
Print(args ...interface{}) //打印dump包的错误日志
}
Logger 用于打印binlog包的调试日志
type MysqlColumn ¶
MysqlColumn 用于实现mysql表列的接口
type MysqlTable ¶
type MysqlTable interface {
Name() MysqlTableName //表名
Columns() []MysqlColumn //所有列
}
MysqlTable 用于实现mysql表的接口
type MysqlTableMapper ¶
type MysqlTableMapper interface {
MysqlTable(name MysqlTableName) (MysqlTable, error)
}
MysqlTableMapper 用于获取表信息的接口
type MysqlTableName ¶
type MysqlTableName struct {
DbName string `json:"db"` //数据库名
TableName string `json:"table"` //表名
}
MysqlTableName mysql的表名
func NewMysqlTableName ¶
func NewMysqlTableName(database, table string) MysqlTableName
NewMysqlTableName 创建MysqlTableName
type Position ¶
type Position struct {
Filename string `json:"filename"` //binlog文件名
Offset int64 `json:"offset"` //在binlog文件中的位移
}
Position 指定binlog的位置,以文件名和位移
type RowStreamer ¶
type RowStreamer struct {
// contains filtered or unexported fields
}
RowStreamer 从github.com/youtube/vitess/go/vt/binlog/binlog_streamer.go的基础上移植过来 专门用来RowStreamer解析row模式的binlog event,将其变为对应的事务
func NewRowStreamer ¶
func NewRowStreamer(dsn string, serverID uint32, tableMapper MysqlTableMapper) (*RowStreamer, error)
NewRowStreamer dsn是mysql数据库的信息,serverID是标识该数据库的信息
func (*RowStreamer) SetStartBinlogPosition ¶
func (s *RowStreamer) SetStartBinlogPosition(startPos Position)
SetStartBinlogPosition 设置开始的binlog位置
func (*RowStreamer) Stream ¶
func (s *RowStreamer) Stream(ctx context.Context, sendTransaction SendTransactionFunc) error
Stream 注册一个处理事务信息函数到Stream中
Example ¶
package main
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"os/signal"
"strings"
//_ "github.com/go-sql-driver/mysql" you need it in you own project
)
const (
mysqlUnsigned = "unsigned" //无符号
)
// 列属性
type mysqlColumnAttribute struct {
field string //列名
typ string //列类型
null string //是否为空
key string //PRI代表主键,UNI代表唯一索引
columnDefault []byte //默认值
extra string //其他备注信息
}
func (m *mysqlColumnAttribute) Field() string {
return m.field
}
func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
return strings.Contains(m.typ, mysqlUnsigned)
}
type mysqlTableInfo struct {
name MysqlTableName
columns []MysqlColumn
}
func (m *mysqlTableInfo) Name() MysqlTableName {
return m.name
}
func (m *mysqlTableInfo) Columns() []MysqlColumn {
return m.columns
}
type exampleMysqlTableMapper struct {
db *sql.DB
}
func (e *exampleMysqlTableMapper) GetBinlogFormat() (format FormatType, err error) {
query := "SHOW VARIABLES LIKE 'binlog_format'"
var name, str string
err = e.db.QueryRow(query).Scan(&name, &str)
if err != nil {
err = fmt.Errorf("QueryRow fail. query: %s, error: %v", query, err)
return
}
format = FormatType(str)
return
}
func (e *exampleMysqlTableMapper) GetBinlogPosition() (pos Position, err error) {
query := "SHOW MASTER STATUS"
var metaDoDb, metaIgnoreDb, executedGTidSet string
err = e.db.QueryRow(query).Scan(&pos.Filename, &pos.Offset, &metaDoDb, &metaIgnoreDb, &executedGTidSet)
if err != nil {
err = fmt.Errorf("query fail. query: %s, error: %v", query, err)
return
}
return
}
func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
info := &mysqlTableInfo{
name: name,
columns: make([]MysqlColumn, 0, 10),
}
query := "desc " + name.String()
rows, err := e.db.Query(query)
if err != nil {
return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
}
defer rows.Close()
for i := 0; rows.Next(); i++ {
column := &mysqlColumnAttribute{}
err = rows.Scan(&column.field, &column.typ, &column.null, &column.key, &column.columnDefault, &column.extra)
if err != nil {
return info, err
}
info.columns = append(info.columns, column)
}
return info, nil
}
func showTransaction(t *Transaction) {
b, err := t.MarshalJSON()
if err != nil {
lw.logger().Errorf("MarshalJSON fail. err: %v", err)
return
}
lw.logger().Print("%v", string(b))
}
func main() {
SetLogger(NewDefaultLogger(os.Stdout, DebugLevel))
dsn := "example:example@tcp(localhost:3306)/mysql?charset=utf8mb4"
db, err := sql.Open("mysql", dsn)
if err != nil {
lw.logger().Errorf("open fail. err: %v", err)
return
}
defer db.Close()
db.SetMaxIdleConns(2)
db.SetMaxOpenConns(4)
e := &exampleMysqlTableMapper{db: db}
format, err := e.GetBinlogFormat()
if err != nil {
lw.logger().Errorf("getBinlogFormat fail. err: %v", err)
return
}
if !format.IsRow() {
lw.logger().Errorf("binlog format is not row. format: %v", format)
return
}
pos, err := e.GetBinlogPosition()
if err != nil {
lw.logger().Errorf("GetBinlogPosition fail. err: %v", err)
return
}
r, err := NewRowStreamer(dsn, 1234, e)
if err != nil {
lw.logger().Errorf("NewRowStreamer fail. err: %v", err)
return
}
r.SetStartBinlogPosition(pos)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
processWait := make(chan os.Signal, 1)
signal.Notify(processWait, os.Kill, os.Interrupt)
go func() {
select {
case <-processWait:
cancel()
}
}()
err = r.Stream(ctx, func(t *Transaction) error {
showTransaction(t)
return nil
})
if err != nil {
log.Fatalf("Stream fail. err: %v", err)
return
}
}
type SendTransactionFunc ¶
type SendTransactionFunc func(*Transaction) error
SendTransactionFunc 处理事务信息函数,你可以将一个chan注册到这个函数中如
func getTransaction(tran *Transaction) error{
Transactions <- tran
return nil
}
如果这个函数返回错误,那么RowStreamer.Stream会停止dump以及解析binlog且返回错误
type StatementType ¶
type StatementType int
StatementType means the sql statement type
const ( StatementUnknown StatementType = iota //不知道的语句 StatementBegin //开始语句 StatementCommit //提交语句 StatementRollback //回滚语句 StatementInsert //插入语句 StatementUpdate //更新语句 StatementDelete //删除语句 StatementCreate //创建表语句 StatementAlter //改变表属性语句 StatementDrop //删除表语句 StatementTruncate //截取表语句 StatementRename //重命名表语句 StatementSet //设置属性语句 )
sql语句类型
func GetStatementCategory ¶
func GetStatementCategory(sql string) StatementType
GetStatementCategory we can get statement type from a SQL
type StreamEvent ¶
type StreamEvent struct {
Type StatementType //语句类型
Table MysqlTableName //表名
SQL string //sql
Timestamp int64 //执行时间
RowValues []*RowData //which data come to used for StatementInsert and StatementUpdate
RowIdentifies []*RowData //which data come from used for StatementUpdate and StatementDelete
}
StreamEvent means a SQL or a rows in binlog
func NewStreamEvent ¶
func NewStreamEvent(tranType StatementType, timestamp int64, table MysqlTableName) *StreamEvent
NewStreamEvent 创建StreamEvent
func (*StreamEvent) MarshalJSON ¶
func (s *StreamEvent) MarshalJSON() ([]byte, error)
MarshalJSON 实现StreamEvent的json序列化
type Transaction ¶
type Transaction struct {
NowPosition Position //在binlog中的当前位置
NextPosition Position //在binlog中的下一个位置
Timestamp int64 //执行时间
Events []*StreamEvent //一组有事务的binlog evnet
}
Transaction 代表一组有事务的binlog evnet
func NewTransaction ¶
func NewTransaction(now, next Position, timestamp int64, events []*StreamEvent) *Transaction
NewTransaction 创建Transaction
func (*Transaction) MarshalJSON ¶
func (t *Transaction) MarshalJSON() ([]byte, error)
MarshalJSON 实现Transaction的json序列化
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package dump 用于dump协议交互的, 从github.com/go-sql-driver/mysql的基础上修改而来,主要功能如下: 1.通过MysqlConn可以执行简单的sql命令,如set命令, 2.通过MysqlConn来和mysql库进行binlog dump github.com/go-sql-driver/mysql已经支持了所有的协议包的读写,但是 由于以下原因需要修改:1.该包不支持dump协议的交互。
|
Package dump 用于dump协议交互的, 从github.com/go-sql-driver/mysql的基础上修改而来,主要功能如下: 1.通过MysqlConn可以执行简单的sql命令,如set命令, 2.通过MysqlConn来和mysql库进行binlog dump github.com/go-sql-driver/mysql已经支持了所有的协议包的读写,但是 由于以下原因需要修改:1.该包不支持dump协议的交互。 |
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。 |
|
tests
|
|
|
binlogStream
command
|