首页
壁纸
Search
1
go-canal 订阅和消费 binlog
1,450 阅读
2
Go语言精进之路-白明
467 阅读
3
Go语言精进之路-白明 | 变长参数函数的妙用
396 阅读
4
Clickhouse 实战记录
315 阅读
5
测试用
293 阅读
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
登录
Search
标签搜索
go
database
binlog
multiprocessing
clickhouse
MrSnake
累计撰写
11
篇文章
累计收到
32
条评论
首页
栏目
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
页面
壁纸
搜索到
1
篇与
go
的结果
2021-12-28
go-canal 订阅和消费 binlog
Binlog二进制的日志文件,记录着数据库中已经执行的 Sql 语句。docker-compose部署mysqlversion: '3' services: mysql5.7: container_name: mysql5.7 image: mysql:5.7 ports: - "3307:3306" command: --server_id=1513 --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-ignore-db=mysql restart: always environment: MYSQL_ROOT_PASSWORD: 123456 TZ: Asia/Shanghai volumes: - ../../data/mysql5.7:/var/lib/mysqldocker-compose up -d # 后台运行 docker exec -it mysql5.7 mysql -u root -p 123456 # 输入密码确保 binlog 开启,并且 binlog 日志格式 为 rowshow variables like 'log_bin'; // 查看 binlog show variables like 'binlog_format'; // 日志格式Tips: 外部工具无法连接请配置mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION; mysql>FLUSH PRIVILEGES; // 其中123456,可以修改为你的密码常用Binlog的sql指令show master logs; show master status; flush logs; --刷新,创建一个新的binlog文件 reset master; --清空日志文件 // 查询 mysql 的配置 mysql --help | grep 'Default options' -A 1 // 默认存储 binlog 的文件位置 > /var/lib/mysqlBinlog字段Event_typeQuery Event记录删除表创建表修改表Xid Eventcommit 提交成功的 idTable_map Event记录下一个操作对映的数据库和表名Rotate Event记录生成一张新的binlog日志表Write_rows Event # 插入Update_rows Event # 更新Delete_rows Event # 删除Event_type官方文档Go-Canal通过 Canal 订阅和消费 mysql 的 binlog。go get github.com/go-mysql-org/go-mysql@v1.4.0canal包中的接口和用法type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent) error OnTableChanged(schema string, table string) error OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error OnXID(nextPos mysql.Position) error OnGTID(gtid mysql.GTIDSet) error OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error String() string }event 类型为 Write_rows, Update_rows, Delete_rowstype BinlogSync struct { canal.DummyEventHandler } //OnRow 获取 event_type 为 write_rows, update_rows, delete_rows 的数据 func (h *BinlogSync) OnRow(ev *canal.RowsEvent) error { rowData := make(map[string]interface{}) rowList := make([]interface{}, len(ev.Rows)) fmt.Println("原始数据:", ev.Rows) fmt.Printf("sql的操作行为:%s\t", ev.Action) for idx, _ := range ev.Table.PKColumns { fmt.Printf("主键为:%s\n", ev.Table.Columns[ev.Table.PKColumns[idx]].Name) } for idxRow, _ := range ev.Rows { for columnIndex, currColumn := range ev.Table.Columns { // 字段名和对应的值 row := fmt.Sprintf("%v:%v", currColumn.Name, ev.Rows[idxRow][columnIndex]) fmt.Println(row) rowData[currColumn.Name] = ev.Rows[idxRow][columnIndex] rowList[idxRow] = rowData } } rowJson, err := json.Marshal(rowList) if err != nil { return fmt.Errorf("序列化错误:%s", err) } fmt.Printf("序列化为json格式:%s\n\n", string(rowJson)) return nil }event 类型为 Rotate// OnRotate 获取 binlog 下个日志文件名字和位置 func (h *BinlogSync) OnRotate(r *replication.RotateEvent) error { fmt.Printf("下一个日志为 %s 位置为 %d \n", string(r.NextLogName), r.Position) return nil }event 类型为 Query// OnTableChanged 在 OnDDL 之前执行 func (h *BinlogSync) OnTableChanged(schema string, table string) error { result := fmt.Sprintf("修改了数据库%s中表%s的结构", schema, table) fmt.Println(result) return nil } // OnDDL query 事件中的一些信息,如执行的 sql 语句 func (h *BinlogSync) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { fmt.Println(string(queryEvent.Query)) return nil }Event 类型为 Xid// OnXID 打印事件 Xid 的结束为止 func (h *BinlogSync) OnXID(m mysql.Position) error { fmt.Println("XID", m.Pos) return nil }执行func main() { cfg := canal.NewDefaultConfig() cfg.Addr = "127.0.0.1:3307" cfg.User = "root" cfg.Password = "123456" // 数据库名 cfg.Dump.TableDB = "mrsnake" cfg.ServerID = 1513 // 表名 cfg.Dump.Tables = []string{"bin_log_test"} cfg.Dump.ExecutionPath = "" c, err := canal.NewCanal(cfg) if err != nil { log.Fatal(err) } // Register a handler to handler Events c.SetEventHandler(&BinlogSync{}) err = c.Run() if err != nil { log.Fatal(err) }GTID 的方式执行# ... 其他不变 startupGTID = "09e12c4e-6a25-21ec-bea1-04242ac180002" set, _ := mysql.ParseGTIDSet("mysql", startupGTID) err = c.StartFromGTID(set) if err != nil { log.Fatal(err) }
2021年12月28日
1,450 阅读
1 评论
3 点赞