中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

說說 MQ 之 Kafka(三)

2018-10-31    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

Kafka 副本和集群

在生產(chǎn)環(huán)境中,Kafka 總是以“集群+分區(qū)”方式運行的,以保證可靠性和性能。下面是一個3副本的 Kafka 集群實例。

首先,需要啟動3個 Kafka Broker,Broker 的配置文件分別如下,

broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9093
log.dirs=/tmp/kafka-logs-1
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9094
log.dirs=/tmp/kafka-logs-2

雖然每個 Broker 只配置了一個端口,實際上,Kafka 會多占用一個,可能是用來 Broker 之間的復(fù)制的。另外,3個 Broker 都配置了,

zookeeper.connect=localhost:2181
delete.topic.enable=true

在同一個 Zookeeper 上的 Broker 會被歸類到一個集群中。注意,這些配置中并沒有指定哪一個 Broker 是主節(jié)點,哪些 Broker 是從節(jié)點,Kafka 采用的辦法是從可選的 Broker 中,選出每個分區(qū)的 Leader。也就是說,對某個 Topic 來說,可能0節(jié)點是 Leader,另外一些 Topic,可能1節(jié)點是 Leader;甚至,如果 topic1 有2個分區(qū)的話,分區(qū)1的 Leader 是0節(jié)點,分區(qū)2的 Leader 是1節(jié)點。

這種對等的設(shè)計,對于故障恢復(fù)是十分有用的,在節(jié)點崩潰的時候,Kafka 會自動選舉出可用的從節(jié)點,將其升級為主節(jié)點。在崩潰的節(jié)點恢復(fù),加入集群之后,Kafka 又會將這個節(jié)點加入到可用節(jié)點,并自動選舉出新的主節(jié)點。

實驗如下,先新建一個3副本,2分區(qū)的 Topic,

bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1

初始狀況下,topic1 的狀態(tài)如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

對于上面的輸出,即使沒有文檔,也可以看懂大概:topic1 有2個分區(qū),Partition 0 和 Partition 1,Leader 分別在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示處于同步狀態(tài)中的 Broker,如果有 Broker 宕機了,那么 Replicas 不會變,但是 Isr 會僅顯示沒有宕機的 Broker,詳見下面的實驗。

然后分2個線程,運行之前寫的 Producer 和 Consumer 的示例代碼,Producer 采用異步發(fā)送,消息采用同步復(fù)制。在有消息傳送的情況下,kill -9?停掉其中2個 Broker(Broker 0 和 Broker 1),模擬突然宕機。此時,topic1 狀態(tài)如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2

可見,Kafka 已經(jīng)選出了新的 Leader,消息傳送沒有中斷。接著再啟動被停掉的那兩個 Broker,并查看 topic1 的狀態(tài),如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0

可以發(fā)現(xiàn), 有一個短暫的時間,topic1 的兩個分區(qū)的 Leader 都是 Broker 2,但是在 Kafka 重新選舉之后,分區(qū)1的 Leader 變?yōu)?Broker 1。說明 Kafka 傾向于用不同的 Broker 做分區(qū)的 Leader,這樣更能達到負載均衡的效果。

再來看看 Producer 和 Consumer 的日志,下面這個片段是2個 Broker 宕機前后的日志,

......
Send     message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send     message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send     message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send     message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send     message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send     message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send     message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send     message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send     message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send     message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send     message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send     message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send     message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send     message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send     message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
......
Send     message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send     message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send     message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send     message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send     message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
......

出現(xiàn)錯誤的時候,Producer 拋出了?NetworkException?異常。其中有3589條 Received 日志,3583條 Send 日志,7條?NetworkException?異常日志,發(fā)送消息的最大序號是3590,接收消息的最大序號是3589,有以下幾個值得注意的地方,

  1. 宕機之前,消息的接收并不是順序的,這是因為 topic1 有2個分區(qū),Kafka 只保證分區(qū)上的有序;
  2. 宕機之后,出現(xiàn)了長段的發(fā)送日志而沒有接收日志,說明 Kafka 此時正在選舉,選舉的過程會阻塞消費者;
  3. 從接收消息的條數(shù)和序號來看,所有的消息都收到了,沒有丟(沒有收到3590的消息可能是因為強制退出 client 進程的原因),發(fā)送的過程的7個異常應(yīng)該只是虛警,7條異常對應(yīng)序號444~450,3583條 Send 消息再加上這7條,與總消息3590條一致;

從這個實驗中,可以看到,雖然 Kafka 不保證消息重復(fù)發(fā)送,但是卻在盡量保證沒有消息被重復(fù)發(fā)送,可能我的實驗場景還不夠極端,沒有做出消息重復(fù)的情況。

如之前所說,如果要保持完全順序性,需要使用單分區(qū);如果要避免拋出?NetworkException?異常,就使用 Producer 同步發(fā)送。下面,我們重做上面的例子,不同之處是使用單分區(qū)和 Producer 同步發(fā)送,截取一段 Broker 宕機時的日志如下,

......
Sent message: (118, Message_00118)
Received message: (00118, Message_00118) at offset 117
Received message: (00119, Message_00119) at offset 118
Sent message: (119, Message_00119)
Sent message: (120, Message_00120)
Received message: (00120, Message_00120) at offset 119
Sent message: (121, Message_00121)
Received message: (00121, Message_00121) at offset 120
Sent message: (122, Message_00122)
Sent message: (123, Message_00123)
Sent message: (124, Message_00124)
Sent message: (125, Message_00125)
Sent message: (126, Message_00126)
Sent message: (127, Message_00127)
......

可見,由于采用同步發(fā)送,Broker 宕機并沒有造成拋出異常,另外,由于使用單分區(qū),順序性也得到了保證,全局沒有出現(xiàn)亂序的情況。

綜上,是否使用多分區(qū)更多的是對順序性的要求,而使用 Producer 同步發(fā)送還是異步發(fā)送,更多是出于重復(fù)消息的考慮,如果異步發(fā)送拋出異常,在保證不丟消息的前提下,勢必要重發(fā)消息,這就會導(dǎo)致收到重復(fù)消息。多分區(qū)和 Producer 異步發(fā)送,會帶來性能的提升,但是也會引入非順序性,重復(fù)消息等問題,如何取舍要看應(yīng)用的需求。

Kafka 最佳實踐

Kafka 在一些應(yīng)用場景中,有一些前人總結(jié)的最佳實踐?8?9。對最佳實踐,我的看法是,對于自己比較熟悉,有把握的部分,可以按自己的步驟進行;對一些自己不清楚的領(lǐng)域,可以借鑒其中的一些內(nèi)容,至少不會錯的特別厲害。有文章10說,Kafka 在分區(qū)比較多的時候,相應(yīng)時間會變長,這個現(xiàn)象值得在實踐中注意。

后記

在 Kafka 與 RocketMQ 的對比中,RocketMQ 的一個核心功能就是可以支持同步刷盤,此時,即使突然斷電,也可以保證消息不丟;而 Kafka 采用的是異步刷盤,即使返回寫入成功,也只是寫入緩沖區(qū)成功,并非已經(jīng)持久化。因此,如果出現(xiàn)斷電或?kill -9?的情況,Kafka 內(nèi)存中的消息可能丟失。另外,同步刷盤的效率是比較低下的,一般生產(chǎn)中估計也不會使用,可以用優(yōu)雅關(guān)閉的方式來關(guān)閉進程。如果不考慮這些極端情況的話,Kafka 基本是一個很可靠的消息中間件。

參考文章

  1. http://kafka.apache.org/documentation.html?
  2. http://www.jianshu.com/p/453c6e7ff81c?
  3. http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章?
  4. http://developer.51cto.com/art/201501/464491.htm?
  5. https://segmentfault.com/q/1010000004292925?
  6. http://www.cnblogs.com/gnivor/p/5318319.html?
  7. http://www.cnblogs.com/davidwang456/p/4313784.html?
  8. http://www.jianshu.com/p/8689901720fd?
  9. http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/?
  10. http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/?

標(biāo)簽: 代碼

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:Git內(nèi)部原理之Git引用

下一篇:說說 MQ 之 Kafka(二)