Flink CDC 数据处理


简介

官方地址

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

Flink CDC 是Flink 的一个源连接器,用于获取不同数据库中变更的数据,Flink CDC Connectors 集成了 Debezium

#debezium相关 https://github.com/debezium/debezium


  • Supported Connectors

Database

Version

MySQL

Database: 5.7, 8.0.x
JDBC Driver: 8.0.16

PostgreSQL

Database: 9.6, 10, 11, 12
JDBC Driver: 42.2.12

MongoDB

Database: 3.6, 4.x, 5.0
MongoDB Driver: 4.3.1

Oracle

Database: 11, 12, 19
Oracle Driver: 19.3.0.0

  • Suported Flink Versions

Flink CDC Connector Version

Flink Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

2.1.*

1.13.*

DataStream Api

完整代码示例:

https://gitee.com/sxuboss_admin/FlinkDemoAll.git

dependency

     <dependency>             <groupId>com.alibaba.ververica</groupId>             <artifactId>flink-connector-mysql-cdc</artifactId>             <version>1.3.0</version>         </dependency>

code

package com.bbx.flink.dataStream_api_all;  import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  public class FlinkCDCDemo {      public static void main(String [] args) throws Exception {          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()                 .hostname("192.168.10.7")                 .port(3306)                 .username("root")                 .password("000000")                 .databaseList("maxwell_test1", "maxwell_test2")  //DB                 .tableList("maxwell_test1.test1", "maxwell_test1.test2", "maxwell_test2.test3")  //表                 .deserializer(new StringDebeziumDeserializationSchema())   //反序列化-此处可以自定义反序列化--详见下面自定义反序列化,利用 StringDebeziumDeserializationSchema 可以查看能获取到的信息,然后在进行自定义获取                 .startupOptions(StartupOptions.initial())                 .build();          env.addSource(sourceFunction)                 .setParallelism(1)                 .print();         env.execute("print CDC");     } }

创建数据库,maxwell_test1,maxwell_test2,同步创建表test1,test2,test3,并开启binlong,相关参考 Maxwell,

自定义反序列化

package com.bbx.flink.dataStream_api_all.CDC;  import cn.hutool.json.JSONObject; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord;  import java.util.List;  /**  * 自定义   debezium    反序列化  */ public class CustomerDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {     private static final long serialVersionUID = 9042745421910624673L;       @Override     public TypeInformation<String> getProducedType() {         return BasicTypeInfo.STRING_TYPE_INFO;     }      /**      * {      * "db":"",      * "table":"",      * "before":{"id:"","name":""...},      * "after":{"id:"","name":""...},      * "op":""      * }      *      * @param sourceRecord      * @param collector      * @throws Exception      */     @Override     public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {         JSONObject jsonObject = new JSONObject();         //获取 db 和table         String topic = sourceRecord.topic();         String[] split = topic.split("\\.");         //获取before,after         Struct value = (Struct) sourceRecord.value();         Object before = getStructInfo(value, "before");         Object after = getStructInfo(value, "after");          //获取操作类型         Envelope.Operation operation = Envelope.operationFor(sourceRecord);          jsonObject.set("db", split[0])                 .set("table", split[1])                 .set("op", operation)                 .set("before", before)                 .set("after", after);         collector.collect(jsonObject.toString());     }      public Object getStructInfo(Struct struct,String name){         Struct tmpStruct = struct.getStruct(name);         JSONObject jsonObject = new JSONObject();         if (tmpStruct != null) {             Schema schema = tmpStruct.schema();             List<Field> fields = schema.fields();             fields.forEach(i -> {                 jsonObject.set(i.name(), tmpStruct.get(i));             });         }         return jsonObject;     } }


Table Api & SQL

code

package com.bbx.flink.dataStream_api_all.CDC;  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;  public class FlinkCDCTableDemo {      public static void main(String [] args) throws Exception {          //执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);          //获取table Env         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);          String test1Sql="        CREATE TABLE test1 (\n" +                 "                id INT,\n" +                 "                name STRING\n" +                 "                ) WITH (\n" +                 "                'connector' = 'mysql-cdc',\n" +                 "                'hostname' = '192.168.10.7',\n" +                 "                'port' = '3306',\n" +                 "                'username' = 'root',\n" +                 "                'password' = '123456',\n" +                 "                'database-name' = 'maxwell_test1',\n" +                 "                'table-name' = 'test1')";         tableEnv.executeSql(test1Sql);          String querySQl = "select * from test1";         tableEnv.executeSql(querySQl).print();     } }

版权声明:

作者: freeclashnode

链接: https://www.freeclashnode.com/news/article-1652.htm

来源: FreeClashNode

文章版权归作者所有,未经允许请勿转载。

免费节点实时更新

热门文章

最新文章

归档