Продакшн: RAG, чат-бот на FastAPI со стримингом | Курс по LangChain урок 11
Цель урока
Вы построите сервис на FastAPI и подключите его к проекту на LangChain. Настроите стриминг с помощью SSE, управление сессиями и обработку ошибок. Разберём структуру проекта для продакшна. И что учить дальше.
Необходимые знания:
- Уроки 1–10 (весь курс)
- FastAPI на базовом уровне (эндпоинты, Pydantic)
- Понимание async/await в Python
Ключевые концепции:
- Структура продакшн проекта
- FastAPI + LangChain правильная инициализация
- Server-Sent Events (SSE) для стриминга
- Управление сессиями пользователей
- Обработка ошибок и таймауты
- Конфигурация через переменные окружения
- Lifespan инициализация при старте приложения
Структура проекта
Хорошая структура основа поддерживаемого кода:
rag_chatbot/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI приложение, lifespan
│ ├── config.py # Настройки через pydantic-settings
│ ├── dependencies.py # Зависимости FastAPI (DI)
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── chat.py # /chat эндпоинты
│ │ └── documents.py # /documents эндпоинты
│ ├── services/
│ │ ├── __init__.py
│ │ ├── rag.py # RAG цепочка и retriever
│ │ ├── indexer.py # Загрузка и индексирование документов
│ │ └── session.py # Управление сессиями
│ └── schemas/
│ ├── __init__.py
│ └── chat.py # Pydantic схемы запросов/ответов
├── data/
│ └── knowledge_base/ # Документы для индексирования
├── indexes/ # Сохранённые FAISS-индексы
├── .env
├── requirements.txt
└── README.md
Конфигурация
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# LLM
openai_api_key: str
openai_base_url: str = "https://api.openai.com/v1"
model_name: str = "gpt-4o"
# LangSmith (опционально)
langchain_tracing_v2: bool = False
langchain_api_key: str = ""
langchain_project: str = "rag-chatbot"
# RAG
chunk_size: int = 1000
chunk_overlap: int = 200
retriever_k: int = 4
index_path: str = "indexes/faiss_index"
# Сессии
max_history_messages: int = 20
session_ttl_seconds: int = 3600 # 1 час
# API
max_tokens: int = 2000
request_timeout: int = 60
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache()
def get_settings() -> Settings:
return Settings()
# .env
OPENAI_API_KEY=rusgpt-...
OPENAI_BASE_URL=https://rus-gpt.com/api/v1
MODEL_NAME=anthropic/claude-haiku-4.5
LANGCHAIN_API_KEY=ls...
LANGCHAIN_TRACING_V2=true
LANGCHAIN_PROJECT=rag-chatbot-prod
Схемы запросов и ответов
# app/schemas/chat.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class ChatRequest(BaseModel):
session_id: str = Field(..., description="ID сессии пользователя")
question: str = Field(..., min_length=1, max_length=2000)
stream: bool = Field(default=True, description="Стриминг ответа")
class ChatResponse(BaseModel):
session_id: str
answer: str
sources: list[str] = []
tokens_used: int = 0
latency_ms: float = 0.0
class DocumentUploadResponse(BaseModel):
message: str
chunks_indexed: int
index_size: int
class HealthResponse(BaseModel):
status: str
index_loaded: bool
active_sessions: int
timestamp: datetime
Сервис RAG
# app/services/rag.py
import os
from langchain_openai import ChatOpenAI
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import BM25Retriever
from langchain_classic.retrievers.ensemble import EnsembleRetriever
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.documents import Document
from app.config import Settings
class RAGService:
def __init__(self, settings: Settings):
self.settings = settings
self.model = ChatOpenAI(
model=settings.model_name,
api_key=settings.openai_api_key,
base_url=settings.openai_base_url,
temperature=0,
max_tokens=settings.max_tokens,
)
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
self.vectorstore: FAISS | None = None
self.retriever = None
self.chain = None
self.chain_with_sources = None
def load_index(self) -> bool:
"""Загружает FAISS-индекс с диска. Возвращает True если успешно."""
if not os.path.exists(self.settings.index_path):
return False
try:
self.vectorstore = FAISS.load_local(
self.settings.index_path,
self.embeddings,
allow_dangerous_deserialization=True,
)
self._build_chain()
return True
except Exception as e:
print(f"Ошибка загрузки индекса: {e}")
return False
def index_documents(self, docs: list[Document]) -> int:
"""Добавляет документы в индекс. Возвращает количество чанков."""
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.settings.chunk_size,
chunk_overlap=self.settings.chunk_overlap,
)
chunks = splitter.split_documents(docs)
if self.vectorstore is None:
self.vectorstore = FAISS.from_documents(chunks, self.embeddings)
else:
self.vectorstore.add_documents(chunks)
os.makedirs(os.path.dirname(self.settings.index_path), exist_ok=True)
self.vectorstore.save_local(self.settings.index_path)
self._build_chain()
return len(chunks)
def _build_chain(self):
"""Собирает RAG-цепочку после загрузки/обновления индекса."""
# Гибридный retriever
all_docs = list(self.vectorstore.docstore._dict.values())
bm25 = BM25Retriever.from_documents(all_docs)
bm25.k = self.settings.retriever_k
vector = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": self.settings.retriever_k, "fetch_k": 20},
)
self.retriever = EnsembleRetriever(
retrievers=[bm25, vector],
weights=[0.3, 0.7],
)
# Промпт с историей
prompt = ChatPromptTemplate.from_messages([
("system", """Ты технический ассистент. Отвечай только на основе контекста.
Если ответа нет в контексте — скажи: "В базе знаний нет информации по этому вопросу."
Не выдумывай факты.
Контекст:
{context}"""),
MessagesPlaceholder(variable_name="history"),
("human", "{question}"),
])
def format_docs(docs):
return "\n\n".join(
f"[Источник {i+1}] {d.page_content}"
for i, d in enumerate(docs)
)
self.chain = (
RunnablePassthrough.assign(
context=lambda x: format_docs(self.retriever.invoke(x["question"]))
)
| prompt
| self.model
| StrOutputParser()
)
self.chain_with_sources = RunnableParallel(
answer=self.chain,
sources=lambda x: [
d.metadata.get("source", f"doc_{i}")
for i, d in enumerate(self.retriever.invoke(x["question"]))
],
)
@property
def is_ready(self) -> bool:
return self.chain is not None
Сервис сессий
# app/services/session.py
import time
from langchain_community.chat_message_histories import ChatMessageHistory
from app.config import Settings
class SessionService:
def __init__(self, settings: Settings):
self.settings = settings
self._sessions: dict[str, dict] = {}
def get_or_create(self, session_id: str) -> ChatMessageHistory:
now = time.time()
# Очищаем устаревшие сессии
expired = [
sid for sid, data in self._sessions.items()
if now - data["last_active"] > self.settings.session_ttl_seconds
]
for sid in expired:
del self._sessions[sid]
if session_id not in self._sessions:
self._sessions[session_id] = {
"history": ChatMessageHistory(),
"last_active": now,
}
self._sessions[session_id]["last_active"] = now
history = self._sessions[session_id]["history"]
# Обрезаем историю до лимита
if len(history.messages) > self.settings.max_history_messages:
history.messages = history.messages[-self.settings.max_history_messages:]
return history
def clear(self, session_id: str) -> bool:
if session_id in self._sessions:
del self._sessions[session_id]
return True
return False
@property
def active_count(self) -> int:
return len(self._sessions)
Зависимости FastAPI
# app/dependencies.py
from functools import lru_cache
from app.config import get_settings, Settings
from app.services.rag import RAGService
from app.services.session import SessionService
# Синглтоны создаются один раз при старте
_rag_service: RAGService | None = None
_session_service: SessionService | None = None
def get_rag_service() -> RAGService:
return _rag_service
def get_session_service() -> SessionService:
return _session_service
def init_services(settings: Settings):
global _rag_service, _session_service
_rag_service = RAGService(settings)
_session_service = SessionService(settings)
return _rag_service, _session_service
Роутер чата
# app/routers/chat.py
import time
import asyncio
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage, AIMessage
from app.schemas.chat import ChatRequest, ChatResponse
from app.services.rag import RAGService
from app.services.session import SessionService
from app.dependencies import get_rag_service, get_session_service
import json
router = APIRouter(prefix="/chat", tags=["chat"])
@router.post("/", response_model=ChatResponse)
async def chat(
request: ChatRequest,
rag: RAGService = Depends(get_rag_service),
sessions: SessionService = Depends(get_session_service),
):
"""Обычный (не стриминговый) эндпоинт."""
if not rag.is_ready:
raise HTTPException(status_code=503, detail="Индекс не загружен")
history = sessions.get_or_create(request.session_id)
start = time.time()
try:
result = await asyncio.wait_for(
asyncio.to_thread(
rag.chain_with_sources.invoke,
{
"question": request.question,
"history": history.messages,
}
),
timeout=60.0,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Превышен таймаут запроса")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ошибка генерации: {str(e)}")
# Обновляем историю
history.add_user_message(request.question)
history.add_ai_message(result["answer"])
return ChatResponse(
session_id=request.session_id,
answer=result["answer"],
sources=result["sources"],
latency_ms=round((time.time() - start) * 1000, 1),
)
@router.get("/stream")
async def chat_stream(
session_id: str,
question: str,
rag: RAGService = Depends(get_rag_service),
sessions: SessionService = Depends(get_session_service),
):
"""Стриминговый эндпоинт через Server-Sent Events."""
if not rag.is_ready:
raise HTTPException(status_code=503, detail="Индекс не загружен")
history = sessions.get_or_create(session_id)
async def event_generator():
full_answer = ""
try:
# Сначала находим источники (быстро)
sources = await asyncio.to_thread(
lambda: [
d.metadata.get("source", f"doc_{i}")
for i, d in enumerate(rag.retriever.invoke(question))
]
)
# Отправляем источники как первое событие
yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"
# Стримим ответ
for chunk in rag.chain.stream({
"question": question,
"history": history.messages,
}):
if chunk:
full_answer += chunk
yield f"data: {json.dumps({'type': 'token', 'content': chunk})}\n\n"
# Финальное событие
yield f"data: {json.dumps({'type': 'done', 'full_answer': full_answer})}\n\n"
# Обновляем историю после стриминга
history.add_user_message(question)
history.add_ai_message(full_answer)
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # отключаем буферизацию в nginx
},
)
@router.delete("/{session_id}")
async def clear_session(
session_id: str,
sessions: SessionService = Depends(get_session_service),
):
"""Очищает историю сессии."""
cleared = sessions.clear(session_id)
return {"cleared": cleared, "session_id": session_id}
Роутер документов
# app/routers/documents.py
import os
import tempfile
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from app.schemas.chat import DocumentUploadResponse
from app.services.rag import RAGService
from app.dependencies import get_rag_service
router = APIRouter(prefix="/documents", tags=["documents"])
@router.post("/upload", response_model=DocumentUploadResponse)
async def upload_document(
file: UploadFile = File(...),
rag: RAGService = Depends(get_rag_service),
):
"""Загружает документ и добавляет его в индекс."""
allowed_extensions = {".pdf", ".txt", ".md"}
_, ext = os.path.splitext(file.filename or "")
suffix = ext.lower()
if suffix not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Неподдерживаемый тип файла. Разрешены: {', '.join(allowed_extensions)}",
)
# Сохраняем во временный файл
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
# Загружаем документ
if suffix == ".pdf":
loader = PyPDFLoader(tmp_path)
else:
loader = TextLoader(tmp_path, autodetect_encoding=True)
docs = loader.load()
# Добавляем метаданные источника
for doc in docs:
doc.metadata["source"] = file.filename
chunks_count = rag.index_documents(docs)
return DocumentUploadResponse(
message=f"Файл '{file.filename}' проиндексирован",
chunks_indexed=chunks_count,
index_size=rag.vectorstore.index.ntotal,
)
finally:
os.unlink(tmp_path) # удаляем временный файл
@router.get("/stats")
async def index_stats(rag: RAGService = Depends(get_rag_service)):
"""Статистика индекса."""
if not rag.is_ready:
return {"status": "not_loaded", "total_chunks": 0}
return {
"status": "ready",
"total_chunks": rag.vectorstore.index.ntotal,
}
Главный файл приложения
# app/main.py
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import get_settings
from app.dependencies import init_services, get_rag_service, get_session_service
from app.routers import chat, documents
from app.schemas.chat import HealthResponse
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Инициализация при старте, очистка при остановке."""
settings = get_settings()
rag_service, session_service = init_services(settings)
# Пытаемся загрузить существующий индекс
loaded = rag_service.load_index()
if loaded:
print(f"✅ Индекс загружен: {rag_service.vectorstore.index.ntotal} чанков")
else:
print("⚠️ Индекс не найден. Загрузите документы через /documents/upload")
yield # приложение работает
# Здесь можно добавить очистку ресурсов
print("🛑 Приложение остановлено")
app = FastAPI(
title="RAG Chatbot API",
description="Чат-бот с поиском по документам на LangChain + FastAPI",
version="1.0.0",
lifespan=lifespan,
)
# CORS для фронтенда
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # в продакшне — конкретные домены
allow_methods=["*"],
allow_headers=["*"],
)
# Подключаем роутеры
app.include_router(chat.router)
app.include_router(documents.router)
@app.get("/health", response_model=HealthResponse)
async def health():
rag = get_rag_service()
sessions = get_session_service()
return HealthResponse(
status="ok",
index_loaded=rag.is_ready,
active_sessions=sessions.active_count,
timestamp=datetime.utcnow(),
)
Запуск и тестирование
Установите зависимости
pip install fastapi uvicorn langchain langchain-openai langchain-community langchain-huggingface langchain-text-splitters sentence-transformers faiss-cpu pypdf rank-bm25 pydantic-settings python-multipart chardet
Запустите сервер
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
После запуска FastAPI автоматически генерирует интерактивную документацию, откройте в браузере:
http://localhost:8000/docs
Там можно нажать на любой эндпоинт → Try it out → заполнить поля → Execute. Удобно для ручного тестирования без curl.
Тест с помощью curl:
# Загрузка документа
curl -X POST "http://localhost:8000/documents/upload" \
-H "Content-Type: multipart/form-data" \
-F "file=@./data/knowledge_base/asyncio_guide.txt"
# Обычный запрос
curl -X POST "http://localhost:8000/chat/" \
-H "Content-Type: application/json" \
-d '{"session_id": "user_1", "question": "Как работает asyncio.gather?"}'
# Стриминговый запрос
curl -N "http://localhost:8000/chat/stream?session_id=user_1&question=Как+работает+event+loop?"
# Health check
curl "http://localhost:8000/health"
Обработка ошибок
# app/middleware/error_handler.py, глобальный обработчик ошибок
from fastapi import Request
from fastapi.responses import JSONResponse
import logging
logger = logging.getLogger(__name__)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Необработанная ошибка: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Внутренняя ошибка сервера"},
)
# В main.py:
# app.add_exception_handler(Exception, global_exception_handler)
requirements.txt
fastapi==0.130.0
uvicorn[standard]==0.41.0
langchain>=1.2.10
langchain-classic>=1.0.1
langchain-openai>=1.1.10
langchain-community>=0.4.1
langchain-huggingface>=1.2.0
langchain-text-splitters>=1.1.1
sentence-transformers>=5.2.3
faiss-cpu==1.13.2
rank-bm25==0.2.2
pypdf==6.7.2
pydantic-settings==2.13.1
python-multipart==0.0.22
python-dotenv==1.2.1
chardet>=6.0.0
httpx==0.28.1
tenacity==9.1.4
langsmith>=0.7.6
Распространённые ошибки
1. Инициализация модели при каждом запросе
# Плохо: ChatOpenAI() создаётся на каждый HTTP-запрос
@router.post("/chat")
async def chat(request: ChatRequest):
model = ChatOpenAI(model=os.getenv("MODEL_NAME", "gpt-4o")) # дорогая операция
...
# Хорошо: создаётся один раз в lifespan через сервис
2. Синхронный код в async эндпоинте
# Плохо: блокирует event loop FastAPI
@router.post("/chat")
async def chat(request: ChatRequest):
result = rag_chain.invoke(...) # LangChain синхронный — блокирует!
# Хорошо: оборачиваем в to_thread
result = await asyncio.to_thread(rag_chain.invoke, inputs)
3. Отсутствие таймаута
# Плохо: запрос может висеть вечно
result = await asyncio.to_thread(rag_chain.invoke, inputs)
# Хорошо: ограничиваем время
result = await asyncio.wait_for(
asyncio.to_thread(rag_chain.invoke, inputs),
timeout=60.0,
)
4. История без ограничений
# Плохо: история растёт бесконечно — растут стоимость и время
history.add_user_message(question)
# Хорошо: обрезаем в SessionService до max_history_messages
5. Глобальные переменные вместо DI
# Плохо: глобальные объекты трудно тестировать и менять
rag_chain = build_chain() # на уровне модуля
# Хорошо: Depends() + сервисы через lifespan
rag: RAGService = Depends(get_rag_service)
Практическое задание
Соберите полноценный чат-бота с RAG, объединив знания всего курса.
Требования:
1) Следуйте структуре проекта из урока: app/, services/, routers/, schemas/
2) Два эндпоинта чата:
POST /chat/- обычный, возвращает JSON с ответом и источникамиGET /chat/stream- SSE стриминг токенов
3) Загрузка документов -POST /documents/upload принимает PDF, TXT и MD
4) Память диалога - история сохраняется по session_id, лимит 20 сообщений, TTL 1 час
5) Продвинутый RAG - гибридный поиск (BM25 + векторный)
6) Устойчивость:
- Таймаут 60 секунд на запрос
- Ответ 503 если индекс не загружен
7) Мониторинг:
GET /healthстатус сервиса- LangSmith трейсинг (переменная окружения)
- Логирование времени каждого запроса
Финальная проверка:
# 1. Загрузите документ
# 2. Отправьте 3 связанных вопроса (проверьте память)
# 3. Отправьте вопрос вне базы знаний (проверьте "не знаю")
# 4. Проверьте /health
# 5. (Опционально) Найдите трейсы в LangSmith
Итог курса
За 11 уроков мы прошли путь от первого chain.invoke() до продакшн-приложения:
| Урок | Тема |
|---|---|
| 1 | Вводный урок, установка зависимостей |
| 2 | LCEL: цепочки, операторы, стриминг |
| 3 | Промпты и структурированный вывод |
| 4 | Память и история диалога |
| 5 | Нелинейные пайплайны: параллельность и ветвление |
| 6 | Tools: инструменты и function calling |
| 7 | Агенты: автоматизация цикла tool use |
| 8 | RAG: поиск по документам |
| 9 | Продвинутый RAG: гибридный поиск и re-ranking |
| 10 | Трейсинг и оценка качества |
| 11 | Продакшн: FastAPI + стриминг |
Что учить дальше:
- LangGraph - сложные мультиагентные системы с состоянием
- MCP (Model Context Protocol) - стандарт подключения инструментов к LLM
- Векторные БД для продакшна: Qdrant, Weaviate, Pinecone
- Fine-tuning vs RAG: когда что выбирать
- Безопасность LLM приложений: prompt injection, guardrails
Подписывайтесь на мой Telegram канал
Если вам нужен ментор и вы хотите научиться разрабатывать AI агентов, пишите, обсудим условия
Авторизуйтесь, чтобы оставить комментарий.
Нет комментариев.
Тут может быть ваша реклама
Пишите info@aisferaic.ru