Статьи

Как создать процессор RSS Feed с помощью Redis

Redis отлично подходит для многих вещей, таких как: работа в качестве кеша, хранение конфигурации вашего приложения и т. Д. В этой статье я собираюсь использовать атомарные и блокирующие средства Redis для создания многошагового процессора RSS-каналов. Кроме того, я надеюсь затронуть некоторые темы: приоритезация очереди, синхронизация между процессами, использование redis для корректного завершения процессов и несколько условий гонки, которые нужно учитывать.

Шаг 1. Подготовка и запуск базового процессора RSS

Вот что пытается сделать обработчик RSS-каналов:

  • 1: У него будет список URL фидов, которые он будет периодически посещать, и получать новые фиды.
  • 2: Когда он находит новый канал, он его обрабатывает . А пока давайте просто предположим, что это значит обрабатывать, где-то хранить в redis.

Запуск первого шага довольно прост. Я собираюсь использовать rss-библиотеку ruby ​​stdlib для обработки RSS-каналов. С точки зрения организации моей структуры данных redis, я буду использовать список под названием feeds_to_fetch, который я буду использовать для извлечения URL-адресов и определения, какие из них нужно получить дальше. Я также буду использовать другой список, который называется records_needing_processing, в который я буду вставлять записи RSS, которые необходимо обработать. Итак, учитывая то короткое вступление, вот как выглядит первый проход при создании этой вещи:

URLS = %W{http://www.npr.org/rss/rss.php?id=1001
          http://www.npr.org/rss/rss.php?id=100}

redis = Redis.new

URLS.each { |url| redis.rpush "feeds_to_fetch", url }

loop do
  queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0
  if queue == "feeds_to_fetch"
    # Fetch feeds
    feed_url = payload
    puts "fetching feed: #{feed_url}"
    content = open(feed_url) { |s| s.read }
    rss = RSS::Parser.parse content, false
    rss.items.each do |entry|
      redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
    end
    redis.rpush "feeds_to_fetch", feed_url
  else
    # Process entries
    entry = Marshal.load payload
    puts "processing entry: #{entry.url}"
    entry_id = redis.incr "entries_processed"
    redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, 
                "published", entry.published, "description", entry.summary
    redis.sadd "entry_ids", entry_id
  end
end

Так что этот код выглядит достаточно просто. Есть несколько URL-адресов, которые помещаются в список feeds_to_fetch. Затем мы выполняем бесконечный цикл, делая BLPOP над несколькими списками (feeds_to_fetch и records_needing_processing). Из BLPOP мы разветвляемся на два пути кода:

  1. Если мы только что извлекли что-то из списка feeds_to_fetch , мы знаем, что значение, которое мы только что извлекли , является URL-адресом RSS-канала, и мы приступаем к его получению. Как только мы закончили извлечение канала, мы помещаем записи в список records_needing_processing .
  2. С другой стороны, если мы извлекли что-то из списка records_needing_processing , мы знаем, что это запись RSS, и мы продолжаем обрабатывать ее, то есть сохранять ее в Redis.

Хотя этот код выглядит так, как будто он должен работать, при запуске мы видим, что все, что мы делаем, это извлекаем feed_url и не обрабатываем какие-либо из наших записей. Вот пример вывода из моего прогона:

fetching feed: http://www.npr.org/rss/rss.php?id=1008
fetching feed: http://www.npr.org/rss/rss.php?id=100
fetching feed: http://www.npr.org/rss/rss.php?id=1001
fetching feed: http://www.npr.org/rss/rss.php?id=1006

Как видите, все, что мы делаем, это выбираем каналы без обработки записей. Добро пожаловать в первое состояние гонки!

Приоритизация очередей в BLPOP

Чтобы повторить поведение, которое мы наблюдаем, мы постоянно выбираем фиды и не обрабатываем какие-либо записи. Почему это происходит? Чтобы понять почему, нам нужно обратить более пристальное внимание на следующую строку:

  queue, payload = redis.blpop "feeds_to_fetch", "entries_needing_processing", 0

Здесь мы говорим redis, чтобы он выталкивал элемент из списков feeds_to_fetch или records_needing_processing в указанном порядке (аргумент конечного 0 просто означает блок навсегда). Для дальнейшего уточнения, мы говорим Redis попытаться дать мне что — нибудь из списка feeds_to_fetch, если вы не можете найти что — нибудь там , то попробуйте дать мне что — нибудь из entries_needing_processing списка. Итак, мы даем списку feeds_to_fetch более высокий порядок приоритета, чем списку records_needing_processing, указав его ранее. Это в сочетании с тем фактом, что когда мы выталкиваем элемент из списка feeds_to_fetch, мы отодвигаем его назад, означает, что мы никогда не собираемся выталкивать элемент из списка records_needing_processing. Как мы это исправим?

Одним из решений является не толкать FEED_URL обратно на список feeds_to_fetch немедленно. Скорее подождите, пока мы не обработаем все его записи, а затем вернем их обратно. Хотя это работает, это делает наш код немного более сложным. Теперь нам нужно отслеживать, когда обрабатываются все записи данного feed_url и т. Д. Более простое, гораздо более элегантное решение — просто переключить приоритеты списка в BLPOP на:

  queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch",  0

Повышая значение entry_needing_processing вверх в порядке извлечения, мы гарантируем, что будем обрабатывать все записи и извлекать все каналы. Основная причина, по которой это устраняет условия гонки, заключается в том, что мы не помещаем запись обратно в список records_needing_processing после ее появления.

Распараллеливание выборки канала

Так что теперь у нас есть базовый скрипт выборки каналов, и мы можем выжать из него некоторую производительность. Один из наименьших результатов, который мы можем оптимизировать, — это увеличение количества процессов, которые извлекают данные из наших списков повторного просмотра. Учитывая это, предварительная попытка выглядит следующим образом:

NUMBER_OF_WORKERS = 4
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do
      queue, payload = redis.blpop "entries_needing_processing", "feeds_to_fetch", 0
      if queue == "feeds_to_fetch"
        # Fetch feeds
        feed_url = payload
        puts "fetching feed: #{feed_url}"
        content = open(feed_url) { |s| s.read }
        rss = RSS::Parser.parse content, false
        rss.items.each do |entry|
          redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
        end
        redis.rpush "feeds_to_fetch", feed_url
      else
        # Process entries
        entry = Marshal.load payload
        puts "processing entry: #{entry.url}"
        entry_id = redis.incr "entries_processed"
        redis.hmset "entry|#{entry_id}", "url", entry.url, "title", entry.title, 
                    "published", entry.published, "description", entry.summary
        redis.sadd "entry_ids", entry_id
      end
    end
  end
end

Process.waitall

Хотя этот код работает отлично, мы видим, что у нас нет возможности грациозно уволить наших работников.

Изящное отключение рабочих

Следующая задача, которую нам нужно решить, — это иметь возможность отключить все рабочие процессы и родительский процесс после запуска нашего сценария. Мы могли бы использовать что-то вроде сигналов для этого, но если родительский процесс скажет ребенку убить себя, это было бы гораздо лучше, чем redis.

NUMBER_OF_WORKERS = 4
NUMBER_OF_WORKERS.times do
  Process.fork do
    redis = Redis.new
    loop do

      queue, payload = redis.blpop "message_from_master", "entries_needing_processing", 
                                    "feeds_to_fetch", 0
      exit(0) if queue == "message_from_master" && payload == "DIE!"
      # rest of worker code
    end
  end
end

`echo #{Process.pid} > /tmp/feed_processor.pid`
puts "Parent process wrote PID to /tmp/loyalize_master.pid"

trap('QUIT') do
  NUMBER_OF_WORKERS.times do
    redis.lpush "message_from_master", "DIE!"
  end
end

Process.waitall

Интересные места в изменении здесь отмечают message_from_masterпервый список в порядке приоритета. Таким образом, когда работник завершает обработку канала или записи и готов получить следующую задачу из redis, сообщение от мастера (так называемого родительского процесса) имеет наивысший приоритет. В самом родительском процессе мы записываем PID в файл в / tmp dir и перехватываем сигнал QUIT. В обработчике сигналов родитель просто помещает сообщение DIE в очередь message_from_master столько раз, сколько рабочих процессов он породил. А поскольку BLPOP является атомарным, мы гарантируем, что каждый ребенок не собирается извлекать более одного из этих сообщений DIE, поэтому таким образом они все увидят это сообщение. Гоночные условия до свидания! Чтобы увидеть это в действии и отправить сигнал QUIT родительскому процессу, нам просто нужно выполнить:

kill -s QUIT `cat /tmp/feed_processor.pid`

Защита от капризов интернета

Мы приближаемся к тому, чтобы сделать это здесь. Одна вещь, которую мы можем сделать, это попытаться сделать сценарий более жестким, защищая часть сценария, извлекающую канал, заключив ее в предложение начала-обеспечения.

if queue == "feeds_to_fetch"
  # Fetch feeds
  feed_url = payload
  begin
    puts "fetching feed: #{feed_url}"
    content = open(feed_url) { |s| s.read }
    rss = RSS::Parser.parse content, false
    rss.items.each do |entry|
      redis.rpush "entries_needing_processing", Marshal.dump([feed_url,entry])
    end
  ensure
    redis.rpush "feeds_to_fetch", feed_url
  end
else

Вывод

Хотя это отнюдь не пуленепробиваемый сценарий, это интересное упражнение. Становится очевидным, насколько велика роль Redis в приложении, где он является основным или единственным хранилищем данных. Такие вещи, как синхронизация, мьютексы, разногласия в ресурсах, приоритетное распределение ресурсов — все это заботит вас Redis. Полный Gist вверх. Наслаждайтесь!