国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

大數(shù)據(jù)框架及流批一體怎么選?

大數(shù)據(jù)
在日常生活中,我們通常會(huì)先把數(shù)據(jù)存儲(chǔ)在一張表中,然后再進(jìn)行加工、分析,這里就涉及到一個(gè)時(shí)效性的問題。

背景

在日常生活中,我們通常會(huì)先把數(shù)據(jù)存儲(chǔ)在一張表中,然后再進(jìn)行加工、分析,這里就涉及到一個(gè)時(shí)效性的問題。

場(chǎng)景一:如果我們處理以年、月為單位的級(jí)別的數(shù)據(jù),針對(duì)這些大量數(shù)據(jù)的實(shí)時(shí)性要求并不高。

場(chǎng)景二:如果我們處理的是以天、小時(shí),甚至分鐘為單位的數(shù)據(jù),那么對(duì)數(shù)據(jù)的時(shí)效性要求就比較高。

在第二種場(chǎng)景下,如果我們?nèi)耘f采用傳統(tǒng)的數(shù)據(jù)處理方式,統(tǒng)一收集數(shù)據(jù),存儲(chǔ)到數(shù)據(jù)庫中,之后在進(jìn)行分析,就可能無法滿足時(shí)效性的要求。

數(shù)據(jù)的計(jì)算模式主要分為:

  • 批量計(jì)算(batch computing)、
  • 流式計(jì)算(stream computing)、
  • 交互計(jì)算(interactive computing)、
  • 圖計(jì)算(graph computing)等。

其中,流式計(jì)算和批量計(jì)算是兩種主要的大數(shù)據(jù)計(jì)算模式,分別適用于不同的大數(shù)據(jù)應(yīng)用場(chǎng)景。

流數(shù)據(jù)(或數(shù)據(jù)流)是指在時(shí)間分布和數(shù)量上無限的一系列動(dòng)態(tài)數(shù)據(jù)集合體,數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,因此必須實(shí)時(shí)計(jì)算給出秒級(jí)響應(yīng)。流式計(jì)算,就是對(duì)數(shù)據(jù)流進(jìn)行處理,是實(shí)時(shí)計(jì)算。

批量計(jì)算則統(tǒng)一收集數(shù)據(jù),存儲(chǔ)到數(shù)據(jù)庫中,然后對(duì)數(shù)據(jù)進(jìn)行批量處理的數(shù)據(jù)計(jì)算方式。兩者的區(qū)別主要體現(xiàn)在以下幾個(gè)方面:

(1)數(shù)據(jù)時(shí)效性不同

  • 流式計(jì)算實(shí)時(shí)、低延遲;
  • 批量計(jì)算非實(shí)時(shí)、高延遲。

(2)數(shù)據(jù)特征不同

  • 流式計(jì)算的數(shù)據(jù)一般是動(dòng)態(tài)的、沒有邊界的;
  • 批處理的數(shù)據(jù)一般則是靜態(tài)數(shù)據(jù)。

(3)應(yīng)用場(chǎng)景不同

  • 流式計(jì)算應(yīng)用在實(shí)時(shí)場(chǎng)景,時(shí)效性要求比較高的場(chǎng)景,如實(shí)時(shí)推薦、業(yè)務(wù)監(jiān)控…。
  • 批量計(jì)算一般說批處理,應(yīng)用在實(shí)時(shí)性要求不高、離線計(jì)算的場(chǎng)景下,數(shù)據(jù)分析、離線報(bào)表等。

(4)運(yùn)行方式不同

  • 流式計(jì)算的任務(wù)持續(xù)進(jìn)行的;
  • 批量計(jì)算的任務(wù)則一次性完成。

流式計(jì)算框架平臺(tái)與相關(guān)產(chǎn)品

第一類,商業(yè)級(jí)流式計(jì)算平臺(tái)(IBM InfoSphere Streams、IBM StreamBase等);

第二類,開源流式計(jì)算框架(Twitter Storm、S4等);

第三類,公司為支持自身業(yè)務(wù)開發(fā)的流式計(jì)算框架。

(1)Strom:Twitter 開發(fā)的第一代流處理系統(tǒng)。

(2)Heron:Twitter 開發(fā)的第二代流處理系統(tǒng)。

(3)Spark streaming:是Spark核心API的一個(gè)擴(kuò)展,可以實(shí)現(xiàn)高吞吐量的、具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理。

(4)Flink:是一個(gè)針對(duì)流數(shù)據(jù)和批數(shù)據(jù)的分布式處理引擎。

(5)Apache Kafka:由Scala寫成。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高通量、低等待的平臺(tái)。

流式計(jì)算主要應(yīng)用場(chǎng)景

流式處理可以用于兩種不同場(chǎng)景:事件流和持續(xù)計(jì)算。

(1)事件流

事件流具能夠持續(xù)產(chǎn)生大量的數(shù)據(jù),這類數(shù)據(jù)最早出現(xiàn)與傳統(tǒng)的銀行和股票交易領(lǐng)域,也在互聯(lián)網(wǎng)監(jiān)控、無線通信網(wǎng)等領(lǐng)域出現(xiàn)、需要以近實(shí)時(shí)的方式對(duì)更新數(shù)據(jù)流進(jìn)行復(fù)雜分析如趨勢(shì)分析、預(yù)測(cè)、監(jiān)控等。簡(jiǎn)單來說,事件流采用的是查詢保持靜態(tài),語句是固定的,數(shù)據(jù)不斷變化的方式。

(2)持續(xù)計(jì)算

比如對(duì)于大型網(wǎng)站的流式數(shù)據(jù):網(wǎng)站的訪問PV/UV、用戶訪問了什么內(nèi)容、搜索了什么內(nèi)容等,實(shí)時(shí)的數(shù)據(jù)計(jì)算和分析可以動(dòng)態(tài)實(shí)時(shí)地刷新用戶訪問數(shù)據(jù),展示網(wǎng)站實(shí)時(shí)流量的變化情況,分析每天各小時(shí)的流量和用戶分布情況;比如金融行業(yè),毫秒級(jí)延遲的需求至關(guān)重要。一些需要實(shí)時(shí)處理數(shù)據(jù)的場(chǎng)景也可以應(yīng)用Storm,比如根據(jù)用戶行為產(chǎn)生的日志文件進(jìn)行實(shí)時(shí)分析,對(duì)用戶進(jìn)行商品的實(shí)時(shí)推薦等。

大數(shù)據(jù)流式計(jì)算可以廣泛應(yīng)用于金融銀行、互聯(lián)網(wǎng)、物聯(lián)網(wǎng)等諸多領(lǐng)域,如股市實(shí)時(shí)分析、插入式廣告投放、交通流量實(shí)時(shí)預(yù)警等場(chǎng)景,主要是為了滿足該場(chǎng)景下的實(shí)時(shí)應(yīng)用需求。數(shù)據(jù)往往以數(shù)據(jù)流的形式持續(xù)到達(dá)數(shù)據(jù)計(jì)算系統(tǒng),計(jì)算功能的實(shí)現(xiàn)是通過有向任務(wù)圖的形式進(jìn)行描述,數(shù)據(jù)流在有向任務(wù)圖中流過后,會(huì)實(shí)時(shí)產(chǎn)生相應(yīng)的計(jì)算結(jié)果。整個(gè)數(shù)據(jù)流的處理過程往往是在毫秒級(jí)的時(shí)間內(nèi)完成的。

通常情況下,大數(shù)據(jù)流式計(jì)算場(chǎng)景具有以下鮮明特征。

1)在流式計(jì)算環(huán)境中,數(shù)據(jù)是以元組為單位,以連續(xù)數(shù)據(jù)流的形態(tài),持續(xù)地到達(dá)大數(shù)據(jù)流式計(jì)算平臺(tái)。數(shù)據(jù)并不是一次全部可用,不能夠一次得到全量數(shù)據(jù),只能在不同的時(shí)間點(diǎn),以增量的方式,逐步得到相應(yīng)數(shù)據(jù)。

2)數(shù)據(jù)源往往是多個(gè),在進(jìn)行數(shù)據(jù)流重放的過程中,數(shù)據(jù)流中各個(gè)元組間的相對(duì)順序是不能控制的。也就是說,在數(shù)據(jù)流重放過程中,得到完全相同的數(shù)據(jù)流(相同的數(shù)據(jù)元組和相同的元組順序)是很困難的,甚至是不可能的。

3)數(shù)據(jù)流的流速是高速的,且隨著時(shí)間在不斷動(dòng)態(tài)變化。這種變化主要體現(xiàn)在兩個(gè)方面,一個(gè)方面是數(shù)據(jù)流流速大小在不同時(shí)間點(diǎn)的變化,這就需要系統(tǒng)可以彈性、動(dòng)態(tài)地適應(yīng)數(shù)據(jù)流的變化,實(shí)現(xiàn)系統(tǒng)中資源、能耗的高效利用;另一方面是數(shù)據(jù)流中各個(gè)元組內(nèi)容(語義)在不同時(shí)間點(diǎn)的變化,即概念漂移,這就需要處理數(shù)據(jù)流的有向任務(wù)圖可以及時(shí)識(shí)別、動(dòng)態(tài)更新和有效適應(yīng)這種語義層面上的變化。

4)實(shí)時(shí)分析和處理數(shù)據(jù)流是至關(guān)重要的,在數(shù)據(jù)流中,其生命周期的時(shí)效性往往很短,數(shù)據(jù)的時(shí)間價(jià)值也更加重要。所有數(shù)據(jù)流到來后,均需要實(shí)時(shí)處理,并實(shí)時(shí)產(chǎn)生相應(yīng)結(jié)果,進(jìn)行反饋,所有的數(shù)據(jù)元組也僅會(huì)被處理一次。雖然部分?jǐn)?shù)據(jù)可能以批量的形式被存儲(chǔ)下來,但也只是為了滿足后續(xù)其他場(chǎng)景下的應(yīng)用需求。

5)數(shù)據(jù)流是無窮無盡的,只要有數(shù)據(jù)源在不斷產(chǎn)生數(shù)據(jù),數(shù)據(jù)流就會(huì)持續(xù)不斷地到來。這也就需要流式計(jì)算系統(tǒng)永遠(yuǎn)在線運(yùn)行,時(shí)刻準(zhǔn)備接收和處理到來的數(shù)據(jù)流。在線運(yùn)行是流式計(jì)算系統(tǒng)的一個(gè)常態(tài),一旦系統(tǒng)上線后,所有對(duì)該系統(tǒng)的調(diào)整和優(yōu)化也將在在線環(huán)境中開展和完成。

6)多個(gè)不同應(yīng)用會(huì)通過各自的有向任務(wù)圖進(jìn)行表示,并將被部署在一個(gè)大數(shù)據(jù)計(jì)算平臺(tái)中,這就需要整個(gè)計(jì)算平臺(tái)可以有效地為各個(gè)有向任務(wù)圖分配合理資源,并保證滿足用戶服務(wù)級(jí)目標(biāo)。同時(shí)各個(gè)資源間需要公平地競(jìng)爭(zhēng)資源、合理地共享資源,特別是要滿足不同時(shí)間點(diǎn)各應(yīng)用間系統(tǒng)資源的公平使用。

什么是流批一體架構(gòu)?

流處理和批處理都是常用的數(shù)據(jù)處理方式,它們各有優(yōu)劣。流處理通常用于需要實(shí)時(shí)響應(yīng)的場(chǎng)景,如在線監(jiān)控和警報(bào)系統(tǒng)等。而批處理則通常用于離線數(shù)據(jù)分析和挖掘等大規(guī)模數(shù)據(jù)處理場(chǎng)景。選擇合適的處理方式取決于具體的業(yè)務(wù)需求和數(shù)據(jù)處理場(chǎng)景。

以前很多系統(tǒng)的架構(gòu)都是采用的Lambda架構(gòu),它將所有的數(shù)據(jù)分成了三個(gè)層次:批處理層、服務(wù)層和速率層,每個(gè)層次都有自己的功能和目的。

  • 批處理層:負(fù)責(zé)離線計(jì)算和歷史數(shù)據(jù)的存儲(chǔ)。
  • 服務(wù)層:負(fù)責(zé)在線查詢和實(shí)時(shí)數(shù)據(jù)的處理。
  • 速率層:負(fù)責(zé)對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行快速的處理和查詢。

這種架構(gòu),需要一套流處理平臺(tái)和一套批處理平臺(tái),這就可能導(dǎo)致了一些問題:

  1. 資源浪費(fèi):一般來說,白天是流計(jì)算的高峰期,此時(shí)需要更多的計(jì)算資源,相對(duì)來說,批計(jì)算就沒有嚴(yán)格的限制,可以選擇凌晨或者白天任意時(shí)刻,但是,流計(jì)算和批計(jì)算的資源無法進(jìn)行混合調(diào)度,無法對(duì)資源進(jìn)行錯(cuò)峰使用,這就會(huì)導(dǎo)致資源的浪費(fèi)。
  2. 成本高:流計(jì)算和批計(jì)算使用的是不同的技術(shù),意味著需要維護(hù)兩套代碼,不論是學(xué)習(xí)成本還是維護(hù)成本都會(huì)更高。
  3. 數(shù)據(jù)一致性:兩套平臺(tái)都是不一樣的,可能會(huì)導(dǎo)致數(shù)據(jù)不一致的問題。

因此,流批一體誕生了!

流批一體的技術(shù)理念最早是2015年提出的,初衷就是讓開發(fā)能用同一套代碼和API實(shí)現(xiàn)流計(jì)算和批計(jì)算,但是那時(shí)候?qū)嶋H落地的就少之又少,阿里巴巴在2020年雙十一首次實(shí)際落地。

Flink流批一體架構(gòu):

目前有哪些流處理的框架?

Kafka Stream

基于 Kafka 的一個(gè)輕量級(jí)流式計(jì)算框架,我們可以使用它從一個(gè)或多個(gè)輸入流中讀取數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換和處理,然后將結(jié)果寫入一個(gè)或多個(gè)輸出流中。

工作原理:讀取數(shù)據(jù)流 -> 數(shù)據(jù)轉(zhuǎn)換/時(shí)間窗口處理/狀態(tài)管理 -> 任務(wù)調(diào)度 -> 輸出結(jié)果

簡(jiǎn)單示例:統(tǒng)計(jì)20秒內(nèi)每個(gè)input的key輸入的次數(shù),典型的例子:統(tǒng)計(jì)網(wǎng)站20秒內(nèi)用戶的點(diǎn)擊次數(shù)。

public class WindowCountApplication {

    private static final String STREAM_INPUT_TOPIC = "streams-window-input";
    private static final String STREAM_OUTPUT_TOPIC = "streams-window-output";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, WindowCountApplication.class.getSimpleName());
        props.put(BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVERS);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(STREAM_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .peek((key, value) -> Console.log("[input] key={}, value={}", key, value))
                .groupByKey()
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(20)))
                .count()
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), value))
                .peek((key, value) -> Console.log("[output] key={}, value={}", key, value))
                .to(STREAM_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams kStreams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(kStreams::close));
        kStreams.start();
    }
}

運(yùn)行結(jié)果:{key}={value},發(fā)送了3次A=1,2次B=1,以及1次C=1,統(tǒng)計(jì)結(jié)果在預(yù)期之內(nèi),即A出現(xiàn)3次,B出現(xiàn)2次,C出現(xiàn)1次。

Pulsar Function

和 Kafka Stream 類似,也是輕量級(jí)的流處理框架,不過它是基于 Pulsar 實(shí)現(xiàn)的一個(gè)流處理框架,同樣的,也是從一個(gè)或多個(gè)輸入流中讀取數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換和處理,然后將結(jié)果寫入一個(gè)或多個(gè)輸出流中。感興趣的可以參考我之前寫的文章:Pulsar Function簡(jiǎn)介以及使用

工作原理:訂閱消息流 -> 處理消息 -> 發(fā)布處理結(jié)果

簡(jiǎn)單示例:LocalRunner模式,按照逗號(hào)“,”去切分 input topic 的消息,然后轉(zhuǎn)換成數(shù)字進(jìn)行求和,結(jié)果發(fā)送至 output topic。

public class IntSumFunction implements Function<String, Integer> {

    public static final String BROKER_SERVICE_URL = "pulsar://localhost:6650";
    public static final String INPUT_TOPIC = "persistent://public/default/int-sum-input";
    public static final String OUTPUT_TOPIC = "persistent://public/default/int-sum-output";
    public static final String LOG_TOPIC = "persistent://public/default/int-sum-log";

    @Override
    public Integer process(String input, Context context) {
        Console.log("input: {}", input);
        return Arrays.stream(input.split(","))
                .map(Integer::parseInt)
                .mapToInt(Integer::intValue)
                .sum();
    }

    public static void main(String[] args) throws Exception {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setName(IntSumFunction.class.getSimpleName());
        functionConfig.setClassName(IntSumFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
        functionConfig.setOutput(OUTPUT_TOPIC);
        functionConfig.setLogTopic(LOG_TOPIC);

        LocalRunner localRunner = LocalRunner.builder()
                .brokerServiceUrl(BROKER_SERVICE_URL)
                .functionConfig(functionConfig)
                .build();
        localRunner.start(true);
    }
}

運(yùn)行結(jié)果:1+2+3+4+5+6=21

Flink

  • 一種流處理框架,具有低延遲、高吞吐量和高可靠性的特性。
  • 支持流處理和批處理,并支持基于事件時(shí)間和處理時(shí)間的窗口操作、狀態(tài)管理、容錯(cuò)機(jī)制等。
  • 提供了豐富的算子庫和 API,支持復(fù)雜的數(shù)據(jù)流處理操作。

工作原理:接收數(shù)據(jù)流 -> 數(shù)據(jù)轉(zhuǎn)換 -> 數(shù)據(jù)處理 -> 狀態(tài)管理 -> 容錯(cuò)處理 -> 輸出結(jié)果

簡(jiǎn)單來說就是將數(shù)據(jù)流分成多個(gè)分區(qū),在多個(gè)任務(wù)中并行處理,同時(shí)維護(hù)狀態(tài)信息,實(shí)現(xiàn)高吞吐量、低延遲的流處理。

簡(jiǎn)單示例:從9966端口讀取數(shù)據(jù),將輸入的句子用空格分割成多個(gè)單詞,每隔5秒做一次單詞統(tǒng)計(jì)。

public class WindowSocketWordCount {

    private static final String REGEX = " ";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStreamSource = env.socketTextStream("localhost", 9966);

        SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = socketTextStreamSource
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (sentence, collector) -> {
                    for (String word : sentence.split(REGEX)) {
                        collector.collect(new Tuple2<>(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        streamOperator.print();
        env.execute();
    }
}

運(yùn)行結(jié)果:

Storm

  • 一個(gè)開源的流處理引擎,旨在實(shí)現(xiàn)快速、可靠的數(shù)據(jù)流處理。
  • 是業(yè)界最早出現(xiàn)的一個(gè)流處理框架(2011年),但是現(xiàn)在已經(jīng)有許多其它優(yōu)秀的流處理框架了,所以它在現(xiàn)在并不是唯一選擇。

工作原理:將數(shù)據(jù)流分成多個(gè)小的流(也稱為tuple),并將這些小流通過一系列的操作(也稱為bolt)進(jìn)行處理。

簡(jiǎn)單示例:在本地模式,使用Storm內(nèi)置的RandomSentenceSpout充當(dāng)數(shù)據(jù)源進(jìn)行測(cè)試,用空格拆分生成的句子為多個(gè)單詞,統(tǒng)計(jì)每個(gè)單詞出現(xiàn)次數(shù)。

public class WindowedWordCountApplication {

    public static void main(String[] args) throws Exception {
        StreamBuilder builder = new StreamBuilder();
        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
                .window(TumblingWindows.of(Duration.seconds(2)))
                .flatMap(sentence -> Arrays.asList(sentence.split(" ")))
                .peek(sentence -> Console.log("Random sentence: {}", sentence))
                .mapToPair(word -> Pair.of(word, 1))
                .countByKey()
                .peek(pair -> Console.log("Count word: ", pair.toString()));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("windowedWordCount", new Config(), builder.build());
        Utils.sleep(20000);
        cluster.shutdown();
    }
}

內(nèi)置的RandomSentenceSpout隨機(jī)生成數(shù)據(jù)關(guān)鍵源代碼:

@Override
public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{
        sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
        sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
    };
    final String sentence = sentences[rand.nextInt(sentences.length)];
    LOG.debug("Emitting tuple: {}", sentence);
    collector.emit(new Values(sentence));
}

運(yùn)行結(jié)果:隨機(jī)找一個(gè)單詞“nature”,統(tǒng)計(jì)的次數(shù)為10次。

Spark Streaming

基于 Spark API 的擴(kuò)展,支持對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行可擴(kuò)展、高吞吐量、容錯(cuò)的流處理。

工作原理:接收實(shí)時(shí)輸入數(shù)據(jù)流并將數(shù)據(jù)分成批次,然后由 Spark 引擎處理以批次生成最終結(jié)果流。

簡(jiǎn)單示例:從 kafka 的 spark-streaming topic 讀取數(shù)據(jù),按照空格“ ”拆分,統(tǒng)計(jì)每一個(gè)單詞出現(xiàn)的次數(shù)并打印。

public class JavaDirectKafkaWordCount {

    private static final String KAFKA_BROKERS = "localhost:9092";
    private static final String KAFKA_GROUP_ID = "spark-consumer-group";
    private static final String KAFKA_TOPICS = "spark-streaming";
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        Configurator.setRootLevel(Level.WARN);
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("spark-streaming-word-count");
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(KAFKA_TOPICS.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        JavaDStream<String> linesStream = messages.map(ConsumerRecord::value);
        JavaPairDStream<String, Integer> wordCountStream = linesStream
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey(Integer::sum);

        wordCountStream.print();

        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

運(yùn)行結(jié)果:

如何選擇流處理框架?

  • 簡(jiǎn)單數(shù)據(jù)流處理
    如果只是輕量級(jí)使用的話,可以結(jié)合技術(shù)棧使用消息中間件自帶的流處理框架就更節(jié)省成本。
  • 使用的 Kafka 就用 Kafka Stream。
  • 使用的 Pulsar 就用 Pulsar Function。
  • 復(fù)雜數(shù)據(jù)流場(chǎng)景

綜上,可以結(jié)合數(shù)據(jù)規(guī)模、技術(shù)棧、處理延遲功能特性、未來的考慮、社區(qū)活躍度、成本和可用性等等進(jìn)行選擇。

責(zé)任編輯:華軒 來源: 數(shù)字化助推器
相關(guān)推薦

2019-07-01 15:40:53

大數(shù)據(jù)架構(gòu)流處理

2023-05-16 07:24:25

數(shù)據(jù)湖快手

2020-01-13 14:39:06

FlinkSQL無限流

2023-09-05 07:22:17

Hudi數(shù)據(jù)存儲(chǔ)

2022-06-30 09:30:36

FlinkSQL流批一體京東

2022-09-29 09:22:33

數(shù)據(jù)倉

2021-08-02 10:19:08

Dataphin 數(shù)倉架構(gòu)存儲(chǔ)計(jì)算分離

2023-03-30 07:40:03

FeatHub 項(xiàng)目特征工程開發(fā)

2021-11-18 21:09:50

流批場(chǎng)景引擎

2024-03-25 08:15:02

數(shù)據(jù)分析AI 一體化大數(shù)據(jù)

2016-11-07 12:36:18

2023-09-24 20:31:23

數(shù)字化

2014-02-12 09:15:17

Oracle大數(shù)據(jù)

2013-09-17 14:23:52

天云大數(shù)據(jù)一體機(jī)

2019-11-28 20:51:10

阿里云Alink開源

2017-04-01 10:10:07

桌面一體機(jī)采購

2020-11-24 10:26:08

2021-06-30 09:20:08

數(shù)倉FlinkHive

2013-05-31 10:19:12

XData大數(shù)據(jù)一體

2012-11-26 13:02:10

浪潮大數(shù)據(jù)一體機(jī)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

在线欧美日韩国产| 国产成人在线免费观看| 国产一级做a爰片久久| 大量国产精品视频| 国产精品国产三级国产普通话蜜臀| 国产日韩中文在线中文字幕| 日韩中文字幕二区| 欧美一区日韩一区| 日韩在线欧美| 成人午夜碰碰视频| www.综合| 日本成人中文字幕在线| 性欧美办公室18xxxxhd| 亚洲免费av在线| 视频在线观看91| 精品久久对白| 成人在线免费播放视频| 色哟哟精品丝袜一区二区| heyzo高清在线| 国产啪精品视频网站| 中文字幕一区二区在线观看| 亚洲v天堂v手机在线| av在线播放av| 97av视频在线观看| 韩国一区二区三区美女美女秀| 亚洲三级免费看| 日韩欧美在线观看视频| 国产成人h网站| 91久久久一线二线三线品牌| 在线观看成人一级片| 亚洲区小说区图片区qvod按摩| 粉嫩av一区二区三区天美传媒| 欧美一级在线亚洲天堂| 亚洲成人久久电影| 色综合天天在线| 亚洲欧美中日韩| 99re成人精品视频| 国产精品一区免费在线观看| 四虎精品欧美一区二区免费| 福利视频一二区| 99视频在线精品国自产拍免费观看| 成人自拍av| 日本电影在线观看网站| 开心丁香婷婷深爱五月| av丝袜天堂网| 久草热视频在线观看| 国产91在线亚洲| 国产精品视频自拍| 国产精品久久久久aaaa九色| 国产成人精品午夜| 成人深夜在线观看| 成人中文视频| 91精品福利观看| 日本在线播放一二三区| heyzo高清在线| 日韩激情电影免费看| 偷偷www综合久久久久久久| 日韩欧美激情在线| av中文字幕在线| 国产精品女同一区二区三区| 国产婷婷97碰碰久久人人蜜臀| 青青草97国产精品免费观看| 图片婷婷一区| 亚洲精品影片| 国产精品一区高清| 欧美人与牛zoz0性行为| 久久久久久久久国产一区| 久久国产精品亚洲人一区二区三区| jizz在线观看视频| a级黄色小视频| 日韩精品小视频| 日韩女优av电影在线观看| 一区二区三区四区视频| 国产精选一区二区三区| 欧美艳星brazzers| av在线免费观看网址| 天堂影院在线| zzzwww在线看片免费| 国产精品免费精品自在线观看| 91丝袜脚交足在线播放| 日韩欧美在线一区| 亚洲精品动漫久久久久| 国产美女精品视频免费观看| 一区二区高清在线| 久久精品久久久久久国产 免费| 九色在线观看视频| 一区二区不卡在线观看| 国产精品swag| 中文字幕久久综合| 香艳视频网站| 免费观看成人性生生活片 | 国产不卡在线视频| 美女100%一区| 欧美专区亚洲专区| 久久国产精品网| 国产女人水真多18毛片18精品 | 欧美激情极品视频| 国产精品视频一区二区三区经| 免费观看美女裸体网站| 蜜桃视频在线入口www| 久久久精品一区二区毛片免费看| 不卡一区综合视频| 久久av免费观看| 欧美精品麻豆| 欧美a在线视频| 欧美巨乳在线| 精品视频自拍| jizz一区二区| 欧美成人aa大片| 国产精品国产三级欧美二区| x88av蜜桃臀一区二区| 国产精品一区三区在线观看| 成人国产精品一区二区| 黄色在线看片| 色婷婷国产精品| 久久精品日产第一区二区三区高清版 | 日本新janpanese乱熟| 天堂8中文在线| 午夜日韩激情| 超碰在线caoporen| 国产精品麻豆视频| 三上悠亚激情av一区二区三区| 春暖花开亚洲一区二区三区| 日韩一区二区三区免费播放| 亚洲精品国产a| 91久久久久久久久久久| 免费在线看v| 久久xxxx精品视频| 亚洲欧美国产视频| 99热在线这里只有精品| 蜜桃成人av| 男人亚洲天堂| 麻豆精品视频| 欧美大人香蕉在线| 国产精品一区二区在线观看不卡 | 一本到高清视频免费精品| 97超碰人人看人人| www.久久ai| jvid福利写真一区二区三区| 欧美综合在线第二页| 9色视频在线观看| 精品女同一区二区| 久久野战av| 黄页网址在线观看| 欧美日韩视频免费观看| 日韩成人一级大片| 日韩丝袜美女视频| xxxx一级片| 日韩亚洲在线| 欧美国产日韩一区二区三区| 午夜爽爽视频| 久久精品国产秦先生| 欧美一级大片在线观看| 金瓶狂野欧美性猛交xxxx| 高清视频在线观看三级| 岛国av午夜精品| 免费一级电影| 九色porny蝌蚪视频在线观看| japanese色国产在线看视频| 久久久久青草大香线综合精品| 91精品久久久久久久| 欧美一区二区三区婷婷| 欧美色图在线观看| 亚洲成人av免费看| 国产精品一区在线| 国产欧美日韩综合一区在线观看| 欧美黄页在线免费观看| 大桥未久女教师av一区二区| 色一情一区二区三区四区| 九色|91porny| 亚洲美腿欧美偷拍| 蜜桃麻豆www久久国产精品| 荡女精品导航| 久久精品99久久久香蕉| 丰满的护士2在线观看高清| 91高清视频在线| 色视频在线看| 一区二区三区免费在线观看| 日本男人操女人| 日本成人片在线| 成人免费一区| 影音先锋成人资源网站| 亚洲精品国产suv一区88| 老司机精品视频在线观看6| 成人毛片视频在线观看| 91深夜福利视频| 国产成人影院| 国产精品久久久久久中文字| 国产精品男女| 国产精品久久久久免费a∨| 嫩草一区二区三区| 国产精品久久久久久久电影| 久久综合另类图片小说| 欧美黄色www| 97影院秋霞午夜在线观看| 精品免费一区二区| 美女主播精品视频一二三四| 在线观看你懂得| 欧美成人自拍| 国产亚洲福利一区|