select_for_update и блокировки | Курс Django ORM урок 6.2
Цель урока
Разобрать select_for_update(), механизм явной блокировки строк в PostgreSQL. Понять когда F() недостаточно для защиты от race condition, как работают режимы NOWAIT и SKIP LOCKED, и как избежать deadlock.
Необходимые знания
- Урок 3.2: F expressions, race conditions
- Урок 6.1: transaction.atomic()
- Базовое понимание блокировок в реляционных БД
Когда F() недостаточно
В уроке 3.2 мы разобрали что F() решает race condition при простом обновлении счетчика:
Product.objects.filter(id=1).update(stock=F("stock") - quantity)
Один атомарный UPDATE, race condition невозможен.
Но что если логика сложнее и требует принятия решений в Python?
# Задача: проверить что товара достаточно, создать заказ, списать stock
def place_order(product_id, quantity):
product = Product.objects.get(id=product_id)
if product.stock < quantity:
raise ValueError("Недостаточно товара") # проверка в Python
# Между проверкой и созданием заказа, окно для race condition
order = Order.objects.create(...)
product.stock -= quantity
product.save()
Два процесса одновременно:
Процесс A: SELECT stock = 5
Процесс B: SELECT stock = 5
Процесс A: stock >= 3 - OK, создаем заказ
Процесс B: stock >= 4 - OK, создаем заказ
Процесс A: UPDATE stock = 5 - 3 = 2
Процесс B: UPDATE stock = 5 - 4 = 1 (но реальный stock = 2, не 5!)
Итог: stock = 1, но нужно -7 (выдано больше чем было)
Трюк с filter(stock__gte=quantity).update(stock=F("stock") - quantity) решает проблему для простых случаев. Но если после проверки нужно создать несколько объектов, отправить событие в очередь, вызвать несколько UPDATE, нам нужна блокировка строки.
select_for_update() - блокировка строки
select_for_update() добавляет к SELECT запросу FOR UPDATE, PostgreSQL блокирует выбранные строки до конца транзакции. Другие транзакции, пытающиеся заблокировать те же строки, будут ждать.
from django.db import transaction
def place_order(product_id, quantity, user):
with transaction.atomic():
# Получить строку с блокировкой
product = Product.objects.select_for_update().get(id=product_id)
if product.stock < quantity:
raise ValueError("Недостаточно товара")
# Теперь мы единственные кто работает с этой строкой
order = Order.objects.create(
user=user,
status="pending",
)
OrderItem.objects.create(
order=order,
product=product,
quantity=quantity,
price=product.price,
)
product.stock -= quantity
product.save(update_fields=["stock"])
BEGIN;
SELECT shop_product.* FROM shop_product
WHERE id = 1
FOR UPDATE; -- блокировка строки
-- Если другая транзакция уже заблокировала эту строку
-- текущая транзакция ждет
INSERT INTO shop_order (...) VALUES (...);
INSERT INTO shop_orderitem (...) VALUES (...);
UPDATE shop_product SET stock = 4 WHERE id = 1;
COMMIT; -- блокировка снимается
select_for_update() работает только внутри transaction.atomic(). Вне транзакции Django бросает TransactionManagementError.
NOWAIT - не ждать заблокированную строку
По умолчанию транзакция ждет пока строка разблокируется. Это может привести к очередям.
nowait=True говорит PostgreSQL сразу вернуть ошибку если строка заблокирована:
from django.db import OperationalError
with transaction.atomic():
try:
product = Product.objects.select_for_update(nowait=True).get(id=product_id)
except OperationalError:
# Строка занята другой транзакцией
raise ValueError("Товар сейчас обрабатывается, попробуйте позже")
# Дальнейшая логика
SELECT shop_product.* FROM shop_product
WHERE id = 1
FOR UPDATE NOWAIT;
-- Если строка заблокирована: ERROR: could not obtain lock on row in relation "shop_product"
NOWAIT полезен в API где нельзя ждать, лучше сразу вернуть ошибку пользователю чем держать соединение ожидающим.
SKIP LOCKED - пропустить заблокированные строки
skip_locked=True заставляет PostgreSQL пропустить строки, которые заблокированы другими транзакциями. Выборка возвращает только доступные строки.
Use case, очередь задач:
def process_next_order():
with transaction.atomic():
# Взять следующий необработанный заказ, пропустить те что уже обрабатываются
order = (
Order.objects
.select_for_update(skip_locked=True)
.filter(status="pending")
.order_by("created_at")
.first()
)
if order is None:
return # нет свободных заказов
order.status = "processing"
order.save(update_fields=["status"])
# Обработать заказ вне транзакции
process_order(order)
SELECT shop_order.* FROM shop_order
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
FOR UPDATE SKIP LOCKED;
Несколько воркер-процессов могут выполнять process_next_order() параллельно, каждый возьмет свой заказ без конфликтов. SKIP LOCKED делает PostgreSQL эффективной очередью задач без Redis или Celery.
of - блокировать только конкретные таблицы
При запросе JOIN select_for_update() по умолчанию блокирует строки во всех таблицах. Параметр of ограничивает блокировку конкретными таблицами:
# Заблокировать только product, не category
product = (
Product.objects
.select_related("category")
.select_for_update(of=("self",)) # "self" = основная модель
.get(id=product_id)
)
SELECT shop_product.*, shop_category.*
FROM shop_product
INNER JOIN shop_category ON shop_product.category_id = shop_category.id
WHERE shop_product.id = 1
FOR UPDATE OF shop_product; -- блокировать только shop_product
Без of, блокируются строки и в shop_product и в shop_category. Это может создать лишние конкурентные блокировки.
of=("self",), блокировать основную модель.
Можно передать имена связанных моделей: of=("self", "category").
Deadlock - тупиковая ситуация
Deadlock возникает когда две транзакции блокируют строки в разном порядке и ждут друг друга:
Транзакция A: блокирует product_id=1
Транзакция B: блокирует product_id=2
Транзакция A: пытается заблокировать product_id=2 - ждет B
Транзакция B: пытается заблокировать product_id=1 - ждет A
Deadlock!
PostgreSQL обнаруживает deadlock, выбирает "жертву" и откатывает её транзакцию с ошибкой:
django.db.utils.OperationalError: deadlock detected
DETAIL: Process 42 waits for ShareLock on transaction 100; blocked by process 43.
Process 43 waits for ShareLock on transaction 101; blocked by process 42.
HINT: See server log for query details.
Решение, блокировать строки в одинаковом порядке:
# Плохо когда разный порядок в разных местах кода
# В одном месте:
product_a = Product.objects.select_for_update().get(id=1)
product_b = Product.objects.select_for_update().get(id=2)
# В другом месте:
product_b = Product.objects.select_for_update().get(id=2)
product_a = Product.objects.select_for_update().get(id=1)
# Правильно, всегда блокировать в одном порядке (по id)
def lock_products(product_ids):
return list(
Product.objects
.select_for_update()
.filter(id__in=product_ids)
.order_by("id") # сортировка гарантирует порядок блокировки
)
SELECT shop_product.*
FROM shop_product
WHERE id IN (1, 2)
ORDER BY id
FOR UPDATE;
-- Всегда блокирует сначала id=1, потом id=2, независимо от порядка в коде
Обработка deadlock:
from django.db import OperationalError
import time
def place_order_with_retry(product_id, quantity, max_retries=3):
for attempt in range(max_retries):
try:
with transaction.atomic():
product = Product.objects.select_for_update().get(id=product_id)
# ... логика
return
except OperationalError as e:
if "deadlock" in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1)) # экспоненциальная задержка
continue
raise
Оптимистичная блокировка - альтернатива select_for_update
select_for_update(), пессимистичная блокировка: строка блокируется заранее, другие ждут. При высокой конкуренции это очередь.
Оптимистичная блокировка работает иначе, никого не блокируем, но при сохранении проверяем что данные не изменились. Реализуется через поле version:
class Product(models.Model):
name = models.CharField(max_length=200)
price = models.DecimalField(max_digits=10, decimal_places=2)
stock = models.IntegerField()
version = models.PositiveIntegerField(default=0) # счетчик версий
def update_stock_optimistic(product_id, quantity, max_retries=3):
for attempt in range(max_retries):
product = Product.objects.get(id=product_id)
if product.stock < quantity:
raise ValueError("Недостаточно товара")
# UPDATE выполняется только если version не изменилась
updated = Product.objects.filter(
id=product_id,
version=product.version, # проверяем что никто не менял объект
).update(
stock=product.stock - quantity,
version=product.version + 1,
)
if updated == 1:
return # успешно обновили
# updated == 0: кто-то успел изменить строку, повторяем
# attempt < max_retries - 1 позволяет повторить
raise RuntimeError("Не удалось обновить stock после нескольких попыток")
-- UPDATE успешен только если version совпадает
UPDATE shop_product
SET stock = 2, version = 4
WHERE id = 1 AND version = 3;
-- Если version уже 4 (кто-то успел) - affected rows = 0, повторяем
Оптимистичная блокировка лучше когда конфликты редки, большинство запросов проходят без ожидания. select_for_update лучше когда конфликты часты, повторные попытки дороже блокировки.
select_for_update() vs F() - когда что
| Ситуация | Решение |
|---|---|
| Инкремент счетчика (просмотры, лайки) | F() в update() |
| Списание stock без бизнес-логики в Python | filter(stock__gte=qty).update(stock=F()-qty) |
| Списание stock с созданием связанных объектов | select_for_update() |
| Обработка элементов очереди несколькими воркерами | select_for_update(skip_locked=True) |
| Быстрый ответ при конкурентном доступе | select_for_update(nowait=True) |
| Редкие конфликты, высокая параллельность | Оптимистичная блокировка через version |
Практическое задание
1) Реализуйте функцию book_product(product_id, quantity, user_id) которая:
- Блокирует строку продукта
- Проверяет наличие stock
- Создает Order и OrderItem
- Списывает stock
- Бросает
ValueErrorесли товара недостаточно
2) Напишите функцию get_next_pending_order() которая атомарно берет первый заказ со статусом "pending" и меняет его статус на "processing". Используйте SKIP LOCKED чтобы несколько воркеров могли работать параллельно.
3) Смоделируйте deadlock в коде: создайте два продукта, напишите два потока которые блокируют их в разном порядке. Запустите в concurrent.futures.ThreadPoolExecutor. Что происходит?
4) Перепиши функцию из задания 3 чтобы избежать deadlock через сортировку order_by("id").
5) Измерьте разницу в производительности между тремя подходами для 100 параллельных запросов на создание заказа с одним продуктом:
product.stock -= 1; product.save()(без защиты)filter(id=pk, stock__gte=1).update(stock=F("stock") - 1)(F expression)select_for_update().get(id=pk)+ Python логика
Возможные ошибки
select_for_update вне транзакции
# TransactionManagementError
product = Product.objects.select_for_update().get(id=1)
# Правильно
with transaction.atomic():
product = Product.objects.select_for_update().get(id=1)
Долгие операции под блокировкой
with transaction.atomic():
product = Product.objects.select_for_update().get(id=1)
# Строка заблокирована на всё это время:
response = requests.post("https://payment.api/charge", ...) # 2-5 секунд
product.stock -= 1
product.save()
Другие транзакции ждут. Выноси внешние вызовы за пределы блокировки где возможно.
Блокировка без транзакции через AUTOCOMMIT
# Даже с select_for_update в autocommit режиме блокировка снимается сразу
# т.к. каждый SELECT это отдельная транзакция
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM shop_product FOR UPDATE")
# блокировка снята сразу после SELECT
Забыть order_by при SKIP LOCKED
# Без order_by PostgreSQL может вернуть разные строки в разных запросах
# нет гарантии FIFO порядка
Order.objects.select_for_update(skip_locked=True).filter(status="pending").first()
# С order_by - FIFO гарантирован
Order.objects.select_for_update(skip_locked=True).filter(status="pending").order_by("created_at").first()
Связь со следующим уроком
В уроке 6.3 разберем constraints и валидацию: UniqueConstraint, CheckConstraint, разницу между валидацией на уровне Django и на уровне БД. Ограничения БД, последний рубеж защиты целостности данных.
Подписывайтесь на мой Telegram канал
Если вам нужен ментор и вы хотите научиться веб-разработке, узнать подробнее
Авторизуйтесь, чтобы оставить комментарий.
Нет комментариев.
Тут может быть ваша реклама
Пишите info@aisferaic.ru