Статьи

Что вы не получаете с ActiveJob

пересмотреть inox

Rails 4.2 включает ActiveJob , который является унифицированным API для организации очередей и написания фоновых заданий. ActiveJob предоставляет простые решения для двух проблем, с которыми сталкиваются разработчики Rails при написании фоновых заданий: работа с очередями (через ActiveJob API) и сериализация объектов ActiveRecord (через GlobalID).

Это, однако, не помогает с третьей проблемой: написание устойчивых заданий, которые могут выжить в производственной среде. Вот где этот пост приходит.

Во-первых, давайте кратко рассмотрим, что такое ActiveJob и что он предоставляет.

Что ActiveJob дает вам

ActiveJob — это адаптер для различных очередей заданий, таких как отложенное задание, Resque или Sidekiq. Хотя это позволяет вам «менять» системы очередей заданий, истинная сила ActiveJob заключается в том, что это единый API, который позволяет разрабатывать дополнительные библиотеки, инструменты и практические методы.

Например, вы можете разработать гем, который ставит в очередь задания, будучи уверенным в том, что он будет работать в любом приложении Rails.

Давайте посмотрим на пример.

Предположим, что у нас есть работа по начислению денег покупателю, называемая PurchaseJob . Очередь работы может быть выполнена через perform_later .

 PurchaseJob.perform_later(customer,amount) 

Чтобы реализовать задание, просто ActiveJob::Base подкласс ActiveJob::Base и предоставьте метод ActiveJob::Base .

 class PurchaseJob < ActiveJob::Base def perform(customer,amount) # job logic here end end 

Обычно вам нужно поставить в очередь идентификатор записи, с которой нужно работать (в данном случае это Customer ), и десериализовать ее как первый шаг вашей работы. GlobalID ActiveJob обрабатывает все это для вас.

Использование ActiveJob обеспечивает отличную основу для фоновой обработки, помогая избежать некоторых распространенных ошибок. Но вы все еще сами по себе, когда имеете дело с самой сложной частью фоновой обработки: управлением ошибками.

Поражение не вариант

Даже в небольших масштабах ваши фоновые задания потерпят неудачу. Поскольку основная причина запуска кода в фоновом задании заключается в том, что код долго выполняется, фоновые задания имеют тенденцию «привлекать» сбой. Такие проблемы, как сбои в работе сети, перезагрузка системы (Heroku Dynos, кто-нибудь?) И тайм-ауты, значительно влияют на фоновые задания.

Давайте посмотрим, как, представив реализацию нашей работы из предыдущего раздела, PurchaseJob . Реализация найдет кредитную карту клиента, попросит нашего стороннего обработчика платежей (сетевой вызов через Интернет) снять некоторую сумму с карты и, наконец, создать запись о покупке с результатами списания.

 class PurchaseJob < ActiveJob::Base def perform(customer,amount) credit_card = customer.credit_card result = PaymentProcessor.charge(credit_card.token,amount) if result.success? customer.purchases.create!(amount: amount, success: true) else customer.purchases.create!(amount: amount, success: false, reason: result.error_message) end end end 

Предположим, что во время звонка на PaymentProcessor.charge у нас PaymentProcessor.charge время ожидания в сети, и возникает исключение. Поскольку мы не знаем, прошел ли в конечном итоге платеж, мы не можем просто выполнить эту работу снова, или клиент может получить двойную плату. Но нам нужно как-то завершить процесс покупки.

Учитывая, как устроена наша работа, кто-то должен будет вмешаться вручную, чтобы завершить покупку. Это явно не масштабируется. Проблема в том, что наша работа не была разработана с учетом такого рода ошибок, но, скорее всего, эта ошибка будет происходить регулярно.

ActiveJob не предоставляет стратегию обработки ошибок по умолчанию. Это просто зависит от того, что будет делать основная очередь заданий. Даже если это действительно обеспечит стратегию обработки сбоев, такую ​​как автоматическое повторение неудачных заданий, это нам здесь не поможет.

Нам все еще нужно, чтобы наши рабочие места были рассчитаны на отказ.

Существует три распространенных метода, которые мы можем применять при проектировании наших работ для борьбы со сбоями: идемпотентные работы, декомпозиционные работы и работы, где отказ не имеет значения.

Идемпотент Джобс

Идемпотентность это :

свойство определенных операций в математике и информатике, которые могут применяться несколько раз без изменения результата за пределы первоначального применения.

Это почти точно соответствует проблеме, с которой мы сталкиваемся. Если задание может быть выполнено несколько раз без изменения результата помимо первоначального выполнения, мы можем просто повторять его бесконечно, пока оно не завершится.

Вот простой пример идемпотентной работы, которая генерирует квитанцию ​​в формате PDF.

 class ReceiptPdfJob < ActiveJob::Base def perform(customer_name,customer_email,amount,amount_tax) File.open("/receipts/#{purchase.id}.pdf","w") do pdf ReceiptPdf.new(for: "#{customer_name} (#{customer_email})" subtotal: amount, tax: amount_tax, grand_total: amount + amount_tax).write!(pdf) end end end 

Независимо от того, сколько раз мы выполняем этот код, он будет записывать в один и тот же файл с одинаковым содержимым в одно и то же место. Если бы в какой-то момент этой работы мы потерпели неудачу, мы могли бы просто выполнить ее заново.

Однако наша PurchaseJob не предназначена для этого.

Чтобы сделать эту работу идемпотентной, нам нужно кардинально изменить дизайн наших клиентов. Нам нужно «сохранять нашу работу» на каждом этапе, чтобы мы могли выбрать, где мы остановились.

Для этого нам понадобится еще немного бухгалтерии с нашей стороны, чтобы те магазины, где мы находимся в процессе. Если нас прерывают, мы можем посмотреть в нашей записи о текущей транзакции, чтобы выяснить, где можно забрать деньги.

Наша работа будет работать так:

  1. Создайте в нашей базе данных Transaction под названием « Transaction .
  2. Попробуйте оплатить с помощью нашего платежного процессора, включая идентификатор нашей Transaction .
  3. Когда заряд завершится, обновите нашу Transaction с результатами.
  4. Если нас прервут, мы сможем найти незавершенную Transaction и спросить обработчика платежа, завершена ли она с их стороны.
  5. Если это так, мы получаем результаты и обновляем нашу базу данных. Если нет, мы повторим попытку зарядки.

Это немного сложно.

 class PurchaseJob < ActiveJob::Base def perform(customer,amount) # See if there is an incomplete transaction for # this customer and amount transaction = Transaction.where(customer: customer, amount: amount, complete: false).first if transaction.present? # if there is, find the analagous transaction # in our payment processor's system existing_payment = PaymentProcessor.payment( custom: { transaction_id: transaction.id } ) else # if there isn't, create an in-progress one that we can find later transaction = Transaction.create!(customer: customer, amount: amount, complete: false) end unless existing_payment.present? # if there was no existing payment, actually charge the customer, # passing along our transaction ID as a customer data element existing_payment = PaymentProcessor.charge( credit_card.token, amount, custom: { transaction_id: transaction.id }) end # complete the transaction and update the purchase record transaction.complete!(existing_payment) customer.purchases.create_from_transaction!(transaction) end end 

Святая корова! Наш простой процесс зарядки просто превратился в гигантский беспорядок кода. Но теперь это пуленепробиваемое.

Вы можете мысленно пройтись по каждой строке кода и, независимо от того, что произойдет, перезапуск правильно подберет то место, где остановилось невыполненное задание. Это умственное упражнение является отличной техникой для проверки возможности повторного воспроизведения вашей работы. Предположим, что каждая строка кода может взорваться и выполнение возобновится в начале процедуры, как если бы за каждой строкой последовала rescue retry .

Если код будет работать во всех обстоятельствах, вы можете идти. Если нет, вам нужно изменить его, чтобы он мог.

Это может показаться болезненным, но для работы, которая имеет решающее значение, например, взимание денег с людей, вы должны проявлять особую осторожность.

Об этом изменении вы заметите, что мы взяли простую операцию (снятие средств с кредитной карты) и разбили ее на несколько небольших этапов. Мы можем использовать эту технику по-другому, чтобы сделать нашу работу устойчивой к сбоям.

Разложение рабочих мест на меньшие рабочие места

Когда сталкиваешься со сложным кодом, общепринятый метод состоит в том, чтобы разложить этот код на многократно используемые функции, которые упрощают выполнение общей процедуры. Мы можем сделать это и с нашими второстепенными заданиями.

«Счастливого пути» нашего нового эластичного кода оплаты покупки выглядит следующим образом:

  1. Создать текущую транзакцию
  2. Зарядите клиента на нашем платежном процессоре
  3. Обновите текущую транзакцию, чтобы показать завершенный платеж

Вместо того, чтобы создавать одну сложную работу, что, если мы создали три отдельных задания, каждое из которых ставило в очередь следующее, и каждое было разработано для повторной попытки?

Во-первых, у нас была бы работа, которая запускает процесс. Цель этой работы — найти или создать текущую транзакцию для клиента и суммы.

 class StartPurchaseTransactionJob < ActiveJob::Base def perform(customer,amount) transaction = Transaction.where(customer: customer, amount: amount, complete: false).first unless transaction.present? transaction = Transaction.create!(customer: customer, amount: amount, complete: false) end ChargeTransactionJob.perform_later(transaction) end end 

В конечном итоге, это задание ChargeTransactionJob очередь ChargeTransactionJob . Цель этой работы — получить полную оплату от обработчика платежей, либо запросив имеющуюся сумму, либо фактически взимая плату с клиента.

 class ChargeTransactionJob < ActiveJob::Base def perform(transaction) payment = PaymentProcessor.payment( custom: { transaction_id: transaction.id } ) unless payment.present? payment = PaymentProcessor.charge( transaction.credit_card.token, transaction.amount, custom: { transaction_id: transaction.id }) end transaction.complete!(payment) CompletePurchaseJob.perform_later(transaction) end end 

Наконец, CompletePurchaseJob обрабатывает «бизнес-логику» того, что делать после совершения покупки.

 class CompletePurchaseJob < ActiveJob::Base def perform(transaction) transaction.customer.purchases.create_from_transaction!(transaction) end end 

Вы заметите, что каждая работа идемпотентна и может быть безопасно воспроизведена, если она потерпит неудачу. Логика «выбора там, где мы остановились» все еще существует, но теперь у нас есть три простых задания, каждое из которых легко понять. Работы также можно использовать повторно. Если бы нам потребовалось взимать с клиентов ежемесячную плату за услугу подписки, ChargeTransactionJob можно было бы легко использовать повторно, поскольку его единственным вводом является transaction .

Существует третий способ борьбы с ошибками, который заключается в том, чтобы спроектировать нашу работу так, чтобы ее можно было просто игнорировать.

Вакансии, которые игнорируют провал

Игнорирование провала работы кажется плохой идеей. Эти задания были поставлены в очередь, потому что они должны быть выполнены и должны быть выполнены. Как мы можем просто игнорировать их неудачи?

Если работа может

а) выяснить, какую работу необходимо выполнить самостоятельно и
б) работать по регулярному расписанию

тогда отдельные сбои работы можно игнорировать. Это связано с тем, что при следующем запуске задания оно «наверстает упущенное» из-за невыполненного задания.

Посмотрите, как мы разбили наш процесс покупки в предыдущем разделе. Первая задача, StartPurchaseTransactionJob создает StartPurchaseTransactionJob «в процессе», а вторая, ChargeTransactionJob , завершает такие транзакции.

Мы можем изменить ChargeTransactionJob чтобы он обрабатывал все незавершенные транзакции, а не только конкретную. Разработанный таким образом, мы можем просто запланировать выполнение задания периодически и игнорировать сбои любого задания.

 class ChargeIncompleteTransactionsJob < ActiveJob::Job def perform Transaction.incomplete.find_each do |transaction| payment = PaymentProcessor.payment( custom: { transaction_id: transaction.id } ) unless payment.present? payment = PaymentProcessor.charge( transaction.credit_card.token, transaction.amount, custom: { transaction_id: transaction.id }) end transaction.complete!(payment) CompletePurchaseJob.perform_later(transaction) end end end 

Предполагая, что ChargeIncompleteTransactionsJob периодически ChargeIncompleteTransactionsJob в очередь, он будет обрабатывать все незавершенные транзакции, даже те, которые остались необработанными в результате сбоя предыдущего задания. Пока задание не часто завершается неудачей, все незавершенные транзакции в конечном итоге будут обработаны.

Не каждая задача может быть реализована таким образом, но это отличный шаблон для использования, когда запрос к базе данных может определить, какую работу необходимо выполнить и где эта работа не требует своевременного выполнения. Например, кэш можно обновить, запросив все элементы, обновленные с момента последнего запуска задания.

В заключение

ActiveJob устраняет некоторые трения, возникающие при написании фоновых заданий, предоставляя единый API для очередей и заданий на запись, однако разработчикам приходится решать сложную работу по устранению сбоев.

Имейте это в виду при разработке процессов для фоновой обработки. Будьте внимательны при извлечении встроенного кода в фоновые задания. Используя методы, обсужденные здесь, ваши фоновые задания могут выполняться безопасно и тихо, выдерживая трудности производственного использования.