Timescale Cloud:性能、规模、企业级
自托管产品
MST
Apache Kafka 是一个分布式事件流平台,用于高性能数据管道、流式分析和数据集成。Apache Kafka Connect
是一种在 Apache Kafka® 和其他数据系统之间可扩展且可靠地传输数据的工具。Kafka Connect 是一个包含预编写和维护的 Kafka 生产者(源连接器)和 Kafka 消费者(接收器连接器)的生态系统,适用于数据库和消息代理等数据产品和平台。
本指南介绍了如何设置 Kafka 和 Kafka Connect,以将数据从 Kafka 主题流式传输到您的 Timescale Cloud 服务中。
要按照本页的步骤操作
创建目标Timescale Cloud 服务,并启用时序和分析功能。
您需要您的连接详细信息。此过程也适用于自托管的 TimescaleDB。
- Java 8 或更高版本
才能运行 Apache Kafka
要安装和配置 Apache Kafka
将 Kafka 二进制文件解压到本地文件夹
curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf -cd kafka_2.13-3.9.0从现在开始,您解压 Kafka 二进制文件的文件夹称为
<KAFKA_HOME>
。配置并运行 Apache Kafka
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties./bin/kafka-server-start.sh config/kraft/reconfig-server.properties使用
-daemon
标志在后台运行此进程。创建 Kafka 主题
在另一个终端窗口中,导航到 <KAFKA_HOME>,然后调用
kafka-topics.sh
并创建以下主题accounts
:发布 JSON 消息,这些消息由 timescale-sink 连接器消费并插入到您的 Timescale Cloud 服务中。deadletter
:存储导致错误且 Kafka Connect 工作程序无法处理的消息。
./bin/kafka-topics.sh \--create \--topic accounts \--bootstrap-server localhost:9092 \--partitions 10./bin/kafka-topics.sh \--create \--topic deadletter \--bootstrap-server localhost:9092 \--partitions 10测试您的主题是否正常工作
- 运行
kafka-console-producer
将消息发送到accounts
主题bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092 - 发送一些事件。例如,键入以下内容>Timescale Cloud>How Cool
- 在另一个终端窗口中,导航到 <KAFKA_HOME>,然后运行
kafka-console-consumer
来消费您刚刚发送的事件您将看到bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092Timescale CloudHow Cool
- 运行
保持这些终端打开,您稍后会使用它们来测试集成。
要设置 Kafka Connect 服务器、插件、驱动程序和连接器
安装 PostgreSQL 连接器
在另一个终端窗口中,导航到 <KAFKA_HOME>,然后下载并配置 PostgreSQL 接收器和驱动程序。
mkdir -p "plugins/camel-postgresql-sink-kafka-connector"curl https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.21.0/camel-postgresql-sink-kafka-connector-3.21.0-package.tar.gz \| tar -xzf - -C "plugins/camel-postgresql-sink-kafka-connector" --strip-components=1curl -H "Accept: application/zip" https://jdbc.postgresql.ac.cn/download/postgresql-42.7.5.jar -o "plugins/camel-postgresql-sink-kafka-connector/postgresql-42.7.5.jar"echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-distributed.properties"echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-standalone.properties"启动 Kafka Connect
export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*./bin/connect-standalone.sh config/connect-standalone.properties使用
-daemon
标志在后台运行此进程。验证 Kafka Connect 是否正在运行
在另一个终端窗口中,运行以下命令
curl http://localhost:8083您将看到类似以下内容
{"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"}
要准备 Timescale Cloud 服务以进行 Kafka 集成
连接到您的 Timescale Cloud 服务
创建一个超表来摄取 Kafka 事件
CREATE TABLE accounts (created_at TIMESTAMPTZ DEFAULT NOW(),name TEXT,city TEXT) WITH (tsdb.hypertable,tsdb.partition_column='created_at');如果您是自托管 TimescaleDB v2.19.3 及更早版本,请创建一个PostgreSQL 关系表
,然后使用create_hypertable进行转换。然后通过调用ALTER TABLE启用 hypercore。
要在 Apache Kafka 中创建 Timescale Cloud 接收器
创建连接配置
在运行 Kafka Connect 的终端中,按
Ctrl+C
停止进程。将以下配置写入
<KAFKA_HOME>/config/timescale-standalone-sink.properties
,然后用您的连接详细信息更新<properties>
。name=timescale-standalone-sinkconnector.class=org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnectorerrors.tolerance=allerrors.deadletterqueue.topic.name=deadlettertasks.max=10value.converter=org.apache.kafka.connect.storage.StringConverterkey.converter=org.apache.kafka.connect.storage.StringConvertertopics=accountscamel.kamelet.postgresql-sink.databaseName=<dbname>camel.kamelet.postgresql-sink.username=<user>camel.kamelet.postgresql-sink.password=<password>camel.kamelet.postgresql-sink.serverName=<host>camel.kamelet.postgresql-sink.serverPort=<port>camel.kamelet.postgresql-sink.query=INSERT INTO accounts (name,city) VALUES (:#name,:#city)使用新配置重启 Kafka Connect
export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-standalone-sink.properties
测试连接
要查看您的接收器,请在 GET 请求中查询
/connectors
路由curl -X GET http://localhost:8083/connectors您将看到
#["timescale-standalone-sink"]
要测试此集成,请向 accounts
主题发送一些消息。您可以使用 kafkacat 或 kcat 工具来完成此操作。
在运行
kafka-console-producer.sh
的终端中输入以下 JSON 字符串{"name":"Lola","city":"Copacabana"}{"name":"Holly","city":"Miami"}{"name":"Jolene","city":"Tennessee"}{"name":"Barbara Ann ","city":"California"}查看运行
kafka-console-consumer
的终端,以查看正在处理的消息。查询您的 Timescale Cloud 服务中
accounts
表的所有行SELECT * FROM accounts;您将看到类似以下内容
created_at name city 2025-02-18 13:55:05.147261+00 Lola 科帕卡巴纳 2025-02-18 13:55:05.216673+00 霍莉 迈阿密 2025-02-18 13:55:05.283549+00 乔琳 田纳西 2025-02-18 13:55:05.35226+00 芭芭拉·安 加利福尼亚
您已成功将 Apache Kafka 与 Timescale Cloud 集成。
关键词