Нелинейные пайплайны: параллельность, ветвление | Курс по LangChain урок 5
Цель урока
Вы научитесь строить нелинейные пайплайны, запускать шаги параллельно с помощью RunnableParallel.
Делать ветвление логики по условию с помощьюRunnableBranch и встраивать функции в цепочку с помощью RunnableLambda.
Необходимые знания:
- Уроки 2-4 (LCEL, цепочки, шаблоны, история)
- Lambda функции и callable объекты в Python
Ключевые концепции:
RunnableParallel- параллельное выполнение нескольких RunnableRunnablePassthrough- пропуск данных без измененийRunnableLambda- произвольная функция Python как RunnableRunnableBranch- ветвление по условиюitemgetter- извлечение ключей из словаря в цепочке
Проблема линейных цепочек
До этого момента все цепочки выглядели так:
prompt | model | parser
Данные идут строго слева направо. Но реальные задачи сложнее:
- Нужно получить несколько независимых ответов параллельно и объединить их
- Нужно выбрать разный промпты в зависимости от типа входных данных
- Нужно вызвать функцию посередине цепочки (форматирование, обращение к БД, валидация)
Для этого LCEL предоставляет несколько дополнительных примитивов.
RunnableParallel: параллельное выполнение
RunnableParallel запускает несколько Runnable одновременно на одних и тех же входных данных и возвращает словарь с результатами.
Обе цепочки выполняются параллельно, время ответа определяется самой медленной из них, а не суммой.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
# Два разных промпта для одного кода
summary_prompt = ChatPromptTemplate.from_messages([
("system", "Кратко опиши, что делает функция. Одно предложение."),
("human", "{code}"),
])
review_prompt = ChatPromptTemplate.from_messages([
("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
("human", "{code}"),
])
summary_chain = summary_prompt | model | StrOutputParser()
review_chain = review_prompt | model | StrOutputParser()
# Запускаем оба параллельно
parallel = RunnableParallel(
summary=summary_chain,
review=review_chain,
)
result = parallel.invoke({"code": """
def get_user(user_id):
db = connect_to_db()
user = db.query(f"SELECT * FROM users WHERE id={user_id}")
return user
"""})
print(result["summary"]) # "Функция получает пользователя из БД по ID."
print(result["review"]) # ["SQL-инъекция", "Соединение с БД не закрывается", ...]
Использование словаря
RunnableParallel можно записать как обычный словарь, LCEL автоматически его преобразует.
parallel = {
"summary": summary_chain,
"review": review_chain,
}
# Это эквивалентно RunnableParallel(summary=summary_chain, review=review_chain)
chain = parallel | some_next_step
RunnablePassthrough
Когда нужно передать исходные данные вместе с результатами обработки,
RunnablePassthrough возвращает входные данные без изменений.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
summary_chain = ChatPromptTemplate.from_messages([
("system", "Кратко опиши, что делает функция. Одно предложение."),
("human", "{code}"),
]) | model | StrOutputParser()
review_chain = ChatPromptTemplate.from_messages([
("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
("human", "{code}"),
]) | model | StrOutputParser()
# Добавляем оригинальный код к результатам параллельной обработки
parallel = RunnableParallel(
code=RunnablePassthrough(), # передаем входной словарь как есть
summary=summary_chain,
review=review_chain,
)
result = parallel.invoke({"code": "def add(a, b): return a - b"})
print(result["code"]) # {"code": "def add(a, b): return a - b"}
print(result["summary"]) # "Функция складывает два числа."
print(result["review"]) # ["Ошибка: используется `-` вместо `+`"]
RunnablePassthrough.assign()
Удобный способ добавить новые ключи к существующему словарю, не теряя существующие.
.assign() самый чистый способ добавить в словарь результаты параллельных вызовов.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
summary_chain = ChatPromptTemplate.from_messages([
("system", "Кратко опиши, что делает функция. Одно предложение."),
("human", "{code}"),
]) | model | StrOutputParser()
review_chain = ChatPromptTemplate.from_messages([
("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
("human", "{code}"),
]) | model | StrOutputParser()
chain = RunnablePassthrough.assign(
summary=summary_chain,
review=review_chain,
)
result = chain.invoke({"code": "def add(a, b): return a - b"})
# result содержит и исходный "code", и новые "summary", "review"
print(result.keys()) # dict_keys(['code', 'summary', 'review'])
RunnableLambda: функция в цепочке
Любую функцию можно встроить в цепочку с помощью RunnableLambda.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
summary_chain = ChatPromptTemplate.from_messages([
("system", "Кратко опиши, что делает функция. Одно предложение."),
("human", "{code}"),
]) | model | StrOutputParser()
review_chain = ChatPromptTemplate.from_messages([
("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
("human", "{code}"),
]) | model | StrOutputParser()
def format_review(data: dict) -> str:
"""Форматирует результаты анализа в читаемый отчет."""
issues = "\n".join(f" - {issue}" for issue in data["review"].split("\n") if issue.strip())
return (
f"## Анализ кода\n\n"
f"**Что делает:**\n {data['summary']}\n\n"
f"**Проблемы:**\n{issues}\n\n"
f"**Исходный код:**\n```python\n{data['code']}\n```"
)
# Встраиваем функцию в цепочку
chain = (
RunnablePassthrough.assign(summary=summary_chain, review=review_chain)
| RunnableLambda(format_review)
)
report = chain.invoke({"code": "def add(a, b): return a - b"})
print(report)
Краткий синтаксис
LCEL автоматически оборачивает lambda и обычные функции в RunnableLambda,
если использовать их с оператором |.
# Эти два варианта эквивалентны:
chain = some_runnable | RunnableLambda(format_review)
chain = some_runnable | format_review # автообёртка в RunnableLambda
Когда использовать RunnableLambda
- Форматирование или трансформация данных между шагами
- Валидация входных данных
- Обращение к БД, кешу или внешнему API
- Любая логика, которую нельзя выразить через промпт или парсер
import os
from operator import itemgetter
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
summary_chain = ChatPromptTemplate.from_messages([
("system", "Кратко опиши, что делает функция. Одно предложение."),
("human", "{code}"),
]) | model | StrOutputParser()
review_chain = ChatPromptTemplate.from_messages([
("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
("human", "{code}"),
]) | model | StrOutputParser()
# itemgetter — удобный способ извлечь одно поле из словаря
chain = (
RunnablePassthrough.assign(summary=summary_chain, review=review_chain)
| itemgetter("summary") # извлекаем только summary
)
result = chain.invoke({"code": "def add(a, b): return a - b"})
print(result)
RunnableBranch: ветвление по условию
RunnableBranch выбирает одну из нескольких цепочек в зависимости от условия.
from langchain_core.runnables import RunnableBranch
branch = RunnableBranch(
(condition_fn_1, chain_1), # если условие 1 истинно — выполнить chain_1
(condition_fn_2, chain_2), # если условие 2 истинно — выполнить chain_2
default_chain, # иначе — выполнить default_chain
)
Пример, роутер вопросов по теме.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
# Три специализированных цепочки
python_chain = ChatPromptTemplate.from_messages([
("system", "Ты эксперт по Python. Отвечай с примерами кода."),
("human", "{question}"),
]) | model | StrOutputParser()
sql_chain = ChatPromptTemplate.from_messages([
("system", "Ты эксперт по SQL и базам данных. Приводи примеры запросов."),
("human", "{question}"),
]) | model | StrOutputParser()
general_chain = ChatPromptTemplate.from_messages([
("system", "Ты универсальный ассистент-разработчик."),
("human", "{question}"),
]) | model | StrOutputParser()
# Условия ветвления
def is_python_question(data: dict) -> bool:
keywords = ["python", "питон", "список", "словарь", "декоратор", "asyncio", "pip"]
return any(kw in data["question"].lower() for kw in keywords)
def is_sql_question(data: dict) -> bool:
keywords = ["sql", "база данных", "бд", "select", "join", "индекс", "транзакция"]
return any(kw in data["question"].lower() for kw in keywords)
branch = RunnableBranch(
(is_python_question, python_chain),
(is_sql_question, sql_chain),
general_chain,
)
print(branch.invoke({"question": "Как работает asyncio в Python?"}))
# → python_chain
print(branch.invoke({"question": "Как оптимизировать JOIN в SQL?"}))
# → sql_chain
print(branch.invoke({"question": "Что такое REST API?"}))
# → general_chain
Ветвление с помощью классификатора LLM
Ключевые слова, хрупкое решение. Надежнее доверить классификацию самой модели.
Классификатор LLM работает точнее ключевых слов, но добавляет один дополнительный вызов API.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel
from typing import Literal
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
# Три специализированных цепочки
python_chain = ChatPromptTemplate.from_messages([
("system", "Ты эксперт по Python. Отвечай с примерами кода."),
("human", "{question}"),
]) | model | StrOutputParser()
sql_chain = ChatPromptTemplate.from_messages([
("system", "Ты эксперт по SQL и базам данных. Приводи примеры запросов."),
("human", "{question}"),
]) | model | StrOutputParser()
general_chain = ChatPromptTemplate.from_messages([
("system", "Ты универсальный ассистент-разработчик."),
("human", "{question}"),
]) | model | StrOutputParser()
class QuestionType(BaseModel):
topic: Literal["python", "sql", "general"]
classifier_model = model.with_structured_output(QuestionType)
classifier_prompt = ChatPromptTemplate.from_messages([
("system", "Определи тему вопроса разработчика: python, sql или general."),
("human", "{question}"),
])
classifier_chain = classifier_prompt | classifier_model
def route(data: dict) -> str:
result = classifier_chain.invoke({"question": data["question"]})
return result.topic # "python", "sql" или "general"
# Используем результат классификации для выбора цепочки
def get_chain(data: dict):
topic = route(data)
chains = {
"python": python_chain,
"sql": sql_chain,
"general": general_chain,
}
return chains[topic].invoke(data)
smart_router = RunnableLambda(get_chain)
print(smart_router.invoke({"question": "Как работает GIL?"}))
Объединяем всё вместе в сложный пайплайн
Пример пайплайна, который анализирует код, параллельно строит несколько ответов, фильтрует результат и форматирует итоговый отчет.
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from pydantic import BaseModel, Field
from typing import List
load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)
# Шаг 1: классифицируем сложность кода
class ComplexityResult(BaseModel):
level: str = Field(description="уровень: simple, medium, complex")
reason: str = Field(description="краткое обоснование")
complexity_chain = ChatPromptTemplate.from_messages([
("system", "Оцени сложность кода."),
("human", "{code}"),
]) | model.with_structured_output(ComplexityResult)
# Шаги 2a, 2b: параллельные анализы
summary_chain = ChatPromptTemplate.from_messages([
("system", "Опиши в одном предложении, что делает код."),
("human", "{code}"),
]) | model | StrOutputParser()
issues_chain = ChatPromptTemplate.from_messages([
("system", "Перечисли проблемы кода. Если проблем нет, напиши 'Проблем не обнаружено'."),
("human", "{code}"),
]) | model | StrOutputParser()
# Шаг 3: форматирование итогового отчета
def build_report(data: dict) -> str:
complexity = data["complexity"]
return (
f"## Code Review Report\n\n"
f"**Сложность:**\n{complexity.level} — {complexity.reason}\n\n"
f"**Описание:**\n{data['summary']}\n\n"
f"**Проблемы:**\n{data['issues']}"
)
# Собираем пайплайн
pipeline = (
RunnablePassthrough.assign(complexity=complexity_chain)
| RunnablePassthrough.assign(summary=summary_chain, issues=issues_chain)
| RunnableLambda(build_report)
)
code = """
def process_orders(orders):
result = []
for order in orders:
if order['status'] == 'pending':
user = get_user(order['user_id'])
if user['is_active']:
result.append({
'order_id': order['id'],
'user': user['name'],
'total': sum([item['price'] for item in order['items']])
})
return result
"""
report = pipeline.invoke({"code": code})
print(report)
Распространенные ошибки
1. Несовместимые типы входных данных
# RunnableParallel ожидает один тип входа для всех веток
parallel = RunnableParallel(a=chain_a, b=chain_b)
parallel.invoke("строка") # ок, если обе цепочки принимают строку
parallel.invoke({"key": "val"}) # ок, если обе принимают словарь
# Ошибка возникает, когда ветки ожидают разные ключи словаря
2. RunnableBranch без default
branch = RunnableBranch(
(condition_1, chain_1),
(condition_2, chain_2),
# нет default, если ни одно условие не выполнено, выброшен ValueError
)
Всегда добавляй цепочку default последним аргументом.
3. Мутация входного словаря в RunnableLambda
def bad_transform(data: dict) -> dict:
data["new_key"] = "value" # мутируем входной словарь
return data
Лучше возвращать новый словарь: return {**data, "new_key": "value"}.
4. Последовательные вызовы вместо параллельных
# Медленно: вызовы идут последовательно
result = {"a": chain_a.invoke(x), "b": chain_b.invoke(x)}
# Быстро: вызовы параллельны
result = RunnableParallel(a=chain_a, b=chain_b).invoke(x)
Практическое задание
Создайте пайплайн для анализа Pull Request.
Требования:
1) Входные данные, словарь с полями:
title- заголовок PRdescription- описание PRdiff- код изменений (diff)
2) Параллельно выполните три анализа:
security- есть ли потенциальные уязвимостиcomplexity- насколько сложны изменения (low / medium / high)test_coverage- предложения по тест-кейсам
3) На основе поля complexity выберите одну из двух финальных цепочек ветвления:
- Если
high- сгенерируйте подробный комментарий с рекомендациями - Если
lowилиmedium- сгенерируйте краткий LGTM (Выглядит неплохо / Меня устраивает)
4) Итоговый вывод отформатированный отчет с помощью RunnableLambda
Ожидаемый результат: один вызов .invoke() возвращает готовый текст review.
Итоги урока
Мы научились строить нелинейные пайплайны, но все данные в них приходили от пользователя напрямую. В реальных приложениях модели нужно обращаться к внешним источникам: поисковику, базе данных, API. В следующем уроке разберем инструменты (Tools), как дать модели возможность вызывать функции и принимать решения на основе их результатов.
Подписывайтесь на мой Telegram канал
Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия
Авторизуйтесь, чтобы оставить комментарий.
Нет комментариев.
Тут может быть ваша реклама
Пишите info@aisferaic.ru