Timescale Cloud:性能、扩展、企业级

自托管产品

MST

本教程使用的数据集包含排名前 100 的交易量最大股票符号的每秒交易数据,存储在名为 stocks_real_time 的超表中。它还包括一个单独的公司符号和公司名称表,存储在名为 company 的常规 PostgreSQL 表中。

要按照本页的步骤操作

当您通过 WebSocket 连接到 Twelve Data API 时,您在计算机和 WebSocket 服务器之间建立持久连接。您需要设置 Python 环境,并传递两个参数来创建 WebSocket 对象并建立连接。

为本项目创建并激活新的 Python 虚拟环境。本教程所需的所有软件包都将安装在此环境中。

  1. 创建并激活 Python 虚拟环境

    virtualenv env
    source env/bin/activate
  2. 安装 Twelve Data Python 封装库,并支持 WebSocket。此库允许您向 API 发送请求并维护稳定的 WebSocket 连接。

    pip install twelvedata websocket-client
  3. 安装 Psycopg2以便您可以从 Python 脚本连接到 TimescaleDB。

    pip install psycopg2-binary

您的计算机和 WebSocket 服务器之间的持久连接用于在连接保持期间接收数据。您需要传递两个参数来创建 WebSocket 对象并建立连接。

  • on_event

    此参数必须是一个函数,每当从 WebSocket 接收到新数据记录时,就会调用该函数。

    def on_event(event):
    print(event) # prints out the data record (dictionary)

    您可以在此处实现摄取逻辑,以便每当有新数据可用时,就将其插入到数据库中。

  • symbols

    此参数需要是股票代码符号(例如 MSFT)或加密货币交易对(例如 BTC/USD)的列表。使用 WebSocket 连接时,您总是需要订阅您希望接收的事件。您可以通过使用 symbols 参数来完成此操作,或者如果您的连接已建立,您也可以使用 subscribe() 函数获取其他符号的数据。

  1. 创建一个名为 websocket_test.py 的新 Python 文件,并使用 <YOUR_API_KEY> 连接到 Twelve Data 服务器。

    import time
    from twelvedata import TDClient
    messages_history = []
    def on_event(event):
    print(event) # prints out the data record (dictionary)
    messages_history.append(event)
    td = TDClient(apikey="<YOUR_API_KEY>")
    ws = td.websocket(symbols=["BTC/USD", "ETH/USD"], on_event=on_event)
    ws.subscribe(['ETH/BTC', 'AAPL'])
    ws.connect()
    while True:
    print('messages received: ', len(messages_history))
    ws.heartbeat()
    time.sleep(10)
  2. 运行 Python 脚本

    python websocket_test.py
  3. 运行脚本后,您会收到服务器关于连接状态的响应。

    {'event': 'subscribe-status',
    'status': 'ok',
    'success': [
    {'symbol': 'BTC/USD', 'exchange': 'Coinbase Pro', 'mic_code': 'Coinbase Pro', 'country': '', 'type': 'Digital Currency'},
    {'symbol': 'ETH/USD', 'exchange': 'Huobi', 'mic_code': 'Huobi', 'country': '', 'type': 'Digital Currency'}
    ],
    'fails': None
    }

    建立与 WebSocket 服务器的连接后,等待几秒钟,您将看到类似这样的数据记录

    {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438893, 'price': 30361.2, 'bid': 30361.2, 'ask': 30361.2, 'day_volume': 49153}
    {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438896, 'price': 30380.6, 'bid': 30380.6, 'ask': 30380.6, 'day_volume': 49157}
    {'event': 'heartbeat', 'status': 'ok'}
    {'event': 'price', 'symbol': 'ETH/USD', 'currency_base': 'Ethereum', 'currency_quote': 'US Dollar', 'exchange': 'Huobi', 'type': 'Digital Currency', 'timestamp': 1652438899, 'price': 2089.07, 'bid': 2089.02, 'ask': 2089.03, 'day_volume': 193818}
    {'event': 'price', 'symbol': 'BTC/USD', 'currency_base': 'Bitcoin', 'currency_quote': 'US Dollar', 'exchange': 'Coinbase Pro', 'type': 'Digital Currency', 'timestamp': 1652438900, 'price': 30346.0, 'bid': 30346.0, 'ask': 30346.0, 'day_volume': 49167}

    每个价格事件都会为您提供有关给定交易对的多个数据点,例如交易所名称和当前价格。您偶尔还会看到响应中的 heartbeat(心跳)事件;这些事件表示连接随时间推移的健康状况。至此,WebSocket 连接已成功传输数据。

时序数据表示系统、过程或行为随时间变化的方式。超表使 TimescaleDB 能够高效处理时序数据。超表是 PostgreSQL 表,可根据时间自动对您的时序数据进行分区。每个超表都由称为“块”(chunks)的子表组成。每个块都被分配一个时间范围,并且只包含该范围内的数据。当您运行查询时,TimescaleDB 会识别正确的块并在其上运行查询,而不是遍历整个表。

Hypercore 是 Timescale 超表使用的混合行/列存储引擎。传统数据库在快速插入(基于行的存储)和高效分析(基于列的存储)之间存在权衡。Hypercore 消除了这种权衡,允许实时分析而不会牺牲事务能力。

Hypercore 在其生命周期中以最有效的格式动态存储数据。

  • 近期数据的行式存储:最新块(可能更多)总是存储在行存储中,确保快速插入、更新和低延迟的单记录查询。此外,行式存储用作列式存储的直写(writethrough)方式,用于插入和更新。
  • 分析性能的列式存储:块会自动压缩到列存储中,优化存储效率并加速分析查询。

与传统列式数据库不同,Hypercore 允许数据在任何阶段插入或修改,使其成为高摄取事务性工作负载和实时分析的灵活解决方案——都在单个数据库中。

因为 TimescaleDB 100% 兼容 PostgreSQL,您可以在超表旁边使用所有标准的 PostgreSQL 表、索引、存储过程和其他对象。这使得创建和使用超表类似于标准的 PostgreSQL。

  1. 连接到您的 Timescale Cloud 服务

    Timescale 控制台中打开SQL 编辑器。您也可以使用psql连接到您的服务。

  2. 创建一个超表以存储实时加密货币数据

    使用 CREATE TABLE 为您的时序数据创建超表。为了在列式存储中对数据进行高效查询,请记住根据您最常用于过滤数据的列进行 segmentby(分段)。

    CREATE TABLE crypto_ticks (
    "time" TIMESTAMPTZ,
    symbol TEXT,
    price DOUBLE PRECISION,
    day_volume NUMERIC
    ) WITH (
    tsdb.hypertable,
    tsdb.partition_column='time',
    tsdb.segmentby='symbol',
    tsdb.orderby='time DESC'
    );

    如果您自托管 TimescaleDB v2.19.3 及更早版本,请创建 PostgreSQL 关系表,然后使用create_hypertable进行转换。之后通过调用ALTER TABLE启用 Hypercore。

当您有增强时序数据的关系数据时,将这些数据存储在标准 PostgreSQL 关系表中。

  1. 添加一个表来在关系表中存储资产符号和名称

    CREATE TABLE crypto_assets (
    symbol TEXT UNIQUE,
    "name" TEXT
    );

您现在在 Timescale Cloud 服务中有两个表:一个名为 crypto_ticks 的超表,以及一个名为 crypto_assets 的普通 PostgreSQL 表。

当您将数据摄取到 Timescale 这样的事务型数据库时,批量插入数据比逐行插入数据效率更高。使用一个事务插入多行可以显著提高 Timescale 数据库的整体摄取容量和速度。

实现批量处理的常见做法是首先将新记录存储在内存中,然后在批次达到一定大小时,在一个事务中将所有记录从内存插入到数据库中。理想的批次大小并非普适的,但您可以尝试不同的批次大小(例如 100、1000、10000 等),看看哪种更适合您的用例。当从 Kafka、Kinesis 或 WebSocket 连接将数据摄取到 TimescaleDB 时,使用批量处理是一种相当常见的模式。

要将数据摄取到 Timescale 服务中,您需要实现 on_event 函数。

WebSocket 连接设置完成后,您可以使用 on_event 函数将数据摄取到数据库中。这是一个将实时金融数据摄取到 Timescale 服务中的数据管道。

您可以使用 Psycopg2 在 Python 中实现批量处理解决方案。您可以在 on_event 函数中实现摄取逻辑,然后将其传递给 WebSocket 对象。

此函数需要

  1. 检查项目是否为数据项,而非 WebSocket 元数据。
  2. 调整数据以使其符合数据库模式,包括数据类型和列的顺序。
  3. 将其添加到内存中的批次(即 Python 中的列表)。
  4. 如果批次达到一定大小,插入数据,然后重置或清空列表。
  1. 更新 Python 脚本,使其打印出当前批次大小,这样您就可以跟踪数据何时从内存摄取到数据库中。使用您要摄取数据的 Timescale 服务的 <HOST><PASSWORD><PORT> 详细信息,以及来自 Twelve Data 的 API 密钥。

    import time
    import psycopg2
    from twelvedata import TDClient
    from psycopg2.extras import execute_values
    from datetime import datetime
    class WebsocketPipeline():
    # name of the hypertable
    DB_TABLE = "stocks_real_time"
    # columns in the hypertable in the correct order
    DB_COLUMNS=["time", "symbol", "price", "day_volume"]
    # batch size used to insert data in batches
    MAX_BATCH_SIZE=100
    def __init__(self, conn):
    """Connect to the Twelve Data web socket server and stream
    data into the database.
    Args:
    conn: psycopg2 connection object
    """
    self.conn = conn
    self.current_batch = []
    self.insert_counter = 0
    def _insert_values(self, data):
    if self.conn is not None:
    cursor = self.conn.cursor()
    sql = f"""
    INSERT INTO {self.DB_TABLE} ({','.join(self.DB_COLUMNS)})
    VALUES %s;"""
    execute_values(cursor, sql, data)
    self.conn.commit()
    def _on_event(self, event):
    """This function gets called whenever there's a new data record coming
    back from the server.
    Args:
    event (dict): data record
    """
    if event["event"] == "price":
    # data record
    timestamp = datetime.utcfromtimestamp(event["timestamp"])
    data = (timestamp, event["symbol"], event["price"], event.get("day_volume"))
    # add new data record to batch
    self.current_batch.append(data)
    print(f"Current batch size: {len(self.current_batch)}")
    # ingest data if max batch size is reached then reset the batch
    if len(self.current_batch) == self.MAX_BATCH_SIZE:
    self._insert_values(self.current_batch)
    self.insert_counter += 1
    print(f"Batch insert #{self.insert_counter}")
    self.current_batch = []
    def start(self, symbols):
    """Connect to the web socket server and start streaming real-time data
    into the database.
    Args:
    symbols (list of symbols): List of stock/crypto symbols
    """
    td = TDClient(apikey="<YOUR_API_KEY")
    ws = td.websocket(on_event=self._on_event)
    ws.subscribe(symbols)
    ws.connect()
    while True:
    ws.heartbeat()
    time.sleep(10)
    onn = psycopg2.connect(database="tsdb",
    host="<HOST>",
    user="tsdbadmin",
    password="<PASSWORD>",
    port="<PORT>")
    symbols = ["BTC/USD", "ETH/USD", "MSFT", "AAPL"]
    websocket = WebsocketPipeline(conn)
    websocket.start(symbols=symbols)
    ```
  2. 运行脚本

    python websocket_test.py

您甚至可以创建单独的 Python 脚本来为不同类型的符号启动多个 WebSocket 连接,例如,一个用于股票,另一个用于加密货币价格。

如果您看到类似这样的错误消息

2022-05-13 18:51:41,976 - ws-twelvedata - ERROR - TDWebSocket ERROR: Handshake status 200 OK

则检查您是否使用了从 Twelve Data 获得的正确 API 密钥。

为了可视化查询结果,启用 Grafana 读取您服务中的数据。

  1. 登录 Grafana

    在浏览器中,登录以下任一地址

    • 自托管 Grafana:地址为 http://localhost:3000/。默认凭据是 adminadmin
    • Grafana Cloud:使用您创建账户时设置的 URL 和凭据。
  2. 将您的服务添加为数据源

    1. 打开 Connections > Data sources,然后点击 Add new data source

    2. 从列表中选择 PostgreSQL

    3. 配置连接

      • Host URLDatabase nameUsernamePassword

        使用您的连接详细信息进行配置。Host URL 的格式为 <host>:<port>

      • TLS/SSL Mode:选择 require

      • PostgreSQL options:启用 TimescaleDB

      • 其他所有字段保持默认设置。

    4. 点击 Save & test

      Grafana 检查您的详细信息是否设置正确。

关键词

页面上发现问题?报告问题 或 编辑此页面 在 GitHub 上。