首页
壁纸
Search
1
go-canal 订阅和消费 binlog
1,370 阅读
2
Go语言精进之路-白明
448 阅读
3
Go语言精进之路-白明 | 变长参数函数的妙用
388 阅读
4
Clickhouse 实战记录
308 阅读
5
测试用
284 阅读
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
登录
Search
标签搜索
go
database
binlog
multiprocessing
clickhouse
MrSnake
累计撰写
11
篇文章
累计收到
32
条评论
首页
栏目
All
Golang
Python
Docker
Daily
ReadingMinutes
go语言精进之路
Database
页面
壁纸
搜索到
11
篇与
MrSnake
的结果
2022-01-09
2022 | 新年快乐 | 新的目标 | 新的开始
2022 | 更加努力 | 减少熬夜 | 敢想敢做
2022年01月09日
91 阅读
3 评论
1 点赞
2022-01-09
Windows | Docker 部署mysql5 | 宿主机已安装mysql
windows 上修改 docker 默认存放镜像的位置
2022年01月09日
118 阅读
2 评论
2 点赞
2022-01-09
Python | multiprocessing | 简单使用
Pool对象 进程池对象,提供一种快捷的方法,将输入数据分配给不同进程处理。使用进程池的方式快捷的进行一个多进程操作import time from multiprocessing import Process, Pipe, Pool def start(x): return x*x*x if __name__ == '__main__': with Pool(5) as p: # 创建一个大小为5的进程池 print(p.map(start,[i for i in range(4)])) # map方法,可以将第二参数的值传到第一个参数中,也就是函数中 Process类 创建对象,再用start()生成进程,与多线程类似。也可以用重载run()的方式自定义多进程简单的例子import time from multiprocessing import Process def example(name): print("l am a {}".format(name)) time.sleep(2) print('end') if __name__ == '__main__': # with Pool(5) as p: # 创建一个大小为5的进程池 # print(p.map(start,[i for i in range(4)])) # map方法,可以将第二参数的值传到第一个参数中,也就是函数中 print("mainProcess") process1 = Process(target=example, args=("snake",)) process2 = Process(target=example, args=("tom",)) # process1.daemon = process2.daemon = True # 如何只剩下守护进程程序退出 process1.start() process2.start() ------------------------------------------------------------------- mainProcess l am a tom l am a snake end end自定义,等同于上面class Start(Process): def __init__(self,name): Process.__init__(self) self.name = name def run(self): print("l am a {}".format(self.name)) time.sleep(2) print('end') if __name__ == '__main__': print("mainProcess") process1 = Process(target=example, args=("snake",)) process2 = Process(target=example, args=("tom",)) # process1.daemon = process2.daemon = True process1.start() process2.start()在进程之间交换对象队列 Queue类是一个近似queue.Queue的克隆from multiprocessing import Queue,Process def example2(q): q.put([38, "snake", 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=example2, args=(q,)) p.start() print(q.get()) # prints "[38, "snake", 'hello']" p.join()管道 Pipe()函数返回一个由管道连接的连接对象,默认为双工,返回的两个对象表示Pipe()表示管道的两端,每个连接对象都send()和recv()方法。并且两个进程不能同时写入和读取同一端。from multiprocessing import Pipe,Process def customers(c): """管道交换对象""" c.send('l am a snake') c.close() if __name__ == '__main__': producer, customer = Pipe() process = Process(target=customers, args=(customer,)) process.start() print(producer.recv()) process.join()进程间同步(进程锁)确保一次只有一个进程打印到标准输出。from multiprocessing import Process, Lock def f(l, i): l.acquire() # 上锁确保一次只有一个进程访问 try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() l = [] for num in range(10): # 创建10个进程 p = Process(target=f, args=(lock, num)) l.append(p) p.start() for p in l: # 等待子进程结束 p.join()进程间共享状态共享内存使用Value或Array将数据存储在共享内存映射中
2022年01月09日
169 阅读
2 评论
0 点赞
2021-12-28
go-canal 订阅和消费 binlog
Binlog二进制的日志文件,记录着数据库中已经执行的 Sql 语句。docker-compose部署mysqlversion: '3' services: mysql5.7: container_name: mysql5.7 image: mysql:5.7 ports: - "3307:3306" command: --server_id=1513 --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-ignore-db=mysql restart: always environment: MYSQL_ROOT_PASSWORD: 123456 TZ: Asia/Shanghai volumes: - ../../data/mysql5.7:/var/lib/mysqldocker-compose up -d # 后台运行 docker exec -it mysql5.7 mysql -u root -p 123456 # 输入密码确保 binlog 开启,并且 binlog 日志格式 为 rowshow variables like 'log_bin'; // 查看 binlog show variables like 'binlog_format'; // 日志格式Tips: 外部工具无法连接请配置mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION; mysql>FLUSH PRIVILEGES; // 其中123456,可以修改为你的密码常用Binlog的sql指令show master logs; show master status; flush logs; --刷新,创建一个新的binlog文件 reset master; --清空日志文件 // 查询 mysql 的配置 mysql --help | grep 'Default options' -A 1 // 默认存储 binlog 的文件位置 > /var/lib/mysqlBinlog字段Event_typeQuery Event记录删除表创建表修改表Xid Eventcommit 提交成功的 idTable_map Event记录下一个操作对映的数据库和表名Rotate Event记录生成一张新的binlog日志表Write_rows Event # 插入Update_rows Event # 更新Delete_rows Event # 删除Event_type官方文档Go-Canal通过 Canal 订阅和消费 mysql 的 binlog。go get github.com/go-mysql-org/go-mysql@v1.4.0canal包中的接口和用法type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent) error OnTableChanged(schema string, table string) error OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error OnXID(nextPos mysql.Position) error OnGTID(gtid mysql.GTIDSet) error OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error String() string }event 类型为 Write_rows, Update_rows, Delete_rowstype BinlogSync struct { canal.DummyEventHandler } //OnRow 获取 event_type 为 write_rows, update_rows, delete_rows 的数据 func (h *BinlogSync) OnRow(ev *canal.RowsEvent) error { rowData := make(map[string]interface{}) rowList := make([]interface{}, len(ev.Rows)) fmt.Println("原始数据:", ev.Rows) fmt.Printf("sql的操作行为:%s\t", ev.Action) for idx, _ := range ev.Table.PKColumns { fmt.Printf("主键为:%s\n", ev.Table.Columns[ev.Table.PKColumns[idx]].Name) } for idxRow, _ := range ev.Rows { for columnIndex, currColumn := range ev.Table.Columns { // 字段名和对应的值 row := fmt.Sprintf("%v:%v", currColumn.Name, ev.Rows[idxRow][columnIndex]) fmt.Println(row) rowData[currColumn.Name] = ev.Rows[idxRow][columnIndex] rowList[idxRow] = rowData } } rowJson, err := json.Marshal(rowList) if err != nil { return fmt.Errorf("序列化错误:%s", err) } fmt.Printf("序列化为json格式:%s\n\n", string(rowJson)) return nil }event 类型为 Rotate// OnRotate 获取 binlog 下个日志文件名字和位置 func (h *BinlogSync) OnRotate(r *replication.RotateEvent) error { fmt.Printf("下一个日志为 %s 位置为 %d \n", string(r.NextLogName), r.Position) return nil }event 类型为 Query// OnTableChanged 在 OnDDL 之前执行 func (h *BinlogSync) OnTableChanged(schema string, table string) error { result := fmt.Sprintf("修改了数据库%s中表%s的结构", schema, table) fmt.Println(result) return nil } // OnDDL query 事件中的一些信息,如执行的 sql 语句 func (h *BinlogSync) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error { fmt.Println(string(queryEvent.Query)) return nil }Event 类型为 Xid// OnXID 打印事件 Xid 的结束为止 func (h *BinlogSync) OnXID(m mysql.Position) error { fmt.Println("XID", m.Pos) return nil }执行func main() { cfg := canal.NewDefaultConfig() cfg.Addr = "127.0.0.1:3307" cfg.User = "root" cfg.Password = "123456" // 数据库名 cfg.Dump.TableDB = "mrsnake" cfg.ServerID = 1513 // 表名 cfg.Dump.Tables = []string{"bin_log_test"} cfg.Dump.ExecutionPath = "" c, err := canal.NewCanal(cfg) if err != nil { log.Fatal(err) } // Register a handler to handler Events c.SetEventHandler(&BinlogSync{}) err = c.Run() if err != nil { log.Fatal(err) }GTID 的方式执行# ... 其他不变 startupGTID = "09e12c4e-6a25-21ec-bea1-04242ac180002" set, _ := mysql.ParseGTIDSet("mysql", startupGTID) err = c.StartFromGTID(set) if err != nil { log.Fatal(err) }
2021年12月28日
1,370 阅读
1 评论
3 点赞
2021-12-22
测试用
{dotted startColor="#ff6c6c" endColor="#1989fa"/}test{dotted startColor="#ff6c6c" endColor="#1989fa"/}test {abtn icon="" color="#ff6800" href="https://mrsnake.top" radius="" content="mrsnake"/} {card-list}{card-list-item} 列表一内容{/card-list-item}{card-list-item} 列表二内容{/card-list-item}{/card-list}{tabs}{tabs-pane label="标签一"} 标签一内容{/tabs-pane}{tabs-pane label="标签二"} 标签二内容{/tabs-pane}{/tabs}reflect包type t struct { name string } reflect.Typeof(t) // main.t reflect.Kind(t) // struct1.Typeof指变量所属的类型,Kind指变量所属的类别。const (Invalid Kind = iota,Bool,Int,Int8,Int16,Int32,Int64,Uint,Uint8,Uint16,Uint32,Uint64,Uintptr,Float32,Float64,Complex64,Complex128,Array,Chan,Func,Interface,Map,Ptr,Slice,String,Struct,UnsafePointer)2.修改接口实际变量的值Elem()获取指向原变量的指针reflect.ValueOf(a).Elem().SetInt(38)时间转换todayZero, _ := time.ParseInLocation("2006-01-02", "2021-11-26 15:22:22" time.Local)
2021年12月22日
284 阅读
2 评论
2 点赞
1
2
3