你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

将 LangChain 与 Azure Database for PostgreSQL 配合使用

Azure Database for PostgreSQL 与领先的大型语言模型(LLM)业务流程包(如 LangChain)无缝集成。 此集成使开发人员能够在其应用程序中使用高级 AI 功能。 LangChain 可以简化 LLM 的管理和使用、嵌入模型和数据库,以便生成 AI 应用程序更易于开发。

本文介绍如何使用 Azure Database for PostgreSQL 中的集成 矢量数据库 通过 LangChain 在集合中存储和管理文档。 它还演示如何使用最近的相邻算法(如余弦距离、L2 距离和内部积)创建索引和执行矢量搜索查询,以查找靠近查询向量的文档。

矢量支持

可以使用 Azure Database for PostgreSQL 有效地存储和查询 PostgreSQL 中的数百万个矢量嵌入。 该服务可帮助你将 AI 用例从概念证明扩展到生产环境。 它提供以下优势:

  • 提供一个熟悉的 SQL 接口,用于查询矢量嵌入和关系数据。
  • 使用 DiskANN 索引算法通过 1 亿多个向量实现更快、更精确的相似性搜索,从而增强 pgvector
  • 通过将关系元数据、矢量嵌入和时序数据集成到单个数据库中来简化作。
  • 使用可靠的 PostgreSQL 生态系统和 Azure 云平台的强大功能实现企业级功能,包括复制和高可用性。

身份验证

Azure Database for PostgreSQL 支持基于密码和 Microsoft Entra (前 Azure Active Directory)身份验证。

Microsoft Entra 身份验证允许使用 Microsoft Entra ID 向 PostgreSQL 服务器进行身份验证。 Microsoft Entra ID 无需为数据库用户管理单独的用户名和密码。 它允许使用用于其他 Azure 服务的相同安全机制。

在本文中,可以使用任一身份验证方法。

设置

Azure Database for PostgreSQL 使用开源 LangChain Postgres 支持连接到 Azure Database for PostgreSQL。 首先,下载合作伙伴包:

%pip install -qU langchain_postgres
%pip install -qU langchain-openai
%pip install -qU azure-identity

在 Azure Database for PostgreSQL 上启用 pgvector

请参阅 Azure Database for PostgreSQL 中的“启用和使用 pgvector”。

设置凭据

需要获取 Azure Database for PostgreSQL 连接详细信息 ,并将其添加为环境变量。

如果要使用 Microsoft Entra 身份验证,请将 USE_ENTRA_AUTH 标志设置为 True。 如果使用 Microsoft Entra 身份验证,则需要提供唯一的主机和数据库名称。 如果使用密码身份验证,则还需要设置用户名和密码。

import getpass
import os

USE_ENTRA_AUTH = True

# Supply the connection details for the database
os.environ["DBHOST"] = "<server-name>"
os.environ["DBNAME"] = "<database-name>"
os.environ["SSLMODE"] = "require"

if not USE_ENTRA_AUTH:
    # If you're using a username and password, supply them here
    os.environ["DBUSER"] = "<username>"
    os.environ["DBPASSWORD"] = getpass.getpass("Database Password:")

设置 Azure OpenAI 嵌入功能

os.environ["AZURE_OPENAI_ENDPOINT"] = "<azure-openai-endpoint>"
os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass("Azure OpenAI API Key:")
AZURE_OPENAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"]
AZURE_OPENAI_API_KEY = os.environ["AZURE_OPENAI_API_KEY"]

from langchain_openai import AzureOpenAIEmbeddings

embeddings = AzureOpenAIEmbeddings(
    model="text-embedding-3-small",
    api_key=AZURE_OPENAI_API_KEY,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    azure_deployment="text-embedding-3-small",
)

初始化

使用 Microsoft Entra 身份验证

以下部分包含设置 LangChain 以使用 Microsoft Entra 身份验证的函数。 get_token_and_username 函数使用来自 azure.identity 库的 DefaultAzureCredential 检索 Azure Databases for PostgreSQL 服务的令牌。 它确保 SQLAlchemy 引擎具有一个有效的令牌,用于创建新连接。 它还分析令牌(即 JSON Web 令牌 (JWT),以提取用于连接到数据库的用户名。

create_postgres_engine 函数创建一个 SQLAlchemy 引擎,该引擎基于从令牌管理器提取的令牌动态设置用户名和密码。 此引擎可以传递到 PGVector LangChain 向量存储的 connection 参数中。

登录到 Azure

若要登录到 Azure,请确保已安装 Azure CLI 。 在终端中运行以下命令:

az login

登录后,以下代码将提取令牌:

import base64
import json
from functools import lru_cache

from azure.identity import DefaultAzureCredential
from sqlalchemy import create_engine, event
from sqlalchemy.engine.url import URL


@lru_cache(maxsize=1)
def get_credential():
    """Memoized function to create the Azure credential, which caches tokens."""
    return DefaultAzureCredential()


def decode_jwt(token):
    """Decode the JWT payload to extract claims."""
    payload = token.split(".")[1]
    padding = "=" * (4 - len(payload) % 4)
    decoded_payload = base64.urlsafe_b64decode(payload + padding)
    return json.loads(decoded_payload)


def get_token_and_username():
    """Fetches a token and returns the username and token."""
    # Fetch a new token and extract the username
    token = get_credential().get_token(
        "https://ossrdbms-aad.database.windows.net/.default"
    )
    claims = decode_jwt(token.token)
    username = claims.get("upn")
    if not username:
        raise ValueError("Could not extract username from token. Have you logged in?")

    return username, token.token


def create_postgres_engine():
    db_url = URL.create(
        drivername="postgresql+psycopg",
        username="",  # This will be replaced dynamically
        password="",  # This will be replaced dynamically
        host=os.environ["DBHOST"],
        port=os.environ.get("DBPORT", 5432),
        database=os.environ["DBNAME"],
    )

    # Create a SQLAlchemy engine
    engine = create_engine(db_url, echo=True)

    # Listen for the connection event to inject dynamic credentials
    @event.listens_for(engine, "do_connect")
    def provide_dynamic_credentials(dialect, conn_rec, cargs, cparams):
        # Fetch the dynamic username and token
        username, token = get_token_and_username()

        # Override the connection parameters
        cparams["user"] = username
        cparams["password"] = token

    return engine

使用密码身份验证

如果不使用 Microsoft Entra 身份验证, get_connection_uri 请提供一个连接 URI,用于从环境变量中提取用户名和密码:

import urllib.parse


def get_connection_uri():
    # Read URI parameters from the environment
    dbhost = os.environ["DBHOST"]
    dbname = os.environ["DBNAME"]
    dbuser = urllib.parse.quote(os.environ["DBUSER"])
    password = os.environ["DBPASSWORD"]
    sslmode = os.environ["SSLMODE"]

    # Construct the connection URI
    # Use Psycopg 3!
    db_uri = (
        f"postgresql+psycopg://{dbuser}:{password}@{dbhost}/{dbname}?sslmode={sslmode}"
    )
    return db_uri

创建矢量存储

from langchain_core.documents import Document
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector

collection_name = "my_docs"

# The connection is either a SQLAlchemy engine or a connection URI
connection = create_postgres_engine() if USE_ENTRA_AUTH else get_connection_uri()

vector_store = PGVector(
    embeddings=embeddings,
    collection_name=collection_name,
    connection=connection,
    use_jsonb=True,
)

矢量存储的管理

将项添加到向量存储

按 ID 添加文档将覆盖与该 ID 匹配的任何现有文档。

docs = [
    Document(
        page_content="there are cats in the pond",
        metadata={"id": 1, "location": "pond", "topic": "animals"},
    ),
    Document(
        page_content="ducks are also found in the pond",
        metadata={"id": 2, "location": "pond", "topic": "animals"},
    ),
    Document(
        page_content="fresh apples are available at the market",
        metadata={"id": 3, "location": "market", "topic": "food"},
    ),
    Document(
        page_content="the market also sells fresh oranges",
        metadata={"id": 4, "location": "market", "topic": "food"},
    ),
    Document(
        page_content="the new art exhibit is fascinating",
        metadata={"id": 5, "location": "museum", "topic": "art"},
    ),
    Document(
        page_content="a sculpture exhibit is also at the museum",
        metadata={"id": 6, "location": "museum", "topic": "art"},
    ),
    Document(
        page_content="a new coffee shop opened on Main Street",
        metadata={"id": 7, "location": "Main Street", "topic": "food"},
    ),
    Document(
        page_content="the book club meets at the library",
        metadata={"id": 8, "location": "library", "topic": "reading"},
    ),
    Document(
        page_content="the library hosts a weekly story time for kids",
        metadata={"id": 9, "location": "library", "topic": "reading"},
    ),
    Document(
        page_content="a cooking class for beginners is offered at the community center",
        metadata={"id": 10, "location": "community center", "topic": "classes"},
    ),
]

vector_store.add_documents(docs, ids=[doc.metadata["id"] for doc in docs])

更新向量存储中的项

docs = [
    Document(
        page_content="Updated - cooking class for beginners is offered at the community center",
        metadata={"id": 10, "location": "community center", "topic": "classes"},
    )
]
vector_store.add_documents(docs, ids=[doc.metadata["id"] for doc in docs])

从向量存储中删除项

vector_store.delete(ids=["3"])

向量存储的查询

创建矢量存储并添加相关文档后,可以在链或代理中查询向量存储。

筛选支持

矢量存储支持一组筛选器,这些筛选器可以应用于文档的元数据字段:

操作员 含义/类别
$eq 相等性 (==)
$ne 不平等 (!=)
$lt 小于 (<)
$lte 小于或等于 (<=)
$gt 大于 (>)
$gte 大于或等于 (>=)
$in 特殊大小写 (in)
$nin 特殊大小写 (not in)
$between 特殊大小写 (between)
$like 文本 (like)
$ilike 文本 (case-insensitive like)
$and 逻辑 (and)
$or 逻辑 (or)

直接查询

可以执行简单的相似性搜索,如下所示:

results = vector_store.similarity_search(
    "kitty", k=10, filter={"id": {"$in": [1, 5, 2, 9]}}
)
for doc in results:
    print(f"* {doc.page_content} [{doc.metadata}]")
    * there are cats in the pond [{'id': 1, 'topic': 'animals', 'location': 'pond'}]
    * ducks are also found in the pond [{'id': 2, 'topic': 'animals', 'location': 'pond'}]
    * the new art exhibit is fascinating [{'id': 5, 'topic': 'art', 'location': 'museum'}]
    * the library hosts a weekly story time for kids [{'id': 9, 'topic': 'reading', 'location': 'library'}]

如果提供包含多个字段但没有运算符的字典,则顶级解释为逻辑 AND 筛选器:

vector_store.similarity_search(
    "ducks",
    k=10,
    filter={"id": {"$in": [1, 5, 2, 9]}, "location": {"$in": ["pond", "market"]}},
)
[Document(id='2', metadata={'id': 2, 'topic': 'animals', 'location': 'pond'}, page_content='ducks are also found in the pond'),
 Document(id='1', metadata={'id': 1, 'topic': 'animals', 'location': 'pond'}, page_content='there are cats in the pond')]
vector_store.similarity_search(
    "ducks",
    k=10,
    filter={
        "$and": [
            {"id": {"$in": [1, 5, 2, 9]}},
            {"location": {"$in": ["pond", "market"]}},
        ]
    },
)
[Document(id='2', metadata={'id': 2, 'topic': 'animals', 'location': 'pond'}, page_content='ducks are also found in the pond'),
 Document(id='1', metadata={'id': 1, 'topic': 'animals', 'location': 'pond'}, page_content='there are cats in the pond')]

如果要执行相似性搜索并接收相应的分数,可以运行:

results = vector_store.similarity_search_with_score(query="cats", k=1)
for doc, score in results:
    print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.528338] there are cats in the pond [{'id': 1, 'topic': 'animals', 'location': 'pond'}]

有关可以在PGVector 矢量存储上执行的搜索的完整列表,请参阅API 参考

转换为数据检索器

还可以将矢量存储转换为检索器,以便更轻松地在链中使用:

retriever = vector_store.as_retriever(search_type="mmr", search_kwargs={"k": 1})
retriever.invoke("kitty")
[Document(id='1', metadata={'id': 1, 'topic': 'animals', 'location': 'pond'}, page_content='there are cats in the pond')]

当前限制

  • langchain_postgres 仅适用于 Psycopg 3 (psycopg3)。 将连接字符串从postgresql+psycopg2://...更新到postgresql+psycopg://langchain:langchain@...
  • 嵌入存储和集合的架构已更改,以确保 add_documents 可以正确处理用户指定的 ID。
  • 现在您必须传递一个显式的连接对象。
  • 目前,没有支持在架构更改上轻松迁移数据 的机制 。 矢量存储中的任何架构更改都需要重新创建表并再次添加文档。