Apache Airflow® 是一个由社区创建的平台,用于以编程方式编写、调度和监控工作流。

DAG (有向无环图) 是 Airflow 的核心概念,它将 任务Task 集合在一起,并根据依赖关系和关联性组织起来,以说明它们应该如何运行。您可以在 Airflow 实例的 $AIRFLOW_HOME/dags 文件夹中的 Python 文件中声明 DAG。

本页介绍如何使用 DAG 中的 Python 连接器将 Apache Airflow 与 Timescale Cloud 服务集成。

集成前

此示例 DAG 使用您在 创建用于关系数据的常规 PostgreSQL 表 中创建的 company

要安装连接到 Timescale Cloud 所需的 Python 库

  1. 启用 Airflow 和 Timescale Cloud 之间的 PostgreSQL 连接

    pip install psycopg2-binary
  2. 在 Airflow UI 中启用 PostgreSQL 连接类型

    pip install apache-airflow-providers-postgres

在您的 Airflow 实例中,安全地连接到您的 Timescale Cloud 服务

  1. 运行 Airflow

    在您的开发机器上,运行以下命令

    airflow standalone

    Airflow UI 的用户名和密码显示在输出的 standalone | Login with username 行中。

  2. 从 Airflow 添加到您的 Timescale Cloud 服务的连接

    1. 在您的浏览器中,导航到 localhost:8080,然后选择 Admin > Connections
    2. 单击 + (添加新记录),然后使用您的 连接信息 填写表单。Connection TypePostgres

要在 Airflow 和您的 Timescale Cloud 服务之间交换数据

  1. 创建并执行 DAG

    要从 Airflow 在您的 Timescale Cloud 服务中插入数据

    1. $AIRFLOW_HOME/dags/timescale_dag.py 中,添加以下代码

      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
      from airflow.hooks.postgres_hook import PostgresHook
      from datetime import datetime
      def insert_data_to_timescale():
      hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')
      conn = hook.get_conn()
      cursor = conn.cursor()
      """
      This could be any query. This example inserts data into the table
      you create in:
      https://docs.timescaledb.cn/getting-started/latest/tables-hypertables/#create-regular-postgresql-tables-for-relational-data
      """
      cursor.execute("INSERT INTO company (symbol, name) VALUES (%s, %s)",
      ('new_company_symbol', 'New Company Name'))
      conn.commit()
      cursor.close()
      conn.close()
      default_args = {
      'owner': 'airflow',
      'start_date': datetime(2023, 1, 1),
      'retries': 1,
      }
      dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')
      insert_task = PythonOperator(
      task_id='insert_data',
      python_callable=insert_data_to_timescale,
      dag=dag,
      )

      此 DAG 使用在 创建用于关系数据的常规 PostgreSQL 表 中创建的 company 表。

    2. 在您的浏览器中,刷新 Airflow UI

    3. Search DAGS 中,键入 timescale_dag 并按 ENTER 键。

    4. 按下播放图标并触发 DAG

      daily eth volume of assets

  2. 验证数据是否出现在 Timescale Cloud 中

    1. Timescale 控制台 中,导航到您的服务并单击 SQL editor

    2. 运行查询以查看您的数据。例如:SELECT symbol, name FROM company;

      您会看到插入到表中的新行。

您已成功将 Apache Airflow 与 Timescale Cloud 集成,并创建了数据管道。

关键词

在此页面上发现问题?报告问题 或 在 GitHub 上编辑此页面