网站首页 > 技术文章 正文
canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。
1 集群模式
图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。
server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务。
instance 包含如下模块 :
- eventParser
- 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
- eventSink
- Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
- eventStore
- 数据存储
- metaManager
- 增量订阅 & 消费信息管理器
真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。
实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。
顺序消费:
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
2 MySQL配置
1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]brlog-bin=mysql-bin # 开启 binlogbrbinlog-format=ROW # 选择 ROW 模式brserver_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
CREATE USER canal IDENTIFIED BY 'canal'; brGRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';br-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;brFLUSH PRIVILEGES;
3、创建数据库商品表 t_product 。
CREATE TABLE `t_product` (br `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,br `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,br `price` DECIMAL ( 10, 2 ) NOT NULL,br `status` TINYINT ( 4 ) NOT NULL,br `create_time` datetime NOT NULL,br `update_time` datetime NOT NULL,br PRIMARY KEY ( `id` ) br) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
3 Elasticsearch配置
使用 Kibana 创建商品索引 。
PUT /t_productbr{br "settings": {br "number_of_shards": 2,br "number_of_replicas": 1br },br "mappings": {br "properties": {br "id": {br "type":"keyword"br },br "name": {br "type":"text"br },br "price": {br "type":"double"br },br "status": {br "type":"integer"br },br "createTime": {br "type": "date",br "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"br },br "updateTime": {br "type": "date",br "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"br }br }br }br}
执行完成,如图所示 :
4 RocketMQ 配置
创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。
5 canal 配置
我们选取 canal 版本 1.1.6 ,进入 conf 目录。
1、配置 canal.properties
#集群模式
zk地址brcanal.zkServers = localhost:2181#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQbrcanal.serverMode = rocketMQbr#instance 列表brcanal.destinations = product-synbr#conf root dirbrcanal.conf.dir = ../confbr#全局的spring配置方式的组件文件 生产环境,集群化部署brcanal.instance.global.spring.xml = classpath:spring/default-instance.xmlbrbr###### 以下部分是默认值 展示出来 br# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)brcanal.mq.canalBatchSize = 50br# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时brcanal.mq.canalGetTimeout = 100br# 是否为 flat json格式对象brcanal.mq.flatMessage = true
2、instance 配置文件
在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties。
# 按需修改成自己的数据库信息br#################################################br...brcanal.instance.master.address=192.168.1.20:3306br# username/password,数据库的用户名和密码br...brcanal.instance.dbUsername = canalbrcanal.instance.dbPassword = canalbr...brbr# table regex brcanal.instance.filter.regex=mytest.t_productbrbr# mq configbrcanal.mq.topic=product-syn-topicbr# 针对库名或者表名发送动态topicbr#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*brcanal.mq.partition=0br# hash partition configbr#canal.mq.partitionsNum=3br#库名.表名: 唯一主键,多个表之间用逗号分隔br#canal.mq.partitionHash=mytest.person:id,mytest.role:idbr#################################################
3、服务启动
启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。
修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。
6 消费者
1、产品索引操作服务
![](https://www.javayong.cn/pics/canal/productindexservice.png)
2、消费监听器
消费者逻辑重点有两点:
- 顺序消费监听器
- 将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。
7 写到最后
canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。
推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。
这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。
https://github.com/makemyownlife/rocketmq4-learning
如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!
猜你喜欢
- 2024-10-29 你还在用 Date?快使用 LocalDateTime 了!
- 2024-10-29 Java修炼终极指南:79,80,81 签到终极修炼天赋
- 2024-10-29 硬核!最全的延迟任务实现方式汇总!附代码(强烈推荐)
- 2024-10-29 还在实体类中用Date?JDK8新的日期类型不香么?
- 2024-10-29 LocalDateTime 说:2020,是时候换个更好的日期时间类了
- 2024-10-29 程序员,你还在使用Date嘛?建议你使用LocalDateTime哦
- 2024-10-29 深度思考:在JDK8中,日期类型该如何使用?
- 2024-10-29 为什么建议使用你 LocalDateTime,而不是 Date?
- 2024-10-29 百度开源的分布式唯一ID生成器UidGenerator,解决了时钟回拨问题
- 2024-10-29 DeepLearning4j 实战:手写体数字识别的 GPU 实现与性能对比
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)