Несколько недель назад я написал статью о том, как разбить работу на потоки, чтобы повысить производительность средства проверки URL. Комментатор отметил, что параллельный драгоценный камень может быть использован для достижения того же результата, который вызвал у меня любопытство. Я исследовал драгоценный камень и обнаружил, что он не только может распараллеливаться на потоки, он также поддерживает разбиение работы на несколько процессов .
Интересно, как это работает?
Взломать его
Как всегда, я начинаю с копии кода, обычно находящейся на GitHub:
git clone https://github.com/grosser/parallel.git
README указывает нам на Parallel.map
как точку входа в код. Это легко найти, поскольку все находится в одном файле: lib/parallel.rb
. Отслеживание этого приводит нас к методам work_in_processes
и worker
, которые являются основой того, что мы пытаемся выяснить. Давайте начнем с вершины work_in_processes
с целью выяснить структуру.
# lib/parallel.rb def self.work_in_processes(items, options, &blk) workers = Array.new(options[:count]).map{ worker(items, options, &blk) } Parallel.kill_on_ctrl_c(workers.map{|worker| worker[:pid] })
Это порождает несколько рабочих, а затем регистрирует их корректное завершение, если элемент управления C отправляется процессу родительского менеджера. Без этого уничтожение вашего сценария приведет к тому, что дополнительные процессы останутся запущенными в вашей системе! worker
метод на самом деле создает новый процесс, и именно здесь все становится интересным.
def self.worker(items, options, &block) # use less memory on REE GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do begin parent_write.close parent_read.close process_incoming_jobs(child_read, child_write, items, options, &block) ensure child_read.close child_write.close end end child_read.close child_write.close {:read => parent_read, :write => parent_write, :pid => pid} end
Здесь есть три важных понятия, которые я расскажу по очереди. Первый — это вызов Process.fork
. Это вызов системного уровня (недоступен в Windows), который эффективно дублирует текущий процесс для создания нового процесса, называемого «дочерним». Для большинства целей и задач два процесса абсолютно одинаковы — одни и те же локальные переменные, одна и та же трассировка стека, одно и то же. Затем Ruby указывает исходному процессу пропустить блок, переданный Process.fork
, но новый блок для его ввода, что позволяет выполнять различное поведение в каждом процессе. Другими словами, блок, переданный Process.fork
выполняется только новым дочерним процессом.
Теперь, имея два процесса, нам нужен способ связи между ними, чтобы мы могли планировать работу. Вот где появляется вторая важная концепция: IO.pipe
. Поскольку два новых процесса являются отдельными, они не могут взаимодействовать путем изменения переменных, так как, хотя они изначально будут иметь одинаковые имена и значения в каждом процессе, они дублируются, так что изменения, внесенные в дочерний процесс, не будут видны родительским процессом , Канал — это метод связи между процессами (опять же, недоступен в Windows). Он действует как файл, в который один процесс может записывать, а другой — читать. Параллельно настраивает две трубы для обеспечения двунаправленной связи. Мы рассмотрим этот метод позже в этой статье, а пока просто признаем, что это настройка канала связи.
Последней важной концепцией является вызов copy_on_write_friendly
, который требует быстрого погружения в управление памятью для объяснения. Причина, по которой fork
настолько эффективен, заключается в том, что, хотя он и создает точную копию, он изначально не копирует память — оба процесса будут считывать одни и те же области памяти. Только когда процесс записывает в память, он копируется и дублируется. Это не только означает быстрое время запуска новых процессов, но также позволяет им иметь очень малые следы памяти, если они только читают и обрабатывают, как мы часто ожидаем, что будут делать наши параллельные работники.
Например, скажем, типичный процесс был 20 МБ. Запуск пяти экземпляров по отдельности приведет к использованию памяти в 100 МБ (5 лотов по 20), но запуск одного экземпляра и его разветвление приведет к использованию памяти по-прежнему всего 20 МБ! (На самом деле он немного выше из-за некоторых накладных расходов при создании нового процесса, но это незначительно.)
Однако у нас есть проблема — сборщик мусора в Ruby (как он управляет памятью). Он использует алгоритм, известный как «отметка и разметка», который состоит из двух этапов:
- Просмотрите все объекты в памяти и напишите им флаг, указывающий, используются они или нет.
- Очистите все объекты, не отмеченные как используемые.
Вы видели проблему в шаге 1? Как только Ruby сборщик мусора запускается, он выполняет запись в каждый объект, вызывая создание копии этой памяти! Даже с разветвлением пять экземпляров нашего 20-мегабайтного скрипта все равно будут использовать 100 Мб памяти.
Как указано в комментарии во фрагменте кода, некоторые очень умные люди решили эту проблему и выпустили ее как Ruby Enterprise Edition . Их FAQ содержит много подробностей, чтобы вы могли продолжить чтение, если вам интересно.
связь
О разветвлении говорить особо нечего, поэтому я хочу IO.pipe
оставшуюся часть статьи каналу связи: IO.pipe
. На дочерней стороне parallel
вилок результаты обработки отправляются обратно родителю путем записи соответствующим образом закодированных объектов Ruby — либо результата, либо исключения — в канал (см. Конец process_incoming_jobs
).
Родитель создает поток для каждого подпроцесса, который блокирует ожидание появления данных в канале, а затем сопоставляет результат перед отправкой дополнительной работы процессу. Это продолжается до тех пор, пока не будет больше запланированной работы.
# lib/parallel.rb:124 workers.each do |worker| listener_threads << Thread.new do begin while output = worker[:read].gets # store output from worker result_index, output = decode(output.chomp) if ExceptionWrapper === output exception = output.exception break elsif exception # some other thread failed break end result[result_index] = output # give worker next item next_index = Thread.exclusive{ current_index += 1 } break if next_index >= items.size write_to_pipe(worker[:write], next_index) end ensure worker[:read].close worker[:write].close end end end
Обратите внимание, что вместо использования Queue
как мы делали в предыдущей статье , parallel
Thread.exclusive
использует Thread.exclusive
чтобы сохранить потокобезопасный счетчик текущего индекса.
Завершение
Теперь у нас есть общее представление о том, как создавать и взаимодействовать между новыми процессами. Давайте попробуем проверить наши знания, создав игрушечное приложение для проверки методов, о которых мы узнали: fork
и pipe
.
reader, writer = IO.pipe process_id = Process.fork do writer.write "Hello" end if process_id # fork will return nil inside the child process # only the parent process wil execute this block puts "Message from child: #{reader.read}" end
На первый взгляд это выглядит нормально, но, запустив его, вы обнаружите, что процесс зависает. Мы упустили что-то важное! read
кажется, блокирует, потому что это не получает сигнал конца файла от канала. Мы можем подтвердить, что сообщение работает иначе, отправив новую строку и использовав вместо нее метод get (который gets
ввод до символа новой строки):
reader, writer = IO.pipe process_id = Process.fork do writer.write "Hello" end
if process_id puts "Message from child: #{reader.gets}" end
Этот скрипт работает как положено. Так почему же не работает наш первый скрипт? Ответ неочевиден, если вы не привыкли работать с параллельным кодом, но он аккуратно объяснен в документации по IO.pipe
:
Конец чтения канала не будет создавать условия завершения файла, если есть какие-либо записи с открытым каналом. В случае родительского процесса
read
никогда не вернется, если оно сначала не выдастwriter.close
.
Бинго! Хотя наш дочерний процесс закрывает свою копию writer
неявно, когда он выходит, наш родительский процесс все еще имеет исходную ссылку на writer
open! Мы можем исправить это, закрыв перед тем, как попытаться прочитать:
reader, writer = IO.pipe process_id = Process.fork do writer.write "Hello" end if process_id writer.close puts "Message from child: #{reader.read}" end
Вот еще несколько упражнений для вас:
- Расширьте наш пример сценария, чтобы родитель мог также отправлять сообщения ребенку.
- Какие преимущества / недостатки есть в использовании
Thread.exclusive
(как это делает параллель) вместоQueue
(как этоThread.exclusive
в нашей последней статье ) для планирования?
Дайте нам знать, как вы идете в комментариях. Настройтесь на следующую неделю для более захватывающих приключений в джунглях кода.