国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Kafka實時數據即席查詢應用與實踐

大數據
在定位一些實時數據的Case時,如果沒有對實時數據進行歷史歸檔,在排查問題時,沒有日志追述,會很難定位是哪個環節的問題。因此,我們需要對處理的這些實時數據進行記錄歸檔并存儲。

一、背景

Kafka中的實時數據是以Topic的概念進行分類存儲,而Topic的數據是有一定時效性的,比如保存24小時、36小時、48小時等。

二、內容

2.1 案例分析

這里以i視頻和vivo短視頻實時數據為例,之前存在這樣的協作問題:

數據上游內容方提供實時Topic(存放i視頻和vivo短視頻相關實時數據),數據側對實時數據進行邏輯處理后,發送給下游工程去建庫實時索引,當任務執行一段時間后,工程側建索引偶爾會提出數據沒有發送過去的Case,前期由于沒有對數據做存儲,在定位問題的時候會比較麻煩,經常需求查看實時日志,需要花費很長的時間來分析這些Case是出現在哪個環節。

為了解決這個問題,我們可以將實時Topic中的數據,在發送給其他Topic的時候,添加跟蹤機制,進行數據分流,Sink到存儲介質(比如HDFS、Hive等)。這里,我們選擇使用Hive來進行存儲,主要是查詢方便,支持SQL來快速查詢。如下圖所示:

圖片

在實現優化后的方案時,有兩種方式可以實現跟蹤機制,它們分別是Flink SQL寫Hive、Flink DataStream寫Hive。接下來,分別對這兩種實現方案進行介紹和實踐。

2.2 方案一:Flink SQL寫Hive

這種方式比較直接,可以在Flink任務里面直接操作實時Topic數據后,將消費后的數據進行分流跟蹤,作為日志記錄寫入到Hive表中,具體實現步驟如下:

  • 構造Hive Catalog;
  • 創建Hive表;
  • 寫入實時數據到Hive表。

2.2.1 構造Hive Catalog

在構造Hive Catalog時,需要初始化Hive的相關信息,部分代碼片段如下所示:

// 設置執行環境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
 // 構造 Hive Catalog 名稱
 String name = "video-hive-catalog";
 // 初始化數據庫名
 String defaultDatabase = "comsearch";
 // Hive 配置文件路徑地址
 String hiveConfDir = "/appcom/hive/conf";
 // Hive 版本號
 String version = "3.1.2";
 // 實例化一個 HiveCatalog 對象
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 // 注冊HiveCatalog
 tEnv.registerCatalog(name, hive);
 // 設定當前 HiveCatalog
 tEnv.useCatalog(name);
 // 設置執行SQL為Hive
 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 // 使用數據庫
 tEnv.useDatabase("db1");

在以上代碼中,我們首先設置了 Flink 的執行環境和表環境,然后創建了一個 HiveCatalog,并將其注冊到表環境中。

2.2.2 創建Hive表

如果Hive表不存在,可以通過在程序中執行建表語句,具體SQL見表語句代碼如下所示:

-- 創建表語句 
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
  `content_id` string,
  `status` int)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'auto-compaction'='true',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")

在創建Hive表時我們使用了IF NOT EXISTS關鍵字,如果Hive中該表不存在會自動在Hive上創建,也可以提前在Hive中創建好該表,Flink SQL中就無需再執行建表SQL,因為用了Hive的Catalog,Flink SQL運行時會找到表。這里,我們設置了auto-compaction屬性為true,用來使小文件自動合并,1.12版的新特性,解決了實時寫Hive產生的小文件問題。同時,指定metastore值是專門用于寫入Hive的,也需要指定success-file值,這樣CheckPoint觸發完數據寫入磁盤后會創建_SUCCESS文件以及Hive metastore上創建元數據,這樣Hive才能夠對這些寫入的數據可查。

2.2.3 寫入實時數據到Hive表

在準備完成2.2.1和2.2.2中的步驟后,接下來就可以在Flink任務中通過SQL來對實時數據進行操作了,具體實現代碼片段如下所示:

// 編寫業務SQL
 String insertSql = "insert into  xxx_table SELECT content_id, status, " +
                    " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt";
 // 執行 Hive SQL
 tEnv.executeSql(insertSql);
 // 執行任務
 env.execute();

將消費后的數據進行分類,編寫業務SQL語句,將消費的數據作為日志記錄,發送到Hive表進行存儲,這樣Kafka中的實時數據就存儲到Hive了,方便使用Hive來對Kafka數據進行即席分析。

2.2.4 避坑技巧

使用這種方式在處理的過程中,如果配置使用的是EventTime,在程序中配置'sink.partition-commit.trigger'='partition-time',最后會出現無法提交分區的情況。經過對源代碼PartitionTimeCommitTigger的分析,找到了出現這種異常情況的原因。

我們可以通過看

org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions

中的一個函數,來說明具體的問題,部分源代碼片段如下:

// PartitionTimeCommitTigger源代碼函數代碼片段
@Override
public List<String> committablePartitions(long checkpointId) {
 if (!watermarks.containsKey(checkpointId)) {
  throw new IllegalArgumentException(String.format(
    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
    checkpointId, watermarks));
 }
 long watermark = watermarks.get(checkpointId);
 watermarks.headMap(checkpointId, true).clear();
 
 List<String> needCommit = new ArrayList<>();
 Iterator<String> iter = pendingPartitions.iterator();
 while (iter.hasNext()) {
  String partition = iter.next();
  // 通過分區的值來獲取分區的時間
  LocalDateTime partTime = extractor.extract(
    partitionKeys, extractPartitionValues(new Path(partition)));
  // 判斷水印是否大于分區創建時間+延遲時間
  if (watermark > toMills(partTime) + commitDelay) {
   needCommit.add(partition);
   iter.remove();
  }
 }
 return needCommit;
}

通過分析上述代碼片段,我們可以知道系統通過分區值來抽取相應的分區來創建時間,然后進行比對,比如我們設置的時間 pattern 是 '$dt $h:$m:00' , 某一時刻我們正在往 /2022-02-26/18/20/ 這個分區下寫數據,那么程序根據分區值,得到的 pattern 將會是2022-02-26 18:20:00,這個值在SQL中是根據 DATA_FORMAT 函數獲取的。

而這個值是帶有時區的,比如我們的時區設置為東八區,2022-02-26 18:20:00這個時間是東八區的時間,換成標準 UTC 時間是減去8個小時,也就是2022-02-26 10:20:00,而在源代碼中的 toMills 函數在處理這個東八區的時間時,并沒有對時區進行處理,把這個其實應該是東八區的時間當做了 UTC 時間來處理,這樣計算出來的值就比實際值大8小時,導致一直沒有觸發分區的提交。

如果我們在數據源中構造的分區是 UTC 時間,也就是不帶分區的時間,那么這個邏輯就是沒有問題的,但是這樣又不符合我們的實際情況,比如對于分區2022-02-26 18:20:00,我希望我的分區肯定是東八區的時間,而不是比東八區小8個小時的UTC時間2022-02-26 10:20:00。

在明白了原因之后,我們就可以針對上述異常情況進行優化我們的實現方案,比如自定義一個分區類、或者修改缺省的時間分區類。比如,我們使用TimeZoneTableFunction類來實現一個自定義時區,部分參考代碼片段如下:

public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
  private transient DateTimeFormatter formatter;
  private String timeZoneId;
  public CustomTimeZoneTableFunction(String timeZoneId) {
    this.timeZoneId = timeZoneId;
  }
  @Override
  public void open(FunctionContext context) throws Exception {
    // 初始化 DateTimeFormatter 對象
    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
    formatter = formatter.withZone(ZoneId.of(timeZoneId));
  }
  @Override
  public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
    // 將時間戳轉換為 LocalDateTime 對象
    LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
    // 將 LocalDateTime 對象轉換為指定時區下的 LocalDateTime 對象
    LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
    // 將 LocalDateTime 對象轉換為 TimestampWithTimeZone 對象,并輸出到下游
    out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
  }
}

2.3 方案二:Flink DataStream寫Hive

在一些特殊的場景下,Flink SQL如果無法實現我們復雜的業務需求,那么我們可以考慮使用Flink DataStream寫Hive這種實現方案。比如如下業務場景,現在需要實現這樣一個業務需求,內容方將實時數據寫入到Kafka消息隊列中,然后由數據側通過Flink任務消費內容方提供的數據源,接著對消費的數據進行分流處理(這里的步驟和Flink SQL寫Hive的步驟類似),每分鐘進行存儲到HDFS(MapReduce任務需要計算和重跑HDFS數據),然后通過MapReduce任務將HDFS上的這些日志數據生成Hive所需要格式,最后將這些Hive格式數據文件加載到Hive表中。實現Kafka數據到Hive的即席分析功能,具體實現流程細節如下圖所示:

圖片

具體核心實現步驟如下:

  • 消費內容方Topic實時數據;
  • 生成數據預處理策略;
  • 加載數據;
  • 使用Hive SQL對Kafka數據進行即席分析。

2.3.1 消費內容方Topic實時數據

編寫消費Topic的Flink代碼,這里不對Topic中的數據做邏輯處理,在后面統一交給MapReduce來做數據預處理,直接消費并存儲到HDFS上。具體實現代碼如下所示:

public class Kafka2Hdfs {
    public static void main(String[] args) {
        // 判斷參數是否有效
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        // 初始化Kafka連接地址和HDFS存儲地址以及Flink并行度
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);
 
        // 實例化一個Flink任務對象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Flink消費Topic中的數據
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
        // 實例化一個HDFS存儲對象
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
        // 自定義存儲到HDFS上的文件名,用小時和分鐘來命名,方便后面算策略
        sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
        // 設置存儲HDFS的文件大小和存儲文件時間頻率
        sink.setBatchSize(1024 * 1024 * 4);
        sink.setBatchRolloverInterval(1000 * 30);
        transction.addSink(sink);
        env.execute("Kafka2Hdfs");
    }
    // 初始化Kafka對象連接信息
    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

注意事項:

  • 這里我們把時間窗口設置小一些,每30s做一次Checkpoint,如果該批次的時間窗口沒有數據過來,就生成一個文件落地到HDFS上;
  • 另外,我們重寫了Bucketer為DateTimeBucketer,邏輯并不復雜,在原有的方法上加一個年-月-日/時-分的文件生成路徑,例如在HDFS上的生成路徑:xxxx/2022-02-26/00-00。

具體DateTimeBucketer實現代碼如下所示:

public class DateMinuteBucketer implements Bucketer<String> {
    private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
    }
}

2.3.2 生成數據預處理策略

這里,我們需要對落地到HDFS上的文件進行預處理,處理的邏輯是這樣的。比如,現在是2022-02-26 14:00,那么我們需要將當天的13:55,13:56,13:57,13:58,13:59這最近5分鐘的數據處理到一起,并加載到Hive的最近5分鐘的一個分區里面去。那么,我們需要生成這樣一個邏輯策略集合,用HH-mm作為key,與之最近的5個文件作為value,進行數據預處理合并。具體實現代碼步驟如下:

  • 步驟一:獲取小時循環策略;
  • 步驟二:獲取分鐘循環策略;
  • 步驟三:判斷是否為5分鐘的倍數;
  • 步驟四:對分鐘級別小于10的數字做0補齊(比如9補齊后變成09);
  • 步驟五:對小時級別小于10的數字做0補齊(比如1補齊后變成01);
  • 步驟六:生成時間范圍;
  • 步驟七:輸出結果。

其中,主要的邏輯是在生成時間范圍的過程中,根據小時和分鐘數的不同情況,生成不同的時間范圍,并輸出結果。在生成時間范圍時,需要注意前導0的處理,以及特殊情況(如小時為0、分鐘為0等)的處理。最后,將生成的時間范圍輸出即可。

根據上述步驟編寫對應的實現代碼,生成當天所有日期命名規則,預覽部分結果如下:

圖片

需要注意的是,如果發生了第二天00:00,那么我們需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55這5個文件中的數據來做預處理。

2.3.3 加載數據

在完成2.3.1和2.3.2里面的內容后,接下來,我們可以使用Hive的load命令直接加載HDFS上預處理后的文件,把數據加載到對應的Hive表中,具體實現命令如下:

-- 加載數據到Hive表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')

2.3.4 即席分析

之后,我們使用Hive SQL來對Kafka數據進行即席分析,示例SQL如下所示:

-- 查詢某5分鐘分區數據
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'

2.4 Flink SQL與 Flink DataStream如何選擇

Flink SQL 和 Flink DataStream 都是 Flink 中用于處理數據的核心組件,我們可以根據自己實際的業務場景來選擇使用哪一種組件。

Flink SQL 是一種基于 SQL 語言的數據處理引擎,它可以將 SQL 查詢語句轉換為 Flink 的數據流處理程序。相比于 Flink DataStream,Flink SQL 更加易于使用和維護,同時具有更快的開發速度和更高的代碼復用性。Flink SQL 適用于需要快速開發和部署數據處理任務的場景,比如數據倉庫、實時報表、數據清洗等。

Flink DataStream API是Flink數據流處理標準API,SQL是Flink后期版本提供的新的數據處理操作接口。SQL的引入為提高了Flink使用的靈活性。可以認為Flink SQL是一種通過字符串來定義數據流處理邏輯的描述語言。

因此,在選擇 Flink SQL 和 Flink DataStream 時,需要根據具體的業務需求和數據處理任務的特點來進行選擇。如果需要快速開發和部署任務,可以選擇使用 Flink SQL;如果需要進行更為深入和定制化的數據處理操作,可以選擇使用 Flink DataStream。同時,也可以根據實際情況,結合使用 Flink SQL 和 Flink DataStream 來完成復雜的數據處理任務。

三、 總結

在實際應用中,Kafka實時數據即席查詢可以用于多種場景,如實時監控、實時報警、實時統計、實時分析等。具體應用和實踐中,需要注意以下幾點:

  • 數據質量:Kafka實時數據即席查詢需要保證數據質量,避免數據重復、丟失或錯誤等問題,需要進行數據質量監控和調優。
  • 系統復雜性:Kafka實時數據即席查詢需要涉及到多個系統和組件,包括Kafka、數據處理引擎(比如Flink)、查詢引擎(比如Hive)等,需要對系統進行配置和管理,增加了系統的復雜性。
  • 安全性:Kafka實時數據即席查詢需要加強數據安全性保障,避免數據泄露或數據篡改等安全問題,做好Hive的權限管控。
  • 性能優化:Kafka實時數據即席查詢需要對系統進行性能優化,包括優化數據處理引擎、查詢引擎等,提高系統的性能和效率。
責任編輯:龐桂玉 來源: vivo互聯網技術
相關推薦

2023-10-13 07:25:50

2021-07-22 18:29:58

AI

2023-08-29 10:20:00

2022-05-23 13:30:48

數據胡實踐

2023-05-06 07:19:48

數倉架構技術架構

2023-12-11 08:00:00

架構FlinkDruid

2021-11-30 07:49:00

大數據工具 Presto

2018-08-23 07:40:58

Spark流處理數據流

2022-07-14 15:29:26

數據庫實踐

2023-10-11 14:37:21

工具開發

2021-07-29 08:00:00

開源數據技術

2021-09-13 13:46:29

Apache HudiB 站數據湖

2022-06-27 09:09:34

快手Flink數倉建設

2025-05-20 10:03:59

數據倉庫Flink SQLPaimon

2023-07-27 07:44:07

云音樂數倉平臺

2021-08-31 10:18:34

Flink 數倉一體快手

2022-07-07 10:19:05

數據畫像

2024-12-19 09:45:24

2020-04-28 11:04:51

數據架構互聯網Flink

2020-05-29 17:10:15

數據架構數據一切數據體系
點贊
收藏

51CTO技術棧公眾號

国产精品成人午夜| 欧美午夜不卡影院在线观看完整版免费| 亚洲美女区一区| 久久久无码中文字幕久...| 伊人色综合久久| 欧美日韩在线网站| 国产日本欧美一区二区三区在线| 色哟哟国产精品免费观看| 亚洲色图15p| 国产丝袜精品视频| 免费人成在线观看视频播放| 国内精品99| 国产厕所精品在线观看| 激情五月色综合国产精品| 91视频免费在线观看| 国产一区二区精品在线观看| 亚洲黄页网在线观看| 国产午夜在线视频| 国产伦理精品不卡| 国内不卡一区二区三区| 日韩深夜福利| 欧美黑人国产人伦爽爽爽| 欧美三区四区| 日韩欧美一级精品久久| 天堂在线免费av| 国产精品午夜久久| av黄色免费在线| 欧美精品在线网站| 卡通动漫国产精品| 美女久久久久久久| 最近高清中文在线字幕在线观看| 亚洲高清免费视频| 先锋影音欧美性受| 欧美福利视频一区| 91一区二区三区在线| 亚洲成人黄色网址| sqte在线播放| 少妇久久久久久| 欧美极品在线观看| 国产精品裸体一区二区三区| 日韩高清在线不卡| 欧美深夜福利视频| 一区二区三区中文字幕电影| 国产尤物视频在线| 在线观看亚洲区| 成人3d精品动漫精品一二三| 国产日韩精品推荐| 久久久久久久久岛国免费| 四色永久免费网站| 欧美r级在线观看| 久久99精品久久久久久园产越南| 亚洲自拍偷拍第一页| 国产麻豆91精品| 波多野结衣天堂| 日韩一区二区不卡| 夜夜躁狠狠躁日日躁2021日韩| 日本在线观看不卡| 亚洲精选免费视频| 91麻豆精品国产91久久久更新资源速度超快| 国内精品久久影院| 丝袜脚交一区二区| 小香蕉视频在线| 欧美肥婆姓交大片| 久久99久久久久久久久久久| 裸体免费网站| 亚洲免费电影在线观看| 亚洲大全视频| 日本乱码一区二区三区不卡| 亚洲国产日韩欧美在线图片| 国产精品久久久久av蜜臀| 国产精品免费小视频| 怡红院成人在线| 高清欧美一区二区三区| 精品亚洲a∨| 日本精品久久久久久久| 乱人伦视频在线| 国产精品青草久久久久福利99| 青青国产91久久久久久| 黄色三级视频片| 精品久久久一区| 午夜激情一区| 国模精品娜娜一二三区| 日韩午夜电影| 欧美一区视久久| 国产经典欧美精品| 亚洲小视频在线播放| 免费在线观看精品| 91手机视频在线| 久久久综合激的五月天| 99re6在线视频| 亚洲女人初尝黑人巨大| 亚洲观看高清完整版在线观看| 在线中文字幕-区二区三区四区| 欧美疯狂xxxx大交乱88av| 美女www一区二区| 在线黄色av| 久久久久久网址| 乱一区二区av| 9色在线视频| 国产精品视频资源| 国产欧美中文在线| 欧美日韩大片| 久草热久草热线频97精品| 亚洲午夜精品17c| 老司机成人在线| 鲁一鲁一鲁一鲁一澡| 精品美女在线观看| 亚洲国产精品第一区二区三区 | 国产污视频在线| 欧美精品久久久久久久久久| 精品亚洲成av人在线观看| 午夜小视频在线| 波多野结衣成人在线| 亚洲h在线观看| 日韩影视在线观看| www日韩视频| 中文字幕亚洲欧美日韩2019| 精品一区二区三区影院在线午夜| 免费在线看黄| 黑人另类av| 欧美日韩一区二区三区高清| 久久亚洲在线| 天堂电影在线| 国产精品高清在线观看| 亚洲综合在线视频| 欧美亚洲国产精品久久| 色www免费视频| 亚洲18私人小影院| 欧美国产一区在线| 97久久亚洲| 免费高清成人| 66m—66摸成人免费视频| 国产婷婷色一区二区三区| 国语精品视频| 99热手机在线| 欧美影院在线播放| 一区二区欧美视频| 日韩av在线播放网址| 中文字幕高清在线观看| 亚洲自拍偷拍色片视频| 精品视频一区二区不卡| 亚洲一区二区毛片| 欧美一级鲁丝片| 成品人视频ww入口| 久久久久国产精品www| 中文字幕在线观看不卡| 日韩理论电影院| 欧美成人hd| 色哺乳xxxxhd奶水米仓惠香| 日韩在线观看av| 国产精品久久久久久久午夜片| 精品国产一级毛片| 在线视频自拍| 精品国产一区二区三区在线| 久久精品中文字幕免费mv| 中文字幕一区二区不卡| 小处雏高清一区二区三区| av在线之家电影网站| 亚洲欧美日韩精品久久久| 中文字幕日韩在线视频| 国产精品激情偷乱一区二区∴| 亚欧美无遮挡hd高清在线视频| 77777影视视频在线观看| 夜夜爽99久久国产综合精品女不卡 | 色噜噜久久综合| 国产精品毛片一区二区三区| 国产污视频在线播放| koreanbj精品视频一区| 国产精品久久久久国产a级| 在线综合视频播放| 97精品久久久久中文字幕| 国产欧美亚洲精品a| 二区三区在线观看| 日批视频在线免费看| 成人欧美一区二区三区黑人孕妇| 91精品国产一区二区三区蜜臀| 成人一区二区三区在线观看| 国产精品嫩模av在线| 午夜看片在线免费| 免费一级特黄特色毛片久久看| 国产精品久久久久久久久久尿 | 性欧美精品孕妇| 偷拍盗摄高潮叫床对白清晰| 国模精品视频一区二区| 欧美日韩中文精品| 久久亚洲欧美国产精品乐播 | 日本a级片电影一区二区| 欧美天堂一区二区三区| 99re视频精品| 亚洲成人在线| 日韩中文字幕视频网| 1区2区3区在线观看| 日日碰狠狠躁久久躁婷婷| 成人永久免费| 欧美成人在线影院| 欧美一区二区精品在线| 亚洲欧美一区二区三区国产精品| 性欧美长视频| 少妇精品久久久一区二区| 国产高清不卡|