PgVectorizer 使您能够从已存储在 PostgreSQL 中的任何数据创建向量嵌入。您可以在 博客文章 中获得更多背景信息,该文章宣布了此功能,以及 "构建方式" 文章,该文章详细介绍了设计内容。
要创建向量嵌入,只需将 PgVectorizer 附加到任何 PostgreSQL 表,即可自动将该表的数据与存储在 PostgreSQL 中的一组嵌入同步。例如,假设您有以下定义的博客表
import psycopg2from langchain.docstore.document import Documentfrom langchain.text_splitter import CharacterTextSplitterfrom timescale_vector import client, pgvectorizerfrom langchain.embeddings.openai import OpenAIEmbeddingsfrom langchain.vectorstores.timescalevector import TimescaleVectorfrom datetime import timedelta
with psycopg2.connect(service_url) as conn:with conn.cursor() as cursor:cursor.execute('''CREATE TABLE IF NOT EXISTS blog (id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,title TEXT NOT NULL,author TEXT NOT NULL,contents TEXT NOT NULL,category TEXT NOT NULL,published_time TIMESTAMPTZ NULL --NULL if not yet published);''')
您可以插入以下一些数据
with psycopg2.connect(service_url) as conn:with conn.cursor() as cursor:cursor.execute('''INSERT INTO blog (title, author, contents, category, published_time) VALUES ('First Post', 'Matvey Arye', 'some super interesting content about cats.', 'AI', '2021-01-01');''')
现在,假设您想嵌入这些博客并将嵌入存储在 PostgreSQL 中。首先,您需要定义一个 embed_and_write
函数,该函数接受一组博客文章,创建嵌入并将它们写入 TimescaleVector。例如,如果使用 LangChain,它可能看起来像下面这样。
def get_document(blog):text_splitter = CharacterTextSplitter(chunk_size=1000,chunk_overlap=200,)docs = []for chunk in text_splitter.split_text(blog['contents']):content = f"Author {blog['author']}, title: {blog['title']}, contents:{chunk}"metadata = {"id": str(client.uuid_from_time(blog['published_time'])),"blog_id": blog['id'],"author": blog['author'],"category": blog['category'],"published_time": blog['published_time'].isoformat(),}docs.append(Document(page_content=content, metadata=metadata))return docsdef embed_and_write(blog_instances, vectorizer):embedding = OpenAIEmbeddings()vector_store = TimescaleVector(collection_name="blog_embedding",service_url=service_url,embedding=embedding,time_partition_interval=timedelta(days=30),)# delete old embeddings for all ids in the work queue. locked_id is a special column that is set to the primary key of the table being# embedded. For items that are deleted, it is the only key that is set.metadata_for_delete = [{"blog_id": blog['locked_id']} for blog in blog_instances]vector_store.delete_by_metadata(metadata_for_delete)documents = []for blog in blog_instances:# skip blogs that are not published yet, or are deleted (in which case the column is NULL)if blog['published_time'] != None:documents.extend(get_document(blog))if len(documents) == 0:returntexts = [d.page_content for d in documents]metadatas = [d.metadata for d in documents]ids = [d.metadata["id"] for d in documents]vector_store.add_texts(texts, metadatas, ids)
然后,您需要做的就是在计划的作业(cron 作业、Lambda 作业等)中运行以下代码
# this job should be run on a schedulevectorizer = pgvectorizer.Vectorize(service_url, 'blog')while vectorizer.process(embed_and_write) > 0:pass
每次该作业运行时,它都会将表与您的嵌入同步。它会将所有插入、更新和删除同步到名为 blog_embedding
的嵌入表。
现在,您可以简单地搜索嵌入,如下所示(在示例中再次使用 LangChain)
embedding = OpenAIEmbeddings()vector_store = TimescaleVector(collection_name="blog_embedding",service_url=service_url,embedding=embedding,time_partition_interval=timedelta(days=30),)res = vector_store.similarity_search_with_score("Blogs about cats")res
[(Document(page_content='Author Matvey Arye, title: First Post, contents:some super interesting content about cats.', metadata={'id': '4a784000-4bc4-11eb-855a-06302dbc8ce7', 'author': 'Matvey Arye', 'blog_id': 1, 'category': 'AI', 'published_time': '2021-01-01T00:00:00+00:00'}),0.12595687795193833)]
关键词
在页面上发现问题了吗?报告问题 或 在 GitHub 上编辑此页面。