В первой статье мы рассмотрели, почему системный вызов fork()
Мы увидели, что, передав блок Kernel#fork
Process#fork
Кроме того, мы увидели, что, хотя разветвление относительно дорого, оно может конкурировать с многопоточностью, если реализация Ruby не расточает оптимизацию копирования при записи.
К сожалению, если метод вызывается в ответвлении, любые производимые им данные не будут доступны родительскому процессу из-за изоляции процесса. Нам нужен канал, по которому мы можем передавать данные между процессами. Здесь, во второй части, мы рассмотрим некоторые механизмы межпроцессного взаимодействия, а также другие способы использования fork
Код в этой статье доступен на github .
трубы
Каналы позволяют данным течь в одном направлении между парой файловых дескрипторов. Поскольку вилки наследуют дескрипторы открытых файлов, каналы могут использоваться для передачи данных между родительским и дочерним процессами. Создать трубу в Ruby легко с IO # pipe.
>> reader, writer = IO.pipe
=> [#<IO:fd 5>, #<IO:fd 6>]
Здесь писатель будет только писать, а читатель будет только читать. Звучит просто, но, к сожалению, есть некоторые нюансы. Если вы хотите использовать канал более одного раза, IO#puts
IO#gets
>> writer.puts("hello world")
>> reader.gets
=> "hello world\n"
>> writer.puts("hello world, again")
>> reader.gets
=> "hello world, again\n"
Каналы передают данные, используя потоки байтов, поэтому им нужны разделители, чтобы знать, когда прекратить чтение данных. IO#gets
В отличие от IO#puts
IO#write
IO#gets
IO#write
>>writer.write("this string is terminated\n")
>>reader.gets
=> "this string is terminated\n"
>> writer.write("this string is not terminated")
>> reader.gets
*waits indefinitely for \n*
В отличие от IO#gets
IO#read
Объекты IO сигнализируют EOF, когда они закрыты, поэтому вы будете использовать IO#read
>>reader,writer = IO.pipe
>>writer.write("hello world")
>>writer.close
>>reader.read
=> "hello world"
>>reader.read
=> ""
Итак, как бы мы связали канал между родительским процессом и одним из его дочерних элементов? Благодаря тому, что переменные и файловые дескрипторы будут общими, нам нужно создать канал только один раз. Однако, поскольку форк будет копировать оба конца канала (читатель и записывающий), нам нужно выбрать два дополнительных конца и закрыть их. Вот как вы бы отправляли данные от ребенка к родителю.
child_parent_pipe.rb
# creates a fork and pipes a string from child to parent
reader,writer = IO.pipe
fork do
reader.close
writer.puts "sent from child process"
end
writer.close
from_child = reader.gets
puts from_child
Мы также можем заставить его работать по-другому. Просто не забудьте закрыть писатель на конце, который будет читать, и читателя на конце, который будет писать.
parent_child_pipe.rb
# creates a fork and pipes a string from parent to child
reader,writer = IO.pipe
fork do
writer.close
from_parent = reader.gets
puts from_parent
end
reader.close
writer.write "sent from parent process"
UNIX-сокеты
Доменные сокеты UNIX можно представить как каналы с двумя большими преимуществами:
- UNIX-сокеты являются двунаправленными, а каналы — однонаправленными
- Каналы могут использовать только пары байтов, но в сокетах UNIX также могут использоваться дейтаграммы.
В отличие от обычных сокетов, они могут передавать данные только между двумя точками на одном компьютере, используя файловую систему в качестве своего адресного пространства. В результате они могут быть очень легкими, что делает их подходящими для межпроцессного взаимодействия.
unix_sockets.rb
# creates a pair of UNIX sockets that send and receive a string
require 'socket'
parent_socket, child_socket = UNIXSocket.pair
fork do
parent_socket.close
child_socket.send("sent from child (#{$$})", 0)
from_parent = child_socket.recv(100)
puts from_parent
end
child_socket.close
parent_socket.send("sent from parent (#{$$})", 0)
from_child = parent_socket.recv(100)
puts from_child
Распределенный Рубин
Оба канала и сокеты UNIX имеют несколько недостатков:
- Это низкоуровневые механизмы передачи байтов. Для сложного поведения вам нужно будет реализовать существующие протоколы или определить свой собственный.
- Они не могут общаться через барьер машины, ограничивая их удобство использования в масштабируемых сценариях.
Распределенный Ruby позволяет создавать и использовать так называемые «службы распределенных объектов». Эти службы позволяют выполнять код удаленно, отправляя сообщения распределенным объектам.
Мы выполняем код на удаленных машинах все время, но мы делаем это через некоторое косвенное обращение. Например, допустим, вы переходите на веб-адрес, например http://searchforallthestuffs.com/search?q=ponies. Ваш запрос попадает в маршрутизатор, который видит указанный вами маршрут и передает аргументы соответствующему контроллеру, который затем выполняет код, связанный с этим маршрутом (генерирует HTML-представление, JSON, XML и т. Д.).
Если ваша цель — выполнить код удаленно, это … отстой. Каждый раз, когда вы добавляете новую функцию, вам нужен маршрут и, возможно, новый контроллер. Это большая нагрузка, просто чтобы добавить метод в класс.
Распределенные объекты позволяют выполнять код удаленно, но с объектом, получающим сообщение, а не адрес.
distributed.rb
# Creates several worker processes and concurrently waits for the fastest one
require 'drb'
NUM_WORKERS = 4
class Worker
def calculate
time_to_work = rand(1..7)
sleep time_to_work
return time_to_work
end
def stop
DRb.stop_service
end
end
# Start object services
NUM_WORKERS.times do |i|
DRb.start_service("druby://:700#{i}", Worker.new)
puts "Worker running at #{DRb.uri}"
end
# Create a local end-point for each service
workers = NUM_WORKERS.times.map { |i| DRbObject.new nil, "druby://:700#{i}" }
# Concurrently wait for the fastest calculation
thread_pool = []
NUM_WORKERS.times do |i|
thread_pool << Thread.new do
answer = workers[i].calculate
puts "Worker #{i} finished in #{answer} seconds"
end
end
# Wait for every thread to get its answer
thread_pool.each(&:join)
# Shut down each worker
workers.each { |w| w.stop }
Перемещение объектов между процессами
Поскольку низкоуровневые коммуникационные конструкции, такие как каналы и сокеты, передают байты, а не объекты, вам нужно будет закодировать ваши объекты в байтовый формат — то есть сериализовать их — чтобы переместить их через барьер процесса.
К счастью, Ruby поставляется с Marshal
большинство объектов Ruby.
Из Ruby-Doc.org : «Некоторые объекты не могут быть выгружены: если объекты, которые должны быть выгружены, включают в себя привязки, объекты процедур или методов, экземпляры класса IO или одноэлементные объекты, будет вызвана ошибка TypeError ».
serialization.rb
# Ruby objects can be serialized by Marshal to move across pipes
# or sockets
Tire = Struct.new(:radius, :pressure)
reader, writer = IO.pipe
fork do
reader.close
tire = Tire.new(7, 28)
tire_data = Marshal.dump(tire)
writer.write tire_data
end
writer.close
tire_data = reader.gets
tire = Marshal.load(tire_data)
puts tire.inspect
Создание модуля для асинхронных вызовов методов
Если вы можете перемещать объекты между процессами, вы можете выполнить метод в другом процессе и получить результат обратно в оригинале. Это позволяет эффективно выполнять асинхронный метод.
Здесь у меня есть модуль с именем Forkable, который дает классу возможность выполнять методы параллельно. Единственное, что отличается от предыдущего — это то, что канал читается из нового потока, а то, что выходит из канала, передается блоку.
forkable.rb
# A module that lets you fork a method and get its result in a block
# Notes:
# 1. Forked methods can't take blocks in this implementation
# 2. Call back threads will be destroyed if the main process exits
# 3. Not production ready. For educational purposes only.
####################################################################
# If included in a class #fork_method will run a method in another process
# and yield the result to a block
module Forkable
def fork_method(method, *args)
reader, writer = IO.pipe
fork do
reader.close
result = self.send(method, *args)
child_data = Marshal.dump(result)
writer.puts(child_data)
end
writer.close
Thread.new do
data_from_child = reader.gets
yield Marshal.load(data_from_child) if block_given?
end
end
end
# An object that takes a random amount of time to instantiate
class ExpensiveObject
attr_reader :expense
def initialize(max_expense)
@expense = rand(1..max_expense)
sleep @expense
end
end
# A worker that forks, a...forker
class Forker
include Forkable
def calculate(max_expense)
return ExpensiveObject.new(max_expense)
end
end
# Create 3 ExpensiveObjects and print how long they took to create
f = Forker.new
3.times do
f.fork_method(:calculate, 7) do |result|
puts "result: #{result.inspect}"
end
end
puts "waiting on results..."
Process.waitall
puts "main process finished"
треска
Cod — это библиотека, цель которой — облегчить IPC в Ruby. В предыдущем разделе я продемонстрировал, как сериализовать объекты для передачи через механизмы передачи байтов, такие как каналы или сокеты. Треска позаботится об этом за нас. Он использует механизмы IPC более высокого уровня, которые он называет каналами. Мало того, что каналы не нужно закрывать на одном конце в каждом процессе, они также могут передавать объекты Ruby. Канал создается в Cod (как ни странно) с Cod#pipe
cod.rb
# The cod gem simplifies IPC. Ruby objects can be sent across channels.
# Installation:
# $ gem install cod
require 'cod'
Tire = Struct.new(:radius, :pressure)
channel = Cod.pipe
3.times do
fork do
radius = rand(8..14)
pressure = rand(24..33)
channel.put Tire.new(radius, pressure)
end
end
tires = 3.times.map { channel.get }
puts tires.inspect
Gem cod делает больше, чем я могу описать здесь (например, он имеет интеграцию beanstalkd). На сайте есть много примеров, так что обязательно посмотрите.
Предварительно разветвленные серверы
Давайте посмотрим на общий пример разветвления на практике: серверы. Некоторые серверы, использующие разветвление для достижения параллелизма, включают в себя:
- Apache (модуль многопроцессорной обработки Prefork)
- единорог
- Rainbows! (на основе единорога)
В частности, на серверах разветвления обычно используется предварительное разветвление. Концепция работает следующим образом: создание процесса может быть дешевле, чем копирование, но это не бесплатно. Поскольку работа веб-сервера обычно состоит в открытии и закрытии большого количества недолговечных соединений, может быть не очень хорошая идея ждать создания нового дочернего процесса, пока не будет принято новое соединение. Вместо этого мы можем несколько раз продолжить работу и дать вилкам ждать соединения. Вы часто увидите, что это описывается как запуск «пула процессов».
Так как это может выглядеть?
preforking.rb
# Starts an echo server that can service 3 clients in parallel
require 'socket'
server = TCPServer.new 'localhost', 3000
trap("EXIT") { socket.close }
trap("INT") { exit }
3.times do
fork do
sock = server.accept
sock.puts "You are connected to process #{$$}:"
while recv = sock.gets
sock.puts("ECHO:> #{recv}")
end
end
end
Process.waitall
Если вы запустите этот код, вы сможете подключиться к 3 отдельным терминалам.
telnet localhost 3000
Параллельная жемчужина
Параллельная жемчужина обеспечивает возможность достижения параллелизма в CRuby (MRI), не вдаваясь в мелочи вилок и IPC. Вот быстрый способ распределить все ваши ядра процессора параллельно.
parallel.rb
# Parallel iterates across collections with processes or threads
# Installation:
# $ gem install parallel
require 'parallel'
def calculate(magnitude)
x = 0
cycles = 10 ** magnitude
cycles.times do
x += 1.000001
end
return x
end
results = Parallel.map([6, 6, 6, 6, 7, 7, 7, 7]) do |mag|
calculate(mag)
end
puts results
У SitePoint уже есть отличная статья о параллельном геме, который исследует его внутреннюю работу (по крайней мере, в 2011 году).
Программисты часто избегают смотреть на исходный код, опасаясь, что он превратится в огромную потерю времени. Параллельный драгоценный камень, однако, является довольно маленькой, одной файловой библиотекой. Есть отличная возможность учиться и вносить свой вклад.
Вывод
Даже если другие интерпретаторы (JRuby, Rubinius) работают свободно без глобальной блокировки интерпретаторов, процессно-ориентированный подход к параллелизму все еще полезен. Чем более вертикальный подход к масштабированию, тем более ограниченным он будет. Запуск нескольких потоков на нескольких ядрах может быть более горизонтальным, чем увеличение тактовой частоты, но все же более вертикальным, чем запуск нескольких машин. Запуск 100 одноядерных экземпляров более детализирован, чем запуск 25 четырехъядерных экземпляров. Мышление с точки зрения процессов, отправляющих сообщения, позволяет вам принять такую гранулярность. Это будет непросто, но если он работает с кластером из 10 экземпляров, почему бы не 100? Или 1000? Почему бы не иметь 500 на одного провайдера и 500 на другом? Они просто процессы отправки сообщений.
Тратить кучу денег на ресурсы, которые вы можете или не можете использовать, представляет собой старый образ мышления. Зачем покупать больше, чем именно то, что вам нужно?