首页
视频
留言
友链
推荐
Github
常用工具
低端影视
Search
1
go-canal 订阅和消费 binlog
121 阅读
2
Go语言精进之路-白明
71 阅读
3
测试用
32 阅读
4
Docker 小记
32 阅读
5
Python | multiprocessing | 简单使用
29 阅读
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
登录
/
注册
Search
标签搜索
go
database
binlog
multiprocessing
clickhouse
MrSnake
累计撰写
11
篇文章
累计收到
4
条评论
首页
栏目
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
页面
视频
留言
友链
推荐
Github
常用工具
低端影视
搜索到
1
篇与
go
的结果
2021-12-28
go-canal 订阅和消费 binlog
Binlog二进制的日志文件,记录着数据库中已经执行的 Sql 语句。docker-compose部署mysqlversion: '3' services: mysql5.7: container_name: mysql5.7 image: mysql:5.7 ports: - "3307:3306" command: --server_id=1513 --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-ignore-db=mysql restart: always environment: MYSQL_ROOT_PASSWORD: 123456 TZ: Asia/Shanghai volumes: - ../../data/mysql5.7:/var/lib/mysqldocker-compose up -d # 后台运行 docker exec -it mysql5.7 mysql -u root -p 123456 # 输入密码确保 binlog 开启,并且 binlog 日志格式 为 rowshow variables like 'log_bin'; // 查看 binlog show variables like 'binlog_format'; // 日志格式Tips: 外部工具无法连接请配置mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION; mysql>FLUSH PRIVILEGES; // 其中123456,可以修改为你的密码常用Binlog的sql指令show master logs; show master status; flush logs; --刷新,创建一个新的binlog文件 reset master; --清空日志文件 // 查询 mysql 的配置 mysql --help | grep 'Default options' -A 1 // 默认存储 binlog 的文件位置 > /var/lib/mysqlBinlog字段Event_typeQuery Event记录删除表创建表修改表Xid Eventcommit 提交成功的 idTable_map Event记录下一个操作对映的数据库和表名Rotate Event记录生成一张新的binlog日志表Write_rows Event # 插入Update_rows Event # 更新Delete_rows Event # 删除Event_type官方文档Go-Canal通过 Canal 订阅和消费 mysql 的 binlog。go get github.com/go-mysql-org/go-mysql@v1.4.0canal包中的接口和用法type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent) error OnTableChanged(schema string, table string) error OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error OnXID(nextPos mysql.Position) error OnGTID(gtid mysql.GTIDSet) error OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error String() string }event 类型为 Write_rows, Update_rows, Delete_rowstype BinlogSync struct { canal.DummyEventHandler } //OnRow 获取 event_type 为 write_rows, update_rows, delete_rows 的数据 func (h *BinlogSync) OnRow(ev *canal.RowsEvent) error { rowData := make(map[string]interface{}) rowList := make([]interface{}, len(ev.Rows)) fmt.Println("原始数据:", ev.Rows) fmt.Printf("sql的操作行为:%s\t", ev.Action) for idx, _ := range ev.Table.PKColumns { fmt.Printf("主键为:%s\n", ev.Table.Columns[ev.Table.PKColumns[idx]].Name) } for idxRow, _ := range ev.Rows { for columnIndex, currColumn := range ev.Table.Columns { // 字段名和对应的值 row := fmt.Sprintf("%v:%v", currColumn.Name, ev.Rows[idxRow][columnIndex]) fmt.Println(row) rowData[currColumn.Name] = ev.Rows[idxRow][columnIndex] rowList[idxRow] = rowData } } rowJson, err := json.Marshal(rowList) if err != nil { return fmt.Errorf("序列化错误:%s", err) } fmt.Printf("序列化为json格式:%s\n\n", string(rowJson)) return nil }event 类型为 Rotate// OnRotate 获取 binlog 下个日志文件名字和位置 func (h *BinlogSync) OnRotate(r *replication.RotateEvent) error { fmt.Printf("下一个日志为 %s 位置为 %d \n", string(r.NextLogName), r.Position) return nil }event 类型为 Query// OnTableChanged 在 OnDDL 之前执行 func (h *BinlogSync) OnTableChanged(schema string, table string) error { result := fmt.Sprintf("修改了数据库%s中表%s的结构", schema, table) fmt.Println(result) return nil } // OnDDL query 事件中的一些信息,如执行的 sql 语句 func (h *BinlogSync) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { fmt.Println(string(queryEvent.Query)) return nil }Event 类型为 Xid// OnXID 打印事件 Xid 的结束为止 func (h *BinlogSync) OnXID(m mysql.Position) error { fmt.Println("XID", m.Pos) return nil }执行func main() { cfg := canal.NewDefaultConfig() cfg.Addr = "127.0.0.1:3307" cfg.User = "root" cfg.Password = "123456" // 数据库名 cfg.Dump.TableDB = "mrsnake" cfg.ServerID = 1513 // 表名 cfg.Dump.Tables = []string{"bin_log_test"} cfg.Dump.ExecutionPath = "" c, err := canal.NewCanal(cfg) if err != nil { log.Fatal(err) } // Register a handler to handler Events c.SetEventHandler(&BinlogSync{}) err = c.Run() if err != nil { log.Fatal(err) }GTID 的方式执行# ... 其他不变 startupGTID = "09e12c4e-6a25-21ec-bea1-04242ac180002" set, _ := mysql.ParseGTIDSet("mysql", startupGTID) err = c.StartFromGTID(set) if err != nil { log.Fatal(err) }
2021年12月28日
121 阅读
0 评论
2 点赞