Django ORM

Продвинутые паттерны | Курс Django ORM урок 9.2

Продвинутые паттерны | Курс Django ORM урок 9.2
Mikhail
Автор
Mikhail
Опубликовано 16.03.2026
0,0
Views 11

Цель урока

Разобрать практические паттерны применяемые в Django ORM: логирование изменений моделей, денормализация данных для ускорения запросов, паттерн счетчиков и кэширование агрегатов.

Необходимые знания


Паттерн 1: логирование истории изменений

Задача: сохранять историю всех изменений полей модели.

Простая реализация через сигналы

import json
from django.db import models
from django.utils import timezone


class ChangeLog(models.Model):
    """История изменений любой модели."""
    content_type_name = models.CharField(max_length=100)  # "Product"
    object_id = models.PositiveIntegerField()
    changed_by = models.ForeignKey(
        "auth.User",
        null=True,
        on_delete=models.SET_NULL,
    )
    changed_at = models.DateTimeField(default=timezone.now)
    changes = models.JSONField()  # {"price": {"old": 100, "new": 150}}

    class Meta:
        indexes = [
            models.Index(fields=["content_type_name", "object_id"]),
        ]


class TrackChangesModel(models.Model):
    """Абстрактная модель с автоматическим логированием изменений."""
    TRACKED_FIELDS = []  # переопределить в дочерней модели

    class Meta:
        abstract = True

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._original_values = self._get_tracked_values()

    def _get_tracked_values(self):
        return {
            field: getattr(self, field, None)
            for field in self.TRACKED_FIELDS
        }

    def save(self, *args, **kwargs):
        if self.pk:  # обновление существующего объекта
            changes = {}
            current_values = self._get_tracked_values()
            for field in self.TRACKED_FIELDS:
                old = self._original_values.get(field)
                new = current_values.get(field)
                if old != new:
                    changes[field] = {"old": str(old), "new": str(new)}
        else:
            changes = {}

        super().save(*args, **kwargs)
        self._original_values = self._get_tracked_values()

        if changes:  # записываем лог после успешного сохранения
            ChangeLog.objects.create(
                content_type_name=self.__class__.__name__,
                object_id=self.pk,
                changes=changes,
            )


class Product(TrackChangesModel):
    TRACKED_FIELDS = ["price", "stock", "is_active"]
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    stock = models.IntegerField()
    is_active = models.BooleanField(default=True)


# Изменить цену
product = Product.objects.get(id=1)
product.price = Decimal("150.00")
product.save()

# В ChangeLog записано:
# {"price": {"old": "100.00", "new": "150.00"}}

Ограничения этого подхода

Логирование в save() не работает для:

  • QuerySet.update(), обходит save()
  • bulk_update(), обходит save()
  • Прямой SQL

Для полного аудита нужны либо триггеры PostgreSQL, либо специализированные библиотеки django-auditlog, django-simple-history.


Паттерн 2: денормализованные счетчики

Задача: быстро получить количество отзывов о продукте без COUNT при каждом запросе.

Вместо:

# Медленно на большой таблице, COUNT каждый запрос
Product.objects.annotate(review_count=Count("reviews"))

Хранить счетчик в поле модели:

class Product(models.Model):
    name = models.CharField(max_length=200)
    review_count = models.PositiveIntegerField(default=0, db_index=True)


class Review(models.Model):
    product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name="reviews")
    rating = models.PositiveSmallIntegerField()
    text = models.TextField()

Обновлять счетчик атомарно при создании и удалении отзыва:

from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.db.models import F


@receiver(post_save, sender=Review)
def increment_review_count(sender, instance, created, **kwargs):
    if created:
        Product.objects.filter(id=instance.product_id).update(
            review_count=F("review_count") + 1
        )


@receiver(post_delete, sender=Review)
def decrement_review_count(sender, instance, **kwargs):
    Product.objects.filter(id=instance.product_id).update(
        review_count=F("review_count") - 1
    )
-- При создании Review:
UPDATE shop_product SET review_count = review_count + 1 WHERE id = 5;

-- При удалении Review:
UPDATE shop_product SET review_count = review_count - 1 WHERE id = 5;


# Теперь мгновенное чтение
product = Product.objects.get(id=5)
print(product.review_count)  # без JOIN и COUNT

# Сортировка без агрегации
Product.objects.order_by("-review_count")[:10]
-- Без COUNT и JOIN
SELECT * FROM shop_product ORDER BY review_count DESC LIMIT 10;

Пересинхронизация счетчиков

Счетчики могут рассинхронизироваться при bulk операциях или прямом SQL. Нужна команда для пересинхронизации:

from django.core.management.base import BaseCommand
from django.db.models import Count

class Command(BaseCommand):
    help = "Пересинхронизировать денормализованные счетчики"

    def handle(self, *args, **options):
        from shop.models import Product

        # Пересчитать review_count для всех продуктов
        products = Product.objects.annotate(
            actual_count=Count("reviews")
        )

        updates = []
        for product in products:
            if product.review_count != product.actual_count:
                product.review_count = product.actual_count
                updates.append(product)

        if updates:
            Product.objects.bulk_update(updates, ["review_count"])
            self.stdout.write(f"Обновлено: {len(updates)} продуктов")

Паттерн 3: кэширование агрегатов

Задача: статистика продаж по категории (выручка, количество заказов) нужна на каждой странице каталога. Вычислять при каждом запросе, дорого.

class CategoryStats(models.Model):
    """Кэшированная статистика для категории."""
    category = models.OneToOneField(
        "Category",
        on_delete=models.CASCADE,
        related_name="stats",
    )
    total_revenue = models.DecimalField(max_digits=14, decimal_places=2, default=0)
    total_orders = models.PositiveIntegerField(default=0)
    avg_product_price = models.DecimalField(max_digits=10, decimal_places=2, default=0)
    last_updated = models.DateTimeField(auto_now=True)


def recalculate_category_stats(category_id):
    """Пересчитать статистику категории."""
    from django.db.models import Sum, Count, Avg

    stats = (
        OrderItem.objects
        .filter(product__category_id=category_id)
        .aggregate(
            total_revenue=Sum(F("price") * F("quantity")),
            total_orders=Count("order", distinct=True),
        )
    )
    avg_price = Product.objects.filter(
        category_id=category_id,
        is_active=True,
    ).aggregate(avg=Avg("price"))["avg"]

    CategoryStats.objects.update_or_create(
        category_id=category_id,
        defaults={
            "total_revenue": stats["total_revenue"] or 0,
            "total_orders": stats["total_orders"] or 0,
            "avg_product_price": avg_price or 0,
        }
    )


# Использование
category = Category.objects.select_related("stats").get(slug="phones")
print(category.stats.total_revenue)  # без JOIN на OrderItem

Запуск пересчета, по расписанию (Celery Beat) или при создании заказа через on_commit:

from django.db import transaction

def place_order(user, cart_items):
    with transaction.atomic():
        order = Order.objects.create(...)
        OrderItem.objects.bulk_create([...])

        # Пересчитать статистику затронутых категорий после commit
        affected_categories = {item.product.category_id for item in cart_items}
        for cat_id in affected_categories:
            transaction.on_commit(
                lambda cid=cat_id: recalculate_category_stats(cid)
            )

Паттерн 4: версионирование объектов

Хранить версии объекта для откатов:

class ProductVersion(models.Model):
    product = models.ForeignKey(
        Product,
        on_delete=models.CASCADE,
        related_name="versions",
    )
    version_number = models.PositiveIntegerField()
    name = models.CharField(max_length=200)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    description = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    created_by = models.ForeignKey("auth.User", null=True, on_delete=models.SET_NULL)

    class Meta:
        unique_together = [["product", "version_number"]]
        ordering = ["-version_number"]


def save_product_version(product, user=None):
    """Сохранить текущее состояние как версию."""
    last_version = product.versions.first()
    version_number = (last_version.version_number + 1) if last_version else 1

    ProductVersion.objects.create(
        product=product,
        version_number=version_number,
        name=product.name,
        price=product.price,
        description=product.description,
        created_by=user,
    )


def restore_product_version(product, version_number):
    """Восстановить продукт из версии."""
    version = product.versions.get(version_number=version_number)

    with transaction.atomic():
        product.name = version.name
        product.price = version.price
        product.description = version.description
        product.save(update_fields=["name", "price", "description", "updated_at"])

        # Сохранить новую версию (восстановление)
        save_product_version(product)

Паттерн 5: составные запросы с Union

Django поддерживает SQL UNION через QuerySet.union():

# Объединить заказы из двух разных статусов
pending = Order.objects.filter(status="pending").values("id", "total_price", "created_at")
processing = Order.objects.filter(status="processing").values("id", "total_price", "created_at")

active_orders = pending.union(processing).order_by("-created_at")


SELECT id, total_price, created_at FROM shop_order WHERE status = 'pending'
UNION
SELECT id, total_price, created_at FROM shop_order WHERE status = 'processing'
ORDER BY created_at DESC;

union() убирает дубли. union(all=True) оставляет дубли (как UNION ALL, быстрее).

Ограничения: после union() нельзя фильтровать через filter(), только order_by() и срезы.


Практическое задание

  1. Реализуйте TrackChangesModel и примените его к Product. Проверьте что при изменении price создается запись в ChangeLog. Убедитесь что при QuerySet.update() логирование не происходит (это ограничение, задокументируйте его).

  2. Добавьте денормализованный счетчик review_count в Product. Реализуйте сигналы для обновления. Напишите команду для пересинхронизации. Измерьте разницу в скорости: Product.objects.annotate(Count("reviews"))[:100] vs Product.objects.all()[:100] с готовым review_count.

  3. Реализуйте CategoryStats и функцию recalculate_category_stats(). Вызывайте пересчет через on_commit при создании заказа. Проверьте что статистика обновляется после оформления заказа.

  4. Добавьте версионирование к Product, при каждом save() сохранять версию. Реализуйте restore_to_version(version_number).

  5. Используйте union() для получения единого списка: последние 5 заказов пользователя + последние 5 отзывов пользователя. Верните отсортированный по дате список (используйте values() с общими полями: id, type, created_at).


Возможные ошибки

Денормализованные счетчики без F(), race condition

# Race condition
@receiver(post_save, sender=Review)
def update_count(sender, instance, created, **kwargs):
    if created:
        product = instance.product
        product.review_count += 1  # читаем, изменяем в Python
        product.save()  # два запроса, race condition

# Правильно атомарный UPDATE
Product.objects.filter(id=instance.product_id).update(
    review_count=F("review_count") + 1
)

Логирование изменений в save() с бесконечной рекурсией

class TrackChangesModel(models.Model):
    def save(self, *args, **kwargs):
        ChangeLog.objects.create(...)  # save вызывает signal, signal вызывает save?
        super().save(*args, **kwargs)

Если ChangeLog.post_save вызывает что-то, что снова вызывает Product.save(), рекурсия. Используйте guard флаги или update_fields, чтобы избежать повторного логирования.

union() с разными типами полей

orders = Order.objects.values("id", "total_price")  # DecimalField
products = Product.objects.values("id", "price")    # DecimalField

# Порядок и типы полей должны совпадать
orders.union(products)  # OK - id(int) + decimal, id(int) + decimal

# Но имена полей берутся из первого QuerySet
# Результат: поля называются "id" и "total_price" (не "price")

Связь со следующим уроком

В уроке 9.3 разберем async ORM, асинхронные методы Django 4.1+ для работы с базой данных в ASGI окружении. aget(), afilter(), aiterator() и их ограничения.


<< Урок 9.1

Урок 9.3 >>


Подписывайтесь на мой Telegram канал

Если вам нужен ментор и вы хотите научиться веб-разработке, узнать подробнее

Авторизуйтесь, чтобы оставить комментарий.

Комментариев: 0

Нет комментариев.

Тут может быть ваша реклама

Пишите info@aisferaic.ru

Похожие туториалы