国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Flume-接入Hive數倉搭建流程

大數據
實時流接入數倉,基本在大公司都會有,在Flume1.8以后支持taildir source, 其有許多特點,而被廣泛使用。本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

實時流接入數倉,基本在大公司都會有,在Flume1.8以后支持taildir source, 其有以下幾個特點,而被廣泛使用:

  1. 使用正則表達式匹配目錄中的文件名
  2. 監控的文件中,一旦有數據寫入,Flume就會將信息寫入到指定的Sink
  3. 高可靠,不會丟失數據
  4. 不會對跟蹤文件有任何處理,不會重命名也不會刪除
  5. 不支持Windows,不能讀二進制文件。支持按行讀取文本文件

本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。

1.1 taildir source配置

  1. a1.sources.r1.type = TAILDIR 
  2. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  3. a1.sources.r1.filegroups = f1 
  4. a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log 

1.2 hdfs sink 配置

  1. a1.sinks.k1.type = hdfs 
  2. a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/ 
  3. a1.sinks.k1.hdfs.filePrefix = startlog. 
  4. # 配置文件滾動方式(文件大小32M) 
  5. a1.sinks.k1.hdfs.rollSize = 33554432 
  6. a1.sinks.k1.hdfs.rollCount = 0 
  7. a1.sinks.k1.hdfs.rollInterval = 0 
  8. a1.sinks.k1.hdfs.idleTimeout = 0 
  9. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  10. # 向hdfs上刷新的event的個數 
  11. a1.sinks.k1.hdfs.batchSize = 100 
  12. # 使用本地時間 
  13. a1.sinks.k1.hdfs.useLocalTimeStamp = true  

1.3 Agent的配置

 

  1. a1.sources = r1 
  2. a1.sinks = k1 
  3. a1.channels = c1 
  4. # taildir source 
  5. a1.sources.r1.type = TAILDIR 
  6. a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json 
  7. a1.sources.r1.filegroups = f1 
  8. a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log 
  9. # memorychannel 
  10. a1.channels.c1.type = memory 
  11. a1.channels.c1.capacity = 100000 
  12. a1.channels.c1.transactionCapacity = 2000 
  13. # hdfs sink 
  14. a1.sinks.k1.type = hdfs 
  15. a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/ 
  16. a1.sinks.k1.hdfs.filePrefix = startlog. 
  17. # 配置文件滾動方式(文件大小32M) 
  18. a1.sinks.k1.hdfs.rollSize = 33554432 
  19. a1.sinks.k1.hdfs.rollCount = 0 
  20. a1.sinks.k1.hdfs.rollInterval = 0 
  21. a1.sinks.k1.hdfs.idleTimeout = 0 
  22. a1.sinks.k1.hdfs.minBlockReplicas = 1 
  23. # 向hdfs上刷新的event的個數 
  24. a1.sinks.k1.hdfs.batchSize = 1000 
  25. # 使用本地時間 
  26. a1.sinks.k1.hdfs.useLocalTimeStamp = true 
  27. # Bind the source and sink to the channel 
  28. a1.sources.r1.channels = c1 
  29. a1.sinks.k1.channel = c1  

/opt/hoult/servers/conf/flume-log2hdfs.conf

1.4 啟動

 

  1. flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 
  2.  
  3. export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote" 
  4. # 要想使配置文件生效,還要在命令行中指定配置文件目錄 
  5. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console 

要$FLUME_HOME/conf/flume-env.sh加下面的參數,否則會報錯誤如下:

1.5 使用自定義攔截器解決Flume Agent替換本地時間為日志里面的時間戳

使用netcat source → logger sink來測試

 

  1. # a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1 
  2. a1.sources = r1 
  3. a1.channels = c1 
  4. a1.sinks = k1 
  5. # source 
  6. a1.sources.r1.type = netcat 
  7. a1.sources.r1.bind = linux121 
  8. a1.sources.r1.port = 9999 
  9. a1.sources.r1.interceptors = i1 
  10. a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder 
  11. # channel 
  12. a1.channels.c1.type = memory 
  13. a1.channels.c1.capacity = 10000 
  14. a1.channels.c1.transactionCapacity = 100 
  15. # sink 
  16. a1.sinks.k1.type = logger 
  17. # source、channel、sink之間的關系 
  18. a1.sources.r1.channels = c1 
  19. a1.sinks.k1.channel = c1  

攔截器主要代碼如下:

 

  1. public class CustomerInterceptor implements Interceptor { 
  2.     private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); 
  3.  
  4.     @Override 
  5.     public void initialize() { 
  6.  
  7.     } 
  8.  
  9.     @Override 
  10.     public Event intercept(Event event) { 
  11.         // 獲得body的內容 
  12.         String eventBody = new String(event.getBody(), Charsets.UTF_8); 
  13.         // 獲取header的內容 
  14.         Map<String, String> headerMap = event.getHeaders(); 
  15.         final String[] bodyArr = eventBody.split("\\s+"); 
  16.         try { 
  17.             String jsonStr = bodyArr[6]; 
  18.             if (Strings.isNullOrEmpty(jsonStr)) { 
  19.                 return null
  20.             } 
  21.             // 將 string 轉成 json 對象 
  22.             JSONObject jsonObject = JSON.parseObject(jsonStr); 
  23.             String timestampStr = jsonObject.getString("time"); 
  24.             //將timestamp 轉為時間日期類型(格式 :yyyyMMdd) 
  25.             long timeStamp = Long.valueOf(timestampStr); 
  26.             String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault())); 
  27.             headerMap.put("logtime"date); 
  28.             event.setHeaders(headerMap); 
  29.         } catch (Exception e) { 
  30.             headerMap.put("logtime""unknown"); 
  31.             event.setHeaders(headerMap); 
  32.         } 
  33.         return event; 
  34.  
  35.     } 
  36.  
  37.     @Override 
  38.     public List<Event> intercept(List<Event> events) { 
  39.         List<Event> out = new ArrayList<>(); 
  40.         for (Event event : events) { 
  41.             Event outEvent = intercept(event); 
  42.             if (outEvent != null) { 
  43.                 out.add(outEvent); 
  44.             } 
  45.         } 
  46.         return out
  47.     } 
  48.  
  49.     @Override 
  50.     public void close() { 
  51.  
  52.     } 
  53.  
  54.     public static class Builder implements Interceptor.Builder { 
  55.         @Override 
  56.         public Interceptor build() { 
  57.             return new CustomerInterceptor(); 
  58.         } 
  59.  
  60.         @Override 
  61.         public void configure(Context context) { 
  62.         } 
  63.     } 

啟動

 

  1. flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console 
  2. ## 測試 
  3. telnet linux121 9999  

吳邪,小三爺,混跡于后臺,大數據,人工智能領域的小菜鳥。

責任編輯:未麗燕 來源: segmentfault.com
相關推薦

2025-06-11 02:45:00

2023-08-07 01:25:39

2022-01-02 23:02:16

數據中臺選型

2025-09-08 09:39:25

2021-01-31 23:54:23

數倉模型

2022-08-22 17:46:56

虛擬數倉Impala

2021-01-04 05:42:48

數倉模型設計

2022-07-26 15:38:58

數據倉數據治理數據團隊

2023-01-03 17:43:39

網易郵箱數倉

2022-11-25 10:07:12

數倉數據流開發

2021-08-02 17:24:37

數字化

2022-03-01 17:16:16

數倉建模ID Mapping

2021-12-02 08:41:30

數倉建模設計

2023-11-23 16:53:56

數據倉庫大數據

2022-02-18 09:02:04

數據倉庫治理

2022-01-13 10:45:48

數倉對象主題域

2025-05-27 00:15:07

2022-08-01 15:58:48

數據倉庫架構數據

2022-12-08 10:16:58

數據模型

2021-08-11 07:53:22

數倉維度建模
點贊
收藏

51CTO技術棧公眾號

亚洲第一会所001| 日韩一区精品字幕| 大荫蒂欧美视频另类xxxx| 国产精品88久久久久久妇女| 日韩高清电影免费| 日韩精品在线免费| 国产在线视频福利| 久久精品亚洲精品国产欧美 | 五月天激情综合| 日韩国产欧美精品| www.丝袜精品| 亚洲欧美日韩视频一区| 巨大荫蒂视频欧美大片| 亚洲午夜精品网| 黄色国产小视频| 国产精品77777| 日本一区不卡| 伊人久久综合| 国产精品日韩欧美综合| 国产精品zjzjzj在线观看| 国产一区二区三区网站| av在线播放观看| 欧美在线影院一区二区| 成人18免费| 国产成人午夜精品影院观看视频| 国产欧美一区二区视频| 日韩欧美在线中字| 欧美精品手机在线| 成人看片在线观看| 日韩精品在线观看一区| freexxx性亚洲精品| 717成人午夜免费福利电影| 亚洲嫩模一区| 亚洲一区二区不卡免费| 久草在线在线视频| 一区av在线播放| 国产九色在线| 亚洲精品97久久| 99久久久国产| 91精品久久久久久久久不口人| 久久久久美女| 亚洲精品成人自拍| www国产精品av| 免费看美女隐私的视频| 欧美午夜精品伦理| 日韩伦理av| www.久久久久久.com| 欧美禁忌电影| 精品九九九九| 91最新地址在线播放| 超碰在线首页| 日韩女同互慰一区二区| 精品午夜视频| 99精品国产一区二区| 三级欧美在线一区| 欧美三级午夜理伦三级富婆| 色综合久久中文字幕| 中国字幕a在线看韩国电影| 国内外成人免费激情在线视频| 欧美疯狂party性派对| 国产日韩在线一区二区三区| 成人性生交大片免费看中文网站| xxxx影院| 91麻豆精品国产91久久久久久| 美女100%一区| 久久久欧美一区二区| 亚洲国产精品成人| 性欧美精品一区二区三区在线播放 | 精品乱码一区二区三区| 男女男精品视频| 人体内射精一区二区三区| 欧美高清在线一区| 在线观看高清av| 精品日韩在线观看| 亚洲伦理久久| 国产在线拍偷自揄拍精品| 巨乳诱惑日韩免费av| 男人添女人下面高潮视频| 亚洲精选免费视频| 一二三四区在线观看| 在线观看欧美日韩国产| 国产a久久精品一区二区三区| 91文字幕巨乱亚洲香蕉| 久草中文综合在线| 成人漫画网站免费| 欧美精品tushy高清| 日本亚洲欧洲无免费码在线| 国产精品高潮呻吟久久av野狼 | 日韩亚洲欧美高清| 亚洲男人在线| 国产精品yjizz| 成人黄色在线视频| 中文字幕在线视频不卡| 亚洲欧美成人网| 成人精品亚洲| 国内自拍中文字幕| 婷婷国产v国产偷v亚洲高清| 国产乱码午夜在线视频 | 毛片在线播放网址| 亚洲天堂精品在线| 久久综合成人| 九九热只有这里有精品| 欧美影院精品一区| 少妇精品视频一区二区免费看| 欧美亚洲激情在线| 麻豆专区一区二区三区四区五区| 邪恶网站在线观看| 日韩精品一区二区三区三区免费 | 久久综合九色综合久99| 成人高清伦理免费影院在线观看| 精东影业在线观看| 在线亚洲国产精品网| 国内成人在线| 超碰超碰97| 中文字幕亚洲欧美日韩2019| 精品国产美女| 日韩精品视频一区二区在线观看| 欧美色精品在线视频| 98视频精品全部国产| 中国人体摄影一区二区三区| 欧美日韩国内自拍| 哺乳一区二区三区中文视频 | 亚洲成人一区在线| 精品国产欧美日韩一区二区三区| 97免费资源站| 亚洲免费三区一区二区| 日韩免费在线电影| 久久99精品久久久久子伦| 一区二区三区久久久| 国产区一区二| 三级在线免费观看| 欧美一区二区三区影视| 亚洲一区二区三区| 婷婷丁香六月天| 亚洲3p在线观看| 99久久精品99国产精品| 国产亚洲成av人片在线观看 | 免费在线观看一区二区| 欧美性猛交xxxx乱大交3| 久久免费视频66| 成人免费观看cn| 亚洲国产精品va在线看黑人| 日韩成人精品一区二区| 在线看的黄色网址| 久久精品国产亚洲精品| 国产精品77777竹菊影视小说| 女女色综合影院| 国产成人亚洲欧美| 在线亚洲免费视频| 888久久久| 夜色福利资源站www国产在线视频 夜色资源站国产www在线视频 | 99久热re在线精品996热视频| 一区二区三区视频在线看| 国产精品一线| 一路向西2在线观看| 久久免费视频网| 亚洲欧美欧美一区二区三区| 亚洲丝袜啪啪| 视频免费观看| 成人免费淫片aa视频免费| 欧美日韩国产在线看| 激情综合视频| 国产又色又爽又黄刺激在线视频| 免费在线国产精品| 亚洲精品一区二区三区蜜桃下载| 天堂成人免费av电影一区| 成人性生交大片免费看网站| 一区二区视频在线播放| 日韩电影免费观看中文字幕| 国产精品99久久久久久宅男| 三上悠亚激情av一区二区三区| 超级碰在线观看| 性欧美视频videos6一9| 91福利在线观看| 懂色av噜噜一区二区三区av| 欧美日韩一二三四| 在线观看欧美日韩电影| 日本免费一二区| 伊人色综合影院| 欧美中文在线视频| 欧美精品一区二区三区视频| 最新国产の精品合集bt伙计| 婷婷综合伊人| 国产情侣一区二区三区| 黄色网免费看| 久久九九视频| 影音先锋欧美精品| 一区二区三区中文字幕电影| 亚洲久久视频| 日本精品久久| 在线成人一区| 伊人久久青草| 2024亚洲男人天堂| 91精品国产综合久久蜜臀| 久久久国产精品不卡| 欧美+日本+国产+在线a∨观看| 2022成人影院| 国产裸舞福利在线视频合集| 亚洲一区在线不卡| 相泽南亚洲一区二区在线播放|