Kafka 0.9版本開始推出了Java版本的consumer,優(yōu)化了coordinator的設(shè)計以及擺脫了對zookeeper的依賴。社區(qū)最近也在探討正式用這套consumer API替換Scala版本的consumer的計劃。鑒于目前這方面的資料并不是很多,本文將嘗試給出一個利用KafkaConsumer編寫的多線程消費者實例,希望對大家有所幫助。
這套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的單線程使用方法官網(wǎng)API已有介紹,這里不再贅述了。因此,我們直奔主題——討論一下如何創(chuàng)建多線程的方式來使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是線程安全的,因此我們鼓勵用戶在多個線程中共享一個KafkaProducer實例,這樣通常都要比每個線程維護一個KafkaProducer實例效率要高。但對于KafkaConsumer而言,它不是線程安全的,所以實現(xiàn)多線程時通常由兩種實現(xiàn)方法:
1 每個線程維護一個KafkaConsumer
2 維護一個或多個KafkaConsumer,同時維護多個事件處理線程(worker thread)
當然,這種方法還可以有多個變種:比如每個worker線程有自己的處理隊列。consumer根據(jù)某種規(guī)則或邏輯將消息放入不同的隊列。不過總體思想還是相同的,故這里不做過多展開討論了。
下表總結(jié)了兩種方法的優(yōu)缺點:
優(yōu)點 | 缺點 | |
方法1(每個線程維護一個KafkaConsumer) | 方便實現(xiàn) 速度較快,因為不需要任何線程間交互 易于維護分區(qū)內(nèi)的消息順序 | 更多的TCP連接開銷(每個線程都要維護若干個TCP連接) consumer數(shù)受限于topic分區(qū)數(shù),擴展性差 頻繁請求導致吞吐量下降 線程自己處理消費到的消息可能會導致超時,從而造成rebalance |
方法2 (單個(或多個)consumer,多個worker線程) | 可獨立擴展consumer數(shù)和worker數(shù),伸縮性好 | 實現(xiàn)麻煩 通常難于維護分區(qū)內(nèi)的消息順序 處理鏈路變長,導致難以保證提交位移的語義正確性 |
下面我們分別實現(xiàn)這兩種方法。需要指出的是,下面的代碼都是最基本的實現(xiàn),并沒有考慮很多編程細節(jié),比如如何處理錯誤等。
方法1
網(wǎng)友評論