Встроенный middleware LangChain | Курс LangChain Agents урок 5
Цель урока: научиться использовать встроенный middleware для защиты агента от бесконечных циклов, сбоев инструментов и переполнения контекста.
Теория
Где встраивается middleware
В уроке 4 вы написали кастомные хуки @before_model и @after_model. Это
node-style хуки, они запускаются в конкретных точках цикла агента. Но есть и
второй стиль, wrap-style, он оборачивает сам вызов модели или инструмента.
Начало выполнения
→ before_agent (один раз)
→ перед каждым вызовом модели:
→ before_model (node-style)
→ [wrap_model_call] (wrap-style: оборачивает сам вызов)
← after_model (node-style)
→ перед каждым вызовом инструмента:
→ [wrap_tool_call] (wrap-style: оборачивает сам вызов)
← after_agent (один раз)
Конец выполнения
Node-style хуки видят состояние до или после вызова. Wrap-style хуки управляют самим вызовом, они могут повторить его, пропустить или заменить ответ.
Встроенный middleware
LangChain включает готовые middleware для самых частых задач. Их не нужно
реализовывать самому, достаточно сконфигурировать и передать в create_agent.
Примеры кода
ModelCallLimitMiddleware: защита от бесконечного цикла
Агент может застрять в петле, когда модель раз за разом вызывает инструменты
без прогресса. ModelCallLimitMiddleware ограничивает число вызовов модели.
import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ModelCallLimitMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
load_dotenv()
@tool
def search_web(query: str) -> str:
"""Поиск в интернете."""
return f"Результаты поиска по запросу: {query}"
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=[search_web],
checkpointer=InMemorySaver(), # нужен для thread_limit
middleware=[
ModelCallLimitMiddleware(
run_limit=5, # не более 5 вызовов за один invoke
thread_limit=20, # не более 20 вызовов за всю сессию
exit_behavior="end", # завершить без ошибки
),
],
)
response = agent.invoke(
{"messages": [{"role": "user", "content": "Подробно исследуй тему LangChain."}]},
{"configurable": {"thread_id": "session-1"}},
)
print(response["messages"][-1].content)
run_limit - лимит на один вызов invoke().
thread_limit - лимит за всю
историю треда (требует checkpointer).
exit_behavior="end" завершает агента без исключения. Можно поставить "error", чтобы поймать исключение.
ToolRetryMiddleware: повтор при сбое инструмента
Внешние API падают, а сеть может сбоить. ToolRetryMiddleware автоматически
повторяет вызов инструмента с нарастающей задержкой.
import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ToolRetryMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool
load_dotenv()
@tool
def get_stock_price(ticker: str) -> str:
"""Получить цену акции."""
# Симуляция нестабильного API
import random
if random.random() < 0.5:
raise ConnectionError("API временно недоступен")
return f"Цена {ticker}: $150.00"
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=[get_stock_price],
middleware=[
ToolRetryMiddleware(
max_retries=3,
backoff_factor=2.0, # задержка: 1с, 2с, 4с
initial_delay=1.0,
on_failure="continue", # отдать ошибку модели, не упасть
),
],
)
response = agent.invoke({
"messages": [{"role": "user", "content": "Какая цена акции AAPL?"}]
})
print(response["messages"][-1].content)
Параметр on_failure="continue" означает: если все попытки исчерпаны,
вернуть сообщение об ошибке в контекст. Модель увидит ошибку и сможет
отреагировать, например предложить альтернативу. При on_failure="error"
агент упадёт с исключением.
SummarizationMiddleware: контекст не переполняется
Длинные исследовательские сессии упираются в лимит контекста модели. Когда
сообщений становится много, старые обрезаются, агент "забывает" что делал.
SummarizationMiddleware автоматически сжимает историю с помощью промежуточной
модели, сохраняя суть.
import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool
load_dotenv()
@tool
def search_web(query: str) -> str:
"""Поиск информации в интернете."""
return f"Найдено по запросу '{query}': подробная информация..."
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
summary_model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=[search_web],
middleware=[
SummarizationMiddleware(
model=summary_model,
trigger=("messages", 10), # сжать когда сообщений >= 10
keep=("messages", 4), # оставить 4 последних
),
],
)
response = agent.invoke({
"messages": [{"role": "user", "content": "Расскажи подробно об истории AI."}]
})
print(response["messages"][-1].content)
trigger и keep принимают кортеж из типа и значения. Три доступных типа:
("messages", 10)- по количеству сообщений("tokens", 4000)- по числу токенов("fraction", 0.8)- по доле контекстного окна модели
Для trigger можно передать список кортежей, тогда суммаризация сработает
при выполнении любого из условий.
ModelFallbackMiddleware: запасные модели
Если основная модель недоступна или вернула ошибку, ModelFallbackMiddleware
переключится на следующую из списка.
import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ModelFallbackMiddleware
from langchain_openai import ChatOpenAI
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=[],
middleware=[
ModelFallbackMiddleware(
"openai:gpt-4o-mini",
"anthropic:claude-haiku-4-5",
),
],
)
Модели fallback передаются как строки. Если основная модель упала, агент пробует первую fallback, затем вторую и так далее.
Wrap-style хук: @wrap_tool_call
Для более тонкого контроля над вызовом инструмента используется wrap-style хук. В отличие от node-style, он управляет самим вызовом и может перехватить запрос до отправки, повторить его или вернуть кешированный ответ.
import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call, ModelRequest, ModelResponse
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from typing import Callable
load_dotenv()
@tool
def search_web(query: str) -> str:
"""Поиск в интернете."""
return f"Результаты: {query}"
@wrap_tool_call
def log_tool_calls(request, handler: Callable):
"""Логирует каждый вызов инструмента."""
tool_name = request.tool_call["name"]
args = request.tool_call["args"]
print(f" [tool] {tool_name}({args})")
result = handler(request)
print(f" [tool] результат: {str(result)[:60]}")
return result
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=[search_web],
middleware=[log_tool_calls],
)
agent.invoke({
"messages": [{"role": "user", "content": "Найди информацию о LangGraph."}]
})
handler(request) это вызов настоящего инструмента. Можно не вызывать его
(пропустить), вызвать несколько раз (повторить) или вернуть другое значение.
Сквозной проект: добавляем summarization агенту-аналитику
import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import (
SummarizationMiddleware,
ModelCallLimitMiddleware,
ToolRetryMiddleware,
before_model,
)
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.runtime import Runtime
from typing import Any
load_dotenv()
SERVER_PATH = str(Path(__file__).parent.parent / "agents_course" / "tools_server.py")
@before_model
def log_step(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f" [шаг] сообщений в стейте: {len(state['messages'])}")
return None
async def main():
client = MultiServerMCPClient({
"analyst_tools": {
"transport": "stdio",
"command": sys.executable,
"args": [SERVER_PATH],
}
})
tools = await client.get_tools()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
summary_model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
agent = create_agent(
model=model,
tools=tools,
checkpointer=InMemorySaver(),
middleware=[
log_step,
SummarizationMiddleware(
model=summary_model,
trigger=("messages", 12),
keep=("messages", 4),
),
ModelCallLimitMiddleware(run_limit=10, exit_behavior="end"),
ToolRetryMiddleware(max_retries=2, initial_delay=0.5),
],
)
config = {"configurable": {"thread_id": "research-1"}}
response = await agent.ainvoke(
{"messages": [{"role": "user", "content": "Найди информацию об Anthropic."}]},
config,
)
print(response["messages"][-1].content)
if __name__ == "__main__":
asyncio.run(main())
Частые ошибки
thread_limit без checkpointer
# Неправильно: thread_limit требует checkpointer для хранения счётчика
agent = create_agent(
model=model,
tools=[],
middleware=[ModelCallLimitMiddleware(thread_limit=10)],
)
# Правильно: добавить checkpointer
agent = create_agent(
model=model,
tools=[],
checkpointer=InMemorySaver(),
middleware=[ModelCallLimitMiddleware(thread_limit=10)],
)
SummarizationMiddleware с trigger=("fraction", ...) без профиля модели
# Может не сработать: fraction требует знать размер контекстного окна модели
SummarizationMiddleware(model=summary_model, trigger=("fraction", 0.8))
# Надёжнее: абсолютные значения токенов или сообщений
SummarizationMiddleware(model=summary_model, trigger=("tokens", 4000))
SummarizationMiddleware(model=summary_model, trigger=("messages", 10))
Порядок middleware при суммаризации
# Неправильно: limit стоит до summarization, агент завершится раньше чем
# суммаризация успеет сжать контекст
middleware=[ModelCallLimitMiddleware(run_limit=5), SummarizationMiddleware(...)]
# Правильно: summarization первым, limit последним
middleware=[SummarizationMiddleware(...), ModelCallLimitMiddleware(run_limit=5)]
Задание
Возьмите агента из урока 4. Добавьте три middleware:
SummarizationMiddlewareс триггером по количеству сообщенийModelCallLimitMiddlewareсrun_limit=8ToolRetryMiddlewareсmax_retries=2
Проведите длинный диалог из 5-6 запросов. Убедитесь в LangSmith, что суммаризация сработала и число вызовов не превысило лимит.
Подписывайтесь на мой Telegram канал
Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия
Авторизуйтесь, чтобы оставить комментарий.
Нет комментариев.
Тут может быть ваша реклама
Пишите info@aisferaic.ru