Timescale 代码快速入门指南旨在帮助您将 Timescale 集成到您自己的程序中。它们使用您最喜欢的编程语言来解释如何连接到 Timescale 数据库、创建和管理超表以及摄取和查询数据。
本快速入门指南将引导您完成以下步骤
在开始之前,请确保您已具备以下条件
- 已安装 TimescaleDB。
- 已安装 Go。
- 已安装 Go 的 PGX 驱动程序。
在本节中,您将使用 PGX 驱动程序创建与 TimescaleDB 的连接。PGX 是一个工具包,旨在帮助 Go 开发人员直接使用 PostgreSQL。您可以使用它来帮助您的 Go 应用程序直接与 TimescaleDB 交互。
找到您的 TimescaleDB 凭据,并使用它们为 PGX 编写连接字符串。
您将需要
- 密码
- 用户名
- 主机 URL
- 端口号
- 数据库名称
使用此格式将您的连接字符串变量编写为 libpq 连接字符串
connStr := "postgres://username:password@host:port/dbname"如果您使用的是 TimescaleDB 的托管版本,或者您需要 SSL 连接,请改用此格式
connStr := "postgres://username:password@host:port/dbname?sslmode=require"可选您可以检查是否已连接到数据库,使用这个 hello world 程序
package mainimport ("context""fmt""os""github.com/jackc/pgx/v5")//connect to database using a single connectionfunc main() {/***********************************************//* Single Connection to TimescaleDB/ PostgreSQL *//***********************************************/ctx := context.Background()connStr := "yourConnectionStringHere"conn, err := pgx.Connect(ctx, connStr)if err != nil {fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)os.Exit(1)}defer conn.Close(ctx)//run a simple query to check our connectionvar greeting stringerr = conn.QueryRow(ctx, "select 'Hello, Timescale!'").Scan(&greeting)if err != nil {fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)os.Exit(1)}fmt.Println(greeting)}如果您想将连接字符串指定为环境变量,您可以使用此语法来访问它,以代替
connStr
变量os.Getenv("DATABASE_CONNECTION_STRING")
或者,您可以使用连接池连接到 TimescaleDB。连接池对于节省计算资源很有用,并且还可以加快数据库查询速度
在本节中,您将创建一个名为 sensors
的表,其中包含虚构传感器的 ID、类型和位置。此外,您还将创建一个名为 sensor_data
的超表,其中包含这些传感器的测量值。测量值包含时间、传感器 ID、温度读数和传感器的 CPU 百分比。
编写一个字符串,其中包含用于创建关系表的 SQL 语句。此示例创建一个名为
sensors
的表,其中包含 ID、类型和位置的列queryCreateTable := `CREATE TABLE sensors (id SERIAL PRIMARY KEY, type VARCHAR(50), location VARCHAR(50));`使用当前上下文的参数和您创建的语句字符串,在
dbpool
对象上使用Exec()
函数执行CREATE TABLE
语句package mainimport ("context""fmt""os""github.com/jackc/pgx/v5/pgxpool")func main() {ctx := context.Background()connStr := "yourConnectionStringHere"dbpool, err := pgxpool.New(ctx, connStr)if err != nil {fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)os.Exit(1)}defer dbpool.Close()/********************************************//* Create relational table *//********************************************///Create relational table called sensorsqueryCreateTable := `CREATE TABLE sensors (id SERIAL PRIMARY KEY, type VARCHAR(50), location VARCHAR(50));`_, err = dbpool.Exec(ctx, queryCreateTable)if err != nil {fmt.Fprintf(os.Stderr, "Unable to create SENSORS table: %v\n", err)os.Exit(1)}fmt.Println("Successfully created relational table SENSORS")}
创建关系表后,您可以创建超表。表和索引的创建、表的更改、数据的插入、数据的选择以及大多数其他任务都在超表上执行。
为超表的
CREATE TABLE SQL
语句创建一个变量。请注意超表是如何具有强制性的时间列的queryCreateTable := `CREATE TABLE sensor_data (time TIMESTAMPTZ NOT NULL,sensor_id INTEGER,temperature DOUBLE PRECISION,cpu DOUBLE PRECISION,FOREIGN KEY (sensor_id) REFERENCES sensors (id));`制定
SELECT
语句,将表转换为超表。您必须指定要转换为超表的表名,以及其时间列名作为第二个参数。有关更多信息,请参阅create_hypertable
文档queryCreateHypertable := `SELECT create_hypertable('sensor_data', by_range('time'));`注意
by_range
维度构建器是 TimescaleDB 2.13 的新增功能。执行
CREATE TABLE
语句和SELECT
语句,将表转换为超表。您可以通过在dbpool
对象上调用Exec()
函数来执行此操作,使用当前上下文的参数以及queryCreateTable
和queryCreateHypertable
语句字符串package mainimport ("context""fmt""os""github.com/jackc/pgx/v5/pgxpool")func main() {ctx := context.Background()connStr := "yourConnectionStringHere"dbpool, err := pgxpool.New(ctx, connStr)if err != nil {fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)os.Exit(1)}defer dbpool.Close()/********************************************//* Create Hypertable *//********************************************/// Create hypertable of time-series data called sensor_dataqueryCreateTable := `CREATE TABLE sensor_data (time TIMESTAMPTZ NOT NULL,sensor_id INTEGER,temperature DOUBLE PRECISION,cpu DOUBLE PRECISION,FOREIGN KEY (sensor_id) REFERENCES sensors (id));`queryCreateHypertable := `SELECT create_hypertable('sensor_data', by_range('time'));`//execute statement_, err = dbpool.Exec(ctx, queryCreateTable+queryCreateHypertable)if err != nil {fmt.Fprintf(os.Stderr, "Unable to create the `sensor_data` hypertable: %v\n", err)os.Exit(1)}fmt.Println("Successfully created hypertable `sensor_data`")}
您可以通过几种不同的方式将行插入到数据库中。以下每个示例都将来自两个数组 sensorTypes
和 sensorLocations
的数据插入到名为 sensors
的关系表中。
第一个示例一次插入一行数据。第二个示例插入多行数据。第三个示例使用批量插入来加快处理速度。
打开到数据库的连接池,然后使用预处理语句来制定
INSERT
SQL 语句并执行它package mainimport ("context""fmt""os""github.com/jackc/pgx/v5/pgxpool")func main() {ctx := context.Background()connStr := "yourConnectionStringHere"dbpool, err := pgxpool.New(ctx, connStr)if err != nil {fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)os.Exit(1)}defer dbpool.Close()/********************************************//* INSERT into relational table *//********************************************///Insert data into relational table// Slices of sample data to insert// observation i has type sensorTypes[i] and location sensorLocations[i]sensorTypes := []string{"a", "a", "b", "b"}sensorLocations := []string{"floor", "ceiling", "floor", "ceiling"}for i := range sensorTypes {//INSERT statement in SQLqueryInsertMetadata := `INSERT INTO sensors (type, location) VALUES ($1, $2);`//Execute INSERT command_, err := dbpool.Exec(ctx, queryInsertMetadata, sensorTypes[i], sensorLocations[i])if err != nil {fmt.Fprintf(os.Stderr, "Unable to insert data into database: %v\n", err)os.Exit(1)}fmt.Printf("Inserted sensor (%s, %s) into database \n", sensorTypes[i], sensorLocations[i])}fmt.Println("Successfully inserted all sensors into database")}
您可以改用此过程来插入多行数据,而不是一次插入一行数据
使用此方法插入多行数据会执行与要插入的样本数量一样多的 insert
语句。这可能会使数据摄取速度变慢。为了加快摄取速度,您可以改为批量插入数据。
这是一个关于如何使用您在上一个过程中生成的示例数据执行此操作的示例模式。它使用 pgx Batch
对象
本节介绍如何针对数据库执行查询。
定义您想要在数据库上运行的 SQL 查询。此示例使用 SQL 查询,该查询结合了时间序列和关系数据。它返回位置为
ceiling
且类型为a
的传感器每 5 分钟间隔的平均 CPU 值// Formulate query in SQL// Note the use of prepared statement placeholders $1 and $2queryTimebucketFiveMin := `SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)FROM sensor_dataJOIN sensors ON sensors.id = sensor_data.sensor_idWHERE sensors.location = $1 AND sensors.type = $2GROUP BY five_minORDER BY five_min DESC;`使用
.Query()
函数执行查询字符串。请确保您指定了相关的占位符//Execute query on TimescaleDBrows, err := dbpool.Query(ctx, queryTimebucketFiveMin, "ceiling", "a")if err != nil {fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)os.Exit(1)}defer rows.Close()fmt.Println("Successfully executed query")访问
.Query()
返回的行。创建一个结构,其中包含表示您期望返回的列的字段,然后使用rows.Next()
函数迭代返回的行,并使用结构数组填充results
。这使用rows.Scan()
函数,传入指向您要扫描结果的字段的指针。此示例打印出从查询返回的结果,但您可能希望将这些结果用于其他目的。扫描完返回的所有行后,您可以随意使用 results 数组。
//Do something with the results of query// Struct for resultstype result2 struct {Bucket time.TimeAvg float64}// Print rows returned and fill up results slice for later usevar results []result2for rows.Next() {var r result2err = rows.Scan(&r.Bucket, &r.Avg)if err != nil {fmt.Fprintf(os.Stderr, "Unable to scan %v\n", err)os.Exit(1)}results = append(results, r)fmt.Printf("Time bucket: %s | Avg: %f\n", &r.Bucket, r.Avg)}// Any errors encountered by rows.Next or rows.Scan are returned hereif rows.Err() != nil {fmt.Fprintf(os.Stderr, "rows Error: %v\n", rows.Err())os.Exit(1)}// use results here…- package mainimport ("context""fmt""os""time""github.com/jackc/pgx/v5/pgxpool")func main() {ctx := context.Background()connStr := "yourConnectionStringHere"dbpool, err := pgxpool.New(ctx, connStr)if err != nil {fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)os.Exit(1)}defer dbpool.Close()/********************************************//* Execute a query *//********************************************/// Formulate query in SQL// Note the use of prepared statement placeholders $1 and $2queryTimebucketFiveMin := `SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)FROM sensor_dataJOIN sensors ON sensors.id = sensor_data.sensor_idWHERE sensors.location = $1 AND sensors.type = $2GROUP BY five_minORDER BY five_min DESC;`//Execute query on TimescaleDBrows, err := dbpool.Query(ctx, queryTimebucketFiveMin, "ceiling", "a")if err != nil {fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)os.Exit(1)}defer rows.Close()fmt.Println("Successfully executed query")//Do something with the results of query// Struct for resultstype result2 struct {Bucket time.TimeAvg float64}// Print rows returned and fill up results slice for later usevar results []result2for rows.Next() {var r result2err = rows.Scan(&r.Bucket, &r.Avg)if err != nil {fmt.Fprintf(os.Stderr, "Unable to scan %v\n", err)os.Exit(1)}results = append(results, r)fmt.Printf("Time bucket: %s | Avg: %f\n", &r.Bucket, r.Avg)}// Any errors encountered by rows.Next or rows.Scan are returned hereif rows.Err() != nil {fmt.Fprintf(os.Stderr, "rows Error: %v\n", rows.Err())os.Exit(1)}}
现在您已经能够从 Go 应用程序连接、读取和写入 TimescaleDB 实例,请务必查看这些高级 TimescaleDB 教程
- 有关 pgx 的更多信息,请参阅 pgx 文档。
- 通过 入门 教程开始运行 TimescaleDB。
- 想要在 CSV 数据上快速插入?查看 TimescaleDB parallel copy,这是一个用 Go 编写的用于快速插入的工具。
关键词
在此页面上发现问题?报告问题 或 在 GitHub 上编辑此页。