LangChain

Встроенный middleware LangChain | Курс LangChain Agents урок 5

Встроенный middleware LangChain | Курс LangChain Agents урок 5
Mikhail
Автор
Mikhail
Опубликовано 23.03.2026
0,0
Views 3

Цель урока: научиться использовать встроенный middleware для защиты агента от бесконечных циклов, сбоев инструментов и переполнения контекста.


Теория

Где встраивается middleware

В уроке 4 вы написали кастомные хуки @before_model и @after_model. Это node-style хуки, они запускаются в конкретных точках цикла агента. Но есть и второй стиль, wrap-style, он оборачивает сам вызов модели или инструмента.

Начало выполнения
  → before_agent          (один раз)
  → перед каждым вызовом модели:
      → before_model      (node-style)
      → [wrap_model_call] (wrap-style: оборачивает сам вызов)
      ← after_model       (node-style)
  → перед каждым вызовом инструмента:
      → [wrap_tool_call]  (wrap-style: оборачивает сам вызов)
  ← after_agent           (один раз)
Конец выполнения

Node-style хуки видят состояние до или после вызова. Wrap-style хуки управляют самим вызовом, они могут повторить его, пропустить или заменить ответ.

Встроенный middleware

LangChain включает готовые middleware для самых частых задач. Их не нужно реализовывать самому, достаточно сконфигурировать и передать в create_agent.


Примеры кода

ModelCallLimitMiddleware: защита от бесконечного цикла

Агент может застрять в петле, когда модель раз за разом вызывает инструменты без прогресса. ModelCallLimitMiddleware ограничивает число вызовов модели.

import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ModelCallLimitMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langgraph.checkpoint.memory import InMemorySaver

load_dotenv()

@tool
def search_web(query: str) -> str:
    """Поиск в интернете."""
    return f"Результаты поиска по запросу: {query}"

model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

agent = create_agent(
    model=model,
    tools=[search_web],
    checkpointer=InMemorySaver(),  # нужен для thread_limit
    middleware=[
        ModelCallLimitMiddleware(
            run_limit=5,       # не более 5 вызовов за один invoke
            thread_limit=20,   # не более 20 вызовов за всю сессию
            exit_behavior="end",  # завершить без ошибки
        ),
    ],
)

response = agent.invoke(
    {"messages": [{"role": "user", "content": "Подробно исследуй тему LangChain."}]},
    {"configurable": {"thread_id": "session-1"}},
)
print(response["messages"][-1].content)

run_limit - лимит на один вызов invoke().
thread_limit - лимит за всю историю треда (требует checkpointer).
exit_behavior="end" завершает агента без исключения. Можно поставить "error", чтобы поймать исключение.


ToolRetryMiddleware: повтор при сбое инструмента

Внешние API падают, а сеть может сбоить. ToolRetryMiddleware автоматически повторяет вызов инструмента с нарастающей задержкой.

import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ToolRetryMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool

load_dotenv()

@tool
def get_stock_price(ticker: str) -> str:
    """Получить цену акции."""
    # Симуляция нестабильного API
    import random
    if random.random() < 0.5:
        raise ConnectionError("API временно недоступен")
    return f"Цена {ticker}: $150.00"

model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

agent = create_agent(
    model=model,
    tools=[get_stock_price],
    middleware=[
        ToolRetryMiddleware(
            max_retries=3,
            backoff_factor=2.0,   # задержка: 1с, 2с, 4с
            initial_delay=1.0,
            on_failure="continue",  # отдать ошибку модели, не упасть
        ),
    ],
)

response = agent.invoke({
    "messages": [{"role": "user", "content": "Какая цена акции AAPL?"}]
})
print(response["messages"][-1].content)

Параметр on_failure="continue" означает: если все попытки исчерпаны, вернуть сообщение об ошибке в контекст. Модель увидит ошибку и сможет отреагировать, например предложить альтернативу. При on_failure="error" агент упадёт с исключением.


SummarizationMiddleware: контекст не переполняется

Длинные исследовательские сессии упираются в лимит контекста модели. Когда сообщений становится много, старые обрезаются, агент "забывает" что делал. SummarizationMiddleware автоматически сжимает историю с помощью промежуточной модели, сохраняя суть.

import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
from langchain_openai import ChatOpenAI
from langchain.tools import tool

load_dotenv()

@tool
def search_web(query: str) -> str:
    """Поиск информации в интернете."""
    return f"Найдено по запросу '{query}': подробная информация..."

model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
summary_model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

agent = create_agent(
    model=model,
    tools=[search_web],
    middleware=[
        SummarizationMiddleware(
            model=summary_model,
            trigger=("messages", 10),  # сжать когда сообщений >= 10
            keep=("messages", 4),      # оставить 4 последних
        ),
    ],
)

response = agent.invoke({
    "messages": [{"role": "user", "content": "Расскажи подробно об истории AI."}]
})
print(response["messages"][-1].content)

trigger и keep принимают кортеж из типа и значения. Три доступных типа:

  • ("messages", 10) - по количеству сообщений
  • ("tokens", 4000) - по числу токенов
  • ("fraction", 0.8) - по доле контекстного окна модели

Для trigger можно передать список кортежей, тогда суммаризация сработает при выполнении любого из условий.


ModelFallbackMiddleware: запасные модели

Если основная модель недоступна или вернула ошибку, ModelFallbackMiddleware переключится на следующую из списка.

import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import ModelFallbackMiddleware
from langchain_openai import ChatOpenAI

load_dotenv()

model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

agent = create_agent(
    model=model,
    tools=[],
    middleware=[
        ModelFallbackMiddleware(
            "openai:gpt-4o-mini",
            "anthropic:claude-haiku-4-5",
        ),
    ],
)

Модели fallback передаются как строки. Если основная модель упала, агент пробует первую fallback, затем вторую и так далее.


Wrap-style хук: @wrap_tool_call

Для более тонкого контроля над вызовом инструмента используется wrap-style хук. В отличие от node-style, он управляет самим вызовом и может перехватить запрос до отправки, повторить его или вернуть кешированный ответ.

import os
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call, ModelRequest, ModelResponse
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from typing import Callable

load_dotenv()

@tool
def search_web(query: str) -> str:
    """Поиск в интернете."""
    return f"Результаты: {query}"

@wrap_tool_call
def log_tool_calls(request, handler: Callable):
    """Логирует каждый вызов инструмента."""
    tool_name = request.tool_call["name"]
    args = request.tool_call["args"]
    print(f"  [tool] {tool_name}({args})")
    result = handler(request)
    print(f"  [tool] результат: {str(result)[:60]}")
    return result

model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

agent = create_agent(
    model=model,
    tools=[search_web],
    middleware=[log_tool_calls],
)

agent.invoke({
    "messages": [{"role": "user", "content": "Найди информацию о LangGraph."}]
})

handler(request) это вызов настоящего инструмента. Можно не вызывать его (пропустить), вызвать несколько раз (повторить) или вернуть другое значение.


Сквозной проект: добавляем summarization агенту-аналитику

import asyncio
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import (
    SummarizationMiddleware,
    ModelCallLimitMiddleware,
    ToolRetryMiddleware,
    before_model,
)
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.runtime import Runtime
from typing import Any

load_dotenv()

SERVER_PATH = str(Path(__file__).parent.parent / "agents_course" / "tools_server.py")

@before_model
def log_step(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"  [шаг] сообщений в стейте: {len(state['messages'])}")
    return None

async def main():
    client = MultiServerMCPClient({
        "analyst_tools": {
            "transport": "stdio",
            "command": sys.executable,
            "args": [SERVER_PATH],
        }
    })

    tools = await client.get_tools()
    model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))
    summary_model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o-mini"))

    agent = create_agent(
        model=model,
        tools=tools,
        checkpointer=InMemorySaver(),
        middleware=[
            log_step,
            SummarizationMiddleware(
                model=summary_model,
                trigger=("messages", 12),
                keep=("messages", 4),
            ),
            ModelCallLimitMiddleware(run_limit=10, exit_behavior="end"),
            ToolRetryMiddleware(max_retries=2, initial_delay=0.5),
        ],
    )

    config = {"configurable": {"thread_id": "research-1"}}

    response = await agent.ainvoke(
        {"messages": [{"role": "user", "content": "Найди информацию об Anthropic."}]},
        config,
    )
    print(response["messages"][-1].content)

if __name__ == "__main__":
    asyncio.run(main())

Частые ошибки

thread_limit без checkpointer

# Неправильно: thread_limit требует checkpointer для хранения счётчика
agent = create_agent(
    model=model,
    tools=[],
    middleware=[ModelCallLimitMiddleware(thread_limit=10)],
)

# Правильно: добавить checkpointer
agent = create_agent(
    model=model,
    tools=[],
    checkpointer=InMemorySaver(),
    middleware=[ModelCallLimitMiddleware(thread_limit=10)],
)

SummarizationMiddleware с trigger=("fraction", ...) без профиля модели

# Может не сработать: fraction требует знать размер контекстного окна модели
SummarizationMiddleware(model=summary_model, trigger=("fraction", 0.8))

# Надёжнее: абсолютные значения токенов или сообщений
SummarizationMiddleware(model=summary_model, trigger=("tokens", 4000))
SummarizationMiddleware(model=summary_model, trigger=("messages", 10))

Порядок middleware при суммаризации

# Неправильно: limit стоит до summarization, агент завершится раньше чем
# суммаризация успеет сжать контекст
middleware=[ModelCallLimitMiddleware(run_limit=5), SummarizationMiddleware(...)]

# Правильно: summarization первым, limit последним
middleware=[SummarizationMiddleware(...), ModelCallLimitMiddleware(run_limit=5)]

Задание

Возьмите агента из урока 4. Добавьте три middleware:

  1. SummarizationMiddleware с триггером по количеству сообщений
  2. ModelCallLimitMiddleware с run_limit=8
  3. ToolRetryMiddleware с max_retries=2

Проведите длинный диалог из 5-6 запросов. Убедитесь в LangSmith, что суммаризация сработала и число вызовов не превысило лимит.

<< урок 4

урок 6 >>


Подписывайтесь на мой Telegram канал

Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия

Авторизуйтесь, чтобы оставить комментарий.

Комментариев: 0

Нет комментариев.

Тут может быть ваша реклама

Пишите info@aisferaic.ru

Похожие туториалы