Apache Airflow® 是一个由社区创建的平台,用于以编程方式编写、调度和监控工作流。
DAG (有向无环图) 是 Airflow 的核心概念,它将 任务Task 集合在一起,并根据依赖关系和关联性组织起来,以说明它们应该如何运行。您可以在 Airflow 实例的 $AIRFLOW_HOME/dags
文件夹中的 Python 文件中声明 DAG。
本页介绍如何使用 DAG 中的 Python 连接器将 Apache Airflow 与 Timescale Cloud 服务集成。
集成前
创建目标 Timescale Cloud 服务
您需要您的 连接详细信息 才能按照本页中的步骤操作。此过程也适用于 自托管 TimescaleDB。
确保您的 Airflow 实例具有对 Timescale Cloud 的网络访问权限。
此示例 DAG 使用您在 创建用于关系数据的常规 PostgreSQL 表 中创建的 company
表
要安装连接到 Timescale Cloud 所需的 Python 库
启用 Airflow 和 Timescale Cloud 之间的 PostgreSQL 连接
pip install psycopg2-binary在 Airflow UI 中启用 PostgreSQL 连接类型
pip install apache-airflow-providers-postgres
在您的 Airflow 实例中,安全地连接到您的 Timescale Cloud 服务
运行 Airflow
在您的开发机器上,运行以下命令
airflow standaloneAirflow UI 的用户名和密码显示在输出的
standalone | Login with username
行中。从 Airflow 添加到您的 Timescale Cloud 服务的连接
- 在您的浏览器中,导航到
localhost:8080
,然后选择Admin
>Connections
。 - 单击
+
(添加新记录),然后使用您的 连接信息 填写表单。Connection Type
为Postgres
。
- 在您的浏览器中,导航到
要在 Airflow 和您的 Timescale Cloud 服务之间交换数据
创建并执行 DAG
要从 Airflow 在您的 Timescale Cloud 服务中插入数据
在
$AIRFLOW_HOME/dags/timescale_dag.py
中,添加以下代码from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom airflow.hooks.postgres_hook import PostgresHookfrom datetime import datetimedef insert_data_to_timescale():hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')conn = hook.get_conn()cursor = conn.cursor()"""This could be any query. This example inserts data into the tableyou create in:https://docs.timescaledb.cn/getting-started/latest/tables-hypertables/#create-regular-postgresql-tables-for-relational-data"""cursor.execute("INSERT INTO company (symbol, name) VALUES (%s, %s)",('new_company_symbol', 'New Company Name'))conn.commit()cursor.close()conn.close()default_args = {'owner': 'airflow','start_date': datetime(2023, 1, 1),'retries': 1,}dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')insert_task = PythonOperator(task_id='insert_data',python_callable=insert_data_to_timescale,dag=dag,)此 DAG 使用在 创建用于关系数据的常规 PostgreSQL 表 中创建的
company
表。在您的浏览器中,刷新 Airflow UI。
在
Search DAGS
中,键入timescale_dag
并按 ENTER 键。按下播放图标并触发 DAG
验证数据是否出现在 Timescale Cloud 中
在 Timescale 控制台 中,导航到您的服务并单击
SQL editor
。运行查询以查看您的数据。例如:
SELECT symbol, name FROM company;
。您会看到插入到表中的新行。
您已成功将 Apache Airflow 与 Timescale Cloud 集成,并创建了数据管道。
关键词
在此页面上发现问题?报告问题 或 在 GitHub 上编辑此页面。