Статьи

Простая, организованная очередь с Resque

После того, как я использовал много разных систем массового обслуживания, от почтенного BackgrounDRb до DelayedJob , для создания собственных решений, я остановился на превосходном Resque .

В уже известном посте в блоге Криса Вэнстрата из GitHub все сказано, и в README есть все, на что вы когда-либо могли надеяться в плане запуска и работы. Тем не менее, Resque оставляет много до воображения, как именно интегрировать его в ваше приложение. Это, конечно, одна из его сильных сторон, поскольку она обеспечивает высокую степень гибкости.

В этой статье я собираюсь представить одну из таких стратегий интеграции, которая мне показалась особенно полезной, чистой и, самое главное, простой.

Собственное пространство имен

Чтобы освободить зависимость от вашей базы данных, которая может привести к возникновению проблем с производительностью в будущем, Resque использует Redis для своего хранилища данных (примечание: если вы хотите узнать больше о том, как начать работу с Redis, прочитайте мой предыдущая статья здесь ).

Предполагая, что вы уже настроили Redis в своем приложении, у вас, вероятно, уже есть глобальная переменная $ redis для представления соединения. Resque по умолчанию использует гем redis-namespace, чтобы не загрязнять пространство ключей вашего сервера redis, но мне лично нравится контролировать такие важные детали, как этот.

К счастью, Resque позволяет это, поэтому инициализировать соединение так же просто, как добавить следующее в config/initializers/resque.rb :

Resque.redis = Redis::Namespace.new(:resque, :redis => $redis)

view raw
gistfile1.rb
hosted with ❤ by GitHub

Ставить рабочие места

Вдохновленный отложенным заданием, в Resque все, что вам нужно для запуска кода в фоновом режиме, это предоставить класс или модуль, который отвечает на метод execute. Эти объекты также указывают имя очереди, которая их обрабатывает.

Поэтому наиболее очевидным решением для интеграции Resque является наличие одного из этих объектов на фоновую задачу. Например, у вас может быть один класс для обработки загруженных пользователями изображений, другой класс для отправки вашего ежемесячного информационного бюллетеня всем пользователям и еще один для обновления поисковых индексов.

Как вы можете себе представить, число этих работников со временем будет увеличиваться. Кроме того, Resque устанавливает приоритеты очередей, основываясь исключительно на порядке, в котором они указаны для работника, поэтому вам нужно будет помнить, какие работники работают с какими очередями. Если у вас есть приложение, в котором новые фоновые задачи постоянно добавляются, удаляются или меняются по приоритетам, это не только сбивает с толку, но и потребует большого количества обслуживания.

Чистый подход

Вместо того чтобы сосредоточиться на том, что должно происходить в фоновом режиме, давайте сосредоточимся на том, когда . То есть мой подход заключается в том, чтобы приоритет был руководящим фактором при разработке интерфейса для Resque.

Первый шаг — отметить, что в 99% случаев фоновые задания попадают в один из трех возможных приоритетов: высокий, нормальный и низкий. Хотя будет проще добавить больше приоритетов позже, это произойдет только в очень редких случаях.

Наличие только трех приоритетов также значительно упрощает настройку рабочих. Каждый работник всегда назначается в эти три очереди, поэтому, за исключением количества работников, команда всегда одинакова:

QUEUE=high,normal,low rake resque:work

view raw
gistfile1.sh
hosted with ❤ by GitHub

Если необходимо, чтобы добавить больше вычислительной мощности в очередь, просто создайте больше рабочих:

COUNT=3 QUEUE=high,normal,low rake resque:workers

view raw
gistfile1.sh
hosted with ❤ by GitHub

Вместо этого ставьте методы в очередь

Следующим шагом к этому подходу является упрощение добавления любого метода экземпляра или класса в одну из этих очередей. Вот где динамическая природа Ruby очень поможет. Чтобы начать в конце, мы сможем поставить в очередь что-то вроде этого:

Queue::Normal.enqueue(some_object, :some_method, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 })
Queue::High.enqueue(some_object, :some_really_imporant_method, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 })
Queue::Low.enqueue(some_object, :some_method_that_can_take_its_time, { ‘some_arg’ => 1, ‘some_other_arg’ => 2 })

view raw
gistfile1.rb
hosted with ❤ by GitHub

Кроме того, не имеет some_object является ли some_object классом, модулем или экземпляром. Мы можем изменить приоритет метода, просто переключаясь между Queue::Normal , Queue::High , Queue::Low .

Давайте посмотрим, как мы можем это кодировать. Приведенный выше интерфейс четко диктует, что каждому классу нужно будет указать метод постановки в очередь. Мы также знаем, что каждому классу необходимо указать имя очереди и метод execute, так что это хорошее место для начала:

# app/models/queue/high.rb
class Queue::High
@queue = :high
def self.enqueue(object, method, *args)
# …to be continued
end
def self.perform
# …to be continued
end
end
# app/models/queue/normal.rb
class Queue::Normal
@queue = :normal
def self.enqueue(object, method, *args)
# …to be continued
end
def self.perform
# …to be continued
end
end
# app/models/queue/low.rb
class Queue::Low
@queue = :low
def self.enqueue(object, method, *args)
# …to be continued
end
def self.perform
# …to be continued
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Мы уже видим, что эти классы имеют один и тот же интерфейс, поэтому давайте преобразуем его в суперкласс:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
# …to be continued
end
def perform
# …to be continued
end
end
end
# app/models/queue/high.rb
class Queue::High < Queue::Base
@queue = :high
end
# app/models/queue/normal.rb
class Queue::Normal < Queue::Base
@queue = :normal
end
# app/models/queue/low.rb
class Queue::Low < Queue::Base
@queue = :low
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Единственная enqueue метода enqueue — предоставить данные для метода execute, который будет вызван Resque. Метод execute должен найти объект по идентификатору, вызвать метод в очереди и передать ему аргументы в очереди. Поскольку все, что передается в Resque, должно быть сериализуемым в JSON, нам нужно передать имя класса объекта, метод и идентификатор объекта в виде специального аргумента «meta»:

def enqueue(object, method, *args)
meta = { ‘class’ => object.class.name, ‘method’ => method, ‘id’ => object.id }
Resque.enqueue(self, meta, args)
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Затем в методе execute мы можем использовать расширение constantize Rails, чтобы получить класс по его имени, найти объект и отправить метод вместе с его аргументами:

def perform(meta = { }, *args)
if model = meta[‘class’].constantize.find_by_id(meta[‘id’])
model.send(meta[‘method’], *args)
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

И с этим мы готовы поставить в очередь любой метод экземпляра. Вот пример того, как может выглядеть большая картина:

# app/models/my_model.rb
class MyModel < ActiveRecord::Base
def async_background_method(arg1, arg2)
Queue::Normal.enqueue(self, :background_method, arg1, arg2)
end
# this happens in the background,
def background_method(arg1, arg2)
# do something that takes a long time…
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Это все, что нужно сделать. Единственное предостережение в том, что все аргументы, передаваемые background_method , будут сериализованы в JSON, а затем десериализованы обратно в Ruby. Обычно это не вызывает проблем, но одно большое отличие состоит в том, что все хэши с символьными ключами будут иметь строковые ключи в background_method .

Постановка в очередь методов класса или

Остался только один последний шаг. Мы также хотим поставить методы класса или класса в очередь. То есть нам не всегда нужно находить экземпляр по идентификатору для выполнения определенных фоновых задач. Например, мы можем отправить электронное письмо каждому зарегистрированному пользователю. Код будет выглядеть примерно так:

# app/models/user.rb
class User
class << self
def async_spam_all(email_text)
Queue::Normal.enqueue(self, :spam_all, email_text)
end
def spam_all(email_text)
find_each { |user| UserMailer.spam(user, email_text).deliver }
end
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Это потребует некоторой незначительной модификации методов enqueue и enqueue , поскольку нам нужно уметь различать класс или модуль в очереди и объект в очереди.

Для этого нам нужно посмотреть, отвечает ли класс объекта :find_by_id или нет. Это работает, потому что, если object является классом или модулем, его классом является Class , который не отвечает :find_by_id . Если object не является экземпляром модели, мы не добавляем ключ 'id' к метаинформации.

Таким образом, метод execute должен только проверять наличие этого ключа, чтобы определить, следует ли вызывать метод непосредственно для объекта или сначала найти экземпляр по идентификатору:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
meta = { ‘method’ => method }
if is_model?(object)
Resque.enqueue(self, meta.merge(‘class’ => object.class.name, ‘id’ => object.id), *args)
else
Resque.enqueue(self, meta.merge(‘class’ => object.name), *args)
end
end
def perform(meta = { }, *args)
if meta.has_key?(‘id’)
if model = meta[‘class’].constantize.find_by_id(meta[‘id’])
model.send(meta[‘method’], *args)
end
else
meta[‘class’].constantize.send(meta[‘method’], *args)
end
end
def is_model?(object)
object.class.respond_to?(:find_by_id)
end
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Примечание: эта статья предполагает использование ActiveRecord ORM. В зависимости от вашего приложения вам может потребоваться изменить определение is_model? чтобы более точно указать, что составляет экземпляр модели.

прочность

У нас есть рабочий интерфейс очереди, но все еще достаточно места для ошибок программиста. Метод enqueue идеале должен вызывать исключения, когда программист пытается поставить в очередь несуществующий метод или пытается передать слишком много или слишком мало аргументов этому методу.

Добавив несколько быстрых проверок, можно значительно сократить количество сбоев на стороне очереди. Давайте добавим метод, ensure_queueable! to Queue::Base которая вызывает исключение, если метод не существует и не передано соответствующее количество аргументов. С этими изменениями весь класс Queue::Base выглядит следующим образом:

# app/models/queue/base.rb
class Queue::Base
class << self
def enqueue(object, method, *args)
meta = { ‘method’ => method }
ensure_queueable!(object, method, *args)
if is_model?(object)
Resque.enqueue(self, meta.merge(‘class’ => object.class.name, ‘id’ => object.id), *args)
else
Resque.enqueue(self, meta.merge(‘class’ => object.name), *args)
end
end
def perform(meta = { }, *args)
if meta.has_key?(‘id’)
if model = meta[‘class’].constantize.find_by_id(meta[‘id’])
model.send(meta[‘method’], *args)
end
else
meta[‘class’].constantize.send(meta[‘method’], *args)
end
end
def is_model?(object)
object.class.respond_to?(:find_by_id)
end
private
def ensure_queueable!(object, method, *args)
ensure_responds_to!(object, method)
ensure_arity!(object, method, args.length)
end
def ensure_responds_to!(object, method)
unless object.respond_to?(method)
raise «object must respond to #{method}«
end
end
def ensure_arity!(object, method, arity)
required = object.method(method).arity
if required < 0 && arity < —required
raise «#{method}: #{arity} of #{required} arguments given»
elsif required >= 0 && required != arity
raise «#{method}: #{arity} of #{required} arguments given»
end
end
end
end

view raw
gistfile1.rb
hosted with ❤ by GitHub

Примечание: проверка арности метода несколько сложна, так как методы, которые принимают переменное число аргументов, возвращают отрицательное число. Смотрите Ruby документацию по Method для получения дополнительной информации.

Это все

Благодаря этому у нас есть простой и понятный интерфейс Resque, который позволяет ставить в очередь любой метод экземпляра или класса с минимальными усилиями. Мы также можем добавить новые приоритеты в несколько строк кода, просто определив новый подкласс Queue::Base .

Кроме того, мы предоставили единственную точку входа в очередь, которая использует Resque в качестве своей реализации, но если мы решим поменять Resque на другое решение в будущем, нам нужно будет только внести изменения в Queue::Base .

Я надеюсь, что эта статья была полезна. Веселитесь вместе с Resque и счастливой очереди!