Timescale 代码快速入门指南旨在帮助您将 Timescale 集成到您自己的程序中。它们使用您最喜欢的编程语言来解释如何连接到 Timescale 数据库、创建和管理超表以及摄取和查询数据。

本快速入门指南将引导您完成以下步骤

在开始之前,请确保您已具备以下条件

在本节中,您将使用 PGX 驱动程序创建与 TimescaleDB 的连接。PGX 是一个工具包,旨在帮助 Go 开发人员直接使用 PostgreSQL。您可以使用它来帮助您的 Go 应用程序直接与 TimescaleDB 交互。

  1. 找到您的 TimescaleDB 凭据,并使用它们为 PGX 编写连接字符串。

    您将需要

    • 密码
    • 用户名
    • 主机 URL
    • 端口号
    • 数据库名称
  2. 使用此格式将您的连接字符串变量编写为 libpq 连接字符串

    connStr := "postgres://username:password@host:port/dbname"

    如果您使用的是 TimescaleDB 的托管版本,或者您需要 SSL 连接,请改用此格式

    connStr := "postgres://username:password@host:port/dbname?sslmode=require"
  3. 可选您可以检查是否已连接到数据库,使用这个 hello world 程序

    package main
    import (
    "context"
    "fmt"
    "os"
    "github.com/jackc/pgx/v5"
    )
    //connect to database using a single connection
    func main() {
    /***********************************************/
    /* Single Connection to TimescaleDB/ PostgreSQL */
    /***********************************************/
    ctx := context.Background()
    connStr := "yourConnectionStringHere"
    conn, err := pgx.Connect(ctx, connStr)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    os.Exit(1)
    }
    defer conn.Close(ctx)
    //run a simple query to check our connection
    var greeting string
    err = conn.QueryRow(ctx, "select 'Hello, Timescale!'").Scan(&greeting)
    if err != nil {
    fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
    os.Exit(1)
    }
    fmt.Println(greeting)
    }

    如果您想将连接字符串指定为环境变量,您可以使用此语法来访问它,以代替 connStr 变量

    os.Getenv("DATABASE_CONNECTION_STRING")

或者,您可以使用连接池连接到 TimescaleDB。连接池对于节省计算资源很有用,并且还可以加快数据库查询速度

在本节中,您将创建一个名为 sensors 的表,其中包含虚构传感器的 ID、类型和位置。此外,您还将创建一个名为 sensor_data 的超表,其中包含这些传感器的测量值。测量值包含时间、传感器 ID、温度读数和传感器的 CPU 百分比。

  1. 编写一个字符串,其中包含用于创建关系表的 SQL 语句。此示例创建一个名为 sensors 的表,其中包含 ID、类型和位置的列

    queryCreateTable := `CREATE TABLE sensors (id SERIAL PRIMARY KEY, type VARCHAR(50), location VARCHAR(50));`
  2. 使用当前上下文的参数和您创建的语句字符串,在 dbpool 对象上使用 Exec() 函数执行 CREATE TABLE 语句

    package main
    import (
    "context"
    "fmt"
    "os"
    "github.com/jackc/pgx/v5/pgxpool"
    )
    func main() {
    ctx := context.Background()
    connStr := "yourConnectionStringHere"
    dbpool, err := pgxpool.New(ctx, connStr)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    os.Exit(1)
    }
    defer dbpool.Close()
    /********************************************/
    /* Create relational table */
    /********************************************/
    //Create relational table called sensors
    queryCreateTable := `CREATE TABLE sensors (id SERIAL PRIMARY KEY, type VARCHAR(50), location VARCHAR(50));`
    _, err = dbpool.Exec(ctx, queryCreateTable)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to create SENSORS table: %v\n", err)
    os.Exit(1)
    }
    fmt.Println("Successfully created relational table SENSORS")
    }

创建关系表后,您可以创建超表。表和索引的创建、表的更改、数据的插入、数据的选择以及大多数其他任务都在超表上执行。

  1. 为超表的 CREATE TABLE SQL 语句创建一个变量。请注意超表是如何具有强制性的时间列的

    queryCreateTable := `CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER,
    temperature DOUBLE PRECISION,
    cpu DOUBLE PRECISION,
    FOREIGN KEY (sensor_id) REFERENCES sensors (id));
    `
  2. 制定 SELECT 语句,将表转换为超表。您必须指定要转换为超表的表名,以及其时间列名作为第二个参数。有关更多信息,请参阅 create_hypertable 文档

    queryCreateHypertable := `SELECT create_hypertable('sensor_data', by_range('time'));`
    注意

    by_range 维度构建器是 TimescaleDB 2.13 的新增功能。

  3. 执行 CREATE TABLE 语句和 SELECT 语句,将表转换为超表。您可以通过在 dbpool 对象上调用 Exec() 函数来执行此操作,使用当前上下文的参数以及 queryCreateTablequeryCreateHypertable 语句字符串

    package main
    import (
    "context"
    "fmt"
    "os"
    "github.com/jackc/pgx/v5/pgxpool"
    )
    func main() {
    ctx := context.Background()
    connStr := "yourConnectionStringHere"
    dbpool, err := pgxpool.New(ctx, connStr)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    os.Exit(1)
    }
    defer dbpool.Close()
    /********************************************/
    /* Create Hypertable */
    /********************************************/
    // Create hypertable of time-series data called sensor_data
    queryCreateTable := `CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER,
    temperature DOUBLE PRECISION,
    cpu DOUBLE PRECISION,
    FOREIGN KEY (sensor_id) REFERENCES sensors (id));
    `
    queryCreateHypertable := `SELECT create_hypertable('sensor_data', by_range('time'));`
    //execute statement
    _, err = dbpool.Exec(ctx, queryCreateTable+queryCreateHypertable)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to create the `sensor_data` hypertable: %v\n", err)
    os.Exit(1)
    }
    fmt.Println("Successfully created hypertable `sensor_data`")
    }

您可以通过几种不同的方式将行插入到数据库中。以下每个示例都将来自两个数组 sensorTypessensorLocations 的数据插入到名为 sensors 的关系表中。

第一个示例一次插入一行数据。第二个示例插入多行数据。第三个示例使用批量插入来加快处理速度。

  1. 打开到数据库的连接池,然后使用预处理语句来制定 INSERT SQL 语句并执行它

    package main
    import (
    "context"
    "fmt"
    "os"
    "github.com/jackc/pgx/v5/pgxpool"
    )
    func main() {
    ctx := context.Background()
    connStr := "yourConnectionStringHere"
    dbpool, err := pgxpool.New(ctx, connStr)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    os.Exit(1)
    }
    defer dbpool.Close()
    /********************************************/
    /* INSERT into relational table */
    /********************************************/
    //Insert data into relational table
    // Slices of sample data to insert
    // observation i has type sensorTypes[i] and location sensorLocations[i]
    sensorTypes := []string{"a", "a", "b", "b"}
    sensorLocations := []string{"floor", "ceiling", "floor", "ceiling"}
    for i := range sensorTypes {
    //INSERT statement in SQL
    queryInsertMetadata := `INSERT INTO sensors (type, location) VALUES ($1, $2);`
    //Execute INSERT command
    _, err := dbpool.Exec(ctx, queryInsertMetadata, sensorTypes[i], sensorLocations[i])
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to insert data into database: %v\n", err)
    os.Exit(1)
    }
    fmt.Printf("Inserted sensor (%s, %s) into database \n", sensorTypes[i], sensorLocations[i])
    }
    fmt.Println("Successfully inserted all sensors into database")
    }

您可以改用此过程来插入多行数据,而不是一次插入一行数据

使用此方法插入多行数据会执行与要插入的样本数量一样多的 insert 语句。这可能会使数据摄取速度变慢。为了加快摄取速度,您可以改为批量插入数据。

这是一个关于如何使用您在上一个过程中生成的示例数据执行此操作的示例模式。它使用 pgx Batch 对象

本节介绍如何针对数据库执行查询。

  1. 定义您想要在数据库上运行的 SQL 查询。此示例使用 SQL 查询,该查询结合了时间序列和关系数据。它返回位置为 ceiling 且类型为 a 的传感器每 5 分钟间隔的平均 CPU 值

    // Formulate query in SQL
    // Note the use of prepared statement placeholders $1 and $2
    queryTimebucketFiveMin := `
    SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)
    FROM sensor_data
    JOIN sensors ON sensors.id = sensor_data.sensor_id
    WHERE sensors.location = $1 AND sensors.type = $2
    GROUP BY five_min
    ORDER BY five_min DESC;
    `
  2. 使用 .Query() 函数执行查询字符串。请确保您指定了相关的占位符

    //Execute query on TimescaleDB
    rows, err := dbpool.Query(ctx, queryTimebucketFiveMin, "ceiling", "a")
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)
    os.Exit(1)
    }
    defer rows.Close()
    fmt.Println("Successfully executed query")
  3. 访问 .Query() 返回的行。创建一个结构,其中包含表示您期望返回的列的字段,然后使用 rows.Next() 函数迭代返回的行,并使用结构数组填充 results。这使用 rows.Scan() 函数,传入指向您要扫描结果的字段的指针。

    此示例打印出从查询返回的结果,但您可能希望将这些结果用于其他目的。扫描完返回的所有行后,您可以随意使用 results 数组。

    //Do something with the results of query
    // Struct for results
    type result2 struct {
    Bucket time.Time
    Avg float64
    }
    // Print rows returned and fill up results slice for later use
    var results []result2
    for rows.Next() {
    var r result2
    err = rows.Scan(&r.Bucket, &r.Avg)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to scan %v\n", err)
    os.Exit(1)
    }
    results = append(results, r)
    fmt.Printf("Time bucket: %s | Avg: %f\n", &r.Bucket, r.Avg)
    }
    // Any errors encountered by rows.Next or rows.Scan are returned here
    if rows.Err() != nil {
    fmt.Fprintf(os.Stderr, "rows Error: %v\n", rows.Err())
    os.Exit(1)
    }
    // use results here…
  4. 可选此示例程序运行查询,并访问该查询的结果

    package main
    import (
    "context"
    "fmt"
    "os"
    "time"
    "github.com/jackc/pgx/v5/pgxpool"
    )
    func main() {
    ctx := context.Background()
    connStr := "yourConnectionStringHere"
    dbpool, err := pgxpool.New(ctx, connStr)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
    os.Exit(1)
    }
    defer dbpool.Close()
    /********************************************/
    /* Execute a query */
    /********************************************/
    // Formulate query in SQL
    // Note the use of prepared statement placeholders $1 and $2
    queryTimebucketFiveMin := `
    SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)
    FROM sensor_data
    JOIN sensors ON sensors.id = sensor_data.sensor_id
    WHERE sensors.location = $1 AND sensors.type = $2
    GROUP BY five_min
    ORDER BY five_min DESC;
    `
    //Execute query on TimescaleDB
    rows, err := dbpool.Query(ctx, queryTimebucketFiveMin, "ceiling", "a")
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)
    os.Exit(1)
    }
    defer rows.Close()
    fmt.Println("Successfully executed query")
    //Do something with the results of query
    // Struct for results
    type result2 struct {
    Bucket time.Time
    Avg float64
    }
    // Print rows returned and fill up results slice for later use
    var results []result2
    for rows.Next() {
    var r result2
    err = rows.Scan(&r.Bucket, &r.Avg)
    if err != nil {
    fmt.Fprintf(os.Stderr, "Unable to scan %v\n", err)
    os.Exit(1)
    }
    results = append(results, r)
    fmt.Printf("Time bucket: %s | Avg: %f\n", &r.Bucket, r.Avg)
    }
    // Any errors encountered by rows.Next or rows.Scan are returned here
    if rows.Err() != nil {
    fmt.Fprintf(os.Stderr, "rows Error: %v\n", rows.Err())
    os.Exit(1)
    }
    }

现在您已经能够从 Go 应用程序连接、读取和写入 TimescaleDB 实例,请务必查看这些高级 TimescaleDB 教程

  • 有关 pgx 的更多信息,请参阅 pgx 文档
  • 通过 入门 教程开始运行 TimescaleDB。
  • 想要在 CSV 数据上快速插入?查看 TimescaleDB parallel copy,这是一个用 Go 编写的用于快速插入的工具。

关键词

在此页面上发现问题?报告问题 或 在 GitHub 上编辑此页