中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

深入理解 Kafka Connect:轉(zhuǎn)換器和序列化

2018-12-08    來源:raincent

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

 

作者,Robin Moffatt,譯者,薛命燈

Kafka Connect 是一個簡單但功能強大的工具,可用于 Kafka 和其他系統(tǒng)之間的集成。人們對 Kafka Connect 最常見的誤解之一是它的轉(zhuǎn)換器。這篇文章將告訴我們?nèi)绾握_地使用消息的序列化格式,以及如何在 Kafka Connect 連接器中對其進行標準化。

Kafka Connect 是 Apache Kafka 的一部分,為其他數(shù)據(jù)存儲和 Kafka 提供流式集成。對于數(shù)據(jù)工程師來說,他們只需要配置一下 JSON 文件就可以了。Kafka 提供了一些可用于常見數(shù)據(jù)存儲的連接器,如 JDBC、Elasticsearch、IBM MQ、S3 和 BigQuery,等等。

對于開發(fā)人員來說,Kafka Connect 提供了豐富的 API,如果有必要還可以開發(fā)其他連接器。除此之外,它還提供了用于配置和管理連接器的 REST API。

Kafka Connect 是一種模塊化組件,提供了一種非常強大的集成方法。一些關(guān)鍵組件包括:

♦ 連接器——定義如何與數(shù)據(jù)存儲集成的 JAR 文件;

♦ 轉(zhuǎn)換器——處理數(shù)據(jù)的序列化和反序列化;

♦ 變換——可選的運行時消息操作。

人們對 Kafka Connect 最常見的誤解與數(shù)據(jù)的序列化有關(guān)。Kafka Connect 使用轉(zhuǎn)換器處理數(shù)據(jù)序列化。接下來讓我們看看它們是如何工作的,并說明如何解決一些常見問題。

Kafka 消息都是字節(jié)

Kafka 消息被保存在主題中,每條消息就是一個鍵值對。當(dāng)它們存儲在 Kafka 中時,鍵和值都只是字節(jié)。Kafka 因此可以適用于各種場景,但這也意味著開發(fā)人員需要決定如何序列化數(shù)據(jù)。

在配置 Kafka Connect 時,序列化格式是最關(guān)鍵的配置選項之一。你需要確保從主題讀取數(shù)據(jù)時使用的序列化格式與寫入主題的序列化格式相同,否則就會出現(xiàn)混亂和錯誤!

 

 

序列化格式有很多種,常見的包括:

♦ JSON;

♦ Avro;

♦ Protobuf;

♦ 字符串分隔(如 CSV)。

選擇序列化格式

選擇序列化格式的一些指導(dǎo)原則:

♦ schema。很多時候,你的數(shù)據(jù)都有對應(yīng)的 schema。你可能不喜歡,但作為開發(fā)人員,你有責(zé)任保留和傳播 schema。schema 為服務(wù)之間提供了一種契約。某些消息格式(例如 Avro 和 Protobuf)具有強大的 schema 支持,而其他消息格式支持較少(JSON)或根本沒有(CVS)。

♦ 生態(tài)系統(tǒng)兼容性。Avro 是 Confluent 平臺的一等公民,擁有來自 Confluent Schema Registry、Kafka Connect、KSQL 的原生支持。另一方面,Protobuf 依賴社區(qū)為部分功能提供支持。

♦ 消息大小。JSON 是純文本的,并且依賴了 Kafka 本身的壓縮機制,Avro 和 Protobuf 都是二進制格式,序列化的消息體積更小。

♦ 語言支持。Avro 在 Java 領(lǐng)域得到了強大的支持,但如果你的公司不是基于 Java 的,那么可能會覺得它不太好用。

如果目標系統(tǒng)使用 JSON,Kafka 主題也必須使用 JSON 嗎?

完全不需要這樣。從數(shù)據(jù)源讀取數(shù)據(jù)或?qū)?shù)據(jù)寫入外部數(shù)據(jù)存儲的格式不需要與 Kafka 消息的序列化格式一樣。

Kafka Connect 中的連接器負責(zé)從源數(shù)據(jù)存儲(例如數(shù)據(jù)庫)獲取數(shù)據(jù),并以數(shù)據(jù)內(nèi)部表示將數(shù)據(jù)傳給轉(zhuǎn)換器。然后,Kafka Connect 的轉(zhuǎn)換器將這些源數(shù)據(jù)對象序列化到主題上。

在使用 Kafka Connect 作為接收器時剛好相反——轉(zhuǎn)換器將來自主題的數(shù)據(jù)反序列化為內(nèi)部表示,傳給連接器,以便能夠使用特定于目標的適當(dāng)方法將數(shù)據(jù)寫入目標數(shù)據(jù)存儲。

也就是說,主題數(shù)據(jù)可以是 Avro 格式,當(dāng)你將數(shù)據(jù)寫入 HDFS 時,指定接收器的連接器使用 HDFS 支持的格式即可。

配置轉(zhuǎn)換器

Kafka Connect 默認使用了 worker 級別的轉(zhuǎn)換器配置,連接器可以對其進行覆蓋。由于在整個管道中使用相同的序列化格式通常會更好,所以一般只需要在 worker 級別設(shè)置轉(zhuǎn)換器,而不需要在連接器中指定。但你可能需要從別人的主題拉取數(shù)據(jù),而他們使了用不同的序列化格式——對于這種情況,你需要在連接器配置中設(shè)置轉(zhuǎn)換器。即使你在連接器的配置中進行了覆蓋,它仍然是執(zhí)行實際任務(wù)的轉(zhuǎn)換器。

好的連接器一般不會序列化或反序列化存儲在 Kafka 中的消息,它會讓轉(zhuǎn)換器完成這項工作。

 

 

請記住,Kafka 消息是鍵值對字節(jié),你需要使用 key.converter 和 value.converter 為鍵和值指定轉(zhuǎn)換器。在某些情況下,你可以為鍵和值使用不同的轉(zhuǎn)換器。

 

 

這是使用 String 轉(zhuǎn)換器的一個示例。

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

有些轉(zhuǎn)換器有一些額外的配置。對于 Avro,你需要指定 Schema Registry。對于 JSON,你需要指定是否希望 Kafka Connect 將 schema 嵌入到 JSON 消息中。在指定特定于轉(zhuǎn)換器的配置時,請始終使用 key.converter. 或 value.converter. 前綴。例如,要將 Avro 用于消息載荷,你需要指定以下內(nèi)容:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

常見的轉(zhuǎn)換器包括:

♦ Avro——來自 Confluent 的開源項目

io.confluent.connect.avro.AvroConverter

♦ String——Apache Kafka 的一部分

org.apache.kafka.connect.storage.StringConverter

♦ JSON——Apache Kafka 的一部分

org.apache.kafka.connect.json.JsonConverter

♦ ByteArray——Apache Kafka 的一部分

org.apache.kafka.connect.converters.ByteArrayConverter

♦ Protobuf——來自社區(qū)的開源項目

com.blueapron.connect.protobuf.ProtobufConverter

JSON 和 schema

雖然 JSON 默認不支持嵌入 schema,但 Kafka Connect 提供了一種可以將 schema 嵌入到消息中的特定 JSON 格式。由于 schema 被包含在消息中,因此生成的消息大小可能會變大。

如果你正在設(shè)置 Kafka Connect 源,并希望 Kafka Connect 在寫入 Kafka 消息包含 schema,可以這樣:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

生成的 Kafka 消息看起來像下面這樣,其中包含 schema 和 payload 節(jié)點元素:

請注意消息的大小,消息由 playload 和 schema 組成。每條消息中都會重復(fù)這些數(shù)據(jù),這也就是為什么說 Avro 這樣的格式會更好,因為它的 schema 是單獨存儲的,消息中只包含載荷(并進行了壓縮)。

如果你正在使用 Kafka Connect 消費 Kafka 主題中的 JSON 數(shù)據(jù),那么就需要知道數(shù)據(jù)是否包含了 schema。如果包含了,并且它的格式與上述的格式相同,那么你可以這樣設(shè)置:

不過,如果你正在消費的 JSON 數(shù)據(jù)如果沒有 schema 加 payload 這樣的結(jié)構(gòu),例如:

那么你必須通過設(shè)置 schemas.enable = false 告訴 Kafka Connect 不要查找 schema:

和之前一樣,轉(zhuǎn)換器配置選項(這里是 schemas.enable)需要使用前綴 key.converter 或 value.converter。

常見錯誤

如果你錯誤地配置了轉(zhuǎn)換器,將會遇到以下的一些常見錯誤。這些消息將顯示在你為 Kafka Connect 配置的接收器中,因為你試圖在接收器中反序列化 Kafka 消息。這些錯誤會導(dǎo)致連接器失敗,主要錯誤消息如下所示:

在錯誤消息的后面,你將看到堆棧信息,描述了發(fā)生錯誤的原因。請注意,對于連接器中的任何致命錯誤,都會拋出上述異常,因此你可能會看到與序列化無關(guān)的錯誤。要快速查看錯誤配置可能會導(dǎo)致的錯誤,請參考下表:

 

 

問題:使用 JsonConverter 讀取非 JSON 數(shù)據(jù)

如果你的源主題上有非 JSON 數(shù)據(jù),但嘗試使用 JsonConverter 讀取它,你將看到:

這有可能是因為源主題使用了 Avro 或其他格式。

解決方案:如果數(shù)據(jù)是 Avro 格式的,那么將 Kafka Connect 接收器的配置改為:

或者,如果主題數(shù)據(jù)是通過 Kafka Connect 填充的,那么你也可以這么做,讓上游源也發(fā)送 JSON 數(shù)據(jù):

問題:使用 AvroConverter 讀取非 Avro 數(shù)據(jù)

這可能是我在 Confluent Community 郵件組和 Slack 組等地方經(jīng)?吹降腻e誤。當(dāng)你嘗試使用 Avro 轉(zhuǎn)換器從非 Avro 主題讀取數(shù)據(jù)時,就會發(fā)生這種情況。這包括使用 Avro 序列化器而不是 Confluent Schema Registry 的 Avro 序列化器(它有自己的格式)寫入的數(shù)據(jù)。

解決方案:檢查源主題的序列化格式,修改 Kafka Connect 接收器連接器,讓它使用正確的轉(zhuǎn)換器,或?qū)⑸嫌胃袷角袚Q為 Avro。如果上游主題是通過 Kafka Connect 填充的,則可以按如下方式配置源連接器的轉(zhuǎn)換器:

問題:沒有使用預(yù)期的 schema/payload 結(jié)構(gòu)讀取 JSON 消息

如前所述,Kafka Connect 支持包含載荷和 schema 的 JSON 消息。如果你嘗試讀取不包含這種結(jié)構(gòu)的 JSON 數(shù)據(jù),你將收到這個錯誤:

需要說明的是,當(dāng) schemas.enable=true 時,唯一有效的 JSON 結(jié)構(gòu)需要包含 schema 和 payload 這兩個頂級元素(如上所示)。

如果你只有簡單的 JSON 數(shù)據(jù),則應(yīng)將連接器的配置改為:

如果要在數(shù)據(jù)中包含 schema,可以使用 Avro(推薦),也可以修改上游的 Kafka Connect 配置,讓它在消息中包含 schema:

故障排除技巧

查看 Kafka Connect 日志

要在 Kafka Connect 中查找錯誤日志,你需要找到 Kafka Connect 工作程序的輸出。這個位置取決于你是如何啟動 Kafka Connect 的。有幾種方法可用于安裝 Kafka Connect,包括 Docker、Confluent CLI、systemd 和手動下載壓縮包。你可以這樣查找日志的位置:

♦ Docker:docker logs container_name;

♦ Confluent CLI:confluent log connect;

♦ systemd:日志文件在 /var/log/confluent/kafka-connect;

其他:默認情況下,Kafka Connect 將其輸出發(fā)送到 stdout,因此你可以在啟動 Kafka Connect 的終端中找到它們。

查看 Kafka Connect 配置文件

♦ Docker——設(shè)置環(huán)境變量,例如在 Docker Compose 中:

♦ Confluent CLI——使用配置文件 etc/schema-registry/connect-avro-distributed.properties;

♦ systemd(deb/rpm)——使用配置文件 /etc/kafka/connect-distributed.properties;

♦ 其他——在啟動 Kafka Connect 時指定工作程序的屬性文件,例如:

檢查 Kafka 主題

假設(shè)我們遇到了上述當(dāng)中的一個錯誤,并且想要解決 Kafka Connect 接收器無法從主題讀取數(shù)據(jù)的問題。

我們需要檢查正在被讀取的數(shù)據(jù),并確保它使用了正確的序列化格式。另外,所有消息都必須使用這種格式,所以不要假設(shè)你現(xiàn)在正在以正確的格式向主題發(fā)送消息就不會出問題。Kafka Connect 和其他消費者也會從主題上讀取已有的消息。

下面,我將使用命令行進行故障排除,當(dāng)然也可以使用其他的一些工具:

♦ Confluent Control Center 提供了可視化檢查主題內(nèi)容的功能;

♦ KSQL 的 PRINT 命令將主題的內(nèi)容打印到控制臺;

♦ Confluent CLI 工具提供了 consume 命令,可用于讀取字符串和 Avro 數(shù)據(jù)。

如果你的數(shù)據(jù)是字符串或 JSON 格式

你可以使用控制臺工具,包括 kafkacat 和 kafka-console-consumer。我個人的偏好是使用 kafkacat:

你也可以使用 jq 驗證和格式化 JSON:

 如果你得到一些“奇怪的”字符,你查看的很可能是二進制數(shù)據(jù),這些數(shù)據(jù)是通過 Avro 或 Protobuf 寫入的:

如果你的數(shù)據(jù)是 Avro 格式

你應(yīng)該使用專為讀取和反序列化 Avro 數(shù)據(jù)而設(shè)計的控制臺工具。我使用的是 kafka-avro-console-consumer。確保指定了正確的 Schema Registry URL:

內(nèi)部轉(zhuǎn)換器

在分布式模式下運行時,Kafka Connect 使用 Kafka 來存儲有關(guān)其操作的元數(shù)據(jù),包括連接器配置、偏移量等。

可以通過 internal.key.converter/internal.value.converter 讓這些 Kafka 使用不同的轉(zhuǎn)換器。不過這些設(shè)置只在內(nèi)部使用,實際上從 Apache Kafka 2.0 開始就已被棄用。你不應(yīng)該更改這些配置,從 Apache Kafka 2.0 版開始,如果你這么做了將會收到警告。

將 schema 應(yīng)用于沒有 schema 的消息

很多時候,Kafka Connect 會從已經(jīng)存在 schema 的地方引入數(shù)據(jù),并使用合適的序列化格式(例如 Avro)來保留這些 schema。然后,這些數(shù)據(jù)的所有下游用戶都可以使用這些 schema。但如果沒有提供顯式的 schema 該怎么辦?

或許你正在使用 FileSourceConnector 從普通文件中讀取數(shù)據(jù)(不建議用于生產(chǎn)環(huán)境中,但可用于 PoC),或者正在使用 REST 連接器從 REST 端點提取數(shù)據(jù)。由于它們都沒有提供 schema,因此你需要聲明它。

有時候你只想傳遞你從源讀取的字節(jié),并將它們保存在一個主題上。但大多數(shù)情況下,你需要 schema 來使用這些數(shù)據(jù)。在攝取時應(yīng)用一次 schema,而不是將問題推到每個消費者,這才是一種更好的處理方式。

你可以編寫自己的 Kafka Streams 應(yīng)用程序,將 schema 應(yīng)用于 Kafka 主題中的數(shù)據(jù)上,當(dāng)然你也可以使用 KSQL。下面讓我們來看一下將 schema 應(yīng)用于某些 CSV 數(shù)據(jù)的簡單示例。

假設(shè)我們有一個 Kafka 主題 testdata-csv,保存著一些 CSV 數(shù)據(jù),看起來像這樣:

我們可以猜測它有三個字段,可能是:

♦ ID
♦ Artist
♦ Song

如果我們將數(shù)據(jù)保留在這樣的主題中,那么任何想要使用這些數(shù)據(jù)的應(yīng)用程序——無論是 Kafka Connect 接收器還是自定義的 Kafka 應(yīng)用程序——每次都需要都猜測它們的 schema 是什么。或者,每個消費應(yīng)用程序的開發(fā)人員都需要向提供數(shù)據(jù)的團隊確認 schema 是否發(fā)生變更。正如 Kafka 可以解耦系統(tǒng)一樣,這種 schema 依賴讓團隊之間也有了硬性耦合,這并不是一件好事。

因此,我們要做的是使用 KSQL 將 schema 應(yīng)用于數(shù)據(jù)上,并使用一個新的派生主題來保存 schema。這樣你就可以通過 KSQL 檢查主題數(shù)據(jù):

前兩個字段(11/6/18 2:41:23 PM UTC 和 NULL)分別是 Kafka 消息的時間戳和鍵。其余字段來自 CSV 文件,F(xiàn)在讓我們用 KSQL 注冊這個主題并聲明 schema:

可以通過查詢 KSQL 流來檢查數(shù)據(jù)是否符合預(yù)期。請注意,這個時候我們只是作為現(xiàn)有 Kafka 主題的消費者——并沒有更改或復(fù)制任何數(shù)據(jù)。

最后,創(chuàng)建一個新的 Kafka 主題,使用帶有 schema 的數(shù)據(jù)進行填充。KSQL 查詢是持續(xù)的,因此除了將現(xiàn)有的數(shù)據(jù)從源主題發(fā)送到目標主題之外,KSQL 還將向目標主題發(fā)送未來將生成的數(shù)據(jù)。

使用 Avro 控制臺消費者驗證數(shù)據(jù):

你甚至可以在 Schema Registry 中查看已注冊的 schema:

寫入原始主題(testdata-csv)的任何新消息都由 KSQL 自動處理,并以 Avro 格式寫入新的 TESTDATA 主題,F(xiàn)在,任何想要使用這些數(shù)據(jù)的應(yīng)用程序或團隊都可以使用 TESTDATA 主題。你還可以更改主題的分區(qū)數(shù)、分區(qū)鍵和復(fù)制系數(shù)。

英文原文:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

標簽: 數(shù)據(jù)庫

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:ApsaraDB-HBase介紹及案例分析

下一篇:Gartner 2019基礎(chǔ)設(shè)施和運維十大趨勢:Serverless、邊緣計算、SaaS 變復(fù)雜等