PgVectorizer 使您能够从已存储在 PostgreSQL 中的任何数据创建向量嵌入。您可以在 博客文章 中获得更多背景信息,该文章宣布了此功能,以及 "构建方式" 文章,该文章详细介绍了设计内容。

要创建向量嵌入,只需将 PgVectorizer 附加到任何 PostgreSQL 表,即可自动将该表的​​数据与存储在 PostgreSQL 中的一组嵌入同步。例如,假设您有以下定义的博客表

import psycopg2
from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from timescale_vector import client, pgvectorizer
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores.timescalevector import TimescaleVector
from datetime import timedelta
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS blog (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
title TEXT NOT NULL,
author TEXT NOT NULL,
contents TEXT NOT NULL,
category TEXT NOT NULL,
published_time TIMESTAMPTZ NULL --NULL if not yet published
);
''')

您可以插入以下一些数据

with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
INSERT INTO blog (title, author, contents, category, published_time) VALUES ('First Post', 'Matvey Arye', 'some super interesting content about cats.', 'AI', '2021-01-01');
''')

现在,假设您想嵌入这些博客并将嵌入存储在 PostgreSQL 中。首先,您需要定义一个 embed_and_write 函数,该函数接受一组博客文章,创建嵌入并将它们写入 TimescaleVector。例如,如果使用 LangChain,它可能看起来像下面这样。

def get_document(blog):
text_splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
docs = []
for chunk in text_splitter.split_text(blog['contents']):
content = f"Author {blog['author']}, title: {blog['title']}, contents:{chunk}"
metadata = {
"id": str(client.uuid_from_time(blog['published_time'])),
"blog_id": blog['id'],
"author": blog['author'],
"category": blog['category'],
"published_time": blog['published_time'].isoformat(),
}
docs.append(Document(page_content=content, metadata=metadata))
return docs
def embed_and_write(blog_instances, vectorizer):
embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
collection_name="blog_embedding",
service_url=service_url,
embedding=embedding,
time_partition_interval=timedelta(days=30),
)
# delete old embeddings for all ids in the work queue. locked_id is a special column that is set to the primary key of the table being
# embedded. For items that are deleted, it is the only key that is set.
metadata_for_delete = [{"blog_id": blog['locked_id']} for blog in blog_instances]
vector_store.delete_by_metadata(metadata_for_delete)
documents = []
for blog in blog_instances:
# skip blogs that are not published yet, or are deleted (in which case the column is NULL)
if blog['published_time'] != None:
documents.extend(get_document(blog))
if len(documents) == 0:
return
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
ids = [d.metadata["id"] for d in documents]
vector_store.add_texts(texts, metadatas, ids)

然后,您需要做的就是在计划的作业(cron 作业、Lambda 作业等)中运行以下代码

# this job should be run on a schedule
vectorizer = pgvectorizer.Vectorize(service_url, 'blog')
while vectorizer.process(embed_and_write) > 0:
pass

每次该作业运行时,它都会将表与您的嵌入同步。它会将所有插入、更新和删除同步到名为 blog_embedding 的嵌入表。

现在,您可以简单地搜索嵌入,如下所示(在示例中再次使用 LangChain)

embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
collection_name="blog_embedding",
service_url=service_url,
embedding=embedding,
time_partition_interval=timedelta(days=30),
)
res = vector_store.similarity_search_with_score("Blogs about cats")
res
[(Document(page_content='Author Matvey Arye, title: First Post, contents:some super interesting content about cats.', metadata={'id': '4a784000-4bc4-11eb-855a-06302dbc8ce7', 'author': 'Matvey Arye', 'blog_id': 1, 'category': 'AI', 'published_time': '2021-01-01T00:00:00+00:00'}),
0.12595687795193833)]

关键词

在页面上发现问题了吗?报告问题 或 在 GitHub 上编辑此页面