Redis отлично подходит для многих вещей, таких как: работа в качестве кеша, хранение конфигурации вашего приложения и т. Д. В этой статье я собираюсь использовать атомарные и блокирующие средства Redis для создания многошагового процессора RSS-каналов. Кроме того, я надеюсь затронуть некоторые темы: приоритезация очереди, синхронизация между процессами, использование redis для корректного завершения процессов и несколько условий гонки, которые нужно учитывать.
Шаг 1. Подготовка и запуск базового процессора RSS
Вот что пытается сделать обработчик RSS-каналов:
- 1: У него будет список URL фидов, которые он будет периодически посещать, и получать новые фиды.
- 2: Когда он находит новый канал, он его обрабатывает . А пока давайте просто предположим, что это значит обрабатывать, где-то хранить в redis.
Запуск первого шага довольно прост. Я собираюсь использовать rss-библиотеку ruby stdlib для обработки RSS-каналов. С точки зрения организации моей структуры данных redis, я буду использовать список под названием feeds_to_fetch, который я буду использовать для извлечения URL-адресов и определения, какие из них нужно получить дальше. Я также буду использовать другой список, который называется records_needing_processing, в который я буду вставлять записи RSS, которые необходимо обработать. Итак, учитывая то короткое вступление, вот как выглядит первый проход при создании этой вещи:
URLS = %W{http://www.npr.org/rss/rss.php?id=1001 http://www.npr.org/rss/rss.php?id=100} redis = Redis.new URLS.each { |url| redis.rpush "feeds_to_fetch", url } loop do queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0 if queue == "feeds_to_fetch" # Fetch feeds feed_url = payload puts "fetching feed: #{feed_url}" content = open(feed_url) { |s| s.read } rss = RSS::Parser.parse content, false rss.items.each do |entry| redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry]) end redis.rpush "feeds_to_fetch", feed_url else # Process entries entry = Marshal.load payload puts "processing entry: #{entry.url}" entry_id = redis.incr "entries_processed" redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, "published", entry.published, "description", entry.summary redis.sadd "entry_ids", entry_id end end
Так что этот код выглядит достаточно просто. Есть несколько URL-адресов, которые помещаются в список feeds_to_fetch. Затем мы выполняем бесконечный цикл, делая BLPOP над несколькими списками (feeds_to_fetch и records_needing_processing). Из BLPOP мы разветвляемся на два пути кода:
- Если мы только что извлекли что-то из списка feeds_to_fetch , мы знаем, что значение, которое мы только что извлекли , является URL-адресом RSS-канала, и мы приступаем к его получению. Как только мы закончили извлечение канала, мы помещаем записи в список records_needing_processing .
- С другой стороны, если мы извлекли что-то из списка records_needing_processing , мы знаем, что это запись RSS, и мы продолжаем обрабатывать ее, то есть сохранять ее в Redis.
Хотя этот код выглядит так, как будто он должен работать, при запуске мы видим, что все, что мы делаем, это извлекаем feed_url и не обрабатываем какие-либо из наших записей. Вот пример вывода из моего прогона:
fetching feed: http://www.npr.org/rss/rss.php?id=1008 fetching feed: http://www.npr.org/rss/rss.php?id=100 fetching feed: http://www.npr.org/rss/rss.php?id=1001 fetching feed: http://www.npr.org/rss/rss.php?id=1006
Как видите, все, что мы делаем, это выбираем каналы без обработки записей. Добро пожаловать в первое состояние гонки!
Приоритизация очередей в BLPOP
Чтобы повторить поведение, которое мы наблюдаем, мы постоянно выбираем фиды и не обрабатываем какие-либо записи. Почему это происходит? Чтобы понять почему, нам нужно обратить более пристальное внимание на следующую строку:
queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0
Здесь мы говорим redis, чтобы он выталкивал элемент из списков feeds_to_fetch или records_needing_processing в указанном порядке (аргумент конечного 0 просто означает блок навсегда). Для дальнейшего уточнения, мы говорим Redis попытаться дать мне что — нибудь из списка feeds_to_fetch, если вы не можете найти что — нибудь там , то попробуйте дать мне что — нибудь из entries_needing_processing списка. Итак, мы даем списку feeds_to_fetch более высокий порядок приоритета, чем списку records_needing_processing, указав его ранее. Это в сочетании с тем фактом, что когда мы выталкиваем элемент из списка feeds_to_fetch, мы отодвигаем его назад, означает, что мы никогда не собираемся выталкивать элемент из списка records_needing_processing. Как мы это исправим?
Одним из решений является не толкать FEED_URL обратно на список feeds_to_fetch немедленно. Скорее подождите, пока мы не обработаем все его записи, а затем вернем их обратно. Хотя это работает, это делает наш код немного более сложным. Теперь нам нужно отслеживать, когда обрабатываются все записи данного feed_url и т. Д. Более простое, гораздо более элегантное решение — просто переключить приоритеты списка в BLPOP на:
queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch", 0
Повышая значение entry_needing_processing вверх в порядке извлечения, мы гарантируем, что будем обрабатывать все записи и извлекать все каналы. Основная причина, по которой это устраняет условия гонки, заключается в том, что мы не помещаем запись обратно в список records_needing_processing после ее появления.
Распараллеливание выборки канала
Так что теперь у нас есть базовый скрипт выборки каналов, и мы можем выжать из него некоторую производительность. Один из наименьших результатов, который мы можем оптимизировать, — это увеличение количества процессов, которые извлекают данные из наших списков повторного просмотра. Учитывая это, предварительная попытка выглядит следующим образом:
NUMBER_OF_WORKERS = 4 NUMBER_OF_WORKERS.times do Process.fork do redis = Redis.new loop do queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch", 0 if queue == "feeds_to_fetch" # Fetch feeds feed_url = payload puts "fetching feed: #{feed_url}" content = open(feed_url) { |s| s.read } rss = RSS::Parser.parse content, false rss.items.each do |entry| redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry]) end redis.rpush "feeds_to_fetch", feed_url else # Process entries entry = Marshal.load payload puts "processing entry: #{entry.url}" entry_id = redis.incr "entries_processed" redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, "published", entry.published, "description", entry.summary redis.sadd "entry_ids", entry_id end end end end Process.waitall
Хотя этот код работает отлично, мы видим, что у нас нет возможности грациозно уволить наших работников.
Изящное отключение рабочих
Следующая задача, которую нам нужно решить, — это иметь возможность отключить все рабочие процессы и родительский процесс после запуска нашего сценария. Мы могли бы использовать что-то вроде сигналов для этого, но если родительский процесс скажет ребенку убить себя, это было бы гораздо лучше, чем redis.
NUMBER_OF_WORKERS = 4 NUMBER_OF_WORKERS.times do Process.fork do redis = Redis.new loop do queue, payload = redis.blpop "message_from_master", "entries_needing_processing", "feeds_to_fetch", 0 exit(0) if queue == "message_from_master" && payload == "DIE!" # rest of worker code end end end `echo #{Process.pid} > /tmp/feed_processor.pid` puts "Parent process wrote PID to /tmp/loyalize_master.pid" trap('QUIT') do NUMBER_OF_WORKERS.times do redis.lpush "message_from_master", "DIE!" end end Process.waitall
Интересные места в изменении здесь отмечают message_from_masterпервый список в порядке приоритета. Таким образом, когда работник завершает обработку канала или записи и готов получить следующую задачу из redis, сообщение от мастера (так называемого родительского процесса) имеет наивысший приоритет. В самом родительском процессе мы записываем PID в файл в / tmp dir и перехватываем сигнал QUIT. В обработчике сигналов родитель просто помещает сообщение DIE в очередь message_from_master столько раз, сколько рабочих процессов он породил. А поскольку BLPOP является атомарным, мы гарантируем, что каждый ребенок не собирается извлекать более одного из этих сообщений DIE, поэтому таким образом они все увидят это сообщение. Гоночные условия до свидания! Чтобы увидеть это в действии и отправить сигнал QUIT родительскому процессу, нам просто нужно выполнить:
kill -s QUIT `cat /tmp/feed_processor.pid`
Защита от капризов интернета
Мы приближаемся к тому, чтобы сделать это здесь. Одна вещь, которую мы можем сделать, это попытаться сделать сценарий более жестким, защищая часть сценария, извлекающую канал, заключив ее в предложение начала-обеспечения.
if queue == "feeds_to_fetch" # Fetch feeds feed_url = payload begin puts "fetching feed: #{feed_url}" content = open(feed_url) { |s| s.read } rss = RSS::Parser.parse content, false rss.items.each do |entry| redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry]) end ensure redis.rpush "feeds_to_fetch", feed_url end else
Вывод
Хотя это отнюдь не пуленепробиваемый сценарий, это интересное упражнение. Становится очевидным, насколько велика роль Redis в приложении, где он является основным или единственным хранилищем данных. Такие вещи, как синхронизация, мьютексы, разногласия в ресурсах, приоритетное распределение ресурсов — все это заботит вас Redis. Полный Gist вверх. Наслаждайтесь!