Timescale Cloud:性能、规模、企业级
自托管产品
MST
Apache Airflow® 是一个由社区创建的平台,用于以编程方式创作、调度和监控工作流。
DAG(有向无环图)是 Airflow 的核心概念,它将任务
组织在一起,并带有依赖关系和连接,以说明它们应如何运行。您可以在 Airflow 实例的
$AIRFLOW_HOME/dags
文件夹中的 Python 文件中声明 DAG。
本页面向您展示如何使用 DAG 中的 Python 连接器将 Apache Airflow 与 Timescale Cloud 服务集成。
要遵循本页的步骤:
创建一个目标Timescale Cloud 服务,并启用时序和分析功能。
您需要您的连接详情。此过程也适用于自托管 TimescaleDB。
确保您的 Airflow 实例可以网络访问 Timescale Cloud。
此示例 DAG 使用您在超表中优化时序数据中创建的 company
表。
要安装连接到 Timescale Cloud 所需的 Python 库:
启用 Airflow 和 Timescale Cloud 之间的 PostgreSQL 连接
pip install psycopg2-binary在 Airflow UI 中启用 PostgreSQL 连接类型
pip install apache-airflow-providers-postgres
在您的 Airflow 实例中,安全地连接到您的 Timescale Cloud 服务
运行 Airflow
在您的开发机器上,运行以下命令:
airflow standaloneAirflow UI 的用户名和密码显示在输出的
standalone | Login with username
行中。从 Airflow 添加到您的 Timescale Cloud 服务的连接
- 在您的浏览器中,导航到
localhost:8080
,然后选择Admin
>Connections
。 - 点击
+
(添加新记录),然后使用您的连接信息填写表格。Connection Type
(连接类型)为Postgres
。
- 在您的浏览器中,导航到
要在 Airflow 和您的 Timescale Cloud 服务之间交换数据:
创建并执行 DAG
要从 Airflow 向您的 Timescale Cloud 服务插入数据:
在
$AIRFLOW_HOME/dags/timescale_dag.py
中,添加以下代码:from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom airflow.hooks.postgres_hook import PostgresHookfrom datetime import datetimedef insert_data_to_timescale():hook = PostgresHook(postgres_conn_id='the ID of the connenction you created')conn = hook.get_conn()cursor = conn.cursor()"""This could be any query. This example inserts data into the tableyou create in:https://docs.timescaledb.cn/getting-started/latest/try-key-features-timescale-products/#optimize-time-series-data-in-hypertables"""cursor.execute("INSERT INTO crypto_assets (symbol, name) VALUES (%s, %s)",('NEW/Asset','New Asset Name'))conn.commit()cursor.close()conn.close()default_args = {'owner': 'airflow','start_date': datetime(2023, 1, 1),'retries': 1,}dag = DAG('timescale_dag', default_args=default_args, schedule_interval='@daily')insert_task = PythonOperator(task_id='insert_data',python_callable=insert_data_to_timescale,dag=dag,)此 DAG 使用在为关系数据创建常规 PostgreSQL 表中创建的
company
表。在您的浏览器中,刷新Airflow UI。
在
Search DAGS
(搜索 DAG)中,键入timescale_dag
并按 Enter 键。点击播放图标并触发 DAG
验证数据是否出现在 Timescale Cloud 中
在Timescale 控制台
中,导航到您的服务并点击
SQL editor
(SQL 编辑器)。运行查询以查看您的数据。例如:
SELECT symbol, name FROM company;
。您会看到表中插入的新行。
您已成功将 Apache Airflow 与 Timescale Cloud 集成并创建了数据管道。
关键词