Binlog
二进制的日志文件,记录着数据库中已经执行的 Sql 语句。
docker-compose部署mysql
version: '3'
services:
mysql5.7:
container_name: mysql5.7
image: mysql:5.7
ports:
- "3307:3306"
command:
--server_id=1513
--default-authentication-plugin=mysql_native_password
--log-bin=mysql-bin
--binlog-ignore-db=mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: 123456
TZ: Asia/Shanghai
volumes:
- ../../data/mysql5.7:/var/lib/mysql
docker-compose up -d # 后台运行
docker exec -it mysql5.7 mysql -u root -p
123456 # 输入密码
确保 binlog 开启,并且 binlog 日志格式 为 row
show variables like 'log_bin'; // 查看 binlog
show variables like 'binlog_format'; // 日志格式
Tips: 外部工具无法连接请配置
mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
mysql>FLUSH PRIVILEGES;
// 其中123456,可以修改为你的密码
常用Binlog的sql指令
show master logs;
show master status;
flush logs; --刷新,创建一个新的binlog文件
reset master; --清空日志文件
// 查询 mysql 的配置
mysql --help | grep 'Default options' -A 1
// 默认存储 binlog 的文件位置
> /var/lib/mysql
Binlog字段
Event_type
- Query Event
记录删除表创建表修改表
- Xid Event
commit 提交成功的 id
- Table_map Event
记录下一个操作对映的数据库和表名
- Rotate Event
记录生成一张新的binlog日志表
- Write_rows Event # 插入
- Update_rows Event # 更新
- Delete_rows Event # 删除
Go-Canal
通过 Canal 订阅和消费 mysql 的 binlog。
go get github.com/go-mysql-org/go-mysql@v1.4.0
canal包中的接口和用法
type EventHandler interface {
OnRotate(roateEvent *replication.RotateEvent) error
OnTableChanged(schema string, table string) error
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}
- event 类型为 Write_rows, Update_rows, Delete_rows
type BinlogSync struct {
canal.DummyEventHandler
}
//OnRow 获取 event_type 为 write_rows, update_rows, delete_rows 的数据
func (h *BinlogSync) OnRow(ev *canal.RowsEvent) error {
rowData := make(map[string]interface{})
rowList := make([]interface{}, len(ev.Rows))
fmt.Println("原始数据:", ev.Rows)
fmt.Printf("sql的操作行为:%s\t", ev.Action)
for idx, _ := range ev.Table.PKColumns {
fmt.Printf("主键为:%s\n", ev.Table.Columns[ev.Table.PKColumns[idx]].Name)
}
for idxRow, _ := range ev.Rows {
for columnIndex, currColumn := range ev.Table.Columns {
// 字段名和对应的值
row := fmt.Sprintf("%v:%v", currColumn.Name, ev.Rows[idxRow][columnIndex])
fmt.Println(row)
rowData[currColumn.Name] = ev.Rows[idxRow][columnIndex]
rowList[idxRow] = rowData
}
}
rowJson, err := json.Marshal(rowList)
if err != nil {
return fmt.Errorf("序列化错误:%s", err)
}
fmt.Printf("序列化为json格式:%s\n\n", string(rowJson))
return nil
}
- event 类型为 Rotate
// OnRotate 获取 binlog 下个日志文件名字和位置
func (h *BinlogSync) OnRotate(r *replication.RotateEvent) error {
fmt.Printf("下一个日志为 %s 位置为 %d \n", string(r.NextLogName), r.Position)
return nil
}
- event 类型为 Query
// OnTableChanged 在 OnDDL 之前执行
func (h *BinlogSync) OnTableChanged(schema string, table string) error {
result := fmt.Sprintf("修改了数据库%s中表%s的结构", schema, table)
fmt.Println(result)
return nil
}
// OnDDL query 事件中的一些信息,如执行的 sql 语句
func (h *BinlogSync) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
fmt.Println(string(queryEvent.Query))
return nil
}
- Event 类型为 Xid
// OnXID 打印事件 Xid 的结束为止
func (h *BinlogSync) OnXID(m mysql.Position) error {
fmt.Println("XID", m.Pos)
return nil
}
执行
func main() {
cfg := canal.NewDefaultConfig()
cfg.Addr = "127.0.0.1:3307"
cfg.User = "root"
cfg.Password = "123456"
// 数据库名
cfg.Dump.TableDB = "mrsnake"
cfg.ServerID = 1513
// 表名
cfg.Dump.Tables = []string{"bin_log_test"}
cfg.Dump.ExecutionPath = ""
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// Register a handler to handler Events
c.SetEventHandler(&BinlogSync{})
err = c.Run()
if err != nil {
log.Fatal(err)
}
GTID 的方式执行
# ... 其他不变
startupGTID = "09e12c4e-6a25-21ec-bea1-04242ac180002"
set, _ := mysql.ParseGTIDSet("mysql", startupGTID)
err = c.StartFromGTID(set)
if err != nil {
log.Fatal(err)
}
555