1.概述

  目前,隨著大數(shù)據(jù)的浪潮,Kafka 被越來越多的企業(yè)所認(rèn)可,如今的Kafka已發(fā)展到0.10.x,其優(yōu)秀的特性也帶給我們解決實際業(yè)務(wù)的方案。對于數(shù)據(jù)分流來說,既可以分流到離線存儲平臺(HDFS),離線計算平臺(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量數(shù)據(jù)查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數(shù)據(jù)到 ElasticSearch。

2.內(nèi)容

  我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負(fù)責(zé)存儲,Logstash負(fù)責(zé)收集數(shù)據(jù)來源,Kibana負(fù)責(zé)可視化數(shù)據(jù),分工明確。想要分流Kafka中的消息數(shù)據(jù),可以使用Logstash的插件直接消費,但是需要我們編寫復(fù)雜的過濾條件,和特殊的映射處理,比如系統(tǒng)保留的`_uid`字段等需要我們額外的轉(zhuǎn)化。今天我們使用另外一種方式來處理數(shù)據(jù),使用Kafka的消費API和ES的存儲API來處理分流數(shù)據(jù)。通過編寫Kafka消費者,消費對應(yīng)的業(yè)務(wù)數(shù)據(jù),將消費的數(shù)據(jù)通過ES存儲API,通過創(chuàng)建對應(yīng)的索引的,存儲到ES中。其流程如下圖所示:

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動開發(fā)培訓(xùn)

  上圖可知,消費收集的數(shù)據(jù),通過ES提供的存儲接口進(jìn)行存儲。存儲的數(shù)據(jù),這里我們可以規(guī)劃,做定時調(diào)度。最后,我們可以通過Kibana來可視化ES中的數(shù)據(jù),對外提供業(yè)務(wù)調(diào)用接口,進(jìn)行數(shù)據(jù)共享。

3.實現(xiàn)

  下面,我們開始進(jìn)行實現(xiàn)細(xì)節(jié)處理,這里給大家提供實現(xiàn)的核心代碼部分,實現(xiàn)代碼如下所示:

3.1 定義ES格式

  我們以插件的形式進(jìn)行消費,從Kafka到ES的數(shù)據(jù)流向,只需要定義插件格式,如下所示:

iOS培訓(xùn),Swift培訓(xùn),蘋果開發(fā)培訓(xùn),移動開發(fā)培訓(xùn)

{    "job": {        "content": {            "reader": {                "name": "kafka",                "parameter": {