Статьи

Распараллеливание работы с Redis

Примечание куратора: Вот инструкции Redis от 2011 года. 

Любой, кто слышал о Redis , вероятно, также слышал о Resque , который является легкой системой очередей. Непосвященным может показаться странным или, возможно, даже невозможным построить систему очередей, используя только хранилище значений ключей. В этой статье я собираюсь разобрать некоторые из примитивов redis, которые делают построение системы очередей поверх нее тривиальной, и показать, что Redis гораздо больше, чем просто хранилище значений ключей.

Эта проблема

Допустим, вы математик и только что придумали этот суперэффективный способ вычисления числовых факторов. Вы решаете написать следующую услугу синатры:

def compute_factors(number)
  factors = crazy_performant_computation number
end

get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  RestClient.post post_back_url, factors => compute_factors(number).to_json
  "OK"
end

Вскоре вы начинаете видеть сумасшедший трафик и понимаете, что, поскольку ваш алгоритм вычисления коэффициентов эффективен, он недостаточно быстр, чтобы не отставать от скорости, с которой вы получаете запросы к вашему сервису.

Первый проход при оптимизации путем разветвления

Вы понимаете, что будет гораздо эффективнее раскошелиться на новый процесс или поток, который будет выполнять вычисления и отправлять результат назад. Итак, ваш код теперь меняется на:

get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  Process.fork do
    RestClient.post post_back_url, factors => compute_factors(number).to_json
  end
  "OK"
end

Хотя это здорово, вы скоро поймете, что заполнение таблицы процессов в вашей ОС — не очень хорошая идея.

Ограничение создания процесса с использованием пула процессов

Именно эту проблему должен был решить пул процессов. Основная идея заключается в том, что вы все равно хотели бы выполнять свою трудоемкую задачу в фоновом режиме, но хотели бы ограничить количество фоновых процессов, которые вы выполняете. Есть несколько превосходных библиотек, которые решают эту проблему, таких как Delayed Job и Resque. Тем не менее, будучи хакером, вы решаете сделать это самостоятельно. Однако есть ряд проблем, которые решают эти библиотеки, и вы решаете взять ручку и бумагу и записать их, чтобы убедиться, что вы ничего не пропустили:

Предел, сколько рабочих вы создаете

У вас должен быть способ ограничить количество созданных вами фоновых работников, чтобы у вас не было той же проблемы, с которой вы сталкивались ранее.

Контроль работника создания и уничтожения

Вы хотели бы иметь возможность загружаться и увольнять ваших работников изящно.

Обрабатывать условия гонки

Вы понимаете, что вращение новых процессов означает, что теперь вы должны обеспечить безопасность своего кода. Redis предоставляет несколько замечательных атомарных операций из коробки, так что это не должно быть слишком сложно.

Второй проход с использованием BRPOP

Redis поддерживает несколько интересных структур данных, включая списки, наборы и хэши. В списках Redis есть команда RPOP, которая позволяет вам выталкивать элемент из хвоста списка, по сути рассматривая его как очередь. Команда RPOP поставляется с блокирующим вариантом самого себя, называемым BRPOP, который блокирует при вызове выталкивание элемента из списка. Вы также можете указать время ожидания (в секундах), которое вы хотели бы заблокировать на вызове.

def compute_factors(number)
  factors = crazy_performant_computation number
end

NUMBER_OF_WORKERS = (ENV['NUMBER_OF_WORKERS'] || 50).to_i
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      val = redis.brpop "work_queue", 1
      unless val
        puts "Process: #{Process.pid} is exiting"
        exit 0
      end

      number, postback_url = Marshal.load val.last
      RestClient.post postback_url, factors => compute_factors(number).to_json
    end
  end
end

redis = Redis.new
get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  redis.lpush "work_queue", Marshal.dump([number, post_back_url])
  "OK"
end

Итак, вы теперь решили кучу проблем в этом новом подходе. У нас есть фиксированное количество рабочих, работающих для фоновой обработки, поэтому теперь заполнение нашей таблицы процессов не зависит от условий трафика. Условия гонки обрабатываются для нас Redis , поскольку BRPOP является атомарным и гарантирует, что двое рабочих не будут выполнять дублирующую работу. И, наконец, рабочие уничтожают себя, если выходят из вызова brpop из-за истечения времени ожидания, в данном случае 1 секунда. Итак, это довольно много проблем, которые были решены для нас благодаря использованию Redis, Мы скоро начнем, увидев другую проблему, хотя. Поскольку трафик на нашем сайте запаздывает, рабочие, похоже, отмирают, поскольку их тайм-аут сокращается. Нам бы очень хотелось, чтобы рабочие блокировались дольше, чем всего на 1 секунду, а также имели возможность убить их раньше, если нам это нужно. Таким образом, они не будут болтаться дольше, чем должны.

Изящно закрывая работников

Наш мандат теперь состоит в том, чтобы корректно завершить работу наших сотрудников, используя redis и немного магии сигналов UNIX (для примеров использования сигналов в этой области проверьте Unicorn Is Unix и веб-сервер Unicorn . Наш код теперь трансформируется в:

def compute_factors(number)
  factors = crazy_performant_computation number
end

NUMBER_OF_WORKERS = (ENV['NUMBER_OF_WORKERS'] || 50).to_i
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      val = redis.brpop "work_queue", 30
      unless val
        puts "Process: #{Process.pid} is signing off due to timeout!"
        exit 0
      end

      if val.last == "DIE!"
        puts "Process: #{Process.pid} has been asked to kill itself by parent"
        exit 0
      end

      number, postback_url = Marshal.load val.last
      RestClient.post postback_url, factors => compute_factors(number).to_json
    end
  end
end
redis = Redis.new
get "/compute_factors" do
  number, post_back_url = params[:number].to_i, params[:post_back_url]
  redis.lpush "work_queue", Marshal.dump([number, post_back_url])
  "OK"
end

`echo #{Process.pid} > /tmp/factors.pid`
puts "Parent process wrote PID to /tmp/factors.pid"

trap('QUIT') do
  NUMBER_OF_WORKERS.times do
    redis.lpush "work_queue", "DIE!"
  end
end

Теперь мы увеличили тайм-аут до 30 секунд, а также нашли способ мгновенно сбить рабочих. Это достигается тем, что веб-сервер перехватывает сигнал QUIT, а когда это происходит, он нажимает кнопку «УМИРАТЬ!» сообщение на Redis «work_queue». Это сообщение отправляется столько же раз, сколько NUMBER_OF_WORKERS. А поскольку BRPOP — это атомарная и одновременно безопасная операция, мы теперь поддерживаем сокращение рабочих с помощью Redis . Как это круто! Чтобы корректно завершить работу сервера и рабочих нам просто необходимо:

kill -s QUIT `cat /tmp/factors.pid`

Вывод

В следующий раз, когда вам нужно будет выполнить какое-то заднее фоновое задание, не позволяйте себе просто взять библиотеку. Вместо этого поиграйтесь с списками redis немного. Вы будете удивлены тем, как много вы сможете сделать с помощью простых примитивов redis .