Timescale Cloud:性能、规模、企业级

自托管产品

MST

Apache Kafka 是一个分布式事件流平台,用于高性能数据管道、流式分析和数据集成。Apache Kafka Connect 是一种在 Apache Kafka® 和其他数据系统之间可扩展且可靠地传输数据的工具。Kafka Connect 是一个包含预编写和维护的 Kafka 生产者(源连接器)和 Kafka 消费者(接收器连接器)的生态系统,适用于数据库和消息代理等数据产品和平台。

本指南介绍了如何设置 Kafka 和 Kafka Connect,以将数据从 Kafka 主题流式传输到您的 Timescale Cloud 服务中。

要按照本页的步骤操作

要安装和配置 Apache Kafka

  1. 将 Kafka 二进制文件解压到本地文件夹

    curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf -
    cd kafka_2.13-3.9.0

    从现在开始,您解压 Kafka 二进制文件的文件夹称为 <KAFKA_HOME>

  2. 配置并运行 Apache Kafka

    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    ./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
    ./bin/kafka-server-start.sh config/kraft/reconfig-server.properties

    使用 -daemon 标志在后台运行此进程。

  3. 创建 Kafka 主题

    在另一个终端窗口中,导航到 <KAFKA_HOME>,然后调用 kafka-topics.sh 并创建以下主题

    • accounts:发布 JSON 消息,这些消息由 timescale-sink 连接器消费并插入到您的 Timescale Cloud 服务中。
    • deadletter:存储导致错误且 Kafka Connect 工作程序无法处理的消息。
    ./bin/kafka-topics.sh \
    --create \
    --topic accounts \
    --bootstrap-server localhost:9092 \
    --partitions 10
    ./bin/kafka-topics.sh \
    --create \
    --topic deadletter \
    --bootstrap-server localhost:9092 \
    --partitions 10
  4. 测试您的主题是否正常工作

    1. 运行 kafka-console-producer 将消息发送到 accounts 主题
      bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092
    2. 发送一些事件。例如,键入以下内容
      >Timescale Cloud
      >How Cool
    3. 在另一个终端窗口中,导航到 <KAFKA_HOME>,然后运行 kafka-console-consumer 来消费您刚刚发送的事件
      bin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092
      您将看到
      Timescale Cloud
      How Cool

保持这些终端打开,您稍后会使用它们来测试集成。

要设置 Kafka Connect 服务器、插件、驱动程序和连接器

  1. 安装 PostgreSQL 连接器

    在另一个终端窗口中,导航到 <KAFKA_HOME>,然后下载并配置 PostgreSQL 接收器和驱动程序。

    mkdir -p "plugins/camel-postgresql-sink-kafka-connector"
    curl https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.21.0/camel-postgresql-sink-kafka-connector-3.21.0-package.tar.gz \
    | tar -xzf - -C "plugins/camel-postgresql-sink-kafka-connector" --strip-components=1
    curl -H "Accept: application/zip" https://jdbc.postgresql.ac.cn/download/postgresql-42.7.5.jar -o "plugins/camel-postgresql-sink-kafka-connector/postgresql-42.7.5.jar"
    echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-distributed.properties"
    echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-standalone.properties"
  2. 启动 Kafka Connect

    export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*
    ./bin/connect-standalone.sh config/connect-standalone.properties

    使用 -daemon 标志在后台运行此进程。

  3. 验证 Kafka Connect 是否正在运行

    在另一个终端窗口中,运行以下命令

    curl http://localhost:8083

    您将看到类似以下内容

    {"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"J-iy4IGXTbmiALHwPZEZ-A"}

要准备 Timescale Cloud 服务以进行 Kafka 集成

  1. 连接到您的 Timescale Cloud 服务

  2. 创建一个超表来摄取 Kafka 事件

    CREATE TABLE accounts (
    created_at TIMESTAMPTZ DEFAULT NOW(),
    name TEXT,
    city TEXT
    ) WITH (
    tsdb.hypertable,
    tsdb.partition_column='created_at'
    );

    如果您是自托管 TimescaleDB v2.19.3 及更早版本,请创建一个PostgreSQL 关系表,然后使用create_hypertable进行转换。然后通过调用ALTER TABLE启用 hypercore。

要在 Apache Kafka 中创建 Timescale Cloud 接收器

  1. 创建连接配置

    1. 在运行 Kafka Connect 的终端中,按 Ctrl+C 停止进程。

    2. 将以下配置写入 <KAFKA_HOME>/config/timescale-standalone-sink.properties,然后用您的连接详细信息更新 <properties>

      name=timescale-standalone-sink
      connector.class=org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector
      errors.tolerance=all
      errors.deadletterqueue.topic.name=deadletter
      tasks.max=10
      value.converter=org.apache.kafka.connect.storage.StringConverter
      key.converter=org.apache.kafka.connect.storage.StringConverter
      topics=accounts
      camel.kamelet.postgresql-sink.databaseName=<dbname>
      camel.kamelet.postgresql-sink.username=<user>
      camel.kamelet.postgresql-sink.password=<password>
      camel.kamelet.postgresql-sink.serverName=<host>
      camel.kamelet.postgresql-sink.serverPort=<port>
      camel.kamelet.postgresql-sink.query=INSERT INTO accounts (name,city) VALUES (:#name,:#city)
    3. 使用新配置重启 Kafka Connect

      export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*
      ./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-standalone-sink.properties
  2. 测试连接

    要查看您的接收器,请在 GET 请求中查询 /connectors 路由

    curl -X GET http://localhost:8083/connectors

    您将看到

    #["timescale-standalone-sink"]

要测试此集成,请向 accounts 主题发送一些消息。您可以使用 kafkacat 或 kcat 工具来完成此操作。

  1. 在运行 kafka-console-producer.sh 的终端中输入以下 JSON 字符串

    {"name":"Lola","city":"Copacabana"}
    {"name":"Holly","city":"Miami"}
    {"name":"Jolene","city":"Tennessee"}
    {"name":"Barbara Ann ","city":"California"}

    查看运行 kafka-console-consumer 的终端,以查看正在处理的消息。

  2. 查询您的 Timescale Cloud 服务中 accounts 表的所有行

    SELECT * FROM accounts;

    您将看到类似以下内容

    created_atnamecity
    2025-02-18 13:55:05.147261+00Lola科帕卡巴纳
    2025-02-18 13:55:05.216673+00霍莉迈阿密
    2025-02-18 13:55:05.283549+00乔琳田纳西
    2025-02-18 13:55:05.35226+00芭芭拉·安加利福尼亚

您已成功将 Apache Kafka 与 Timescale Cloud 集成。

关键词

此页面有问题?报告问题 或 编辑此页面 在 GitHub 中。