Flink CDC 数据处理
简介
官方地址
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
Flink CDC 是Flink 的一个源连接器,用于获取不同数据库中变更的数据,Flink CDC Connectors 集成了 Debezium
#debezium相关 https://github.com/debezium/debezium
- Supported Connectors
Database | Version |
MySQL | Database: 5.7, 8.0.x |
PostgreSQL | Database: 9.6, 10, 11, 12 |
MongoDB | Database: 3.6, 4.x, 5.0 |
Oracle | Database: 11, 12, 19 |
- Suported Flink Versions
Flink CDC Connector Version | Flink Version |
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
DataStream Api
完整代码示例:
https://gitee.com/sxuboss_admin/FlinkDemoAll.git
dependency
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.3.0</version> </dependency>
code
package com.bbx.flink.dataStream_api_all; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkCDCDemo { public static void main(String [] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("192.168.10.7") .port(3306) .username("root") .password("000000") .databaseList("maxwell_test1", "maxwell_test2") //DB .tableList("maxwell_test1.test1", "maxwell_test1.test2", "maxwell_test2.test3") //表 .deserializer(new StringDebeziumDeserializationSchema()) //反序列化-此处可以自定义反序列化--详见下面自定义反序列化,利用 StringDebeziumDeserializationSchema 可以查看能获取到的信息,然后在进行自定义获取 .startupOptions(StartupOptions.initial()) .build(); env.addSource(sourceFunction) .setParallelism(1) .print(); env.execute("print CDC"); } }
创建数据库,maxwell_test1,maxwell_test2,同步创建表test1,test2,test3,并开启binlong,相关参考 Maxwell,
自定义反序列化
package com.bbx.flink.dataStream_api_all.CDC; import cn.hutool.json.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; /** * 自定义 debezium 反序列化 */ public class CustomerDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { private static final long serialVersionUID = 9042745421910624673L; @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } /** * { * "db":"", * "table":"", * "before":{"id:"","name":""...}, * "after":{"id:"","name":""...}, * "op":"" * } * * @param sourceRecord * @param collector * @throws Exception */ @Override public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { JSONObject jsonObject = new JSONObject(); //获取 db 和table String topic = sourceRecord.topic(); String[] split = topic.split("\\."); //获取before,after Struct value = (Struct) sourceRecord.value(); Object before = getStructInfo(value, "before"); Object after = getStructInfo(value, "after"); //获取操作类型 Envelope.Operation operation = Envelope.operationFor(sourceRecord); jsonObject.set("db", split[0]) .set("table", split[1]) .set("op", operation) .set("before", before) .set("after", after); collector.collect(jsonObject.toString()); } public Object getStructInfo(Struct struct,String name){ Struct tmpStruct = struct.getStruct(name); JSONObject jsonObject = new JSONObject(); if (tmpStruct != null) { Schema schema = tmpStruct.schema(); List<Field> fields = schema.fields(); fields.forEach(i -> { jsonObject.set(i.name(), tmpStruct.get(i)); }); } return jsonObject; } }
Table Api & SQL
code
package com.bbx.flink.dataStream_api_all.CDC; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkCDCTableDemo { public static void main(String [] args) throws Exception { //执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //获取table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String test1Sql=" CREATE TABLE test1 (\n" + " id INT,\n" + " name STRING\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '192.168.10.7',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'maxwell_test1',\n" + " 'table-name' = 'test1')"; tableEnv.executeSql(test1Sql); String querySQl = "select * from test1"; tableEnv.executeSql(querySQl).print(); } }
版权声明:
作者: freeclashnode
链接: https://www.freeclashnode.com/news/article-1652.htm
来源: FreeClashNode
文章版权归作者所有,未经允许请勿转载。
免费节点实时更新
热门文章
- 9月1日|21M/S,Shadowrocket/SSR/V2ray/Clash免费节点订阅链接每天更新
- 8月31日|20.3M/S,V2ray/Shadowrocket/SSR/Clash免费节点订阅链接每天更新
- 8月30日|22.8M/S,SSR/Clash/Shadowrocket/V2ray免费节点订阅链接每天更新
- 9月15日|20.4M/S,Shadowrocket/V2ray/SSR/Clash免费节点订阅链接每天更新
- 8月29日|22.2M/S,V2ray/Clash/SSR/Shadowrocket免费节点订阅链接每天更新
- 8月24日|19.3M/S,Shadowrocket/V2ray/Clash/SSR免费节点订阅链接每天更新
- 8月28日|20.4M/S,SSR/Clash/Shadowrocket/V2ray免费节点订阅链接每天更新
- 9月2日|21.4M/S,SSR/Shadowrocket/Clash/V2ray免费节点订阅链接每天更新
- 8月27日|21.5M/S,Clash/V2ray/SSR/Shadowrocket免费节点订阅链接每天更新
- 9月16日|18M/S,SSR/Shadowrocket/Clash/V2ray免费节点订阅链接每天更新
最新文章
- 9月19日|23M/S,Clash/SSR/Shadowrocket/V2ray免费节点订阅链接每天更新
- 9月18日|22.9M/S,Clash/Shadowrocket/V2ray/SSR免费节点订阅链接每天更新
- 9月17日|21.6M/S,SSR/Shadowrocket/V2ray/Clash免费节点订阅链接每天更新
- 9月16日|18M/S,SSR/Shadowrocket/Clash/V2ray免费节点订阅链接每天更新
- 9月15日|20.4M/S,Shadowrocket/V2ray/SSR/Clash免费节点订阅链接每天更新
- 9月14日|20M/S,Shadowrocket/SSR/Clash/V2ray免费节点订阅链接每天更新
- 9月13日|22.3M/S,Shadowrocket/V2ray/Clash/SSR免费节点订阅链接每天更新
- 9月12日|22.9M/S,Clash/V2ray/Shadowrocket/SSR免费节点订阅链接每天更新
- 9月11日|20.4M/S,Shadowrocket/Clash/SSR/V2ray免费节点订阅链接每天更新
- 9月10日|21.9M/S,V2ray/SSR/Clash/Shadowrocket免费节点订阅链接每天更新