Статьи

Параллелизм Django, блокировка базы данных и обновление объектов модели


Использование выражений, чтобы сделать некоторые из наших обновлений модели атомарными (как обсуждалось
ранее ), было недостаточно, чтобы сделать все наши операции безопасными для одновременных модификаций базы данных (хотя все еще полезно). Это потому, что, выбрав некоторые значения, мы хотели выполнить операции на основе этих значений, и они
не должны изменяться во время выполнения операций (потому что конечный результат будет записан обратно и перезапишет любые другие сделанные изменения).

Что нам действительно нужно было сделать — это заблокировать строки в базе данных, где строка соответствует конкретному экземпляру модели, чтобы никакая другая операция не могла изменить его во время изменения.

Блокировка строки базы данных обычно выполняется с помощью SELECT … FOR UPDATE; и, к сожалению, Django не имеет встроенной поддержки для этого. Надеюсь, он будет добавлен в Django 1.4.

Наше решение этого заключается в том, чтобы вручную изменить SQL-запрос, чтобы сделать его для обновления. Это получает блокировку строки, и любой другой запрос к этой строке будет блокироваться до тех пор, пока блокировка не будет снята. Это должно быть сделано внутри транзакции, поскольку блокировка снята, когда транзакция завершится (фиксация или откат).

Запись

С тех пор, как я начал писать эту запись в блоге (прерванную моей женой, которая незаметно родила нашу прекрасную дочь Ирину ), она была совершена. Новое в версии разработки Django: метод QuerySet select_for_update .

Наша первоначальная попытка создала запрос, который просто выбрал бы одно поле из конкретного экземпляра модели, который мы хотели заблокировать. Затем запрос преобразуется в запрос FOR UPDATE и выполняется для получения блокировки.

Этот запрос выбирает поле, которое мы хотим изменить. Как только мы получим блокировку, мы знаем, что ничто другое не может изменить поле, пока мы его используем. Нам нужно извлечь поле снова (и обновить поле в нашем экземпляре модели) после получения блокировки, потому что у объекта модели, который у нас уже есть, есть значение, но получение блокировки, возможно, заблокировало, в то время как другой запрос изменил значение (нам всегда нужно последнее значение на данный момент — поэтому, даже если мы не были заблокированы, мы хотим обновить после получения блокировки. Так как изменения в этом конкретном поле всегда защищены с помощью блокировки, на практике мы были бы заблокированы, если бы что-то еще меняло ее) ,

Наша начальная итерация интересовала только одно поле, которое могло измениться из-за одновременной модификации, поэтому код, который получает блокировку, обновляет это поле :

@transaction.commit_on_success
def do_something(self):
    self._get_lock()
    # do stuff

def _mangle_sql_for_locking(self, sql):
    # yeah, it's really this difficult
    return sql + ' FOR UPDATE'

def _get_lock(self):
    query = SomeModel.objects.filter(id=self.id).values_list('field')
    sql, params = query._as_sql(connection=connection)

    cursor = connection.cursor()
    cursor.execute(self._mangle_sql_for_locking(sql), params)

    # acquire the lock and update the field
    field = cursor.fetchone()[0]
    self.field = field

Это работало нормально, пока у нас не было больше одного интересующего нас поля. Наша наивная попытка изменить код выглядела так:

def _get_lock(self):
    values = ('field', 'other_field')
    query = SomeModel.objects.filter(id=self.id).values_list(*values)
    sql, params = query._as_sql(connection=connection)

Это взрывается в вызове _as_sql с этим исключением:


Невозможно использовать
многополевое ValuesListQuerySet в качестве значения фильтра.

Об этом не было никаких упоминаний в документации, и Google мало что дал, но потом мы вызываем приватный метод напрямую.

Оооо … как насчет метода, который обновит все поля с последней версией из БД? Я всегда был немного удивлен, что экземпляры модели не имеют этой встроенной возможности (может быть, я просто упустил ее?), Но, возможно, это немного против паттерна за пределами очень конкретных случаев использования.

Так что на случай, если у вас возникнет желание пройти по тем же кроличьим норам, что и я, вот метод обновления, который выбирает последнюю версию всех полей и обновляет экземпляр модели.

def refresh(self):
    updated = SomeModel.objects.filter(id=self.id)[0]
    fields = [f.name for f in self._meta._fields()
              if f.name != 'id']
    for field in fields:
        setattr(self, field, getattr(updated, field))

Это позволяет нам получить блокировку (все еще генерировать и выполнять SQL-код FOR UPDATE), но отбрасывать результат, поскольку все поля обновляются другим запросом (с дополнительными издержками, которые, конечно, подразумевают).

Обратите внимание, что для нашего кода, который выполняет блокировку, есть разумная причина того, что блокировка SQL, как бы тривиально это ни было, имеет свой собственный метод. Фактический код имеет дополнительный метод и вызова в нем:

def _mangle_sql_for_locking(self, sql):
    # yeah, it's really this difficult
    return sql + ' FOR UPDATE'

def _concurrency_poison(self):
    pass

def _get_lock(self):
    values = ('field', 'other_field')
    query = SomeModel.objects.filter(id=self.id).values_list(*values)
    sql, params = query._as_sql(connection=connection)

    cursor = connection.cursor()
    cursor.execute(self._mangle_sql_for_locking(sql), params)
    self._concurrency_poison()

Метод _concurrency_poison ничего не делает в рабочей среде , но он позволяет нам писать тесты, которые доказывают, что есть состояние гонки, и доказывают, что оно исправлено. В наших тестах мы исправляем _mangle_sql_for_locking с помощью функции, которая возвращает sql без изменений. Мы дополнительно исправляем _concurrency_poison с помощью функции, которая выполняет параллельную модификацию.

Без блокировки «одновременное изменение» будет перезаписано, а окончательное значение будет неправильным (одновременное изменение будет потеряно). Мы проверяем, что мы получаем неправильный конечный результат, который доказывает, что у нас есть состояние гонки.

Второй тест оставляет _mangle_sql_for_locking без изменений, но все же исправляет _concurrency_poison для внесения одновременных изменений. Поскольку теперь это должно блокировать ( _concurrency_poison вызывается после получения блокировки), параллельное изменение должно быть сделано из отдельного процесса или потока. Типичный пример (используется для обоих тестов) может выглядеть примерно так:

import subprocess
import time
from textwrap import dedent

from django.conf import settings
from mock import patch

ENV = {
    'PGHOST': settings.DATABASES['default']['HOST'],
    'PGUSER': settings.DATABASES['default']['USER'],
    'PGPASSWORD': settings.DATABASES['default']['PASSWORD'],
    'PGPORT': settings.DATABASES['default']['PORT'],
}

@patch('appname.models.Model._concurrency_poison')
def concurrently_modify(self, concurrency_mock):
    # Poison: modify the database in an inconvenient way at an
    # inconvenient time.
    database_name = settings.DATABASES['default']['NAME']
    proc = subprocess.Popen(['psql', database_name],
        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
        stderr=subprocess.PIPE, env=ENV)

    def poison():
        proc.stdin.write(dedent('''
            UPDATE some_table
            SET field = field - 3;\n'''
        ))
        proc.stdin.flush()
        # give the database code a chance to execute
        time.sleep(1)

    concurrency_mock.side_effect = poison

    # call the code that obtains the lock here
    # it will automatically trigger the
    # concurrent operation

Во втором тесте одновременное изменение блокируется до тех пор, пока блокировка не будет снята. Вместо того, чтобы перезаписываться, параллельное изменение выполняется после логики в модели, поэтому результат будет отличаться от первого теста, и мы можем утверждать, что результат имеет это другое (правильное) значение.

Этот шаблон тестирования, который я считаю довольно классным, был разработан Дэвидом Оуэном, который является постоянным экспертом по базам данных в нашей команде. Он учит меня базам данных, хотя я рекомендую ему, чтобы тестирование было полезным …Улыбка