Skip to main content

流式传输

¥Streaming

流式传输对于增强基于 LLMs 构建的应用的响应能力至关重要。通过逐步显示输出(即使在完整响应准备好之前),流式传输可以显著提升用户体验 (UX),尤其是在处理 LLM 延迟时。

¥Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.

概述

¥Overview

LLMs 生成完整响应通常会导致几秒钟的延迟,这在包含多个模型调用的复杂应用中会更加明显。幸运的是,LLM 会迭代生成响应,从而允许在生成过程中显示中间结果。通过流式传输这些中间输出,LangChain 可以在基于 LLM 的应用中实现更流畅的用户体验,并在其设计核心中提供内置的流式传输支持。

¥Generating full responses from LLMs often incurs a delay of several seconds, which becomes more noticeable in complex applications with multiple model calls. Fortunately, LLMs generate responses iteratively, allowing for intermediate results to be displayed as they are produced. By streaming these intermediate outputs, LangChain enables smoother UX in LLM-powered apps and offers built-in support for streaming at the core of its design.

在本指南中,我们将讨论 LLM 应用中的流式传输,并探索 LangChain 的流式传输 API 如何促进应用中各个组件的实时输出。

¥In this guide, we'll discuss streaming in LLM applications and explore how LangChain's streaming APIs facilitate real-time output from various components in your application.

在 LLM 应用中要流式传输什么内容

¥What to stream in LLM applications

在涉及 LLM 的应用中,可以对多种类型的数据进行流式传输,以通过减少感知延迟和提高透明度来改善用户体验。这些包括:

¥In applications involving LLMs, several types of data can be streamed to improve user experience by reducing perceived latency and increasing transparency. These include:

1. LLM 流式传输输出

¥ Streaming LLM outputs

最常见和最关键的流式数据是 LLM 本身生成的输出。LLM 通常需要一些时间来生成完整的响应,通过实时流式传输输出,用户可以在生成结果时看到部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

¥The most common and critical data to stream is the output generated by the LLM itself. LLMs often take time to generate full responses, and by streaming the output in real-time, users can see partial results as they are produced. This provides immediate feedback and helps reduce the wait time for users.

2. 流式传输管道或工作流进度

¥ Streaming pipeline or workflow progress

除了流式传输 LLM 输出之外,它还可以通过更复杂的工作流或管道流式传输进度,让用户了解应用的整体进度。这可能包括:

¥Beyond just streaming LLM output, it’s useful to stream progress through more complex workflows or pipelines, giving users a sense of how the application is progressing overall. This could include:

  • 在 LangGraph 工作流中:在 LangGraph 中,工作流由代表各个步骤的节点和边组成。此处的流式传输涉及在各个节点请求更新时跟踪图形状态的变化。这允许更精细地监控工作流中当前处于活动状态的节点,并在工作流经历不同阶段时提供实时状态更新。

    ¥In LangGraph Workflows: With LangGraph, workflows are composed of nodes and edges that represent various steps. Streaming here involves tracking changes to the graph state as individual nodes request updates. This allows for more granular monitoring of which node in the workflow is currently active, giving real-time updates about the status of the workflow as it progresses through different stages.

  • 在 LCEL 管道中:来自 LCEL 管道的流式更新涉及捕获各个子可运行对象的进度。例如,随着管道不同步骤或组件的执行,你可以流式传输当前正在运行的子可运行程序,从而实时洞察整个管道的进度。

    ¥In LCEL Pipelines: Streaming updates from an LCEL pipeline involves capturing progress from individual sub-runnables. For example, as different steps or components of the pipeline execute, you can stream which sub-runnable is currently running, providing real-time insight into the overall pipeline's progress.

流式传输管道或工作流进度对于让用户清晰地了解应用在执行过程中的位置至关重要。

¥Streaming pipeline or workflow progress is essential in providing users with a clear picture of where the application is in the execution process.

3. 流式传输自定义数据

¥ Streaming custom data

在某些情况下,你可能需要流式传输超出管道或工作流结构所提供信息的自定义数据。此自定义信息将注入工作流中的特定步骤,无论该步骤是工具还是 LangGraph 节点。例如,你可以通过 LangGraph 节点流式传输有关工具实时运行情况或进度的更新。这些直接从步骤内部发出的细粒度数据可以更详细地洞察工作流的执行情况,在需要更多可见性的复杂流程中尤其有用。

¥In some cases, you may need to stream custom data that goes beyond the information provided by the pipeline or workflow structure. This custom information is injected within a specific step in the workflow, whether that step is a tool or a LangGraph node. For example, you could stream updates about what a tool is doing in real-time or the progress through a LangGraph node. This granular data, which is emitted directly from within the step, provides more detailed insights into the execution of the workflow and is especially useful in complex processes where more visibility is needed.

流式传输 API

¥Streaming APIs

LangChain 提供两种用于实时流式输出的主要 API。任何实现 Runnable 接口 的组件都支持这些 API,包括 LLMs编译的 LangGraph 图表 以及任何使用 LCEL 生成的 Runnable。

¥LangChain two main APIs for streaming output in real-time. These APIs are supported by any component that implements the Runnable Interface, including LLMs, compiled LangGraph graphs, and any Runnable generated with LCEL.

  1. stream:用于流式传输单个 Runnable(例如,聊天模型)生成的输出,或流式传输使用 LangGraph 创建的任何工作流。

    ¥stream: Use to stream outputs from individual Runnables (e.g., a chat model) as they are generated or stream any workflow created with LangGraph.

  2. streamEvents:使用此 API 可访问完全使用 LCEL 构建的 LLM 应用中的自定义事件和中间输出。请注意,此 API 可用,但在使用 LangGraph 时不需要。

    ¥streamEvents: Use this API to get access to custom events and intermediate outputs from LLM applications built entirely with LCEL. Note that this API is available, but not needed when working with LangGraph.

note

此外,还有一个旧版 streamLog API。不建议将此 API 用于新项目,因为它比其他流式 API 更复杂且功能不够丰富。

¥In addition, there is a legacy streamLog API. This API is not recommended for new projects it is more complex and less feature-rich than the other streaming APIs.

stream()

stream() 方法返回一个迭代器,该迭代器在生成输出块时同步生成输出块。你可以使用 for await 循环实时处理每个块。例如,使用 LLM 时,这允许输出在生成时以增量方式进行流式传输,从而减少用户的等待时间。

¥The stream() method returns an iterator that yields chunks of output synchronously as they are produced. You can use a for await loop to process each chunk in real-time. For example, when using an LLM, this allows the output to be streamed incrementally as it is generated, reducing the wait time for users.

stream() 方法生成的块类型取决于正在流式传输的组件。例如,从 LLM 进行流式传输时,每个组件都将是 AIMessageChunk;但是,对于其他组件,数据块可能有所不同。

¥The type of chunk yielded by the stream() methods depends on the component being streamed. For example, when streaming from an LLM each component will be an AIMessageChunk; however, for other components, the chunk may be different.

stream() 方法返回一个迭代器,该迭代器在生成这些块时生成输出块。例如,

¥The stream() method returns an iterator that yields these chunks as they are produced. For example,

for await (const chunk of await component.stream(someInput)) {
// IMPORTANT: Keep the processing of each chunk as efficient as possible.
// While you're processing the current chunk, the upstream component is
// waiting to produce the next one. For example, if working with LangGraph,
// graph execution is paused while the current chunk is being processed.
// In extreme cases, this could even result in timeouts (e.g., when llm outputs are
// streamed from an API that has a timeout).
console.log(chunk);
}

与聊天模型结合使用

¥Usage with chat models

stream() 与聊天模型一起使用时,输出将以 AIMessageChunks 的形式进行流式传输,因为它是由 LLM 生成的。这允许你在生成 LLM 的输出时以增量方式呈现或处理它,这在交互式应用或界面中尤其有用。

¥When using stream() with chat models, the output is streamed as AIMessageChunks as it is generated by the LLM. This allows you to present or process the LLM's output incrementally as it's being produced, which is particularly useful in interactive applications or interfaces.

与 LangGraph 结合使用

¥Usage with LangGraph

LangGraph 编译图是 Runnables,并支持标准流式 API。

¥LangGraph compiled graphs are Runnables and support the standard streaming APIs.

将流和方法与 LangGraph 一起使用时,你可以使用一个或多个 流式传输模式 来控制流式传输的输出类型。可用的流模式包括:

¥When using the stream and methods with LangGraph, you can one or more streaming mode which allow you to control the type of output that is streamed. The available streaming modes are:

  • "values":每一步都发出 state 的所有值。

    ¥"values": Emit all values of the state for each step.

  • "updates":每一步后仅发出节点名称和节点返回的更新。

    ¥"updates": Emit only the node name(s) and updates that were returned by the node(s) after each step.

  • "debug":每一步都发出调试事件。

    ¥"debug": Emit debug events for each step.

  • "messages":发出 LLM messages token-by-token

    ¥"messages": Emit LLM messages token-by-token.

有关连接到 Couchbase 集群的更多信息,请查看 。

¥For more information, please see:

与 LCEL 结合使用

¥Usage with LCEL

如果你使用 LangChain 的表达式语言 (LCEL) 编写多个 Runnable,则 stream() 方法将按照惯例流式传输链中最后一步的输出。这允许最终处理结果以增量方式流式传输。LCEL 尝试优化管道中的流式传输延迟,以便尽快获得上一步的流式传输结果。

¥If you compose multiple Runnables using LangChain’s Expression Language (LCEL), the stream() methods will, by convention, stream the output of the last step in the chain. This allows the final processed result to be streamed incrementally. LCEL tries to optimize streaming latency in pipelines such that the streaming results from the last step are available as soon as possible.

streamEvents

tip

使用 streamEvents API 访问完全使用 LCEL 构建的 LLM 应用的自定义数据和中间输出。

¥Use the streamEvents API to access custom data and intermediate outputs from LLM applications built entirely with LCEL.

虽然此 API 也可用于 LangGraph,但在使用 LangGraph 时通常不需要,因为 stream 方法为 LangGraph 图提供了全面的流式传输功能。

¥While this API is available for use with LangGraph as well, it is usually not necessary when working with LangGraph, as the stream methods provide comprehensive streaming capabilities for LangGraph graphs.

对于使用 LCEL 构建的链,.stream() 方法仅从链中流式传输最后一步的输出。对于某些应用来说,这可能已经足够了,但当你构建由多个 LLM 调用组成的更复杂的链时,你可能需要将链的中间值与最终输出一起使用。例如,在构建文档聊天应用时,你可能希望将源数据与最终生成数据一起返回。

¥For chains constructed using LCEL, the .stream() method only streams the output of the final step from te chain. This might be sufficient for some applications, but as you build more complex chains of several LLM calls together, you may want to use the intermediate values of the chain alongside the final output. For example, you may want to return sources alongside the final generation when building a chat-over-documents app.

有很多方法可以实现这一点,例如 使用回调,或者通过构建链式调用,将中间值传递到最后,类似于链式 .assign() 调用。但 LangChain 还包含一个 .streamEvents() 方法,它将回调的灵活性与 .stream() 的人机工程学相结合。调用时,它会返回一个迭代器,该迭代器生成 各种类型的事件,你可以根据项目需求对其进行过滤和处理。

¥There are ways to do this using callbacks, or by constructing your chain in such a way that it passes intermediate values to the end with something like chained .assign() calls, but LangChain also includes an .streamEvents() method that combines the flexibility of callbacks with the ergonomics of .stream(). When called, it returns an iterator which yields various types of events that you can filter and process according to the needs of your project.

以下是一个小示例,它仅打印包含流聊天模型输出的事件:

¥Here's one small example that prints just events containing streamed chat model output:

import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { ChatAnthropic } from "@langchain/anthropic";

const model = new ChatAnthropic({ model: "claude-3-sonnet-20240229" });

const prompt = ChatPromptTemplate.fromTemplate("tell me a joke about {topic}");
const parser = StringOutputParser();
const chain = prompt.pipe(model).pipe(parser);

for await (const event of await chain.streamEvents(
{ topic: "parrot" },
{ version: "v2" }
)) {
if (event.event === "on_chat_model_stream") {
console.log(event);
}
}

你可以粗略地将其视为回调事件的迭代器(尽管格式不同) - 几乎所有 LangChain 组件都可以使用它!

¥You can roughly think of it as an iterator over callback events (though the format differs) - and you can use it on almost all LangChain components!

有关如何使用 .streamEvents() 的更多详细信息,包括列出可用事件的表格,请参阅 此指南

¥See this guide for more detailed information on how to use .streamEvents(), including a table listing available events.

将自定义数据写入流

¥Writing custom data to the stream

要将自定义数据写入流,你需要根据所使用的组件选择以下方法之一:

¥To write custom data to the stream, you will need to choose one of the following methods based on the component you are working with:

  1. dispatch_events 可用于编写自定义数据,这些数据将通过 streamEvents API 显示出来。有关更多信息,请参阅 如何调度自定义回调事件

    ¥dispatch_events can be used to write custom data that will be surfaced through the streamEvents API. See how to dispatch custom callback events for more information.

"自动流式传输" 聊天模型

¥"Auto-Streaming" Chat Models

LangChain 通过在某些情况下自动启用流模式(即使你未明确调用流方法),简化了来自 聊天模型 的流式传输。当你使用非流式 invoke 方法但仍希望流式传输整个应用(包括聊天模型的中间结果)时,这一点尤其有用。

¥LangChain simplifies streaming from chat models by automatically enabling streaming mode in certain cases, even when you're not explicitly calling the streaming methods. This is particularly useful when you use the non-streaming invoke method but still want to stream the entire application, including intermediate results from the chat model.

工作原理

¥How It Works

当你在聊天模型上调用 invoke 方法时,如果 LangChain 检测到你正在尝试对整个应用进行流式传输,它将自动切换到流式传输模式。

¥When you call the invoke method on a chat model, LangChain will automatically switch to streaming mode if it detects that you are trying to stream the overall application.

在底层,invoke 将使用 stream 方法生成其输出。就使用 invoke 的代码而言,调用的结果将相同;但是,在聊天模型进行流式传输时,LangChain 将负责在 LangChain 的 回调系统 中调用 on_llm_new_token 事件。这些回调事件允许 LangGraph streamstreamEvents 实时显示聊天模型的输出。

¥Under the hood, it'll have invoke use the stream method to generate its output. The result of the invocation will be the same as far as the code that was using invoke is concerned; however, while the chat model is being streamed, LangChain will take care of invoking on_llm_new_token events in LangChain's callback system. These callback events allow LangGraph stream and streamEvents to surface the chat model's output in real-time.

示例:

¥Example:

const node = (state) => {
...
// The code below uses the invoke method, but LangChain will
// automatically switch to streaming mode
// when it detects that the overall
// application is being streamed.
ai_message = model.invoke(state["messages"])
...

for await (const chunk of await compiledGraph.stream(..., { streamMode: "messages" })) {
// ... do something
}
}

¥Related Resources

有关 LangChain 中流式传输的具体示例,请参阅以下操作指南:

¥Please see the following how-to guides for specific examples of streaming in LangChain:

有关将自定义数据写入流,请参阅以下资源:

¥For writing custom data to the stream, please see the following resources: