После того, как я использовал много разных систем массового обслуживания, от почтенного BackgrounDRb до DelayedJob , для создания собственных решений, я остановился на превосходном Resque .
В уже известном посте в блоге Криса Вэнстрата из GitHub все сказано, и в README есть все, на что вы когда-либо могли надеяться в плане запуска и работы. Тем не менее, Resque оставляет много до воображения, как именно интегрировать его в ваше приложение. Это, конечно, одна из его сильных сторон, поскольку она обеспечивает высокую степень гибкости.
В этой статье я собираюсь представить одну из таких стратегий интеграции, которая мне показалась особенно полезной, чистой и, самое главное, простой.
Собственное пространство имен
Чтобы освободить зависимость от вашей базы данных, которая может привести к возникновению проблем с производительностью в будущем, Resque использует Redis для своего хранилища данных (примечание: если вы хотите узнать больше о том, как начать работу с Redis, прочитайте мой предыдущая статья здесь ).
Предполагая, что вы уже настроили Redis в своем приложении, у вас, вероятно, уже есть глобальная переменная $ redis для представления соединения. Resque по умолчанию использует гем redis-namespace, чтобы не загрязнять пространство ключей вашего сервера redis, но мне лично нравится контролировать такие важные детали, как этот.
К счастью, Resque позволяет это, поэтому инициализировать соединение так же просто, как добавить следующее в config/initializers/resque.rb
:
Resque.redis = Redis::Namespace.new(:resque, :redis => $redis) |
Ставить рабочие места
Вдохновленный отложенным заданием, в Resque все, что вам нужно для запуска кода в фоновом режиме, это предоставить класс или модуль, который отвечает на метод execute. Эти объекты также указывают имя очереди, которая их обрабатывает.
Поэтому наиболее очевидным решением для интеграции Resque является наличие одного из этих объектов на фоновую задачу. Например, у вас может быть один класс для обработки загруженных пользователями изображений, другой класс для отправки вашего ежемесячного информационного бюллетеня всем пользователям и еще один для обновления поисковых индексов.
Как вы можете себе представить, число этих работников со временем будет увеличиваться. Кроме того, Resque устанавливает приоритеты очередей, основываясь исключительно на порядке, в котором они указаны для работника, поэтому вам нужно будет помнить, какие работники работают с какими очередями. Если у вас есть приложение, в котором новые фоновые задачи постоянно добавляются, удаляются или меняются по приоритетам, это не только сбивает с толку, но и потребует большого количества обслуживания.
Чистый подход
Вместо того чтобы сосредоточиться на том, что должно происходить в фоновом режиме, давайте сосредоточимся на том, когда . То есть мой подход заключается в том, чтобы приоритет был руководящим фактором при разработке интерфейса для Resque.
Первый шаг — отметить, что в 99% случаев фоновые задания попадают в один из трех возможных приоритетов: высокий, нормальный и низкий. Хотя будет проще добавить больше приоритетов позже, это произойдет только в очень редких случаях.
Наличие только трех приоритетов также значительно упрощает настройку рабочих. Каждый работник всегда назначается в эти три очереди, поэтому, за исключением количества работников, команда всегда одинакова:
QUEUE=high,normal,low rake resque:work |
Если необходимо, чтобы добавить больше вычислительной мощности в очередь, просто создайте больше рабочих:
COUNT=3 QUEUE=high,normal,low rake resque:workers |
Вместо этого ставьте методы в очередь
Следующим шагом к этому подходу является упрощение добавления любого метода экземпляра или класса в одну из этих очередей. Вот где динамическая природа Ruby очень поможет. Чтобы начать в конце, мы сможем поставить в очередь что-то вроде этого:
Queue::Normal.enqueue(some_object, :some_method, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 }) | |
Queue::High.enqueue(some_object, :some_really_imporant_method, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 }) | |
Queue::Low.enqueue(some_object, :some_method_that_can_take_its_time, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 }) |
Кроме того, не имеет some_object
является ли some_object
классом, модулем или экземпляром. Мы можем изменить приоритет метода, просто переключаясь между Queue::Normal
, Queue::High
, Queue::Low
.
Давайте посмотрим, как мы можем это кодировать. Приведенный выше интерфейс четко диктует, что каждому классу нужно будет указать метод постановки в очередь. Мы также знаем, что каждому классу необходимо указать имя очереди и метод execute, так что это хорошее место для начала:
# app/models/queue/high.rb | |
class Queue::High | |
@queue = :high | |
def self.enqueue(object, method, *args) | |
# …to be continued | |
end | |
def self.perform | |
# …to be continued | |
end | |
end | |
# app/models/queue/normal.rb | |
class Queue::Normal | |
@queue = :normal | |
def self.enqueue(object, method, *args) | |
# …to be continued | |
end | |
def self.perform | |
# …to be continued | |
end | |
end | |
# app/models/queue/low.rb | |
class Queue::Low | |
@queue = :low | |
def self.enqueue(object, method, *args) | |
# …to be continued | |
end | |
def self.perform | |
# …to be continued | |
end | |
end |
Мы уже видим, что эти классы имеют один и тот же интерфейс, поэтому давайте преобразуем его в суперкласс:
# app/models/queue/base.rb | |
class Queue::Base | |
class << self | |
def enqueue(object, method, *args) | |
# …to be continued | |
end | |
def perform | |
# …to be continued | |
end | |
end | |
end | |
# app/models/queue/high.rb | |
class Queue::High < Queue::Base | |
@queue = :high | |
end | |
# app/models/queue/normal.rb | |
class Queue::Normal < Queue::Base | |
@queue = :normal | |
end | |
# app/models/queue/low.rb | |
class Queue::Low < Queue::Base | |
@queue = :low | |
end |
Единственная enqueue
метода enqueue
— предоставить данные для метода execute, который будет вызван Resque. Метод execute должен найти объект по идентификатору, вызвать метод в очереди и передать ему аргументы в очереди. Поскольку все, что передается в Resque, должно быть сериализуемым в JSON, нам нужно передать имя класса объекта, метод и идентификатор объекта в виде специального аргумента «meta»:
def enqueue(object, method, *args) | |
meta = { ‘class’ => object.class.name, ‘method’ => method, ‘id’ => object.id } | |
Resque.enqueue(self, meta, args) | |
end |
Затем в методе execute мы можем использовать расширение constantize
Rails, чтобы получить класс по его имени, найти объект и отправить метод вместе с его аргументами:
def perform(meta = { }, *args) | |
if model = meta[‘class’].constantize.find_by_id(meta[‘id’]) | |
model.send(meta[‘method’], *args) | |
end | |
end |
И с этим мы готовы поставить в очередь любой метод экземпляра. Вот пример того, как может выглядеть большая картина:
# app/models/my_model.rb | |
class MyModel < ActiveRecord::Base | |
def async_background_method(arg1, arg2) | |
Queue::Normal.enqueue(self, :background_method, arg1, arg2) | |
end | |
# this happens in the background, | |
def background_method(arg1, arg2) | |
# do something that takes a long time… | |
end | |
end |
Это все, что нужно сделать. Единственное предостережение в том, что все аргументы, передаваемые background_method
, будут сериализованы в JSON, а затем десериализованы обратно в Ruby. Обычно это не вызывает проблем, но одно большое отличие состоит в том, что все хэши с символьными ключами будут иметь строковые ключи в background_method
.
Постановка в очередь методов класса или
Остался только один последний шаг. Мы также хотим поставить методы класса или класса в очередь. То есть нам не всегда нужно находить экземпляр по идентификатору для выполнения определенных фоновых задач. Например, мы можем отправить электронное письмо каждому зарегистрированному пользователю. Код будет выглядеть примерно так:
# app/models/user.rb | |
class User | |
class << self | |
def async_spam_all(email_text) | |
Queue::Normal.enqueue(self, :spam_all, email_text) | |
end | |
def spam_all(email_text) | |
find_each { |user| UserMailer.spam(user, email_text).deliver } | |
end | |
end | |
end |
Это потребует некоторой незначительной модификации методов enqueue
и enqueue
, поскольку нам нужно уметь различать класс или модуль в очереди и объект в очереди.
Для этого нам нужно посмотреть, отвечает ли класс объекта :find_by_id
или нет. Это работает, потому что, если object
является классом или модулем, его классом является Class
, который не отвечает :find_by_id
. Если object
не является экземпляром модели, мы не добавляем ключ 'id'
к метаинформации.
Таким образом, метод execute должен только проверять наличие этого ключа, чтобы определить, следует ли вызывать метод непосредственно для объекта или сначала найти экземпляр по идентификатору:
# app/models/queue/base.rb | |
class Queue::Base | |
class << self | |
def enqueue(object, method, *args) | |
meta = { ‘method’ => method } | |
if is_model?(object) | |
Resque.enqueue(self, meta.merge(‘class’ => object.class.name, ‘id’ => object.id), *args) | |
else | |
Resque.enqueue(self, meta.merge(‘class’ => object.name), *args) | |
end | |
end | |
def perform(meta = { }, *args) | |
if meta.has_key?(‘id’) | |
if model = meta[‘class’].constantize.find_by_id(meta[‘id’]) | |
model.send(meta[‘method’], *args) | |
end | |
else | |
meta[‘class’].constantize.send(meta[‘method’], *args) | |
end | |
end | |
def is_model?(object) | |
object.class.respond_to?(:find_by_id) | |
end | |
end | |
end |
Примечание: эта статья предполагает использование ActiveRecord ORM. В зависимости от вашего приложения вам может потребоваться изменить определение is_model?
чтобы более точно указать, что составляет экземпляр модели.
прочность
У нас есть рабочий интерфейс очереди, но все еще достаточно места для ошибок программиста. Метод enqueue
идеале должен вызывать исключения, когда программист пытается поставить в очередь несуществующий метод или пытается передать слишком много или слишком мало аргументов этому методу.
Добавив несколько быстрых проверок, можно значительно сократить количество сбоев на стороне очереди. Давайте добавим метод, ensure_queueable!
to Queue::Base
которая вызывает исключение, если метод не существует и не передано соответствующее количество аргументов. С этими изменениями весь класс Queue::Base
выглядит следующим образом:
# app/models/queue/base.rb | |
class Queue::Base | |
class << self | |
def enqueue(object, method, *args) | |
meta = { ‘method’ => method } | |
ensure_queueable!(object, method, *args) | |
if is_model?(object) | |
Resque.enqueue(self, meta.merge(‘class’ => object.class.name, ‘id’ => object.id), *args) | |
else | |
Resque.enqueue(self, meta.merge(‘class’ => object.name), *args) | |
end | |
end | |
def perform(meta = { }, *args) | |
if meta.has_key?(‘id’) | |
if model = meta[‘class’].constantize.find_by_id(meta[‘id’]) | |
model.send(meta[‘method’], *args) | |
end | |
else | |
meta[‘class’].constantize.send(meta[‘method’], *args) | |
end | |
end | |
def is_model?(object) | |
object.class.respond_to?(:find_by_id) | |
end | |
private | |
def ensure_queueable!(object, method, *args) | |
ensure_responds_to!(object, method) | |
ensure_arity!(object, method, args.length) | |
end | |
def ensure_responds_to!(object, method) | |
unless object.respond_to?(method) | |
raise «object must respond to #{method}« | |
end | |
end | |
def ensure_arity!(object, method, arity) | |
required = object.method(method).arity | |
if required < 0 && arity < —required | |
raise «#{method}: #{arity} of #{—required} arguments given» | |
elsif required >= 0 && required != arity | |
raise «#{method}: #{arity} of #{required} arguments given» | |
end | |
end | |
end | |
end |
Примечание: проверка арности метода несколько сложна, так как методы, которые принимают переменное число аргументов, возвращают отрицательное число. Смотрите Ruby документацию по Method
для получения дополнительной информации.
Это все
Благодаря этому у нас есть простой и понятный интерфейс Resque, который позволяет ставить в очередь любой метод экземпляра или класса с минимальными усилиями. Мы также можем добавить новые приоритеты в несколько строк кода, просто определив новый подкласс Queue::Base
.
Кроме того, мы предоставили единственную точку входа в очередь, которая использует Resque в качестве своей реализации, но если мы решим поменять Resque на другое решение в будущем, нам нужно будет только внести изменения в Queue::Base
.
Я надеюсь, что эта статья была полезна. Веселитесь вместе с Resque и счастливой очереди!