LangChain

Нелинейные пайплайны: параллельность, ветвление | Курс по LangChain урок 5

Нелинейные пайплайны: параллельность, ветвление | Курс по LangChain урок 5
Mikhail
Автор
Mikhail
Опубликовано 23.02.2026
0,0
Views 4

Цель урока

Вы научитесь строить нелинейные пайплайны, запускать шаги параллельно с помощью RunnableParallel. Делать ветвление логики по условию с помощьюRunnableBranch и встраивать функции в цепочку с помощью RunnableLambda.

Необходимые знания:

  • Уроки 2-4 (LCEL, цепочки, шаблоны, история)
  • Lambda функции и callable объекты в Python

Ключевые концепции:

  • RunnableParallel - параллельное выполнение нескольких Runnable
  • RunnablePassthrough - пропуск данных без изменений
  • RunnableLambda - произвольная функция Python как Runnable
  • RunnableBranch - ветвление по условию
  • itemgetter - извлечение ключей из словаря в цепочке

Проблема линейных цепочек

До этого момента все цепочки выглядели так:

prompt | model | parser

Данные идут строго слева направо. Но реальные задачи сложнее:

  • Нужно получить несколько независимых ответов параллельно и объединить их
  • Нужно выбрать разный промпты в зависимости от типа входных данных
  • Нужно вызвать функцию посередине цепочки (форматирование, обращение к БД, валидация)

Для этого LCEL предоставляет несколько дополнительных примитивов.


RunnableParallel: параллельное выполнение

RunnableParallel запускает несколько Runnable одновременно на одних и тех же входных данных и возвращает словарь с результатами.

Обе цепочки выполняются параллельно, время ответа определяется самой медленной из них, а не суммой.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

# Два разных промпта для одного кода
summary_prompt = ChatPromptTemplate.from_messages([
    ("system", "Кратко опиши, что делает функция. Одно предложение."),
    ("human", "{code}"),
])

review_prompt = ChatPromptTemplate.from_messages([
    ("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
    ("human", "{code}"),
])

summary_chain = summary_prompt | model | StrOutputParser()
review_chain  = review_prompt  | model | StrOutputParser()

# Запускаем оба параллельно
parallel = RunnableParallel(
    summary=summary_chain,
    review=review_chain,
)

result = parallel.invoke({"code": """
def get_user(user_id):
    db = connect_to_db()
    user = db.query(f"SELECT * FROM users WHERE id={user_id}")
    return user
"""})

print(result["summary"])  # "Функция получает пользователя из БД по ID."
print(result["review"])   # ["SQL-инъекция", "Соединение с БД не закрывается", ...]

Использование словаря

RunnableParallel можно записать как обычный словарь, LCEL автоматически его преобразует.

parallel = {
    "summary": summary_chain,
    "review": review_chain,
}

# Это эквивалентно RunnableParallel(summary=summary_chain, review=review_chain)
chain = parallel | some_next_step

RunnablePassthrough

Когда нужно передать исходные данные вместе с результатами обработки, RunnablePassthrough возвращает входные данные без изменений.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

summary_chain = ChatPromptTemplate.from_messages([
    ("system", "Кратко опиши, что делает функция. Одно предложение."),
    ("human", "{code}"),
]) | model | StrOutputParser()

review_chain = ChatPromptTemplate.from_messages([
    ("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
    ("human", "{code}"),
]) | model | StrOutputParser()

# Добавляем оригинальный код к результатам параллельной обработки
parallel = RunnableParallel(
    code=RunnablePassthrough(),   # передаем входной словарь как есть
    summary=summary_chain,
    review=review_chain,
)

result = parallel.invoke({"code": "def add(a, b): return a - b"})
print(result["code"])     # {"code": "def add(a, b): return a - b"}
print(result["summary"])  # "Функция складывает два числа."
print(result["review"])   # ["Ошибка: используется `-` вместо `+`"]

RunnablePassthrough.assign()

Удобный способ добавить новые ключи к существующему словарю, не теряя существующие. .assign() самый чистый способ добавить в словарь результаты параллельных вызовов.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

summary_chain = ChatPromptTemplate.from_messages([
    ("system", "Кратко опиши, что делает функция. Одно предложение."),
    ("human", "{code}"),
]) | model | StrOutputParser()

review_chain = ChatPromptTemplate.from_messages([
    ("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
    ("human", "{code}"),
]) | model | StrOutputParser()

chain = RunnablePassthrough.assign(
    summary=summary_chain,
    review=review_chain,
)

result = chain.invoke({"code": "def add(a, b): return a - b"})
# result содержит и исходный "code", и новые "summary", "review"
print(result.keys())  # dict_keys(['code', 'summary', 'review'])

RunnableLambda: функция в цепочке

Любую функцию можно встроить в цепочку с помощью RunnableLambda.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

summary_chain = ChatPromptTemplate.from_messages([
    ("system", "Кратко опиши, что делает функция. Одно предложение."),
    ("human", "{code}"),
]) | model | StrOutputParser()

review_chain = ChatPromptTemplate.from_messages([
    ("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
    ("human", "{code}"),
]) | model | StrOutputParser()


def format_review(data: dict) -> str:
    """Форматирует результаты анализа в читаемый отчет."""
    issues = "\n".join(f"  - {issue}" for issue in data["review"].split("\n") if issue.strip())
    return (
        f"## Анализ кода\n\n"
        f"**Что делает:**\n {data['summary']}\n\n"
        f"**Проблемы:**\n{issues}\n\n"
        f"**Исходный код:**\n```python\n{data['code']}\n```"
    )


# Встраиваем функцию в цепочку
chain = (
    RunnablePassthrough.assign(summary=summary_chain, review=review_chain)
    | RunnableLambda(format_review)
)


report = chain.invoke({"code": "def add(a, b): return a - b"})
print(report)

Краткий синтаксис

LCEL автоматически оборачивает lambda и обычные функции в RunnableLambda, если использовать их с оператором |.

# Эти два варианта эквивалентны:
chain = some_runnable | RunnableLambda(format_review)
chain = some_runnable | format_review  # автообёртка в RunnableLambda

Когда использовать RunnableLambda

  • Форматирование или трансформация данных между шагами
  • Валидация входных данных
  • Обращение к БД, кешу или внешнему API
  • Любая логика, которую нельзя выразить через промпт или парсер
import os
from operator import itemgetter
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

summary_chain = ChatPromptTemplate.from_messages([
    ("system", "Кратко опиши, что делает функция. Одно предложение."),
    ("human", "{code}"),
]) | model | StrOutputParser()

review_chain = ChatPromptTemplate.from_messages([
    ("system", "Найди потенциальные проблемы в коде. Только список, без лишних слов."),
    ("human", "{code}"),
]) | model | StrOutputParser()

# itemgetter — удобный способ извлечь одно поле из словаря
chain = (
    RunnablePassthrough.assign(summary=summary_chain, review=review_chain)
    | itemgetter("summary")   # извлекаем только summary
)

result = chain.invoke({"code": "def add(a, b): return a - b"})
print(result)

RunnableBranch: ветвление по условию

RunnableBranch выбирает одну из нескольких цепочек в зависимости от условия.

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (condition_fn_1, chain_1),   # если условие 1 истинно — выполнить chain_1
    (condition_fn_2, chain_2),   # если условие 2 истинно — выполнить chain_2
    default_chain,               # иначе — выполнить default_chain
)

Пример, роутер вопросов по теме.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableBranch, RunnableLambda

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

# Три специализированных цепочки
python_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты эксперт по Python. Отвечай с примерами кода."),
    ("human", "{question}"),
]) | model | StrOutputParser()

sql_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты эксперт по SQL и базам данных. Приводи примеры запросов."),
    ("human", "{question}"),
]) | model | StrOutputParser()

general_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты универсальный ассистент-разработчик."),
    ("human", "{question}"),
]) | model | StrOutputParser()


# Условия ветвления
def is_python_question(data: dict) -> bool:
    keywords = ["python", "питон", "список", "словарь", "декоратор", "asyncio", "pip"]
    return any(kw in data["question"].lower() for kw in keywords)


def is_sql_question(data: dict) -> bool:
    keywords = ["sql", "база данных", "бд", "select", "join", "индекс", "транзакция"]
    return any(kw in data["question"].lower() for kw in keywords)


branch = RunnableBranch(
    (is_python_question, python_chain),
    (is_sql_question,    sql_chain),
    general_chain,
)

print(branch.invoke({"question": "Как работает asyncio в Python?"}))
# → python_chain

print(branch.invoke({"question": "Как оптимизировать JOIN в SQL?"}))
# → sql_chain

print(branch.invoke({"question": "Что такое REST API?"}))
# → general_chain

Ветвление с помощью классификатора LLM

Ключевые слова, хрупкое решение. Надежнее доверить классификацию самой модели.

Классификатор LLM работает точнее ключевых слов, но добавляет один дополнительный вызов API.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
from pydantic import BaseModel
from typing import Literal

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)

# Три специализированных цепочки
python_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты эксперт по Python. Отвечай с примерами кода."),
    ("human", "{question}"),
]) | model | StrOutputParser()

sql_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты эксперт по SQL и базам данных. Приводи примеры запросов."),
    ("human", "{question}"),
]) | model | StrOutputParser()

general_chain = ChatPromptTemplate.from_messages([
    ("system", "Ты универсальный ассистент-разработчик."),
    ("human", "{question}"),
]) | model | StrOutputParser()


class QuestionType(BaseModel):
    topic: Literal["python", "sql", "general"]


classifier_model = model.with_structured_output(QuestionType)

classifier_prompt = ChatPromptTemplate.from_messages([
    ("system", "Определи тему вопроса разработчика: python, sql или general."),
    ("human", "{question}"),
])

classifier_chain = classifier_prompt | classifier_model


def route(data: dict) -> str:
    result = classifier_chain.invoke({"question": data["question"]})
    return result.topic  # "python", "sql" или "general"


# Используем результат классификации для выбора цепочки
def get_chain(data: dict):
    topic = route(data)
    chains = {
        "python":  python_chain,
        "sql":  sql_chain,
        "general": general_chain,
    }
    return chains[topic].invoke(data)


smart_router = RunnableLambda(get_chain)

print(smart_router.invoke({"question": "Как работает GIL?"}))

Объединяем всё вместе в сложный пайплайн

Пример пайплайна, который анализирует код, параллельно строит несколько ответов, фильтрует результат и форматирует итоговый отчет.

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda
from pydantic import BaseModel, Field
from typing import List

load_dotenv()
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"), temperature=0)


# Шаг 1: классифицируем сложность кода
class ComplexityResult(BaseModel):
    level: str = Field(description="уровень: simple, medium, complex")
    reason: str = Field(description="краткое обоснование")

complexity_chain = ChatPromptTemplate.from_messages([
    ("system", "Оцени сложность кода."),
    ("human", "{code}"),
]) | model.with_structured_output(ComplexityResult)

# Шаги 2a, 2b: параллельные анализы
summary_chain = ChatPromptTemplate.from_messages([
    ("system", "Опиши в одном предложении, что делает код."),
    ("human", "{code}"),
]) | model | StrOutputParser()

issues_chain = ChatPromptTemplate.from_messages([
    ("system", "Перечисли проблемы кода. Если проблем нет, напиши 'Проблем не обнаружено'."),
    ("human", "{code}"),
]) | model | StrOutputParser()


# Шаг 3: форматирование итогового отчета
def build_report(data: dict) -> str:
    complexity = data["complexity"]
    return (
        f"## Code Review Report\n\n"
        f"**Сложность:**\n{complexity.level}{complexity.reason}\n\n"
        f"**Описание:**\n{data['summary']}\n\n"
        f"**Проблемы:**\n{data['issues']}"
    )

# Собираем пайплайн
pipeline = (
    RunnablePassthrough.assign(complexity=complexity_chain)
    | RunnablePassthrough.assign(summary=summary_chain, issues=issues_chain)
    | RunnableLambda(build_report)
)

code = """
def process_orders(orders):
    result = []
    for order in orders:
        if order['status'] == 'pending':
            user = get_user(order['user_id'])
            if user['is_active']:
                result.append({
                    'order_id': order['id'],
                    'user': user['name'],
                    'total': sum([item['price'] for item in order['items']])
                })
    return result
"""


report = pipeline.invoke({"code": code})
print(report)

Распространенные ошибки

1. Несовместимые типы входных данных

# RunnableParallel ожидает один тип входа для всех веток
parallel = RunnableParallel(a=chain_a, b=chain_b)
parallel.invoke("строка")  # ок, если обе цепочки принимают строку
parallel.invoke({"key": "val"})  # ок, если обе принимают словарь
# Ошибка возникает, когда ветки ожидают разные ключи словаря

2. RunnableBranch без default

branch = RunnableBranch(
    (condition_1, chain_1),
    (condition_2, chain_2),
    # нет default, если ни одно условие не выполнено, выброшен ValueError
)

Всегда добавляй цепочку default последним аргументом.

3. Мутация входного словаря в RunnableLambda

def bad_transform(data: dict) -> dict:
    data["new_key"] = "value"  # мутируем входной словарь
    return data

Лучше возвращать новый словарь: return {**data, "new_key": "value"}.

4. Последовательные вызовы вместо параллельных

# Медленно: вызовы идут последовательно
result = {"a": chain_a.invoke(x), "b": chain_b.invoke(x)}

# Быстро: вызовы параллельны
result = RunnableParallel(a=chain_a, b=chain_b).invoke(x)

Практическое задание

Создайте пайплайн для анализа Pull Request.

Требования:

1) Входные данные, словарь с полями:

  • title - заголовок PR
  • description - описание PR
  • diff - код изменений (diff)

2) Параллельно выполните три анализа:

  • security - есть ли потенциальные уязвимости
  • complexity - насколько сложны изменения (low / medium / high)
  • test_coverage - предложения по тест-кейсам

3) На основе поля complexity выберите одну из двух финальных цепочек ветвления:

  • Если high - сгенерируйте подробный комментарий с рекомендациями
  • Если low или medium - сгенерируйте краткий LGTM (Выглядит неплохо / Меня устраивает)

4) Итоговый вывод отформатированный отчет с помощью RunnableLambda

Ожидаемый результат: один вызов .invoke() возвращает готовый текст review.


Итоги урока

Мы научились строить нелинейные пайплайны, но все данные в них приходили от пользователя напрямую. В реальных приложениях модели нужно обращаться к внешним источникам: поисковику, базе данных, API. В следующем уроке разберем инструменты (Tools), как дать модели возможность вызывать функции и принимать решения на основе их результатов.

<< Урок 4

Урок 6 >>


Подписывайтесь на мой Telegram канал

Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия

Авторизуйтесь, чтобы оставить комментарий.

Комментариев: 0

Нет комментариев.

Тут может быть ваша реклама

Пишите info@aisferaic.ru

Похожие туториалы