Статьи

Code Safari: многопоточность Ruby

В течение недели у меня возникла проблема: дали список возможных URL-адресов, какие из них являются реальными? Казалось, идеальная работа для компьютера, чтобы решить. Я полез в свой набор инструментов Ruby и обнаружил несколько хитрых трюков по пути. В этом кодовом сафари мы будем делать что-то немного другое — вместо того, чтобы исследовать популярный гем, мы будем открывать саму стандартную библиотеку Ruby.

Есть несколько различных частей проблемы URL. Первое и наиболее очевидное — это возможность определить, активен ли URL. Мне не нужно было быть на 100% тщательным, поэтому я выбрал самый простой вариант — попытаться извлечь что-то из URL и посмотреть, что произойдет. Если не возникает никаких исключений, это, вероятно, нормально. В стандартной библиотеке Ruby есть несколько методов для извлечения контента из URL. Net/HTTP является наиболее распространенным и гибким, но для простых случаев я предпочитаю open-uri , который расположен поверх Net/HTTP

 require 'open-uri'

def link_valid?(url)
  open(url)
  true
rescue
  false
end

Обратите внимание, что вы можете использовать rescuebegin Это хитрый трюк, который может привести в порядок ваш код по уровню отступа. Я также использовал открытое rescue Вы должны четко указать, какие ошибки вы ожидаете. В этом случае, однако, существует так много различных возможных исключений, которые плохо документированы — таких как SocketErrorArgumentErrorErrno — и, учитывая, что мы охватываем только один единственный вызов Ruby, полного восстановления пока будет достаточно.

Теперь, когда у нас есть метод проверки правильности ссылки, давайте добавим некоторые данные.

 urls = %w(
  http://www.sitepoint.com
  http://xaviershay.com
  http://twitter.com
  not_a_link
  http://bogus.bogus
)
 
invalid_links = urls.reject {|x| link_valid?(x) }
 
puts invalid_links

Как и ожидалось, этот код распечатывает последние два поддельных URL.

Пожалуйста, прогресс

Это было бы концом истории, за исключением того, что потребовалось немало времени, чтобы проверить каждую ссылку, и никаких продвижений по пути не было. Для пяти URL это может быть хорошо, но мой список насчитывал тысячи. Как я узнал, что программа не просто зависла? Было бы легко просто бросить putsreject Давайте переместим код в метод, который дает нам возможность перемещаться и играть с этим кодом.

 def validate_urls(urls)
  urls.each do |url|
    yield url, link_valid?(url)
  end
end
 
validate_urls(urls) do |url, valid|
  puts "#{url} was #{valid ? "valid" : "invalid"}"
end

Теперь мы получаем обновления при проверке каждого URL.

Больше скорости

Это хорошо, но мы не решили основную проблему. Мы только что сделали видимым, насколько медленным этот код. Основная проблема заключается в том, что для open Поскольку мы обрабатываем URL-адреса последовательно, в то время как мы ожидаем один URL-адрес, мы не можем выполнять какую-либо другую работу, и наш код просто простаивает. Поскольку каждая проверка URL не зависит от других проверок, это идеальный вариант для распараллеливания или проверки более одной ссылки одновременно. Есть несколько способов сделать это (я рассмотрел подход, основанный на Fiber, несколько недель назад), но самым простым в этом случае является использование Thread

Если вы действительно заинтересованы в параллелизме Ruby или собираетесь интенсивно его использовать, стоит почитать о том, как ваша версия Ruby реализует свою модель потоков, поскольку существуют некоторые важные предостережения и различия между реализациями. Для этого случайного использования достаточно просто знать, что они позволяют нам делать больше, чем одну вещь одновременно.

Наша первая итерация выглядит так:

 def validate_urls(urls)
  number_per_thread = (urls.length / 3.0).round # 3 threads
 
  urls.each_slice(number_per_thread).map {|urls_subset|
    Thread.new do
      urls_subset.each do |url|
        yield url, link_valid?(url)
      end
    end
  }.each {|thread| thread.join }
end

Здесь происходит несколько вещей. Сначала я быстро объясню код потоков. Thread.new Этот блок теперь будет выполняться параллельно с остальным кодом. Если бы мы просто оставили его, у нас возникла бы проблема, так как метод validate_urls Вот где метод join Вызов joinпосле того, как мы создали все из них), предотвращает возврат validate_urls

Другой важный метод — each_slice Модуль Enumerable предоставляет богатый набор методов, доступных в ArrayHash Для любого вида алгоритмической задачи обычно есть готовое решение, которое вы можете использовать. В этом случае each_slicenchunkEnumerable Нам просто нужно немного подсчитать, чтобы выяснить, насколько большим должен быть каждый срез, который будет зависеть от количества URL-адресов и количества желаемых потоков (здесь три, но поиграйтесь с этим числом, чтобы найти лучшее значение для ваших данных и машины).

Этот код является улучшением, но он все еще имеет некоторые недостатки. Он не использует максимально доступные потоки — вполне возможно, что все «медленные» URL попадают в чанк первого потока, в то время как другой поток быстро просматривает список явно недопустимых, просто чтобы бездействовать, ожидая, пока первый поток делает всю тяжелую работу.

Объединение потоков

Нам нужен лучший способ планирования работы для потоков. Вместо того, чтобы делить всю работу на старте, прежде чем мы узнаем, сколько времени займет каждая задача, у нас могут быть потоки, которые постоянно берут URL-адреса из общего пула, когда они становятся доступными для выполнения дополнительной работы, пока пул не опустеет.

Наивная реализация выглядит так:

 def validate_urls(urls)
  urls = urls.dup
   
  3.times.map {
    Thread.new do
      while !urls.empty?
        url = urls.pop
        yield url, link_valid?(url)
      end
    end
  }.each {|thread| thread.join }
end

Сначала мы создаем копию переданного массива, так как не вежливо изменять аргументы, передаваемые вашему методу. Затем мы запускаем три потока, каждый из которых постоянно берет URL-адреса из списка, пока он не станет пустым. Но не все розы в многопоточном королевстве! Мы вступаем в болото параллелизма, где грязь отрыгивает от грязи, а все не так, как кажется на первый взгляд. Этот код концептуально выглядит хорошо, и в большинстве случаев он также будет работать нормально, но там, где когда-либо потоки совместно используют доступ к данным, мы должны гарантировать, что этот доступ к этим данным называется потокобезопасным . Предоставление двум потокам доступа к памяти волей-неволей может иметь катастрофические и непредсказуемые последствия, выходящие далеко за рамки всего, что вы или я могли бы предсказать .

Вы никогда не должны предполагать, что классы являются потокобезопасными, если они явно не задокументированы как таковые. (Вот отличная, но занудная дискуссия о безопасности потоков в базовых классах Ruby.) Моя мама всегда говорила лучше, чем потом сожалеть. У нас есть два варианта исправления нашего кода: либо мы используем поточно-ориентированную структуру данных, либо мы синхронизируем доступ к нашему массиву (то есть мы разрешаем доступ только одному потоку за раз). Первый вариант должен быть предпочтительным, так как он проще и менее подвержен ошибкам. Поиск «Ruby thread-safe array» привел меня к классу Queue в стандартной библиотеке, который обеспечивает точное поведение, которого мы придерживаемся, в поточно-ориентированном режиме — пример в документации практически такой же, как и наш вариант использования.

 require 'thread'

def validate_urls(urls)
  queue     = Queue.new
  urls.each {|url| queue << url }
&nbsp;
  3.times.map {
    Thread.new do
      while !queue.empty?
        url = queue.pop
        yield url, link_valid?(url)
      end
    end
  }.each {|thread| thread.join }
end

Бам, потокобезопасный. И последнее замечание: давайте сделаем так, чтобы в каждый момент времени происходил только один выход, чтобы пользователю этого метода не нужно было беспокоиться о каких-либо соображениях параллелизма. Что касается их, у них просто будет очень быстрый метод validate_urls Для этого мы будем использовать второй из наших двух вариантов выше: синхронизация. При этом используется другой встроенный класс Ruby Mutex Я включу это здесь в полный список всей программы:

 require 'open-uri'
require 'thread'
&nbsp;
def link_valid?(url)
  open(url)
  true
rescue
  false
end
&nbsp;
def validate_urls(urls)
  queue     = Queue.new
  semaphore = Mutex.new
  urls.each {|url| queue << url }
&nbsp;
  3.times.map {
    Thread.new do
      while !queue.empty?
        url = queue.pop
        valid = link_valid?(url)
        semaphore.synchronize {
          yield url, valid
        }
      end
    end
  }.each {|thread| thread.join }
end
&nbsp;
urls = %w(
  http://www.sitepoint.com
  http://xaviershay.com
  http://twitter.com
  not_a_link
  http://bogus.bogus
)
&nbsp;
validate_urls(urls) do |url, valid|
  puts "#{url} was #{valid ? "valid" : "invalid"}"
end

Метод synchronize Все объединяется, чтобы дать нам быструю и надежную проверку ссылок.

Завершение

Мы взяли простой код и сделали его довольно сложным. То, насколько далеко вы настроите свой собственный код, будет зависеть от ваших конкретных требований, но знание того, как выглядят концы спектра, позволяет вам сделать лучший выбор в ваших обстоятельствах.

Вот несколько вопросов для дальнейшего изучения:

  • Как бы выглядел этот код с использованием модели параллельного параллелизма? (Например, EventMachine)
  • Как бы вы расширили эту технику, чтобы написать программу-паук? (Тот, который находит ссылки на странице и следует за ними)

Дайте нам знать, как вы пойдете в комментариях, и настройтесь на следующую неделю для более захватывающих приключений в джунглях кода.