Статьи

Как реализовать надежные и масштабируемые транзакции в документах с MongoDB

Транзакционный вопрос

Есть несколько веских причин, почему исторически базы данных обеспечивали поддержку транзакций между частями данных. Типичный сценарий состоит в том, что приложению необходимо изменить несколько независимых битов, и оно, скорее всего, перейдет в плохое состояние, если только некоторые из этих изменений действительно внесут его в хранилище данных. Отсюда и концепция давно уважаемой КИСЛОТЫ:

  • Атомарность: все изменения сделаны или нет
  • Согласованность: данные остаются в согласованном состоянии
  • Изоляция: другие клиенты не видят частичных изменений
  • Долговечность: после подтверждения транзакции обратно клиенту данные находятся в безопасном месте (обычно в журнале на диске)

С появлением баз данных NoSQL поддержка ACID-транзакций в документах, как правило, была исключена. Многие хранилища ключей / значений по-прежнему имеют ACID, но это относится только к одной записи. Основная причина его падения в том, что он просто не масштабируется ! Если документы распределены по нескольким серверам, транзакции становятся чрезвычайно сложными для выполнения и ресурсоемкими. Представьте себе, если транзакция распространяется на десятки серверов, некоторые из которых являются удаленными, некоторые из них ненадежными, какими бы трудными и медленными это ни было!

MongoDB поддерживает ACID на одном уровне документа . Точнее, вы получите «ACI» по умолчанию и получите «D», если включите опцию «j» WriteConcern. Mongo имеет богатый язык запросов, охватывающий все документы, и, следовательно, люди жаждут переносить несколько документов через свой код SQL. Один естественный обходной путь — использовать всю мощь документов: вместо множества строк и связей вы можете объединить все в один большой документ. Денормализация возвращает вам транзакции!

Этот метод фактически решает большое количество транзакционных проблем для отношений один к одному и некоторых отношений один ко многим . Это также делает ваше приложение проще, а хранилище данных быстрее, так что это беспроигрышный вариант! Но для других случаев, когда данные должны быть разделены, как вы можете справиться с этим?

Уменьшение КИСЛОТЫ

Это действительно сводится к этому для большинства приложений:

  • Атомность: на самом деле вы просто хотите, чтобы ВСЕ изменения были сделаны.
  • Последовательность: это хорошо, если система непоследовательна в течение короткого времени, пока она в конечном итоге непротиворечива
  • Изоляция: отсутствие изоляции обуславливает временную несогласованность, которая не является идеальной, но большинство пользователей привыкли к ней в мире онлайн-услуг (например, поддержка клиентов: «для распространения требуется несколько секунд»)
  • Долговечность: это важно и поддерживается

Проблема действительно сводится к тому, чтобы иметь надежную и масштабируемую конечную последовательность!

Решение 1: Поле синхронизации

Этот вариант использования является самым простым и наиболее распространенным: существуют поля, которые необходимо синхронизировать между документами. Например, скажем, у вас есть пользовательский документ с именем пользователя «Джон» и документы, представляющие комментарии, которые Джон опубликовал. Если пользователю разрешено изменять имя пользователя, это изменение необходимо распространить на все документы, даже если в середине процесса произошла ошибка приложения или базы данных.

Для этого проще всего использовать новое поле (например, «синхронизация») в основном документе (в данном случае это пользовательский документ). Установите в поле «синхронизация» метку даты в обновлении пользовательского документа:

db.user.update({ _id: userId }, { $set: { syncing: currentTime }, { rest of updates ... } })

Теперь приложение может изменять все документы комментариев. Когда это сделано, флаг должен быть снят:

db.user.update({ _id: userId }, { $unset: { syncing: 1 } })

Теперь представьте, что во время процесса произошел сбой: остались некоторые комментарии со старым именем пользователя. К счастью, флаг все еще установлен, и у приложения есть способ узнать, что процесс должен быть повторен. Для этого вам нужен фоновый поток, который проверяет наличие «синхронизирующих» документов, которые не были закончены в консервативное время (например, 1 час). Поиск этих документов может быть очень эффективным с помощью индекса по флагу «синхронизация». Этот индекс должен быть «разреженным», чтобы на самом деле индексировались только те немногие документы, для которых он установлен, что делает его очень маленьким.

db.user.ensureIndex({ syncing: 1 }, { sparse: true })

В результате ваша система обычно поддерживает синхронизацию в течение короткого периода времени или до 1 часа в случае сбоя системы. Если синхронизация не важна, вы можете даже заставить приложение исправлять документы лениво, если при чтении будет обнаружен флаг «синхронизации».

Решение 2. Очередь заданий

Приведенный выше принцип хорошо работает, если приложению не требуется много контекста, и он просто применяет общий процесс (например, копирование значения). В некоторые транзакции необходимо внести конкретные изменения, которые потом будет сложно идентифицировать. Например, скажем, пользовательский документ содержит список друзей:

{ _id: userId, friends: [ userId1, userId2, ... ]}

Теперь пользователи A и B решают стать друзьями: вам нужно добавить B в список A и наоборот. Хорошо, если это не происходит точно в одно и то же время (если это не раздражает одного из них :)). Решением этой и большинства проблем с транзакциями является использование очереди заданий, также хранящейся в MongoDB. Рабочий документ может выглядеть так:

{ _id: jobId, ts: timeStamp, state: "TODO", type: "ADD_FRIEND", details: { users: [ userA, userB ]} }

Либо исходный поток может вставить задание и продолжить вносить изменения, либо несколько «рабочих» потоков могут быть выделены для выбора заданий. Работник выбирает самое старое необработанное задание с помощью функции findAndModify (), которая является полностью атомарной. В операции он помечает задание как обрабатываемое, а также указывает имя работника и текущее время для отслеживания. Индекс {state: 1, ts: 1} делает эти вызовы очень быстрыми.

db.job.findAndModify({ query: { state: "TODO" }, sort: { ts: 1 }, update: { $set: { state: "PROCESSING", worker: { name: "worker1", ts: startTime } } } })

Затем работник вносит изменения в оба пользовательских документа таким образом, что он идемпотентен. Важно, чтобы эти изменения можно было повторно применять много раз с одинаковым эффектом! Здесь мы просто будем использовать $ addToSet для этой цели. Более общей альтернативой является добавление теста на стороне запроса, чтобы проверить, было ли изменение уже внесено или нет.

db.user.update({ _id: userA }, { $addToSet: { friends: userB } })

Последний шаг — удалить задание или пометить его как выполненное. Это может быть хорошо, чтобы сохранить работу на некоторое время в качестве меры безопасности. Единственным недостатком является то, что предыдущий индекс со временем увеличивается, хотя вы также можете использовать изящный разреженный индекс в специальном поле {undone: 1} (и соответственно изменять запросы)

db.job.update({ _id: jobId }, { $set: { state: "DONE" } })

Если процесс умирает в какой-то момент времени, задание все еще находится в очереди, но помечено как обработка. После периода бездействия фоновый поток может снова пометить задание как нуждающееся в обработке, и задание просто начинается заново с самого начала.

Решение 3: двухфазный коммит

Two Phase Commit — это хорошо известное решение, которое используется во многих распределенных системах. MongoDB фактически облегчает реализацию этого решения, поскольку благодаря гибкой схеме вы можете вставить все данные, необходимые для его выполнения, в документы. Несколько лет назад я написал статью на эту тему, которую можно найти в «Поваренной книге MongoDB« Выполнение двухфазных коммитов », а также в руководстве MongoDB« Выполнение двухфазных коммитов »» . Приятного чтения!

Решение 4: Выверка журнала

Одним из распространенных решений, которое используют большинство финансовых систем, является идея сверки данных. Идея состоит в том, чтобы записывать транзакции в виде простого журнала, что позволяет избежать сложности и возможных сбоев. Затем состояние учетной записи определяется путем агрегирования всех изменений с момента последнего известного исправного состояния. В крайнем случае, вы можете стереть аккаунт и воссоздать его с нуля, повторно применив все изменения, начиная с первого дня … страшно, но это просто работает :). Документ учетной записи просто нуждается в «кэшированном» балансе для повышения скорости чтения, а также в идентификаторе последовательности, по которому он был рассчитан:

{ _id: accountId, cache: { balance: 10000, seqId: 115 } }

Теперь для транзакции типичная финансовая система записывает запись для транзакции и одну запись «изменение счета» для каждого участвующего счета. Этот метод требует дополнительных гарантий записи, которые могут быть достигнуты с помощью решения «очередь заданий», где заданием является сама транзакция, пока не будут записаны все изменения учетной записи. Но здесь, с MongoDB, мы можем вместо этого написать один документ, который включает как транзакцию, так и изменения аккаунта. Документ будет вставлен в коллекцию TX, например:

{ _id: ObjectId, ts: timestamp , proc: "UNCOMMITTED", state: "VALID", changes: [ { account: 1234, type: "withdraw", value: -100, seqId: 801, cachedBal: null }, { account: 2345, type: "deposit", value: 100, seqId: 203, cachedBal: null } ] }

Области интересов:

  • proc: транзакция начинается в состоянии «UNCOMMITTED» и становится «COMMITTED», когда все предыдущие транзакции, связанные с этими счетами, также находятся в состоянии «COMMITTED». Это сигнализирует о том, что эта транзакция может использоваться в качестве «якоря» для расчета баланса.
  • состояние: может быть различным «VALID», «CANCELED» и т. д. Если не «VALID», транзакция игнорируется для расчетов сальдо, даже если «COMMITTED».
  • seqId: представляет уникальный идентификатор последовательности для этой учетной записи. Это дает детерминированный порядок изменения учетной записи
  • cachedBal: кэшированный баланс для учетной записи. Если транзакция находится в состоянии «COMMITTED», то кэшированный баланс (если установлен) является надежным числом.
  • обратите внимание, что мы используем уникальный индекс для {changes.account: 1, changes.seqId: 1}. Это необходимо для быстрой выверки, а также гарантирует отсутствие дубликатов последовательностей для учетной записи.

Ключевая часть заключается в том, чтобы обеспечить надежное вычисление / аннулирование кэшированного баланса, даже если порядок транзакции не гарантирован, а состояние транзакции может измениться. Для этой цели мы используем идентификатор последовательности для каждой учетной записи, который обеспечивает детерминированное упорядочение изменений учетной записи и необходим для избежания сложных блокировок. Перед записью транзакции приложение сначала выводит следующий seqId для каждой из учетных записей, выполняя простые запросы:

db.tx.find({ "changes.account": 1234 }, { "changes.$.seqId": 1 }).sort({ "changes.seqId": -1 }).limit(1)

Затем каждый seqId увеличивается локально и, наконец, записывается как часть транзакции. В случае, если другой поток получил тот же seqId одновременно, уникальный индекс гарантирует, что запись не удалась, и процесс просто оптимистично повторяет попытки, пока не завершится. Альтернативой является сохранение текущего seqId в коллекции учетных записей и получение следующего наверняка с помощью findAndModify (), который обычно медленнее, если у вас нет больших разногласий по учетным записям. Обратите внимание, что вполне возможно, что seqId будет пропущен, если каким-либо образом транзакция никогда не будет записана, но это не проблема, если нет дубликатов.

Теперь мы заложили основу для примирения! Фоновый процесс может гарантировать, что все незафиксированные транзакции будут обработаны. Транзакция не помечается как зафиксированная, если не были зафиксированы все транзакции с меньшим номером последовательности для этих учетных записей. Как только транзакция помечена как совершенная, она становится полностью неизменной. Теперь о хорошей части: давайте получим баланс счета! Сначала мы можем получить последний известный хороший баланс, который можно быстро найти, пройдя по индексу:

db.tx.find({ "changes.account": 1234, proc: "COMMITTED" }, { "changes.$": 1 }).sort({ "changes.seqId": -1 }).limit(1)

Ибо там мы можем легко получить все ожидающие изменения, получив транзакции с большим seqId:

db.tx.find({ "changes.account": 1234, "changes.seqId": { $gt: lastGoodSeqId } }, { "changes.$": 1 }).sort({ "changes.seqId": 1 })

Мы можем использовать эти результаты для отображения ожидающих платежей. Если мы просто хотим быстро определить, где находится ожидающий баланс, мы можем просто попросить MongoDB агрегировать изменения и дать нам общую сумму:

db.tx.aggregate([{ $match: { "changes.account": 1234, "changes.seqId": { $gt: lastGoodSeqId }, state: "VALID" }}, 
{ $unwind: "changes" }, 
{ $match: { "account": 1234 }}, 
{ $group: { _id: "total", total: { $sum: "$value" } }}])

Чтобы система оставалась быстрой, а вычисления — небольшими, важно, чтобы фоновые работники продолжали следить за тем, чтобы транзакции переходили в зафиксированное состояние и чтобы кэширование балансов. В идеале транзакция никогда не будет возвращена, и вместо этого будет отправлена ​​новая обратная транзакция. Тем не менее, можно реализовать отмену транзакции, если все остальные состояния транзакций и кэши сбрасываются правильно.

Решение 5: Управление версиями

Иногда вносятся изменения, которые слишком сложны для представления в JSON, и они могут просочиться ко многим документам со сложными связями (например, древовидная структура). Было бы слишком разрушительным, если изменения появляются только частично (например, сломанное дерево), поэтому в этом случае необходима изоляция. Одним из способов достижения этого является вставка новых документов с более высоким номером версии вместо обновления существующих. Номер новой версии можно легко и безопасно получить с помощью той же технологии, что и идентификатор последовательности выверки журнала. Обычно существует уникальный индекс {itemId: 1, version: 1}

Приложение вставляет документы, начиная с дочерних и заканчивая «основным» документом (например, корнем дерева). При доступе к данным приложение проверяет номер версии в основном документе и оттуда игнорирует любой документ с более высоким номером версии. Незавершенные транзакции можно оставить как есть и игнорировать, или очистить, если вы хотите быть аккуратными 🙂

Вывод

В заключение мы увидели несколько решений для реализации надежных и масштабируемых транзакций между документами:

  • Флаг синхронизации : лучше всего копировать данные из основного документа
  • Job Queue : очень общего назначения, решает 95% случаев. Большинству систем нужна хотя бы одна очередь заданий!
  • Двухфазная фиксация : этот метод гарантирует, что каждый объект всегда имеет всю информацию, необходимую для перехода в согласованное состояние
  • Согласование бревен : самый надежный метод, идеальный для финансовых систем
  • Управление версиями : обеспечивает изоляцию и поддерживает сложные структуры

Кроме того, много раз упоминалось, что MongoDB в конечном итоге будет поддерживать какие-то действительно атомарные и изолированные транзакции между документами. На самом деле это уже делается как часть шардинга , но пока это чисто внутреннее … Это будет возможно только для документов, которые находятся на одном и том же шарде, поскольку в противном случае это вернет нас в немасштабируемый мир SQL! Спасибо за чтение этого длинного поста, следите за обновлениями!