Skip to main content

如何重新索引数据以使向量存储与底层数据源保持同步

¥How to reindex data to keep your vectorstore in-sync with the underlying data source

Prerequisites

本指南假设你熟悉以下概念:

¥This guide assumes familiarity with the following concepts:

这里,我们将使用 LangChain 索引 API 来了解一个基本的索引工作流程。

¥Here, we will look at a basic indexing workflow using the LangChain indexing API.

索引 API 允许你将来自任何来源的文档加载到向量存储中并保持同步。具体来说,它有助于:

¥The indexing API lets you load and keep in sync documents from any source into a vector store. Specifically, it helps:

  • 避免将重复的内容写入向量存储

    ¥Avoid writing duplicated content into the vector store

  • 避免重写未更改的内容

    ¥Avoid re-writing unchanged content

  • 避免对未更改的内容重新计算嵌入

    ¥Avoid re-computing embeddings over unchanged content

所有这些都可以节省你的时间和金钱,并改善你的向量搜索结果。

¥All of which should save you time and money, as well as improve your vector search results.

至关重要的是,即使文档相对于原始源文档经过了多个转换步骤(例如,通过文本分块),索引 API 也能正常工作。

¥Crucially, the indexing API will work even with documents that have gone through several transformation steps (e.g., via text chunking) with respect to the original source documents.

工作原理

¥How it works

LangChain 索引使用记录管理器 (RecordManager) 来跟踪文档写入向量存储的过程。

¥LangChain indexing makes use of a record manager (RecordManager) that keeps track of document writes into the vector store.

在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中:

¥When indexing content, hashes are computed for each document, and the following information is stored in the record manager:

  • 文档哈希值(页面内容和元数据的哈希值)

    ¥the document hash (hash of both page content and metadata)

  • 写入时间

    ¥write time

  • 源 ID - 每个文档都应在其元数据中包含信息,以便我们确定该文档的最终来源。

    ¥the source ID - each document should include information in its metadata to allow us to determine the ultimate source of this document

删除模式

¥Deletion Modes

将文档索引到向量存储时,可能需要删除向量存储中的某些现有文档。在某些情况下,你可能需要删除与正在索引的新文档来自同一来源的任何现有文档。在其他情况下,你可能希望批量删除所有现有文档。索引 API 的删除模式允许你选择所需的行为:

¥When indexing documents into a vector store, it's possible that some existing documents in the vector store should be deleted. In certain situations you may want to remove any existing documents that are derived from the same sources as the new documents being indexed. In others you may want to delete all existing documents wholesale. The indexing API deletion modes let you pick the behavior you want:

Cleanup ModeDe-Duplicates ContentParallelizableCleans Up Deleted Source DocsCleans Up Mutations of Source Docs and/or Derived DocsClean Up Timing
None-
IncrementalContinuously
FullAt end of indexing

None 不执行任何自动清理,允许用户手动清理旧内容。

¥None does not do any automatic clean up, allowing the user to manually do clean up of old content.

incrementalfull 提供以下自动清理功能:

¥incremental and full offer the following automated clean up:

  • 如果源文档或派生文档的内容已更改,incrementalfull 模式都会清理(删除)内容的先前版本。

    ¥If the content of the source document or derived documents has changed, both incremental or full modes will clean up (delete) previous versions of the content.

  • 如果源文档已被删除(即它不包含在当前正在索引的文档中),则完全清理模式将正确地将其从向量存储中删除,但 incremental 模式则不会。

    ¥If the source document has been deleted (meaning it is not included in the documents currently being indexed), the full cleanup mode will delete it from the vector store correctly, but the incremental mode will not.

当内容发生变异(例如,源 PDF 文件已修改)时,在索引过程中会有一段时间,新旧版本都可能返回给用户。这发生在新内容写入之后,但在旧版本删除之前。

¥When content is mutated (e.g., the source PDF file was revised) there will be a period of time during indexing when both the new and old versions may be returned to the user. This happens after the new content was written, but before the old version was deleted.

  • incremental 索引可以最大限度地减少这段时间,因为它能够在写入时持续进行清理。

    ¥incremental indexing minimizes this period of time as it is able to do clean up continuously, as it writes.

  • full 模式在所有批次写入后进行清理。

    ¥full mode does the clean up after all batches have been written.

要求

¥Requirements

  1. 请勿与已独立于索引 API 预先填充内容的存储一起使用,因为记录管理器将不知道之前已插入记录。

    ¥Do not use with a store that has been pre-populated with content independently of the indexing API, as the record manager will not know that records have been inserted previously.

  2. 仅适用于支持以下内容的 LangChain vectorstore:a).按 ID 添加文档(addDocuments 方法,带 ids 参数)b)。按 ID 删除(delete 方法,带 ids 参数)

    ¥Only works with LangChain vectorstore's that support: a). document addition by id (addDocuments method with ids argument) b). delete by id (delete method with ids argument)

兼容的 Vectorstore:PGVector, Chroma, CloudflareVectorize, ElasticVectorSearch, FAISS, MariaDB, MomentoVectorIndex, Pinecone, SupabaseVectorStore, VercelPostgresVectorStore, Weaviate, Xata

¥Compatible Vectorstores: PGVector, Chroma, CloudflareVectorize, ElasticVectorSearch, FAISS, MariaDB, MomentoVectorIndex, Pinecone, SupabaseVectorStore, VercelPostgresVectorStore, Weaviate, Xata

警告

¥Caution

记录管理器依靠基于时间的机制来确定哪些内容可以清理(使用 fullincremental 清理模式时)。

¥The record manager relies on a time-based mechanism to determine what content can be cleaned up (when using full or incremental cleanup modes).

如果两个任务连续运行,并且第一个任务在时钟时间更改之前完成,则第二个任务可能无法清理内容。

¥If two tasks run back-to-back, and the first task finishes before the clock time changes, then the second task may not be able to clean up content.

由于以下原因,这在实际设置中不太可能成为问题:

¥This is unlikely to be an issue in actual settings for the following reasons:

  1. RecordManager 使用更高分辨率的时间戳。

    ¥The RecordManager uses higher resolution timestamps.

  2. 数据需要在第一次任务和第二次任务运行之间进行更改,如果任务之间的时间间隔较短,则不太可能发生更改。

    ¥The data would need to change between the first and the second tasks runs, which becomes unlikely if the time interval between the tasks is small.

  3. 索引任务通常需要几毫秒以上的时间。

    ¥Indexing tasks typically take more than a few ms.

快速入门

¥Quickstart

import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";

// First, follow set-up instructions at
// https://js.langchain.com/docs/modules/indexes/vector_stores/integrations/pgvector

const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};

const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);

// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);

// Create the schema if it doesn't exist
await recordManager.createSchema();

// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};

const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};

/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}

// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.

console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Second time around all content will be skipped

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// Updated content will be added, but old won't be deleted

const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};

console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/

/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/

// Incremental mode
await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Indexing again should result in both documents getting skipped – also skipping the embedding operation!

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!

const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/

// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.

// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!

// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Say someone deleted the first doc:

const doc2Only = [doc2];

// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!

console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/

await clear();

const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};

const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};

const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});

const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/

console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];

console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/

// Usage with document loaders

// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}

await clear();

const loader = new MyCustomDocumentLoader();

console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Closing resources
await recordManager.end();
await vectorStore.end();

API Reference:

后续步骤

¥Next steps

现在你已经了解了如何在 RAG 管道中使用索引。

¥You've now learned how to use indexing in your RAG pipelines.

接下来,查看关于检索的其他部分。

¥Next, check out some of the other sections on retrieval.