58同城實(shí)時(shí)計(jì)算平臺(tái)架構(gòu)實(shí)踐
導(dǎo)語
本文主要介紹58同城實(shí)時(shí)計(jì)算平臺(tái)技術(shù)演進(jìn),以及基于Flink打造的一站式實(shí)時(shí)計(jì)算平臺(tái)Wstream,涵蓋很多實(shí)踐經(jīng)驗(yàn)、干貨和方法論,希望對您有所幫助。
背景
58同城作為覆蓋生活全領(lǐng)域的服務(wù)平臺(tái),業(yè)務(wù)覆蓋招聘、房產(chǎn)、汽車、金融、二手及本地服務(wù)等各個(gè)方面。 豐富的業(yè)務(wù)線和龐大的用戶數(shù)每天產(chǎn)生海量用戶數(shù)據(jù)需要實(shí)時(shí)化的計(jì)算分析,實(shí)時(shí)計(jì)算平臺(tái)定位于為集團(tuán)海量數(shù)據(jù)提供高效、穩(wěn)定、分布式實(shí)時(shí)計(jì)算的基礎(chǔ)服務(wù)。 本文主要介紹58同城基于Flink打造的一站式實(shí)時(shí)計(jì)算平臺(tái)Wstream。
實(shí)時(shí)計(jì)算場景
和很多互聯(lián)網(wǎng)公司一樣,實(shí)時(shí)計(jì)算在58擁有豐富的場景需求,主要包括以下幾類:
1.實(shí)時(shí)數(shù)據(jù)ETL
實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)進(jìn)行清洗、轉(zhuǎn)換、結(jié)構(gòu)化處理用于下游計(jì)算處理。
2.實(shí)時(shí)數(shù)倉
實(shí)時(shí)化數(shù)據(jù)計(jì)算,倉庫模型加工和存儲(chǔ)。 實(shí)時(shí)分析業(yè)務(wù)及用戶各類指標(biāo),讓運(yùn)營更加實(shí)時(shí)化。
3.實(shí)時(shí)監(jiān)控
對系統(tǒng)和用戶行為進(jìn)行實(shí)時(shí)檢測和分析,如業(yè)務(wù)指標(biāo)實(shí)時(shí)監(jiān)控,運(yùn)維線上穩(wěn)定性監(jiān)控,金融 風(fēng)控等。
4.實(shí)時(shí)分析
特征平臺(tái),用戶畫像,實(shí)時(shí)個(gè)性化推薦等。
平臺(tái)演進(jìn)

在實(shí)時(shí)計(jì)算平臺(tái)建設(shè)過程中,主要是跟進(jìn)開源社區(qū)發(fā)展以及實(shí)際業(yè)務(wù)需求,計(jì)算框架經(jīng)歷了Storm到 Spark Streaming到 Flink的發(fā)展,同時(shí)建設(shè)一站式實(shí)時(shí)計(jì)算平臺(tái),旨在提升用戶實(shí)時(shí)計(jì)算需求開發(fā)上線管理監(jiān)控效率,優(yōu)化平臺(tái)管理。
實(shí)時(shí)計(jì)算引擎前期基于Storm和Spark Streaming構(gòu)建,很多情況下并不能很好的滿足業(yè)務(wù)需求,如商業(yè)部門基于Spark Streaming構(gòu)建的特征平臺(tái)希望將計(jì)算延遲由分鐘級(jí)降低到秒級(jí),提升用戶體驗(yàn),運(yùn)維監(jiān)控平臺(tái)基于Storm分析公司全量nginx日志對線上業(yè)務(wù)進(jìn)行監(jiān)控,需要秒級(jí)甚至毫秒級(jí)別的延遲,Storm的吞吐能力成為瓶頸。 同時(shí)隨著實(shí)時(shí)需求不斷增加,場景更加豐富,在追求任務(wù)高吞吐低延遲的基礎(chǔ)上,對計(jì)算過程中間狀態(tài)管理,靈活窗口支持,以及exactly once語義保障的訴求越來越多。 Apache Flink開源之后,支持高吞吐低延遲的架構(gòu)設(shè)計(jì)以及高可用的穩(wěn)定性,同時(shí)擁有實(shí)時(shí)計(jì)算場景一系列特性以及支持實(shí)時(shí)Sql模型,使我們決定采用 Flink作為新一代實(shí)時(shí)計(jì)算平臺(tái)的計(jì)算引擎。
平臺(tái)規(guī)模

實(shí)時(shí)計(jì)算平臺(tái)當(dāng)前主要基于Storm/Spark Streaming/Flink,集群共計(jì)500多臺(tái)機(jī)器,每天處理數(shù)據(jù)量6000億+,其中Flink經(jīng)過近一年的建設(shè),任務(wù)占比已經(jīng)達(dá)到50% 。
Flink穩(wěn)定性
Flink作為實(shí)時(shí)計(jì)算集群,可用性要求遠(yuǎn)高于離線計(jì)算集群。 為保障集群可用性,平臺(tái)主要采用任務(wù)隔離以及高可用集群架構(gòu)保障穩(wěn)定性。
任務(wù)隔離
在應(yīng)用層面主要基于業(yè)務(wù)線以及場景進(jìn)行機(jī)器隔離,隊(duì)列資源分配管理,避免集群抖動(dòng)造成全局影響。

集群架構(gòu)
Flink集群采用了ON YARN模式獨(dú)立部署,為減少集群維護(hù)工作量,底層HDFS利用公司統(tǒng)一HDFS Federation架構(gòu)下建立獨(dú)立的namespace,減少Flink任務(wù)在checkpoint采用hdfs/rocksdb作為狀態(tài)存儲(chǔ)后端場景下由于hdfs抖動(dòng)出現(xiàn)頻繁異常失敗。 在資源隔離層面,引入Node Label機(jī)制實(shí)現(xiàn)重要任務(wù)運(yùn)行在獨(dú)立機(jī)器,不同計(jì)算性質(zhì)任務(wù)運(yùn)行在合適的機(jī)器下,最大化機(jī)器資源的利用率。 同時(shí)在YARN資源隔離基礎(chǔ)上增加Cgroup進(jìn)行物理cpu隔離,減少任務(wù)間搶占影響,保障任務(wù)運(yùn)行穩(wěn)定性。

平臺(tái)化管理
Wstream是一套基于Apache Flink構(gòu)建的一站式、高性能實(shí)時(shí)大數(shù)據(jù)處理平臺(tái)。 提供SQL化流式數(shù)據(jù)分析能力,大幅降低數(shù)據(jù)實(shí)時(shí)分析門檻,支持通過DDL實(shí)現(xiàn)source/sink以及維表,支持UDF/UDAF/UDTF,為用戶提供更強(qiáng)大的數(shù)據(jù)實(shí)時(shí)處理能力。 支持多樣式應(yīng)用構(gòu)建方式FlinkJar/Stream SQL/Flink-Storm,以滿足不同用戶的開發(fā)需求,同時(shí)通過調(diào)試,監(jiān)控,診斷,探查結(jié)果等輔助手段完善任務(wù)生命周期管理。

流式sql能力建設(shè)
Stream SQL是平臺(tái)為了打造sql化實(shí)時(shí)計(jì)算能力,減小實(shí)時(shí)計(jì)算開發(fā)門檻,基于開源的 Flink,對底層sql模塊進(jìn)行擴(kuò)展實(shí)現(xiàn)以 下功能
1.支持自定義DDL語法(包括源表,輸出表,維表)
2.支持自定義UDF/UDTF/UDAF語法
3.實(shí)現(xiàn)了流與維表的join,雙流join
在支持大數(shù)據(jù)開源組件的同時(shí),也打通了公司主流的實(shí)時(shí)存儲(chǔ)平臺(tái)。 同時(shí)為用戶提供基于Sql client的cli方式以及在Wstream集成了對實(shí)時(shí)sql能力的支持,為用戶提供在線開發(fā)調(diào)試sql任務(wù)的編輯器,同時(shí)支持代碼高亮,智能提示,語法校驗(yàn)及運(yùn)行時(shí)校驗(yàn),盡可能避免用戶提交到集群的任務(wù)出現(xiàn)異常。 另外也為用戶提供了向?qū)Щ渲梅绞剑鉀Q用戶定義table需要了解復(fù)雜的參數(shù)設(shè)置,用戶只需關(guān)心業(yè)務(wù)邏輯處理,像開發(fā)離線Hive一樣使用sql開發(fā)實(shí)時(shí)任務(wù)。

Storm任務(wù)遷移Flink
在完善Flink平臺(tái)建設(shè)的同時(shí),我們也啟動(dòng)Storm任務(wù)遷移Flink計(jì)劃,旨在提升實(shí)時(shí)計(jì)算平臺(tái)整體效率,減少機(jī)器成本和運(yùn)維成本。 Flink-Storm作為官方提供Flink兼容Storm程序?yàn)槲覀儗?shí)現(xiàn)無縫遷移提供了可行性,但是作為beta版本,在實(shí)際使用過程中存在很多無法滿足現(xiàn)實(shí)場景的情況,因此我們進(jìn)行了大量改進(jìn),主要包括實(shí)現(xiàn)Storm任務(wù)on yarn ,遷移之后任務(wù)at least once語義保障,兼容Storm的 tick tuple機(jī)制等等。

通過對Fink-Storm的優(yōu)化,在無需用戶修改代碼的基礎(chǔ)上,我們已經(jīng)順利完成多個(gè)Storm版本集群任務(wù)遷移和集群下線,在保障實(shí)時(shí)性及吞吐量的基礎(chǔ)上可以節(jié)約計(jì)算資源40%以上,同時(shí)借助yarn統(tǒng)一管理實(shí)時(shí)計(jì)算平臺(tái)無需維護(hù)多套Storm集群,整體提升了平臺(tái)資源利用率,減輕平臺(tái)運(yùn)維工作量。
任務(wù)診斷
指標(biāo)監(jiān)控
Flink webUI 提供了大量的運(yùn)行時(shí)信息供用戶了解任務(wù)當(dāng)前運(yùn)行狀況,但是存在無法獲取歷史metrics的問題導(dǎo)致用戶無法了解任務(wù)歷史運(yùn)行狀態(tài),因此我們采用了Flink原生支持的Prometheus進(jìn)行實(shí)時(shí)指標(biāo)采集和存儲(chǔ),Prometheus是一個(gè)開源的監(jiān)控和報(bào)警系統(tǒng),通過pushgateway的方式實(shí)時(shí)上報(bào)metrics,Prometheus集群采用Fedration部署模式,meta節(jié)點(diǎn)定時(shí)抓取所有子節(jié)點(diǎn)指標(biāo)進(jìn)行匯總,方便統(tǒng)一數(shù)據(jù)源提供給Grafana進(jìn)行可視化以及告警配置。

任務(wù)延遲
吞吐能力和延遲作為衡量實(shí)時(shí)任務(wù)性能最重要的指標(biāo),我們經(jīng)常需要通過這兩個(gè)指標(biāo)來調(diào)整任務(wù)并發(fā)度和資源配置。 Flink Metrics提供latencyTrackingInterval參數(shù)啟用任務(wù)延遲跟蹤,打開會(huì)顯著影響集群和任務(wù)性能,官方高度建議只在debug下使用。 在實(shí)踐場景下,F(xiàn)link任務(wù)數(shù)據(jù)源基本都是Kafka,因此我們采用topic消費(fèi)堆積作為衡量任務(wù)延遲的指標(biāo),監(jiān)控模塊實(shí)時(shí)通過Flink rest獲取任務(wù)正在消費(fèi)topic的offset,同時(shí)通過Kafka JMX獲取對應(yīng)topic的logsize,采用logsize– offset作為topic的堆積。

日志檢索
Flink 作為分布式計(jì)算引擎,所有任務(wù)會(huì)由YARN統(tǒng)一調(diào)度到任意的計(jì)算節(jié)點(diǎn),因此任務(wù)的運(yùn)行日志會(huì)分布在不同的機(jī)器,用戶定位日志困難,我們通過調(diào)整log4j日志框架默認(rèn)機(jī)制,按天切分任務(wù)日志,定期清理過期日志,避免異常任務(wù)頻繁寫滿磁盤導(dǎo)致計(jì)算節(jié)點(diǎn)不可用的情況,同時(shí)在所有計(jì)算節(jié)點(diǎn)部署agent 實(shí)時(shí)采集日志,匯聚寫入Kafka,通過日志分發(fā)平臺(tái)實(shí)時(shí)將數(shù)據(jù)分發(fā)到ES,方便用戶進(jìn)行日志檢索和定位問題。
Flink優(yōu)化
在實(shí)際使用過程中, 我們也針對業(yè)務(wù)場景進(jìn)行了一些優(yōu)化和擴(kuò)展,主要包括:
1.Storm任務(wù)需要Storm引擎提供ack機(jī)制保障消息傳遞at least once語義,遷移到Flink無法使用ack機(jī)制,我們通過定制KafakSpout實(shí)現(xiàn)checkpoint相關(guān)接口,通過Flink checkpoint機(jī)制實(shí)現(xiàn)消息傳遞不丟失。 另外Flink-Storm默認(rèn)只能支持standalone的提交方式,我們通過實(shí)現(xiàn)yarn client相關(guān)接口增加了storm on yarn的支持。
2.Flink 1.6推薦的是一個(gè)TaskManager對應(yīng)一個(gè)slot的使用方式,在申請資源的時(shí)候根據(jù)最大并發(fā)度申請對應(yīng)數(shù)量的TaskManger,這樣導(dǎo)致的問題就是在任務(wù)設(shè)置task slots之后需要申請的資源大于實(shí)際資源。 我們通過在ResoureManager請求資源管理器SlotManager的時(shí)候增加TaskManagerSlot相關(guān)信息 ,用于維護(hù)申請到的待分配TaskManager和slot,之后對于SlotRequests請求不是直接申請TaskManager,而是先從SlotManager申請是否有足夠slot,沒有才會(huì)啟動(dòng)新的TaskManger,這樣就實(shí)現(xiàn)了申請資源等于實(shí)際消耗資源,避免任務(wù)在資源足夠的情況下無法啟動(dòng)。

3.Kafak Connector改造,增加自動(dòng)換行支持,另外針對08source無法設(shè)置client.id,通過將client.id生成機(jī)制優(yōu)化成更有標(biāo)識(shí)意義的id,便于Kafka層面管控
4.Flink提交任務(wù)無法支持第三方依賴jar包和配置文件供TaskManager使用,我們通過修改flink啟動(dòng)腳本,增加相關(guān)參數(shù)支持外部傳輸文件,之后在任務(wù)啟動(dòng)過程中通過將對應(yīng)的jar包和文件加入classpath,借助yarn的文件管理機(jī)制實(shí)現(xiàn)類似spark對應(yīng)的使用方式,方便用戶使用
5.業(yè)務(wù)場景存在大量實(shí)時(shí)寫入hdfs需求,F(xiàn)link 自帶BucketingSink默認(rèn)只支持string和avro格式,我們在此基礎(chǔ)上同時(shí)支持了LZO及Parquet格式寫入,極大提升數(shù)據(jù)寫入性能。
后續(xù)規(guī)劃
實(shí)時(shí)計(jì)算平臺(tái)當(dāng)前正在進(jìn)行Storm任務(wù)遷移Flink集群,目前已經(jīng)基本完成,大幅提升了平臺(tái)資源利用率和計(jì)算效率。 后續(xù)將繼續(xù)調(diào)研完善Flink相關(guān)能力,推動(dòng)Flink在更多的實(shí)時(shí)場景下的應(yīng)用,包括實(shí)時(shí)規(guī)則引擎,實(shí)時(shí)機(jī)器學(xué)習(xí)等。




























