MongoDB Kafka 連接器


Apache Kafka 是一種開源的發布/訂閱消息系統。Kafka Connect,Apache Kafka的一個元件,面對將Apache Kafka與各種數據存儲連接的挑戰,包括 MongoDB。Kafka Connect 提供:

  • 傳輸數據到數據存儲的容錯運行時
  • Apache Kafka社區共享連接 Apache Kafka 到不同數據存儲解決方案的框架

在這篇文章中,我們將重點介紹如何將 MongoDB 作為數據湖。 MongoDB Kafka 接收連接器是從 Apache Kafka 讀取數據並將其寫入 MongoDB 的 Kafka Connect 連接器。官方的 MongoDB Kafka 連接器可以在這裏找到。

開始 Kafka 環境

這裡下載最新版的 Kafka。

curl https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz -o kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0

按照正確的順序運行以下命令來開始所有的服務。首先開始 ZooKeeper 服務。

bin/zookeeper-server-start.sh config/zookeeper.properties

在另一個終端會話中,開始 Kafka 代理服務:

bin/kafka-server-start.sh config/server.properties

所有服務成功啟動後,您將會擁有一個運行中的 Kafka 基礎環境。

安裝插件

這裡下載 JAR 文件,並導航至 /libs 目錄。

curl -L https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar -o plugin/mongo-kafka-connect-1.7.0-all.jar

編輯 config/connect-standalone.properties,並將 plugin.path 指向下載的 JAR 文件。

plugin.path=/home/ubuntu/kafka_2.13-3.2.0/libs/mongo-kafka-connect-1.7.0-all.jar

創建配置屬性

/config 文件夾中,創建一個名為 MongoSinkConnector.properties 的文件。

name=mongo-sink
topics=quickstart.sampleData
connector.class=com.mongodb.kafka.connect.MongoSinkConnector

消息類型

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

關於 MongoDB Sink 連接器的具體配置

connection.url=mongodb://localhost:27017
database=quickstart
collection=topicData
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

/config 文件夾中,創建一個名為 MongoSourceConnector.properties 的文件。

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector

連接和源配置

connection.uri=mongodb://localhost:27017
database=quickstart
collection=sampleData

安裝 MongoDB

運行以下命令導入 MongoDB 的公開 GPG 鑰匙:

wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -

創建 MongoDB 源列表

echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list

更新本地軟件包數據庫

sudo apt-get update

安裝 MongoDB 套件

sudo apt-get install -y mongodb-org

如果遇到任何與未滿足的依賴項相關的錯誤,使用以下命令修復它們:

echo "deb http://security.ubuntu.com/ubuntu impish-security main" | sudo tee /etc/apt/sources.list.d/impish-security.list
sudo apt-get update
sudo apt-get install libssl1.1

驗證 MongoDB 狀態

檢查 MongoDB 是否已成功啟動:

sudo systemctl status mongod

如果它是非活動的並需要重新啟動,運行:

sudo systemctl restart mongod

開始 Kafka Connect

要開始 Kafka Connect,執行以下命令:

bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties config/MongoSinkConnector.properties

將數據寫入 Topic

運行控制台生產者客戶端以將幾個事件寫入您的 Topic。您輸入的每行將導致一個單獨的事件被寫入 Topic。

$ bin/kafka-console-producer.sh --topic connect-test --bootstrap-server localhost:9092
This is my first event
This is my second event

通過您的連接器發送文件內容

要將文檔的內容通過您的連接器發送,插入一個文檔到您的源連接器從中讀取數據的 MongoDB 集合。使用以下 MongoDB shell 命令:

use quickstart
db.sampleData.insertOne({"hello":"world"})

插入文檔後,通過檢查 topicData 集合來驗證您的連接器是否已將變更處理。

db.topicData.find()

您應該會看到以下類似的輸出:

[
  {
    "_id": ObjectId(...),
    "hello": "world",
    "travel": "MongoDB Kafka Connector"
  }
]

參考

欲了解更多訊息,請參觀 MongoDB Kafka 連接器文檔