国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

實時數倉 | 三分鐘搞定Flink Cdc

大數據
Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。

簡介

Flink CDC Connector 是ApacheFlink的一組數據源連接器,使用變化數據捕獲change data capture (CDC)從不同的數據庫中提取變更數據。Flink CDC連接器將Debezium集成為引擎來捕獲數據變更。因此,它可以充分利用Debezium的功能。

特點

  • 支持讀取數據庫快照,并且能夠持續讀取數據庫的變更日志,即使發生故障,也支持exactly-once 的處理語義
  • 對于DataStream API的CDC connector,用戶無需部署Debezium和Kafka,即可在單個作業中使用多個數據庫和表上的變更數據。
  • 對于Table/SQL API 的CDC connector,用戶可以使用SQL DDL創建CDC數據源,來監視單個表上的數據變更。

使用場景

  • 數據庫之間的增量數據同步
  • 審計日志
  • 數據庫之上的實時物化視圖
  • 基于CDC的維表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具體如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用過程中的注意點

使用MySQL CDC的注意點

如果要使用MySQL CDC connector,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-connector-mysql-cdc</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

使用canal-json的注意點

如果要使用Kafka的canal-json,對于程序而言,需要添加如下依賴:

  1. <!-- universal --> 
  2. <dependency> 
  3.     <groupId>org.apache.flink</groupId> 
  4.     <artifactId>flink-connector-kafka_2.11</artifactId> 
  5.     <version>1.11.0</version> 
  6. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。由于Flink1.11的安裝包 的lib目錄下并沒有提供該jar包,所以必須要手動添加依賴包,否則會報如下錯誤:

  1. [ERROR] Could not execute SQL statement. Reason: 
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 
  3.  
  4. Available factory identifiers are: 
  5.  
  6. datagen 
  7. mysql-cdc 

使用changelog-json的注意點

如果要使用Kafka的changelog-json Format,對于程序而言,需要添加如下依賴:

  1. <dependency> 
  2.   <groupId>com.alibaba.ververica</groupId> 
  3.   <artifactId>flink-format-changelog-json</artifactId> 
  4.   <version>1.0.0</version> 
  5. </dependency> 

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,將該jar包放在Flink安裝目錄的lib文件夾下即可。

mysql-cdc的操作實踐

創建MySQL數據源表

在創建MySQL CDC表之前,需要先創建MySQL的數據表,如下:

  1. -- MySQL 
  2. /*Table structure for table `order_info` */ 
  3. DROP TABLE IF EXISTS `order_info`; 
  4. CREATE TABLE `order_info` ( 
  5.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  6.   `consignee` varchar(100) DEFAULT NULL COMMENT '收貨人'
  7.   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人電話'
  8.   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '總金額'
  9.   `order_status` varchar(20) DEFAULT NULL COMMENT '訂單狀態,1表示下單,2表示支付'
  10.   `user_id` bigint(20) DEFAULT NULL COMMENT '用戶id'
  11.   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式'
  12.   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送貨地址'
  13.   `order_comment` varchar(200) DEFAULT NULL COMMENT '訂單備注'
  14.   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '訂單交易編號(第三方支付用)'
  15.   `trade_body` varchar(200) DEFAULT NULL COMMENT '訂單描述(第三方支付用)'
  16.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  17.   `operate_time` datetime DEFAULT NULL COMMENT '操作時間'
  18.   `expire_time` datetime DEFAULT NULL COMMENT '失效時間'
  19.   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流單編號'
  20.   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父訂單編號'
  21.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片路徑'
  22.   `province_id` int(20) DEFAULT NULL COMMENT '地區'
  23.   PRIMARY KEY (`id`) 
  24. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單表'
  25. -- ---------------------------- 
  26. -- Records of order_info 
  27. -- ---------------------------- 
  28. INSERT INTO `order_info`  
  29. VALUES (476, 'lAXjcL''13408115089', 433.00, '2', 10, '2''OYyAdSdLxedceqovndCD''ihjAYsSjrgJMQVdFQnSy''8728720206''''2020-06-18 02:21:38'NULLNULLNULLNULLNULL, 9); 
  30. INSERT INTO `order_info` 
  31. VALUES (477, 'QLiFDb''13415139984', 772.00, '1', 90, '2''OizYrQbKuWvrvdfpkeSZ''wiBhhqhMndCCgXwmWVQq''1679381473''''2020-06-18 09:12:25'NULLNULLNULLNULLNULL, 3); 
  32. INSERT INTO `order_info` 
  33. VALUES (478, 'iwKjQD''13320383859', 88.00, '1', 107, '1''cbXLKtNHWOcWzJVBWdAs''njjsnknHxsxhuCCeNDDi''0937074290''''2020-06-18 15:56:34'NULLNULLNULLNULLNULL, 7); 
  34.  
  35. /*Table structure for table `order_detail` */ 
  36. CREATE TABLE `order_detail` ( 
  37.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號'
  38.   `order_id` bigint(20) DEFAULT NULL COMMENT '訂單編號'
  39.   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id'
  40.   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名稱(冗余)'
  41.   `img_url` varchar(200) DEFAULT NULL COMMENT '圖片名稱(冗余)'
  42.   `order_price` decimal(10,2) DEFAULT NULL COMMENT '購買價格(下單時sku價格)'
  43.   `sku_num` varchar(200) DEFAULT NULL COMMENT '購買個數'
  44.   `create_time` datetime DEFAULT NULL COMMENT '創建時間'
  45.   PRIMARY KEY (`id`) 
  46. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='訂單明細表'
  47.  
  48. -- ---------------------------- 
  49. -- Records of order_detail 
  50. -- ---------------------------- 
  51. INSERT INTO `order_detail`  
  52. VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3''2020-06-18 02:21:38'); 
  53. INSERT INTO `order_detail`  
  54. VALUES (1330, 477, 9, '榮耀10 GT游戲加速 AIS手持夜景 6GB+64GB 幻影藍全網通 移動聯通電信''http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4''2020-06-18 09:12:25'); 
  55. INSERT INTO `order_detail` 
  56. VALUES (1331, 478, 4, '小米Play 流光漸變AI雙攝 4GB+64GB 夢幻藍 全網通4G 雙卡雙待 小水滴全面屏拍照游戲智能手機''http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1''2020-06-18 15:56:34'); 
  57. INSERT INTO `order_detail`  
  58. VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3''2020-06-18 15:56:34'); 
  59. INSERT INTO `order_detail`  
  60. VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移動聯通電信4G手機 雙卡雙待''http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1''2020-06-18 15:56:34'); 

Flink SQL Cli創建CDC數據源

啟動 Flink 集群,再啟動 SQL CLI,執行下面命令:

  1. -- 創建訂單信息表 
  2. CREATE TABLE order_info( 
  3.     id BIGINT
  4.     user_id BIGINT
  5.     create_time TIMESTAMP(0), 
  6.     operate_time TIMESTAMP(0), 
  7.     province_id INT
  8.     order_status STRING, 
  9.     total_amount DECIMAL(10, 5) 
  10.   ) WITH ( 
  11.     'connector' = 'mysql-cdc'
  12.     'hostname' = 'kms-1'
  13.     'port' = '3306'
  14.     'username' = 'root'
  15.     'password' = '123qwe'
  16.     'database-name' = 'mydw'
  17.     'table-name' = 'order_info' 
  18. ); 

在Flink SQL Cli中查詢該表的數據:result-mode: tableau,+表示數據的insert。

在SQL CLI中創建訂單詳情表:

  1. CREATE TABLE order_detail( 
  2.     id BIGINT
  3.     order_id BIGINT
  4.     sku_id BIGINT
  5.     sku_name STRING, 
  6.     sku_num BIGINT
  7.     order_price DECIMAL(10, 5), 
  8.  create_time TIMESTAMP(0) 
  9.  ) WITH ( 
  10.     'connector' = 'mysql-cdc'
  11.     'hostname' = 'kms-1'
  12.     'port' = '3306'
  13.     'username' = 'root'
  14.     'password' = '123qwe'
  15.     'database-name' = 'mydw'
  16.     'table-name' = 'order_detail' 
  17. ); 

查詢結果如下:

執行JOIN操作:

  1. SELECT 
  2.     od.id, 
  3.     oi.id order_id, 
  4.     oi.user_id, 
  5.     oi.province_id, 
  6.     od.sku_id, 
  7.     od.sku_name, 
  8.     od.sku_num, 
  9.     od.order_price, 
  10.     oi.create_time, 
  11.     oi.operate_time 
  12. FROM 
  13.    ( 
  14.     SELECT *  
  15.     FROM order_info 
  16.     WHERE  
  17.       order_status = '2'-- 已支付 
  18.    ) oi 
  19.    JOIN 
  20.   ( 
  21.     SELECT * 
  22.     FROM order_detail 
  23.   ) od  
  24.   ON oi.id = od.order_id; 

canal-json的操作實踐

關于cannal的使用方式,可以參考我的另一篇文章:基于Canal與Flink實現數據實時增量同步(一)。我已經將下面的表通過canal同步到了kafka,具體格式為:

  1.     "data":[ 
  2.         { 
  3.             "id":"1"
  4.             "region_name":"華北" 
  5.         }, 
  6.         { 
  7.             "id":"2"
  8.             "region_name":"華東" 
  9.         }, 
  10.         { 
  11.             "id":"3"
  12.             "region_name":"東北" 
  13.         }, 
  14.         { 
  15.             "id":"4"
  16.             "region_name":"華中" 
  17.         }, 
  18.         { 
  19.             "id":"5"
  20.             "region_name":"華南" 
  21.         }, 
  22.         { 
  23.             "id":"6"
  24.             "region_name":"西南" 
  25.         }, 
  26.         { 
  27.             "id":"7"
  28.             "region_name":"西北" 
  29.         } 
  30.     ], 
  31.     "database":"mydw"
  32.     "es":1597128441000, 
  33.     "id":102, 
  34.     "isDdl":false
  35.     "mysqlType":{ 
  36.         "id":"varchar(20)"
  37.         "region_name":"varchar(20)" 
  38.     }, 
  39.     "old":null
  40.     "pkNames":null
  41.     "sql":""
  42.     "sqlType":{ 
  43.         "id":12, 
  44.         "region_name":12 
  45.     }, 
  46.     "table":"base_region"
  47.     "ts":1597128441424, 
  48.     "type":"INSERT" 

在SQL CLI中創建該canal-json格式的表:

  1. CREATE TABLE region ( 
  2.   id BIGINT
  3.   region_name STRING 
  4. WITH ( 
  5.  'connector' = 'kafka'
  6.  'topic' = 'mydw.base_region'
  7.  'properties.bootstrap.servers' = 'kms-3:9092'
  8.  'properties.group.id' = 'testGroup'
  9.  'format' = 'canal-json' , 
  10.  'scan.startup.mode' = 'earliest-offset'  
  11. ); 

查詢結果如下:

changelog-json的操作實踐

創建MySQL數據源

參見上面的order_info

Flink SQL Cli創建changelog-json表

  1. CREATE TABLE order_gmv2kafka ( 
  2.   day_str STRING, 
  3.   gmv DECIMAL(10, 5) 
  4. WITH ( 
  5.     'connector' = 'kafka'
  6.     'topic' = 'order_gmv_kafka'
  7.     'scan.startup.mode' = 'earliest-offset'
  8.     'properties.bootstrap.servers' = 'kms-3:9092'
  9.     'format' = 'changelog-json' 
  10. ); 
  11.  
  12. INSERT INTO order_gmv2kafka 
  13. SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd'as day_str, SUM(total_amount) as gmv 
  14. FROM order_info 
  15. WHERE order_status = '2' -- 訂單已支付 
  16. GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');  

查詢表看一下結果:

再查一下kafka的數據:

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"

當將另外兩個訂單的狀態order_status更新為2時,總金額=443+772+88=1293再觀察數據:

再看kafka中的數據:

 

責任編輯:武曉燕 來源: 大數據技術與數倉
相關推薦

2009-11-05 16:04:19

Oracle用戶表

2020-11-20 08:36:59

Jpa數據代碼

2024-05-16 11:13:16

Helm工具release

2024-12-18 10:24:59

代理技術JDK動態代理

2009-11-09 12:55:43

WCF事務

2022-02-16 19:42:25

Spring配置開發

2009-11-12 09:16:15

ADO.NET數據庫連

2021-04-20 13:59:37

云計算

2024-08-30 08:50:00

2023-12-27 08:15:47

Java虛擬線程

2022-02-17 09:24:11

TypeScript編程語言javaScrip

2024-01-16 07:46:14

FutureTask接口用法

2013-06-28 14:30:26

棱鏡計劃棱鏡棱鏡監控項目

2025-10-27 01:35:00

2020-06-30 10:45:28

Web開發工具

2021-12-17 07:47:37

IT風險框架

2025-02-24 10:40:55

2020-06-29 07:42:20

邊緣計算云計算技術

2024-07-05 09:31:37

2024-10-15 09:18:30

點贊
收藏

51CTO技術棧公眾號

在线国产日本| 99热精品久久| 精品视频在线视频| 天天影视色香欲综合| 91在线云播放| 国内少妇毛片视频| 日韩电影在线观看网站| 久久99蜜桃综合影院免费观看| 国产一区二区三区四区五区| 国模精品视频一区二区| 久久精品亚洲一区二区三区浴池| 欧美一级久久久久久久大片| 日韩伦理一区二区| 亚洲国产成人精品女人久久久| 久久久久久久激情视频| av在线app| 欧美猛男gaygay网站| 92国产在线视频| 欧美午夜寂寞影院| 香蕉视频在线免费看| 日本韩国欧美国产| 亚洲麻豆精品| 欧美一级生活片| 蜜乳av一区| 亚洲欧美日韩直播| 国产精品66| 欧美高跟鞋交xxxxhd| 国产精品三p一区二区| 欧美亚洲午夜视频在线观看| 日本一区二区三区电影免费观看| 欧美乱大交xxxxx另类电影| 警花av一区二区三区| 97国产真实伦对白精彩视频8| 盗摄牛牛av影视一区二区| 日本在线精品视频| 亚洲91精品| 日韩中文字幕一区| 国产传媒久久文化传媒| 黑鬼大战白妞高潮喷白浆| 亚洲免费观看视频| 国产黄在线看| 欧美精品一区二| 国产成人精品亚洲日本在线观看| 久精品免费视频| 欧美日韩激情| 欧美色欧美亚洲另类七区| 精品一区二区三区在线播放 | 久久久久久一级片| 日本视频一二区| 欧美伊人久久大香线蕉综合69| 人人澡人人添人人爽一区二区| 女同性一区二区三区人了人一| 欧美二区在线看| 播五月开心婷婷综合| 男男视频在线观看网站| 在线观看网站黄不卡| 一本大道色婷婷在线| 91精品国产九九九久久久亚洲| 亚洲国产精品成人| 六月婷婷激情网| 亚洲国产成人av网| 美女一区网站| 国产精品电影一区| 黑人精品欧美一区二区蜜桃| 国产乱xxⅹxx国语对白| 欧美一级久久久久久久大片| 国产精品流白浆在线观看| 韩国成人动漫在线观看| 成人丝袜高跟foot| 国产主播福利在线| 麻豆乱码国产一区二区三区| 国产精品vip| 激情五月开心婷婷| 欧美日韩1234| 女同久久另类99精品国产| 欧美一二三四五区| 亚洲人成网站色在线观看| 亚洲欧美日本免费| 91一区二区在线| 日本一区二区三区四区视频| 懂色av一区二区夜夜嗨| 国产蜜臀在线| 久久久免费观看| 日韩av成人高清| 色噜噜狠狠永久免费| 亚洲欧洲在线免费| 免费高清成人在线| 五月伊人六月| 亚洲国产精品国自产拍av秋霞| 欧美高清另类hdvideosexjaⅴ | 中文字幕欧美视频在线| 国产欧美精品在线观看| 亚洲一二三四| 成人免费观看网站| 精品国产1区二区| 蜜臀91精品一区二区三区| 国产精品4hu.www| 成全视频全集| 中文字幕日韩在线播放| 寂寞少妇一区二区三区| videos性欧美另类高清| 亚洲一区精品视频| 永久555www成人免费| 中文字幕国产一区二区| 国产专区一区| 欧美视频第一| 91黑丝在线| 日韩中文字幕亚洲精品欧美| 精品国产_亚洲人成在线| 亚洲精品成人久久久| 色激情天天射综合网| 日本不卡视频在线| 精品视频在线一区二区在线| 亚洲视频第二页| 国产伦视频一区二区三区| 3d动漫啪啪精品一区二区免费 | 又黄又爽毛片免费观看| 欧美重口另类videos人妖| 欧美性猛交丰臀xxxxx网站| 久久狠狠一本精品综合网| 欧美疯狂party性派对| 黄网站app在线观看| 麻豆影视国产在线观看| 欧美.www| 最近最新中文字幕在线| 97在线视频免费播放| 99久久婷婷国产精品综合| 丝袜美腿一区| 91麻豆天美传媒在线| 精品久久久久久久久久久久包黑料| 欧美不卡一区| 欧美女v视频| 成人h猎奇视频网站| 精品久久久久久久久中文字幕| 禁果av一区二区三区| 激情六月婷婷| 国产乱人伦真实精品视频| 精品国产鲁一鲁一区二区张丽| 国产欧美日韩视频在线| 五月亚洲综合| 国产精品88a∨| 欧美特级www| 99伊人成综合| 精品精品导航| 亚洲国产一区二区精品视频 | 欧美性生活影院| 亚洲激情影院| 中文国产字幕在线观看| 亚洲一区二区免费视频软件合集| 亚洲精品美女免费| 国产成人免费在线视频| 成人精品国产亚洲| 成人亚洲精品777777大片| 欧洲一区二区视频| 色综合久久综合中文综合网| 久久久天天操| 日本少妇一区| 五月综合网站| 成人激情视频免费在线| 91麻豆精品国产| 岛国精品一区二区| 一区三区在线欧| 欧美三级理伦电影| 国产男女免费视频| 国产成人一区二区三区小说| 欧美色中文字幕| 国产一区二区三区四区在线观看| 国产精品一区免费在线| 操操操综合网| 色姑娘综合av| 九九精品视频在线观看| 色老综合老女人久久久| 老司机一区二区| 老汉色老汉首页av亚洲| 求av网址在线观看| 国产91在线免费| 91影视免费在线观看| 亚洲美女视频网站| 亚洲欧美日韩中文播放 | 成人av影院在线| 禁断一区二区三区在线| av在线免费网站| 久久久噜噜噜www成人网| 国产一区二区色| 日韩av在线一区| 亚洲一区在线免费观看| 国产综合久久久久影院| 日本成人小视频| 色婷婷综合久久久中字幕精品久久 | 日韩av高清| 欧美男插女视频| 欧美日韩一区三区| eeuss影院一区二区三区| 亚洲人metart人体| 日韩久久99| 18+激情视频在线| 激情视频国产| 最新中文字幕久久| 91大片在线观看| 欧美日韩国产成人高清视频|