All

go-canal 订阅和消费 binlog

MrSnake
2021-12-28 / 0 评论 / 121 阅读 / 正在检测是否收录...

Binlog

二进制的日志文件,记录着数据库中已经执行的 Sql 语句。

docker-compose部署mysql

version: '3'

services:
  mysql5.7:
    container_name: mysql5.7
    image: mysql:5.7
    ports:
      - "3307:3306"
    command: 
      --server_id=1513
      --default-authentication-plugin=mysql_native_password
      --log-bin=mysql-bin
      --binlog-ignore-db=mysql
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: 123456
      TZ: Asia/Shanghai
    volumes:
      - ../../data/mysql5.7:/var/lib/mysql
docker-compose up -d # 后台运行
docker exec -it mysql5.7 mysql -u root -p
123456 # 输入密码

确保 binlog 开启,并且 binlog 日志格式 为 row

show variables like 'log_bin'; // 查看 binlog
show variables like 'binlog_format'; // 日志格式

Tips: 外部工具无法连接请配置

mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
mysql>FLUSH PRIVILEGES;
// 其中123456,可以修改为你的密码

常用Binlog的sql指令

show master logs;
show master status;
flush logs;        --刷新,创建一个新的binlog文件
reset master;    --清空日志文件

// 查询 mysql 的配置
mysql --help | grep 'Default options' -A 1
// 默认存储 binlog 的文件位置
> /var/lib/mysql

Binlog字段

Event_type

  • Query Event

记录删除表创建表修改表

  • Xid Event

commit 提交成功的 id

  • Table_map Event

记录下一个操作对映的数据库和表名

  • Rotate Event

记录生成一张新的binlog日志表

  • Write_rows Event # 插入
  • Update_rows Event # 更新
  • Delete_rows Event # 删除

Event_type官方文档

Go-Canal

通过 Canal 订阅和消费 mysql 的 binlog。

go get github.com/go-mysql-org/go-mysql@v1.4.0

canal包中的接口和用法

type EventHandler interface {
   OnRotate(roateEvent *replication.RotateEvent) error
   OnTableChanged(schema string, table string) error
   OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
   OnRow(e *RowsEvent) error
   OnXID(nextPos mysql.Position) error
   OnGTID(gtid mysql.GTIDSet) error
   OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
   String() string
}
  1. event 类型为 Write_rows, Update_rows, Delete_rows
type BinlogSync struct {
   canal.DummyEventHandler
}

//OnRow 获取 event_type 为 write_rows, update_rows, delete_rows 的数据
func (h *BinlogSync) OnRow(ev *canal.RowsEvent) error {
   rowData := make(map[string]interface{})
   rowList := make([]interface{}, len(ev.Rows))

   fmt.Println("原始数据:", ev.Rows)
   fmt.Printf("sql的操作行为:%s\t", ev.Action)

   for idx, _ := range ev.Table.PKColumns {
      fmt.Printf("主键为:%s\n", ev.Table.Columns[ev.Table.PKColumns[idx]].Name)
   }
   for idxRow, _ := range ev.Rows {
      for columnIndex, currColumn := range ev.Table.Columns {
         // 字段名和对应的值
         row := fmt.Sprintf("%v:%v", currColumn.Name, ev.Rows[idxRow][columnIndex])
         fmt.Println(row)
         rowData[currColumn.Name] = ev.Rows[idxRow][columnIndex]
         rowList[idxRow] = rowData
      }
   }

   rowJson, err := json.Marshal(rowList)
   if err != nil {
      return fmt.Errorf("序列化错误:%s", err)
   }

   fmt.Printf("序列化为json格式:%s\n\n", string(rowJson))
   return nil
}
  1. event 类型为 Rotate
// OnRotate 获取 binlog 下个日志文件名字和位置
func (h *BinlogSync) OnRotate(r *replication.RotateEvent) error {
    fmt.Printf("下一个日志为 %s 位置为 %d \n", string(r.NextLogName), r.Position)
    return nil
}
  1. event 类型为 Query
// OnTableChanged 在 OnDDL 之前执行
func (h *BinlogSync) OnTableChanged(schema string, table string) error {
    result := fmt.Sprintf("修改了数据库%s中表%s的结构", schema, table)
    fmt.Println(result)
    return nil
}
// OnDDL query 事件中的一些信息,如执行的 sql 语句
func (h *BinlogSync) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
    fmt.Println(string(queryEvent.Query))
    return nil
}
  1. Event 类型为 Xid
// OnXID 打印事件 Xid 的结束为止
func (h *BinlogSync) OnXID(m mysql.Position) error {
   fmt.Println("XID", m.Pos)
   return nil
}

执行

func main() {
    cfg := canal.NewDefaultConfig()
    cfg.Addr = "127.0.0.1:3307"
    cfg.User = "root"
    cfg.Password = "123456"
    // 数据库名
    cfg.Dump.TableDB = "mrsnake"
    cfg.ServerID = 1513
    // 表名
    cfg.Dump.Tables = []string{"bin_log_test"}
    cfg.Dump.ExecutionPath = ""

    c, err := canal.NewCanal(cfg)
    if err != nil {
        log.Fatal(err)
    }
    // Register a handler to handler Events
    c.SetEventHandler(&BinlogSync{})

    err = c.Run()
    if err != nil {
        log.Fatal(err)
    }

GTID 的方式执行

# ... 其他不变
startupGTID = "09e12c4e-6a25-21ec-bea1-04242ac180002"
set, _ := mysql.ParseGTIDSet("mysql", startupGTID)
err = c.StartFromGTID(set)
    if err != nil {
        log.Fatal(err)
    }
2

评论 (0)

取消