Documentation
¶
Overview ¶
Package gobinlog 将自己伪装成slave获取mysql主从复杂流来 获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互 以及binlog的row模式下的格式解析。
gobinlog使用方式非常简单,你要实现一个MysqlTableMapper
type mysqlColumnAttribute struct {
field string
typ string
}
func (m *mysqlColumnAttribute) Field() string {
return m.field
}
func (m *mysqlColumnAttribute) IsUnSignedInt() bool {
return strings.Contains(m.typ, mysqlUnsigned)
}
type mysqlTableInfo struct {
name MysqlTableName
columns []MysqlColumn
}
func (m *mysqlTableInfo) Name() MysqlTableName {
return m.name
}
func (m *mysqlTableInfo) Columns() []MysqlColumn {
return m.columns
}
type exampleMysqlTableMapper struct {
db *sql.DB
}
func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) {
info := &mysqlTableInfo{
name: name,
columns: make([]MysqlColumn, 0, 10),
}
query := "desc " + name.String()
rows, err := e.db.Query(query)
if err != nil {
return info, fmt.Errorf("query failed query: %s, error: %v", query, err)
}
defer rows.Close()
var null,key,extra string
var columnDefault []byte
for i := 0; rows.Next(); i++ {
column := &mysqlColumnAttribute{}
err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra)
if err != nil {
return info, err
}
info.columns = append(info.columns, column)
}
return info, nil
}
你需要申请一个NewRowStreamer,数据库连接信息如下
user:password@tcp(ip:port)/db
其中user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址, port是mysql的端口,db是mysql的数据库名,serverID要与主库不同,
s, err := gobinlog.NewStreamer(dsn, 1234, e)
if err != nil {
_log.Errorf("NewStreamer fail. err: %v", err)
return
}
SetBinlogPosition的参数可以通过SHOW MASTER STATUS获取,通过这个函数 可以设置同步起始位置
s.SetBinlogPosition(pos)
通过开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息
err = s.Stream(ctx, func(t *Transaction) error {
fmt.Printf("%v", *t)
return nil
})
如果有需要,你可以通过Error获取Stream过程中的错误
err = s.Error()
当然你如果需要详细的调试信息,你可以通过SetLogger函数设置对应的调试接口
gobinlog.SetLogger(NewDefaultLogger(os.Stdout, DebugLevel))
Index ¶
- func SetLogger(logger log.Logger)
- type ColumnData
- type ColumnType
- func (c ColumnType) IsBit() bool
- func (c ColumnType) IsBlob() bool
- func (c ColumnType) IsDate() bool
- func (c ColumnType) IsDateTime() bool
- func (c ColumnType) IsDecimal() bool
- func (c ColumnType) IsFloat() bool
- func (c ColumnType) IsGeometry() bool
- func (c ColumnType) IsInteger() bool
- func (c ColumnType) IsString() bool
- func (c ColumnType) IsTime() bool
- func (c ColumnType) IsTimestamp() bool
- func (c ColumnType) String() string
- type Error
- type FormatType
- type MysqlColumn
- type MysqlTable
- type MysqlTableMapper
- type MysqlTableName
- type Position
- type RowData
- type SendTransactionFunc
- type StatementType
- type StreamEvent
- type Streamer
- type Transaction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ColumnData ¶
type ColumnData struct {
Filed string // 字段信息
Type ColumnType // binlog中的列类型
IsEmpty bool // data is empty,即该列没有变化
Data []byte // the data
}
ColumnData 单个列的信息
func (*ColumnData) MarshalJSON ¶
func (c *ColumnData) MarshalJSON() ([]byte, error)
MarshalJSON 实现ColumnData的json序列化
type MysqlColumn ¶
MysqlColumn 用于实现mysql表列的接口
type MysqlTable ¶
type MysqlTable interface {
Name() MysqlTableName //表名
Columns() []MysqlColumn //所有列
}
MysqlTable 用于实现mysql表的接口
type MysqlTableMapper ¶
type MysqlTableMapper interface {
MysqlTable(name MysqlTableName) (MysqlTable, error)
}
MysqlTableMapper 用于获取表信息的接口
type MysqlTableName ¶
type MysqlTableName struct {
DbName string `json:"db"` //数据库名
TableName string `json:"table"` //表名
}
MysqlTableName mysql的表名
func NewMysqlTableName ¶
func NewMysqlTableName(database, table string) MysqlTableName
NewMysqlTableName 创建MysqlTableName
type Position ¶
type Position struct {
Filename string `json:"filename"` //binlog文件名
Offset int64 `json:"offset"` //在binlog文件中的位移
}
Position 指定binlog的位置,以文件名和位移
type SendTransactionFunc ¶
type SendTransactionFunc func(*Transaction) error
SendTransactionFunc 处理事务信息函数,你可以将一个chan注册到这个函数中如
func getTransaction(tran *Transaction) error{
Transactions <- tran
return nil
}
如果这个函数返回错误,那么RowStreamer.Stream会停止dump以及解析binlog且返回错误
type StatementType ¶
type StatementType int
StatementType means the sql statement type
const ( StatementUnknown StatementType = iota //不知道的语句 StatementBegin //开始语句 StatementCommit //提交语句 StatementRollback //回滚语句 StatementInsert //插入语句 StatementUpdate //更新语句 StatementDelete //删除语句 StatementCreate //创建表语句 StatementAlter //改变表属性语句 StatementDrop //删除表语句 StatementTruncate //截取表语句 StatementRename //重命名表语句 StatementSet //设置属性语句 )
sql语句类型
func GetStatementCategory ¶
func GetStatementCategory(sql string) StatementType
GetStatementCategory we can get statement type from a SQL
type StreamEvent ¶
type StreamEvent struct {
Type StatementType //语句类型
Table MysqlTableName //表名
Query replication.Query //sql
Timestamp int64 //执行时间
RowValues []*RowData //which data come to used for StatementInsert and StatementUpdate
RowIdentifies []*RowData //which data come from used for StatementUpdate and StatementDelete
}
StreamEvent means a SQL or a rows in binlog
func (*StreamEvent) MarshalJSON ¶
func (s *StreamEvent) MarshalJSON() ([]byte, error)
MarshalJSON 实现StreamEvent的json序列化
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer 从github.com/youtube/vitess/go/vt/binlog/binlog_streamer.go的基础上移植过来 专门用来RowStreamer解析row模式的binlog event,将其变为对应的事务
func NewStreamer ¶
func NewStreamer(dsn string, serverID uint32, tableMapper MysqlTableMapper) (*Streamer, error)
NewStreamer dsn是mysql数据库的信息,serverID是标识该数据库的信息
func (*Streamer) SetBinlogPosition ¶
SetBinlogPosition 设置开始的binlog位置
type Transaction ¶
type Transaction struct {
NowPosition Position //在binlog中的当前位置
NextPosition Position //在binlog中的下一个位置
Timestamp int64 //执行时间
Events []*StreamEvent //一组有事务的binlog evnet
}
Transaction 代表一组有事务的binlog evnet
func (*Transaction) MarshalJSON ¶
func (t *Transaction) MarshalJSON() ([]byte, error)
MarshalJSON 实现Transaction的json序列化
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
binlogDump
command
|
|
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。 |