LangChain

Продакшн: RAG, чат-бот на FastAPI со стримингом | Курс по LangChain урок 11

Продакшн: RAG, чат-бот на FastAPI со стримингом | Курс по LangChain урок 11
Mikhail
Автор
Mikhail
Опубликовано 23.02.2026
0,0
Views 4

Цель урока

Вы построите сервис на FastAPI и подключите его к проекту на LangChain. Настроите стриминг с помощью SSE, управление сессиями и обработку ошибок. Разберём структуру проекта для продакшна. И что учить дальше.

Необходимые знания:

  • Уроки 1–10 (весь курс)
  • FastAPI на базовом уровне (эндпоинты, Pydantic)
  • Понимание async/await в Python

Ключевые концепции:

  • Структура продакшн проекта
  • FastAPI + LangChain правильная инициализация
  • Server-Sent Events (SSE) для стриминга
  • Управление сессиями пользователей
  • Обработка ошибок и таймауты
  • Конфигурация через переменные окружения
  • Lifespan инициализация при старте приложения

Структура проекта

Хорошая структура основа поддерживаемого кода:

rag_chatbot/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI приложение, lifespan
│   ├── config.py            # Настройки через pydantic-settings
│   ├── dependencies.py      # Зависимости FastAPI (DI)
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── chat.py          # /chat эндпоинты
│   │   └── documents.py     # /documents эндпоинты
│   ├── services/
│   │   ├── __init__.py
│   │   ├── rag.py           # RAG цепочка и retriever
│   │   ├── indexer.py       # Загрузка и индексирование документов
│   │   └── session.py       # Управление сессиями
│   └── schemas/
│       ├── __init__.py
│       └── chat.py          # Pydantic схемы запросов/ответов
├── data/
│   └── knowledge_base/      # Документы для индексирования
├── indexes/                 # Сохранённые FAISS-индексы
├── .env
├── requirements.txt
└── README.md

Конфигурация

# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    # LLM
    openai_api_key: str
    openai_base_url: str = "https://api.openai.com/v1"
    model_name: str = "gpt-4o"

    # LangSmith (опционально)
    langchain_tracing_v2: bool = False
    langchain_api_key: str = ""
    langchain_project: str = "rag-chatbot"

    # RAG
    chunk_size: int = 1000
    chunk_overlap: int = 200
    retriever_k: int = 4
    index_path: str = "indexes/faiss_index"

    # Сессии
    max_history_messages: int = 20
    session_ttl_seconds: int = 3600  # 1 час

    # API
    max_tokens: int = 2000
    request_timeout: int = 60

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"


@lru_cache()
def get_settings() -> Settings:
    return Settings()
# .env
OPENAI_API_KEY=rusgpt-...
OPENAI_BASE_URL=https://rus-gpt.com/api/v1
MODEL_NAME=anthropic/claude-haiku-4.5

LANGCHAIN_API_KEY=ls...
LANGCHAIN_TRACING_V2=true
LANGCHAIN_PROJECT=rag-chatbot-prod

Схемы запросов и ответов

# app/schemas/chat.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime


class ChatRequest(BaseModel):
    session_id: str = Field(..., description="ID сессии пользователя")
    question: str   = Field(..., min_length=1, max_length=2000)
    stream: bool    = Field(default=True, description="Стриминг ответа")


class ChatResponse(BaseModel):
    session_id: str
    answer: str
    sources: list[str] = []
    tokens_used: int = 0
    latency_ms: float = 0.0


class DocumentUploadResponse(BaseModel):
    message: str
    chunks_indexed: int
    index_size: int


class HealthResponse(BaseModel):
    status: str
    index_loaded: bool
    active_sessions: int
    timestamp: datetime

Сервис RAG

# app/services/rag.py
import os
from langchain_openai import ChatOpenAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain_classic.retrievers.ensemble import EnsembleRetriever
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.documents import Document
from app.config import Settings


class RAGService:
    def __init__(self, settings: Settings):
        self.settings = settings
        self.model = ChatOpenAI(
            model=settings.model_name,
            api_key=settings.openai_api_key,
            base_url=settings.openai_base_url,
            temperature=0,
            max_tokens=settings.max_tokens,
        )
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
        )
        self.vectorstore: FAISS | None = None
        self.retriever = None
        self.chain = None
        self.chain_with_sources = None

    def load_index(self) -> bool:
        """Загружает FAISS-индекс с диска. Возвращает True если успешно."""
        if not os.path.exists(self.settings.index_path):
            return False
        try:
            self.vectorstore = FAISS.load_local(
                self.settings.index_path,
                self.embeddings,
                allow_dangerous_deserialization=True,
            )
            self._build_chain()
            return True
        except Exception as e:
            print(f"Ошибка загрузки индекса: {e}")
            return False

    def index_documents(self, docs: list[Document]) -> int:
        """Добавляет документы в индекс. Возвращает количество чанков."""
        from langchain_text_splitters import RecursiveCharacterTextSplitter

        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.settings.chunk_size,
            chunk_overlap=self.settings.chunk_overlap,
        )
        chunks = splitter.split_documents(docs)

        if self.vectorstore is None:
            self.vectorstore = FAISS.from_documents(chunks, self.embeddings)
        else:
            self.vectorstore.add_documents(chunks)

        os.makedirs(os.path.dirname(self.settings.index_path), exist_ok=True)
        self.vectorstore.save_local(self.settings.index_path)
        self._build_chain()
        return len(chunks)

    def _build_chain(self):
        """Собирает RAG-цепочку после загрузки/обновления индекса."""
        # Гибридный retriever
        all_docs = list(self.vectorstore.docstore._dict.values())
        bm25 = BM25Retriever.from_documents(all_docs)
        bm25.k = self.settings.retriever_k

        vector = self.vectorstore.as_retriever(
            search_type="mmr",
            search_kwargs={"k": self.settings.retriever_k, "fetch_k": 20},
        )

        self.retriever = EnsembleRetriever(
            retrievers=[bm25, vector],
            weights=[0.3, 0.7],
        )

        # Промпт с историей
        prompt = ChatPromptTemplate.from_messages([
            ("system", """Ты технический ассистент. Отвечай только на основе контекста.
Если ответа нет в контексте — скажи: "В базе знаний нет информации по этому вопросу."
Не выдумывай факты.

Контекст:
{context}"""),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{question}"),
        ])

        def format_docs(docs):
            return "\n\n".join(
                f"[Источник {i+1}] {d.page_content}"
                for i, d in enumerate(docs)
            )

        self.chain = (
            RunnablePassthrough.assign(
                context=lambda x: format_docs(self.retriever.invoke(x["question"]))
            )
            | prompt
            | self.model
            | StrOutputParser()
        )

        self.chain_with_sources = RunnableParallel(
            answer=self.chain,
            sources=lambda x: [
                d.metadata.get("source", f"doc_{i}")
                for i, d in enumerate(self.retriever.invoke(x["question"]))
            ],
        )

    @property
    def is_ready(self) -> bool:
        return self.chain is not None

Сервис сессий

# app/services/session.py
import time
from langchain_community.chat_message_histories import ChatMessageHistory
from app.config import Settings


class SessionService:
    def __init__(self, settings: Settings):
        self.settings = settings
        self._sessions: dict[str, dict] = {}

    def get_or_create(self, session_id: str) -> ChatMessageHistory:
        now = time.time()

        # Очищаем устаревшие сессии
        expired = [
            sid for sid, data in self._sessions.items()
            if now - data["last_active"] > self.settings.session_ttl_seconds
        ]
        for sid in expired:
            del self._sessions[sid]

        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "history": ChatMessageHistory(),
                "last_active": now,
            }

        self._sessions[session_id]["last_active"] = now
        history = self._sessions[session_id]["history"]

        # Обрезаем историю до лимита
        if len(history.messages) > self.settings.max_history_messages:
            history.messages = history.messages[-self.settings.max_history_messages:]

        return history

    def clear(self, session_id: str) -> bool:
        if session_id in self._sessions:
            del self._sessions[session_id]
            return True
        return False

    @property
    def active_count(self) -> int:
        return len(self._sessions)

Зависимости FastAPI

# app/dependencies.py
from functools import lru_cache
from app.config import get_settings, Settings
from app.services.rag import RAGService
from app.services.session import SessionService


# Синглтоны создаются один раз при старте
_rag_service: RAGService | None = None
_session_service: SessionService | None = None


def get_rag_service() -> RAGService:
    return _rag_service


def get_session_service() -> SessionService:
    return _session_service


def init_services(settings: Settings):
    global _rag_service, _session_service
    _rag_service = RAGService(settings)
    _session_service = SessionService(settings)
    return _rag_service, _session_service

Роутер чата

# app/routers/chat.py
import time
import asyncio
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage, AIMessage
from app.schemas.chat import ChatRequest, ChatResponse
from app.services.rag import RAGService
from app.services.session import SessionService
from app.dependencies import get_rag_service, get_session_service
import json


router = APIRouter(prefix="/chat", tags=["chat"])


@router.post("/", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    rag: RAGService = Depends(get_rag_service),
    sessions: SessionService = Depends(get_session_service),
):
    """Обычный (не стриминговый) эндпоинт."""
    if not rag.is_ready:
        raise HTTPException(status_code=503, detail="Индекс не загружен")

    history = sessions.get_or_create(request.session_id)
    start = time.time()

    try:
        result = await asyncio.wait_for(
            asyncio.to_thread(
                rag.chain_with_sources.invoke,
                {
                    "question": request.question,
                    "history": history.messages,
                }
            ),
            timeout=60.0,
        )
    except asyncio.TimeoutError:
        raise HTTPException(status_code=504, detail="Превышен таймаут запроса")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Ошибка генерации: {str(e)}")

    # Обновляем историю
    history.add_user_message(request.question)
    history.add_ai_message(result["answer"])

    return ChatResponse(
        session_id=request.session_id,
        answer=result["answer"],
        sources=result["sources"],
        latency_ms=round((time.time() - start) * 1000, 1),
    )


@router.get("/stream")
async def chat_stream(
    session_id: str,
    question: str,
    rag: RAGService = Depends(get_rag_service),
    sessions: SessionService = Depends(get_session_service),
):
    """Стриминговый эндпоинт через Server-Sent Events."""
    if not rag.is_ready:
        raise HTTPException(status_code=503, detail="Индекс не загружен")

    history = sessions.get_or_create(session_id)

    async def event_generator():
        full_answer = ""
        try:
            # Сначала находим источники (быстро)
            sources = await asyncio.to_thread(
                lambda: [
                    d.metadata.get("source", f"doc_{i}")
                    for i, d in enumerate(rag.retriever.invoke(question))
                ]
            )

            # Отправляем источники как первое событие
            yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"

            # Стримим ответ
            for chunk in rag.chain.stream({
                "question": question,
                "history": history.messages,
            }):
                if chunk:
                    full_answer += chunk
                    yield f"data: {json.dumps({'type': 'token', 'content': chunk})}\n\n"

            # Финальное событие
            yield f"data: {json.dumps({'type': 'done', 'full_answer': full_answer})}\n\n"

            # Обновляем историю после стриминга
            history.add_user_message(question)
            history.add_ai_message(full_answer)

        except Exception as e:
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # отключаем буферизацию в nginx
        },
    )


@router.delete("/{session_id}")
async def clear_session(
    session_id: str,
    sessions: SessionService = Depends(get_session_service),
):
    """Очищает историю сессии."""
    cleared = sessions.clear(session_id)
    return {"cleared": cleared, "session_id": session_id}

Роутер документов

# app/routers/documents.py
import os
import tempfile
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from app.schemas.chat import DocumentUploadResponse
from app.services.rag import RAGService
from app.dependencies import get_rag_service


router = APIRouter(prefix="/documents", tags=["documents"])


@router.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(
    file: UploadFile = File(...),
    rag: RAGService = Depends(get_rag_service),
):
    """Загружает документ и добавляет его в индекс."""
    allowed_extensions = {".pdf", ".txt", ".md"}

    _, ext = os.path.splitext(file.filename or "")
    suffix = ext.lower()

    if suffix not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Неподдерживаемый тип файла. Разрешены: {', '.join(allowed_extensions)}",
        )

    # Сохраняем во временный файл
    with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name

    try:
        # Загружаем документ
        if suffix == ".pdf":
            loader = PyPDFLoader(tmp_path)
        else:
            loader = TextLoader(tmp_path, autodetect_encoding=True)

        docs = loader.load()

        # Добавляем метаданные источника
        for doc in docs:
            doc.metadata["source"] = file.filename

        chunks_count = rag.index_documents(docs)

        return DocumentUploadResponse(
            message=f"Файл '{file.filename}' проиндексирован",
            chunks_indexed=chunks_count,
            index_size=rag.vectorstore.index.ntotal,
        )
    finally:
        os.unlink(tmp_path)  # удаляем временный файл


@router.get("/stats")
async def index_stats(rag: RAGService = Depends(get_rag_service)):
    """Статистика индекса."""
    if not rag.is_ready:
        return {"status": "not_loaded", "total_chunks": 0}
    return {
        "status": "ready",
        "total_chunks": rag.vectorstore.index.ntotal,
    }

Главный файл приложения

# app/main.py
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import get_settings
from app.dependencies import init_services, get_rag_service, get_session_service
from app.routers import chat, documents
from app.schemas.chat import HealthResponse


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Инициализация при старте, очистка при остановке."""
    settings = get_settings()
    rag_service, session_service = init_services(settings)

    # Пытаемся загрузить существующий индекс
    loaded = rag_service.load_index()
    if loaded:
        print(f"✅ Индекс загружен: {rag_service.vectorstore.index.ntotal} чанков")
    else:
        print("⚠️ Индекс не найден. Загрузите документы через /documents/upload")

    yield  # приложение работает

    # Здесь можно добавить очистку ресурсов
    print("🛑 Приложение остановлено")


app = FastAPI(
    title="RAG Chatbot API",
    description="Чат-бот с поиском по документам на LangChain + FastAPI",
    version="1.0.0",
    lifespan=lifespan,
)


# CORS для фронтенда
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # в продакшне — конкретные домены
    allow_methods=["*"],
    allow_headers=["*"],
)


# Подключаем роутеры
app.include_router(chat.router)
app.include_router(documents.router)


@app.get("/health", response_model=HealthResponse)
async def health():
    rag = get_rag_service()
    sessions = get_session_service()
    return HealthResponse(
        status="ok",
        index_loaded=rag.is_ready,
        active_sessions=sessions.active_count,
        timestamp=datetime.utcnow(),
    )

Запуск и тестирование

Установите зависимости

pip install fastapi uvicorn langchain langchain-openai langchain-community langchain-huggingface langchain-text-splitters sentence-transformers faiss-cpu pypdf rank-bm25 pydantic-settings python-multipart chardet

Запустите сервер

uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

После запуска FastAPI автоматически генерирует интерактивную документацию, откройте в браузере:

http://localhost:8000/docs

Там можно нажать на любой эндпоинт → Try it out → заполнить поля → Execute. Удобно для ручного тестирования без curl.

Тест с помощью curl:

# Загрузка документа
curl -X POST "http://localhost:8000/documents/upload" \
  -H "Content-Type: multipart/form-data" \
  -F "file=@./data/knowledge_base/asyncio_guide.txt"

# Обычный запрос
curl -X POST "http://localhost:8000/chat/" \
  -H "Content-Type: application/json" \
  -d '{"session_id": "user_1", "question": "Как работает asyncio.gather?"}'

# Стриминговый запрос
curl -N "http://localhost:8000/chat/stream?session_id=user_1&question=Как+работает+event+loop?"

# Health check
curl "http://localhost:8000/health"

Обработка ошибок

# app/middleware/error_handler.py, глобальный обработчик ошибок
from fastapi import Request
from fastapi.responses import JSONResponse
import logging


logger = logging.getLogger(__name__)


async def global_exception_handler(request: Request, exc: Exception):
    logger.error(f"Необработанная ошибка: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"detail": "Внутренняя ошибка сервера"},
    )

# В main.py:
# app.add_exception_handler(Exception, global_exception_handler)

requirements.txt

fastapi==0.130.0
uvicorn[standard]==0.41.0
langchain>=1.2.10
langchain-classic>=1.0.1
langchain-openai>=1.1.10
langchain-community>=0.4.1
langchain-huggingface>=1.2.0
langchain-text-splitters>=1.1.1
sentence-transformers>=5.2.3
faiss-cpu==1.13.2
rank-bm25==0.2.2
pypdf==6.7.2
pydantic-settings==2.13.1
python-multipart==0.0.22
python-dotenv==1.2.1
chardet>=6.0.0
httpx==0.28.1
tenacity==9.1.4
langsmith>=0.7.6

Распространённые ошибки

1. Инициализация модели при каждом запросе

# Плохо: ChatOpenAI() создаётся на каждый HTTP-запрос
@router.post("/chat")
async def chat(request: ChatRequest):
    model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o"))  # дорогая операция
    ...

# Хорошо: создаётся один раз в lifespan через сервис

2. Синхронный код в async эндпоинте

# Плохо: блокирует event loop FastAPI
@router.post("/chat")
async def chat(request: ChatRequest):
    result = rag_chain.invoke(...)  # LangChain синхронный — блокирует!

# Хорошо: оборачиваем в to_thread
result = await asyncio.to_thread(rag_chain.invoke, inputs)

3. Отсутствие таймаута

# Плохо: запрос может висеть вечно
result = await asyncio.to_thread(rag_chain.invoke, inputs)

# Хорошо: ограничиваем время
result = await asyncio.wait_for(
    asyncio.to_thread(rag_chain.invoke, inputs),
    timeout=60.0,
)

4. История без ограничений

# Плохо: история растёт бесконечно — растут стоимость и время
history.add_user_message(question)

# Хорошо: обрезаем в SessionService до max_history_messages

5. Глобальные переменные вместо DI

# Плохо: глобальные объекты трудно тестировать и менять
rag_chain = build_chain()  # на уровне модуля

# Хорошо: Depends() + сервисы через lifespan
rag: RAGService = Depends(get_rag_service)

Практическое задание

Соберите полноценный чат-бота с RAG, объединив знания всего курса.

Требования:

1) Следуйте структуре проекта из урока: app/, services/, routers/, schemas/

2) Два эндпоинта чата:

  • POST /chat/ - обычный, возвращает JSON с ответом и источниками
  • GET /chat/stream - SSE стриминг токенов

3) Загрузка документов -POST /documents/upload принимает PDF, TXT и MD

4) Память диалога - история сохраняется по session_id, лимит 20 сообщений, TTL 1 час

5) Продвинутый RAG - гибридный поиск (BM25 + векторный)

6) Устойчивость:

  • Таймаут 60 секунд на запрос
  • Ответ 503 если индекс не загружен

7) Мониторинг:

  • GET /health статус сервиса
  • LangSmith трейсинг (переменная окружения)
  • Логирование времени каждого запроса

Финальная проверка:

# 1. Загрузите документ
# 2. Отправьте 3 связанных вопроса (проверьте память)
# 3. Отправьте вопрос вне базы знаний (проверьте "не знаю")
# 4. Проверьте /health
# 5. (Опционально) Найдите трейсы в LangSmith

Итог курса

За 11 уроков мы прошли путь от первого chain.invoke() до продакшн-приложения:

Урок Тема
1 Вводный урок, установка зависимостей
2 LCEL: цепочки, операторы, стриминг
3 Промпты и структурированный вывод
4 Память и история диалога
5 Нелинейные пайплайны: параллельность и ветвление
6 Tools: инструменты и function calling
7 Агенты: автоматизация цикла tool use
8 RAG: поиск по документам
9 Продвинутый RAG: гибридный поиск и re-ranking
10 Трейсинг и оценка качества
11 Продакшн: FastAPI + стриминг

Что учить дальше:

  • LangGraph - сложные мультиагентные системы с состоянием
  • MCP (Model Context Protocol) - стандарт подключения инструментов к LLM
  • Векторные БД для продакшна: Qdrant, Weaviate, Pinecone
  • Fine-tuning vs RAG: когда что выбирать
  • Безопасность LLM приложений: prompt injection, guardrails

<< Урок 10


Подписывайтесь на мой Telegram канал

Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия

Авторизуйтесь, чтобы оставить комментарий.

Комментариев: 0

Нет комментариев.

Тут может быть ваша реклама

Пишите info@aisferaic.ru

Похожие туториалы