实时同步MySQL数据到Hive
案例:将mysql中新增的数据实时同步到Hive中。
以上案例需要用到的处理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
首先通过“CaptureChangeMySQL”读取MySQL中数据的变化(需要开启MySQL binlog日志),将Binlog中变化的数据同步到“RouteOnAttribute”处理器,通过此处理器获取上游数据属性,获取对应binlog操作类型,再将想要处理的数据路由到“EvaluateJsonPath”处理器,该处理器可以将json格式的binlog数据解析,通过自定义json 表达式获取json数据中的属性放入FlowFile属性,将FlowFile通过“ReplaceText”处理器获取上游FowFile属性,动态拼接sql替换所有的FlowFile内容,将拼接好的sql组成FlowFile路由到“PutHiveQL”将数据写入到Hive表。
一、开启MySQL的binlog日志
mysql-binlog是MySQL数据库的二进制日志,记录了所有的DDL和DML(除了数据查询语句)语句信息。一般来说开启二进制日志大概会有1%的性能损耗。这里需要开启MySQL的binlog日志方便后期使用“CaptureChangeMySQL”处理器来获取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。
1、登录mysql查看MySQL是否开启binlog日志

2 、开启mysql binlog日志
在/etc/my.cnf文件中[mysqld]下写入以下内容:
3、重启mysql 服务,重新查看binlog日志情况

二、配置“CaptureChangeMySQL”处理器
“CaptureChangeMySQL”主要是从MySQL数据库捕获CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作发生时的顺序输出为单独的FlowFile文件。
关于“CaptureChangeMySQL”处理器的“Properties”主要配置的说明如下:
配置步骤如下:
1、创建“CaptureChangeMySQL”处理器

2、配置“DistributeMapCacheServer”控制服务
监控mysql变化需要设置“DistributedMapCacheClient”控制服务,其对应的Server中存储处理器所需的各种表、列等信息,所以这里需要首先配置“DistributeMapCacheServer”控制服务。



3、配置“SCHEDULING”
由于这里使用“CaptureChangeMySQL”处理器监控“MySQL”中的数据,所以设置调度访问周期为“10s”,防止一直监听MySQL binlog数据,带来性能消耗。

4、配置“PROPERTIES”
在“CaptureChangeMySQL”处理器中配置“PROPERTIES”,配置如下:
注意:这里需要在每台NiFi节点上创建对应目录,上传mysql驱动包。
“PROPERTIES”配置如下:

此外,在“PROPERTIES”中还需要配置“Distributed Map Cache Client”控制服务,来读取“DistributeMapCacheServer”控制服务中的缓存数据:









暂无评论内容