Необходимость асинхронного выполнения задач вне цикла обработки HTTP-запросов рано или поздно появится во всех веб-приложениях. Один камень, который может помочь вам справиться с этой задачей, это delayed_job . Есть и другие, например, Resque и Sidekiq . Я использовал многие из них, и я должен сказать, что мне действительно нравится delayed_job
потому что он так хорошо интегрируется с моим бэкэндом RDBMS (обычно MySQL или PostgreSQL).
Было много сообщений о Задержке Работы, с тоннами полезной информации. В этой статье я расскажу о некоторых приемах, которые я применяю на работе при работе с отложенной работой, таких как:
- Улучшения в таблице, которая содержит отложенные задания
- Хорошая практика при постановке в очередь заданий, включая нестандартные отложенные задания
- Управление заданиями с помощью консоли Rails
- Управление заданиями с помощью веб-интерфейса
- Тестирование с отложенными заданиями
- Помеченное ведение журнала
Я буду использовать Rails и ActiveRecord в своем демонстрационном приложении, поэтому не стесняйтесь создавать приложение Rails и следуйте инструкциям. Вам нужно будет добавить delayed_job
в ваш Gemfile .
Стол для хранения отложенных заданий
Если вы запустите следующую команду:
rails generate delayed_job:active_record
вы получите следующую миграцию:
def self.up create_table :delayed_jobs, :force => true do |table| table.integer :priority, :default => 0, :null => false table.integer :attempts, :default => 0, :null => false table.text :handler, :null => false table.text :last_error table.datetime :run_at table.datetime :locked_at table.datetime :failed_at table.string :locked_by table.string :queue table.timestamps end add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority' end
Предлагаемые оптимизации для миграции
Добавить индекс в столбце очереди
Вам нужно будет запросить вашу работу по queue
. Если у вас много заданий в таблице базы данных, запрос по queue
выполнит полное сканирование таблицы и займет много времени. Индекс очень полезен, в этом случае.
# optimization #1 # def self.up create_table :delayed_jobs, :force => true do |table| # ... # end # ... # add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue' end
MySQL longtext
Оптимизации
Некоторые исключения, полученные Delayed Job, могут быть довольно продолжительными. Если вы используете MySQL, поля handler
и last_error
могут быть недостаточно длинными. Измените их тип данных на longtext
чтобы избежать этой проблемы. Если вы используете PostgreSQL, это не будет проблемой.
# optimization #2 # def self.up create_table :delayed_jobs, :force => true do |table| # ... # # replace the migration for column +handler+ with table.column :handler, :longtext, :null => false # replace the migration for column +last_error+ with table.column :last_error, :longtext # ... # end # ... # add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue' end
Столбцы для вашего отложенного объекта
Обычно задание создается для обработки фоновой задачи, связанной с бизнес-объектом. Например, чтобы отправить электронное письмо пользователю.
Я использую два столбца для хранения ссылки на этот экземпляр бизнес-объекта. Следовательно, я могу быстро запрашивать записи о работе, которые связаны с конкретными типами или экземплярами бизнес-объектов. Я также добавляю соответствующие индексы, чтобы эти запросы быстро заканчивались.
# optimization #3 # def self.up create_table :delayed_jobs, :force => true do |table| # ... # # replace the migration for column +handler+ with table.column :handler, :longtext, :null => false # replace the migration for column +last_error+ with table.column :last_error, :longtext # ... # table.integer :delayed_reference_id table.string :delayed_reference_type end # ... # add_index :delayed_jobs, [:queue], :name => 'delayed_jobs_queue' add_index :delayed_jobs, [:delayed_reference_id], :name => 'delayed_jobs_delayed_reference_id' add_index :delayed_jobs, [:delayed_reference_type], :name => 'delayed_jobs_delayed_reference_type' end
Позже я покажу вам, как я заполняю эти две колонки.
Работа в очереди
Не удаляйте неудачные задания
По умолчанию задержанные работники удаляют неудачные задания, как только достигают максимального числа попыток. Это может раздражать, если вы хотите найти корень проблемы и устранить неполадки. Я предлагаю оставить сбойные задания в базе данных и настроить процесс для обработки сбойных заданий.
Вы можете установить значение этого атрибута конфигурации с помощью следующего оператора:
Delayed::Worker.destroy_failed_jobs = false
внутри инициализатора отложенного задания.
Подумайте о максимальном значении времени выполнения
Вы можете подумать, какое значение должен иметь максимальный атрибут конфигурации времени выполнения. По умолчанию используется значение 4.hours
. Однако, если вы не ожидаете, что у вас будут такие длительные задачи, лучше уменьшить это значение. Это приведет к уничтожению задержанного работника, когда этот предел будет достигнут, и позволит сбою задания, чтобы другой работник мог его забрать. Кроме того, вы будете уведомлены о задачах, которые вы ожидали выполнить в короткие сроки, но заняли больше времени (посредством уведомлений по электронной почте об error
и error
).
С другой стороны, я работал с Delayed Job над проектами, где 4.hours
было недостаточно, и мне пришлось увеличить это значение. кашляет
Delayed::Worker.max_run_time = 15.minutes
установит этот лимит на 15 минут.
Не используйте одну очередь
Не используйте по default
или только одну очередь. Это не будет масштабироваться. Даже если у вас есть только несколько рабочих мест в начале вашего проекта, начните с того, что дайте осмысленные имена очередям. Распределите задания с разными бизнес-контекстами в разные очереди.
Например, если у вас есть электронные письма с регистрацией, поставьте их в очередь registration_emails
, тогда как уведомления по электронной почте пользователям вашего приложения могут быть поставлены в очередь в email_notifications
.
Если у вас разные очереди, ими легче управлять. Если есть исключение, которое вы хотите обработать вручную или с помощью сценария, вы можете остановить рабочих и удалить все email_notifications
очереди email_notifications
. Если все ваши задания находятся в одной и той же очереди, такие задачи сложнее решать.
Распределение ваших заданий по разным очередям также позволяет различным рабочим обрабатывать разные очереди. Мое предложение — иметь хотя бы одного работника в очереди. Следовательно, вы будете выполнять свои задания параллельно.
Использовать пользовательские отложенные задания
Вы можете использовать handle_asynchronously
чтобы объявить, что вызов метода должен обрабатываться асинхронно. Я редко использую эту технику. Я предпочитаю объявлять пользовательские объекты отложенных заданий в папке заданий моего проекта. Использование пользовательских отложенных заданий позволяет мне точно настроить то, что я храню в таблице delayed_jobs
.
Вот пример заказной работы. Предположим, у меня есть работа, которая обрабатывает видео (и мое приложение VideoStreamer
).
# app/jobs/video_streamer/process_video_job.rb # module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) # ... # end end
Я создаю папку video_streamer внутри папки заданий вместе с файлом processvideo_job.rb для хранения отложенного пользовательского кода задания. Класс для пользовательского задания имеет пространство имен с именем моего приложения. Также я добавляю к имени класса суффикс Job
. Используя Struct
, я сохраняю идентификатор видео (экземпляр бизнес-объекта, с которым связано это задание). Атрибут будет иметь имя video_id
.
Реализовать Enqueue Hook
Я регистрирую обработчик enqueue
ловушки enqueue
. Хук обрабатывает элементы, которые я хочу выполнять всякий раз, когда новое задание этого класса помещается в очередь.
# app/jobs/video_streamer/process_video_job.rb # module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) def enqueue(job) job.delayed_reference_id = video_id job.delayed_reference_type = 'VideoStreamer::Video' job.save! end end end
Как вы можете видеть, видео (бизнес-объект) сохраняется при постановке в очередь.
Конечно, бывают моменты, когда я хочу сделать более сложные вещи в enqueue
. В следующем примере я принимаю постановку в очередь, только если status
имеет правильное значение. Затем я обновляю этот status
до значения processing
чтобы указать, что конкретный экземпляр видео обрабатывается:
module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) def enqueue(job) check_and_update_status job.delayed_reference_id = video_id job.delayed_reference_type = 'VideoStreamer::Video' job.save! end private def check_and_update_status video = VideoStreamer::Video.find video_id raise StandardError.new("Video: #{video.id} is not on status 'new' (status: #{video.status}") unless video.status == 'new' video.status = 'processing' video.save! end end end
Внедрить Успех Крюк
Чтобы обновить статус после успешной обработки задания, я использую хук success
:
module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) # ... # def success(job) update_status('success') end private def update_status(status) video = VideoStreamer::Video.find video_id video.status = status video.save! end # ... # end end
Реализация ошибки Hook
Хук error
в вашей пользовательской работе может, например, отправить оповещение по электронной почте или изменить статус соответствующего бизнес-объекта. Метод error
имеет доступ к exception
, которое даст вам информацию об ошибке. Обратите внимание, что ошибка указывает на временный сбой, и, если остались попытки, другой работник попытается снова запустить фоновую задачу.
module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) def enqueue(job) # ... # end def success(job) # ... # end def error(job, exception) update_status('temp_error') # Send email notification / alert / alarm end private # ... # end end
Реализовать отказ крюка
Используйте ловушку сбоя, например, для отправки оповещения по электронной почте или для изменения статуса соответствующего бизнес-объекта, когда задание не выполняется должным образом и не будет повторено. Если вы настроили сбойные задания так, чтобы они оставались в таблице базы данных, вы можете повторить попытку вручную.
module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) def enqueue(job) # ... # end def success(job) # ... # end def error(job, exception) # ... # end def failure(job) update_status('failure') # Send email notification / alert / alarm / SMS / call ... whatever end private # ... # end end
Внедрить Perform Hook — Делегировать — Поднять
Этот крючок — самый важный, иначе работа ничего не сделает. Ваша реализация должна быть очень простой и делегировать фактическую работу модели или другому сервисному объекту. Реальная реализация не должна быть частью реализации выполнения. Это обеспечит выполнение логики реализации без необходимости наличия экземпляра отложенного задания. Кроме того, вам может быть проще проверить логику в модульном тесте. Итак, как минимум:
module VideoStreamer class ProcessVideoJob < Struct.new(:video_id) def enqueue(job) # ... # end def success(job) # ... # end def error(job, exception) # ... # end def failure(job) # ... # end def perform video = VideoSteamer::Video.find video_id video.process! end private # ... # end end
Абсолютно необходимо вызвать любые исключения, чтобы их мог обработать работник (что при необходимости вызовет error
и failure
). Не глотайте исключения, которые могут возникнуть. В приведенном выше примере video.process!
может вызвать исключение, которое я позволю всплыть. То же самое касается определения местоположения субъекта хозяйствования. Я использую #find()
и даю video_id
, который вызывает исключение, если бизнес-объект не найден. Не используйте, например, find_by_id()
который не вызывает такого исключения.
Если объект службы, выполняющий задачу, не выдает ошибку при необходимости (возможно, возвращает false
), perform
должно вызвать ошибку.
def perform video = VideoSteamer::Video.find video_id raise StandardError.new("Failed to process video with id: #{video.id}") unless video.process? end
Управление заданиями через консоль Rails
Задержка задания включает в себя интерфейс скрипта для запуска / остановки заданий. Но бывают случаи, когда я хочу прекратить работу рабочих и запускать / останавливать определенные задания вручную. Я обычно делаю это, когда нахожусь в среде разработки, где у меня редко работают фоновые бегуны. Я запускаю их вручную из консоли, используя определенные вызовы API Delayed::Job
. Кроме того, в производственной среде я был в ситуациях, когда мне приходилось останавливать работающих работников и выполнять мелкозернистую ручную работу, используя ту же технику.
Вот некоторые команды API Delayed::Job
которые я нахожу полезными в таких ситуациях:
Запрос таблицы delayed_jobs с соответствующей моделью
Я использую модель Delayed::Job
для запроса таблицы delayed_jobs
. Например, следующий email_notification
возвращает задания, принадлежащие очереди email_notification
, в которых возникла ошибка:
Delayed::Job.where(queue: 'email_notifications').where.not(last_error: nil)
Поймите, вы можете использовать любой из столбцов в вашей таблице delayed_jobs
для построения предложения where
.
Какой класс справится с моей работой?
Когда я хочу увидеть, какой class
будет обрабатывать задание (обычно с ошибкой), я запрашиваю handler
:
handler = Delayed::Job.last.handler
Это экземпляр сериализованного объекта YAML.
Пользовательские Задержки
Вот пример выходных данных для моего VideoStreamer::ProcessVideoJob
:
"--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n"
Эти выходные данные говорят мне, что конкретный экземпляр отложенной работы — это задача для обработки видео с ID = 68.
Прикладные программы
Если вы отправляете свои электронные письма асинхронно, то сериализация YAML будет немного другой:
"--- !ruby/object:Delayed::PerformableMailer\nobject: !ruby/class 'UserNotifierMailer'\nmethod_name: :new_user_registration\nargs:\n- 35\n"
Все сериализованные !rubyobject:Delayed::PerformableMailer
— это !rubyobject:Delayed::PerformableMailer
за которым следует !ruby/class 'UserNotifierMailer'
из вашего приложения (см. !ruby/class 'UserNotifierMailer'
). Вы можете увидеть фактический метод, который используется для отправки электронного письма ( new_user_registration
) и его аргументы ( 35
).
Разработать почтовые программы
Если вы отправляете свои электронные письма разработчикам асинхронно (для этого я использую гем devise-async ), то сериализация YAML выглядит примерно так:
"--- !ruby/object:Delayed::PerformableMethod\nobject: !ruby/object:Devise::Async::Backend::DelayedJob {}\nmethod_name: :perform\nargs:\n- :confirmation_instructions\n- User\n- '317'\n- daD9bVQ2d_kaR3abyS7X\n- {}\n"
Опять же, вы можете увидеть, какая электронная почта может давать сбой вместе с аргументами времени выполнения В приведенном выше примере ошибочное электронное письмо отправляет инструкции подтверждения пользователю с ID = 317.
Десериализация различных заданий в одной очереди
Если у вас разные типы заданий в одной и той же очереди, вам будет сложно управлять заданиями, сгруппированными по типу заданий. Например, вы можете посчитать количество заданий в очереди для каждого типа в очереди, которая называется solr_indexing
которая обрабатывает различные классы для фоновой индексации.
Если единственной информацией, которую вы имеете в своей таблице delayed_jobs
является handler
(то есть delayed_reference_type
не помогает), вам придется работать с… handler
.
Дело в том, что handler
хранит разные сериализованные объекты в соответствии с сериализованным классом, как мы обсуждали выше.
YAML.load(dj.handler)
десериализует ваш сериализованный объект.
Пользовательские Задержки
Если задание в очереди является пользовательским отложенным заданием
dj = Delayed::Job.last dj.handler # Assume: "--- !ruby/struct:VideoStreamer::ProcessVideoJob\nvideo_id: 68\n" job = YAML.load(dj.handler) # +job+ will be instance of +VideoStreamer::ProcessVideoJob+ struct with # +video_id+ attribute set to +68+
Прикладные программы
Если это почтовый объект, то конкретным заданием является Delayed::PerformableMailer
:
=> #<Delayed::PerformableMailer:0x0000000a050898 @object=UserNotifierMailer, @method_name=:new_user_registration, @args=[35]>
Этот обработчик, как вы можете видеть, отвечает на object
, который является фактическим экземпляром почтовой программы, method_name
, который является методом экземпляра почтовой программы, который будет использоваться для отправки электронной почты, и args
, которые содержат аргументы времени выполнения для метода.
Разработать почтовые программы
Когда обработчик является почтовой программой Devise
, существует еще один уровень абстракции. Задание имеет тип Delayed::PerformableMethod
:
=> #<Delayed::PerformableMethod:0x0000000a0bb800 @object=#<Devise::Async::Backend::DelayedJob:0x0000000a0bf090>, @method_name=:perform, @args=[:confirmation_instructions, "User", "317", "daD9bVQ2d_kaR3abyS7X", {}]>
Этот, как вы можете видеть, отвечает на object
, который является экземпляром Devise::Async::Backend::DelayedJob
, имя method_name
, который является методом для вызова в этом экземпляре, и args
, которые содержат время аргументы к этому методу. Этот массив args
содержит реальный метод электронной почты и его реальные аргументы.
YAML.load(dj.handler)
может возвращать разные типы объектов, и вам может понадобиться реализовать какую-то is_a?(....)
если вы хотите написать скрипт, который работает на всех или на части принадлежащих заданий в ту же очередь.
Что было исключением для ошибочной работы?
Когда я хочу просмотреть сведения об исключении для задания, я проверяю столбец last_error
.
Запустите задание из консоли Rails без очереди
Предполагая, что у вас есть задание, и вы хотите запустить его вручную с консоли rails, но вы не хотите, чтобы оно помещалось в цикл жизненного цикла очереди отложенных заданий :
# This will run your job but will not go through the delayed job lifecycle loop job = VideoStreamer::ProcessVideoJob.new(68) job.perform
Это будет работать, но зарегистрированные хуки не будут выполняться.
Я редко использую этот метод, но иногда он пригодится.
Создание работника в консоли Rails
Это очень удобно, когда вы хотите снова запустить сбойное задание, но хотите, чтобы оно выполнялось через консоль, а не через фоновых рабочих:
dw = Delayed::Worker.new
Легко, а?
Запустите сбойное задание из консоли Rails
Предполагая, что у вас есть сбойное задание, и вы хотите запустить его снова вручную из консоли:
dw = Delayed::Worker.new dj = Delayed::Job.last # assuming that the last job is the failed one, otherwise use a proper query to # locate it dw.run dj
Вот и все. Ваш работник запустит вызов, вызвав соответствующие ошибки и ошибки, если работа снова не удалась. Если это удастся, он удалит его из очереди.
Управление заданиями с помощью веб-интерфейса
Используйте интерфейс delayed_job_web, чтобы получить доступ к вашим работам в очереди. Добавьте следующую строку в ваши маршруты:
mount DelayedJobWeb => "/delayed_job"
Это позволит вам получить доступ к интерфейсу управления с помощью адреса, такого как https://www.myapp.com/delayed_job
.
У меня также есть инициализатор delayed_job_web.rb (в моей папке config / initializers ):
DelayedJobWeb.use Rack::Auth::Basic do |username, password| # authenticate user = User.find_by_username(username) return false unless user.authenticate(password) # authorize. I am using cancancan for authorization. You can use any other authorization gem you see fit. ability = Ability.new(user) can = ability.can? :manage, Delayed::Job raise CanCan::AccessDenied unless can true end
Это позволяет мне аутентифицировать и авторизовать запрос на доступ /delayed_job
.
Тестирование с отложенной работой
Не ставить в очередь или издеваться при тестировании
При тестировании кода приложения (с помощью любых тестов, модульных или интеграционных тестов или тестов пользовательского интерфейса) не ставьте в очередь и не высмеивайте задачи с задержкой. Это может показаться вам странной практикой, но для меня это оказалось бесценным. Мне нужно посмотреть, сломаются они или нет. Если в тех случаях, когда задача занимает слишком много времени или имеет рекурсию, я могу решить сделать эту работу. Но в целом я не издеваюсь над своей работой.
Чтобы отложенное задание просто выполняло вашу задачу, а не ставило ее в очередь, выполните следующие действия в инициализаторе:
Delayed::Worker.delay_jobs = !%w[ test ].include?(Rails.env)
Вышеуказанное не будет ставить задания в очередь в test
среде.
Отключить немедленное выполнение при тестировании
Однако могут быть тесты, в которых вы хотите проверить функциональность очередей. Я обертываю эти тесты специальными тегами и прошу отложенное задание поставить их в очередь.
RSpec
При использовании RSpec
меня есть конфигурация :delayed_job
around(:each)
которая позволяет использовать тег: :delayed_job
:
# spec/spec_helper.rb RSpec.configure do |config| # ... other config here ... # config.around(:each, :delayed_job) do |example| old_value = Delayed::Worker.delay_jobs Delayed::Worker.delay_jobs = true Delayed::Job.destroy_all example.run Delayed::Worker.delay_jobs = old_value end # ... other config here ... # end
Я включаю очередь заданий и удаляю все существующие задания до запуска примера. После запуска примера я установил отложенное задание в очередь обратно в прежнее состояние.
Теперь, когда я хочу написать спецификацию, которая использует отложенные очереди заданий, я делаю следующее:
it 'should queue the job`, delayed_job: true do ... end
Огурец
# features/support/hooks.rb Around('@delayed_job') do |scenario, block| old_value = Delayed::Worker.delay_jobs Delayed::Worker.delay_jobs = true Delayed::Job.destroy_all block.call Delayed::Worker.delay_jobs = old_value end
Подобно конфигурации RSpec
, я использую ловушку Around
с тегом @delayed_job
. Затем я помечаю Сценарии, которые я хочу использовать в реальной очереди:
@delayed_job Scenario: As a User when I sign up there is a new user registration email queued
Tagged Logging
Rails поддерживает теговое ведение журнала , как вы, наверное, знаете. Я настроил отложенное задание, чтобы использовать теговое ведение журнала тоже.
Для этого я использую технику Delayed::Plugin
:
module Delayed module Plugins class TaggedLogging < Delayed::Plugin Delayed::Worker.logger = Rails.logger callbacks do |lifecycle| lifecycle.around(:execute) do |worker, *args, &block| Rails.logger.tagged "Worker:#{worker.name_prefix.strip}", "Queues:#{worker.queues.join(',')}" do block.call(worker, *args) end end lifecycle.around(:invoke_job) do |job, *args, &block| Rails.logger.tagged "Job:#{job.id}" do block.call(job, *args) end end end end end end
Как вы можете видеть, я ловлю around(:invoke_job)
around(:execute)
и around(:invoke_job)
и использую Rails.logger
для реализации Rails.logger
ведения журнала. Я регистрирую имя работника, очереди и идентификатор задания.
Не забудьте зарегистрировать свой подкласс Delayed::Plugin
в инициализаторе delayed_job
с помощью:
Delayed::Worker.plugins << Delayed::Plugins::TaggedLogging
Кстати, если вы хотите узнать, какие события вы можете подключить, посмотрите эту строку здесь .
Вывод
В этой статье я представил некоторые практики, которые я использую для решения своих фоновых заданий с использованием гема отложенных заданий. Я надеюсь, что вы найдете некоторые, если не все, эти советы полезными.