Статьи

Форкинг и IPC в Ruby, часть II

Вилка из трех дорог

В первой статье мы рассмотрели, почему системный вызов fork() Мы увидели, что, передав блок Kernel#forkProcess#fork Кроме того, мы увидели, что, хотя разветвление относительно дорого, оно может конкурировать с многопоточностью, если реализация Ruby не расточает оптимизацию копирования при записи.

К сожалению, если метод вызывается в ответвлении, любые производимые им данные не будут доступны родительскому процессу из-за изоляции процесса. Нам нужен канал, по которому мы можем передавать данные между процессами. Здесь, во второй части, мы рассмотрим некоторые механизмы межпроцессного взаимодействия, а также другие способы использования fork

Код в этой статье доступен на github .

трубы

Каналы позволяют данным течь в одном направлении между парой файловых дескрипторов. Поскольку вилки наследуют дескрипторы открытых файлов, каналы могут использоваться для передачи данных между родительским и дочерним процессами. Создать трубу в Ruby легко с IO # pipe.

 >> reader, writer = IO.pipe
=> [#<IO:fd 5>, #<IO:fd 6>]

Здесь писатель будет только писать, а читатель будет только читать. Звучит просто, но, к сожалению, есть некоторые нюансы. Если вы хотите использовать канал более одного раза, IO#putsIO#gets

 >> writer.puts("hello world")
>> reader.gets
=> "hello world\n"

>> writer.puts("hello world, again")
>> reader.gets
=> "hello world, again\n"

Каналы передают данные, используя потоки байтов, поэтому им нужны разделители, чтобы знать, когда прекратить чтение данных. IO#gets В отличие от IO#putsIO#writeIO#getsIO#write

 >>writer.write("this string is terminated\n")
>>reader.gets
=> "this string is terminated\n"

>> writer.write("this string is not terminated")
>> reader.gets
*waits indefinitely for \n*

В отличие от IO#getsIO#read Объекты IO сигнализируют EOF, когда они закрыты, поэтому вы будете использовать IO#read

 >>reader,writer = IO.pipe
>>writer.write("hello world")
>>writer.close
>>reader.read
=> "hello world"
>>reader.read
=> ""

Итак, как бы мы связали канал между родительским процессом и одним из его дочерних элементов? Благодаря тому, что переменные и файловые дескрипторы будут общими, нам нужно создать канал только один раз. Однако, поскольку форк будет копировать оба конца канала (читатель и записывающий), нам нужно выбрать два дополнительных конца и закрыть их. Вот как вы бы отправляли данные от ребенка к родителю.

 child_parent_pipe.rb
# creates a fork and pipes a string from child to parent

reader,writer = IO.pipe

fork do
  reader.close
  writer.puts "sent from child process"
end

writer.close
from_child = reader.gets
puts from_child

Мы также можем заставить его работать по-другому. Просто не забудьте закрыть писатель на конце, который будет читать, и читателя на конце, который будет писать.

 parent_child_pipe.rb
# creates a fork and pipes a string from parent to child

reader,writer = IO.pipe

fork do
  writer.close
  from_parent = reader.gets
  puts from_parent
end

reader.close
writer.write "sent from parent process"

UNIX-сокеты

Доменные сокеты UNIX можно представить как каналы с двумя большими преимуществами:

  • UNIX-сокеты являются двунаправленными, а каналы — однонаправленными
  • Каналы могут использовать только пары байтов, но в сокетах UNIX также могут использоваться дейтаграммы.

В отличие от обычных сокетов, они могут передавать данные только между двумя точками на одном компьютере, используя файловую систему в качестве своего адресного пространства. В результате они могут быть очень легкими, что делает их подходящими для межпроцессного взаимодействия.

 unix_sockets.rb
# creates a pair of UNIX sockets that send and receive a string

require 'socket'
parent_socket, child_socket = UNIXSocket.pair

fork do
  parent_socket.close
  child_socket.send("sent from child (#{$$})", 0)
  from_parent = child_socket.recv(100)
  puts from_parent
end

child_socket.close
parent_socket.send("sent from parent (#{$$})", 0)
from_child = parent_socket.recv(100)
puts from_child

Распределенный Рубин

Оба канала и сокеты UNIX имеют несколько недостатков:

  1. Это низкоуровневые механизмы передачи байтов. Для сложного поведения вам нужно будет реализовать существующие протоколы или определить свой собственный.
  2. Они не могут общаться через барьер машины, ограничивая их удобство использования в масштабируемых сценариях.

Распределенный Ruby позволяет создавать и использовать так называемые «службы распределенных объектов». Эти службы позволяют выполнять код удаленно, отправляя сообщения распределенным объектам.

Мы выполняем код на удаленных машинах все время, но мы делаем это через некоторое косвенное обращение. Например, допустим, вы переходите на веб-адрес, например http://searchforallthestuffs.com/search?q=ponies. Ваш запрос попадает в маршрутизатор, который видит указанный вами маршрут и передает аргументы соответствующему контроллеру, который затем выполняет код, связанный с этим маршрутом (генерирует HTML-представление, JSON, XML и т. Д.).

Если ваша цель — выполнить код удаленно, это … отстой. Каждый раз, когда вы добавляете новую функцию, вам нужен маршрут и, возможно, новый контроллер. Это большая нагрузка, просто чтобы добавить метод в класс.

Распределенные объекты позволяют выполнять код удаленно, но с объектом, получающим сообщение, а не адрес.

 distributed.rb
# Creates several worker processes and concurrently waits for the fastest one

require 'drb'

NUM_WORKERS = 4

class Worker

  def calculate
    time_to_work = rand(1..7)
    sleep time_to_work
    return time_to_work
  end

  def stop
    DRb.stop_service
  end
end

# Start object services
NUM_WORKERS.times do |i|
  DRb.start_service("druby://:700#{i}", Worker.new)
  puts "Worker running at #{DRb.uri}"
end

# Create a local end-point for each service
workers = NUM_WORKERS.times.map { |i| DRbObject.new nil, "druby://:700#{i}" }

# Concurrently wait for the fastest calculation
thread_pool = []
NUM_WORKERS.times do |i|
  thread_pool << Thread.new do
    answer = workers[i].calculate
    puts "Worker #{i} finished in #{answer} seconds"
  end
end

# Wait for every thread to get its answer
thread_pool.each(&:join)

# Shut down each worker
workers.each { |w| w.stop }

Перемещение объектов между процессами

Поскольку низкоуровневые коммуникационные конструкции, такие как каналы и сокеты, передают байты, а не объекты, вам нужно будет закодировать ваши объекты в байтовый формат — то есть сериализовать их — чтобы переместить их через барьер процесса.

К счастью, Ruby поставляется с Marshalбольшинство объектов Ruby.

Из Ruby-Doc.org : «Некоторые объекты не могут быть выгружены: если объекты, которые должны быть выгружены, включают в себя привязки, объекты процедур или методов, экземпляры класса IO или одноэлементные объекты, будет вызвана ошибка TypeError ».

 serialization.rb
# Ruby objects can be serialized by Marshal to move across pipes
# or sockets

Tire = Struct.new(:radius, :pressure)

reader, writer = IO.pipe

fork do
  reader.close
  tire = Tire.new(7, 28)
  tire_data = Marshal.dump(tire)
  writer.write tire_data
end

writer.close
tire_data = reader.gets
tire = Marshal.load(tire_data)
puts tire.inspect

Создание модуля для асинхронных вызовов методов

Если вы можете перемещать объекты между процессами, вы можете выполнить метод в другом процессе и получить результат обратно в оригинале. Это позволяет эффективно выполнять асинхронный метод.

Здесь у меня есть модуль с именем Forkable, который дает классу возможность выполнять методы параллельно. Единственное, что отличается от предыдущего — это то, что канал читается из нового потока, а то, что выходит из канала, передается блоку.

 forkable.rb
# A module that lets you fork a method and get its result in a block
# Notes: 
#   1. Forked methods can't take blocks in this implementation
#   2. Call back threads will be destroyed if the main process exits
#   3. Not production ready. For educational purposes only.
####################################################################

# If included in a class #fork_method will run a method in another process
# and yield the result to a block
module Forkable
  def fork_method(method, *args)
    reader, writer = IO.pipe
    fork do
      reader.close
      result = self.send(method, *args)
      child_data = Marshal.dump(result)
      writer.puts(child_data)
    end

    writer.close
    Thread.new do
      data_from_child = reader.gets
      yield Marshal.load(data_from_child) if block_given?
    end
  end
end

# An object that takes a random amount of time to instantiate
class ExpensiveObject

  attr_reader :expense

  def initialize(max_expense)
    @expense = rand(1..max_expense)
    sleep @expense
  end
end

# A worker that forks, a...forker
class Forker
  include Forkable

  def calculate(max_expense)
    return ExpensiveObject.new(max_expense)
  end
end

# Create 3 ExpensiveObjects and print how long they took to create
f = Forker.new
3.times do 
  f.fork_method(:calculate, 7) do |result|
    puts "result: #{result.inspect}"
  end
end

puts "waiting on results..."

Process.waitall
puts "main process finished"

треска

Cod — это библиотека, цель которой — облегчить IPC в Ruby. В предыдущем разделе я продемонстрировал, как сериализовать объекты для передачи через механизмы передачи байтов, такие как каналы или сокеты. Треска позаботится об этом за нас. Он использует механизмы IPC более высокого уровня, которые он называет каналами. Мало того, что каналы не нужно закрывать на одном конце в каждом процессе, они также могут передавать объекты Ruby. Канал создается в Cod (как ни странно) с Cod#pipe

 cod.rb
# The cod gem simplifies IPC. Ruby objects can be sent across channels.
# Installation: 
#   $ gem install cod

require 'cod'

Tire = Struct.new(:radius, :pressure)

channel = Cod.pipe

3.times do
  fork do
    radius = rand(8..14)
    pressure = rand(24..33)
    channel.put Tire.new(radius, pressure)
  end
end

tires = 3.times.map { channel.get }
puts tires.inspect

Gem cod делает больше, чем я могу описать здесь (например, он имеет интеграцию beanstalkd). На сайте есть много примеров, так что обязательно посмотрите.

Предварительно разветвленные серверы

Давайте посмотрим на общий пример разветвления на практике: серверы. Некоторые серверы, использующие разветвление для достижения параллелизма, включают в себя:

  • Apache (модуль многопроцессорной обработки Prefork)
  • единорог
  • Rainbows! (на основе единорога)

В частности, на серверах разветвления обычно используется предварительное разветвление. Концепция работает следующим образом: создание процесса может быть дешевле, чем копирование, но это не бесплатно. Поскольку работа веб-сервера обычно состоит в открытии и закрытии большого количества недолговечных соединений, может быть не очень хорошая идея ждать создания нового дочернего процесса, пока не будет принято новое соединение. Вместо этого мы можем несколько раз продолжить работу и дать вилкам ждать соединения. Вы часто увидите, что это описывается как запуск «пула процессов».

Так как это может выглядеть?

 preforking.rb
# Starts an echo server that can service 3 clients in parallel

require 'socket'

server = TCPServer.new 'localhost', 3000

trap("EXIT") { socket.close }
trap("INT") { exit }

3.times do
  fork do
    sock = server.accept
    sock.puts "You are connected to process #{$$}:"
    while recv = sock.gets
      sock.puts("ECHO:> #{recv}")
    end
  end
end

Process.waitall

Если вы запустите этот код, вы сможете подключиться к 3 отдельным терминалам.

 telnet localhost 3000

Параллельная жемчужина

Параллельная жемчужина обеспечивает возможность достижения параллелизма в CRuby (MRI), не вдаваясь в мелочи вилок и IPC. Вот быстрый способ распределить все ваши ядра процессора параллельно.

 parallel.rb
# Parallel iterates across collections with processes or threads
# Installation: 
#   $ gem install parallel

require 'parallel'

def calculate(magnitude)
  x = 0
  cycles = 10 ** magnitude
  cycles.times do
    x += 1.000001
  end
  return x
end

results = Parallel.map([6, 6, 6, 6, 7, 7, 7, 7]) do |mag|
  calculate(mag)
end

puts results

У SitePoint уже есть отличная статья о параллельном геме, который исследует его внутреннюю работу (по крайней мере, в 2011 году).

Программисты часто избегают смотреть на исходный код, опасаясь, что он превратится в огромную потерю времени. Параллельный драгоценный камень, однако, является довольно маленькой, одной файловой библиотекой. Есть отличная возможность учиться и вносить свой вклад.

Вывод

Даже если другие интерпретаторы (JRuby, Rubinius) работают свободно без глобальной блокировки интерпретаторов, процессно-ориентированный подход к параллелизму все еще полезен. Чем более вертикальный подход к масштабированию, тем более ограниченным он будет. Запуск нескольких потоков на нескольких ядрах может быть более горизонтальным, чем увеличение тактовой частоты, но все же более вертикальным, чем запуск нескольких машин. Запуск 100 одноядерных экземпляров более детализирован, чем запуск 25 четырехъядерных экземпляров. Мышление с точки зрения процессов, отправляющих сообщения, позволяет вам принять такую ​​гранулярность. Это будет непросто, но если он работает с кластером из 10 экземпляров, почему бы не 100? Или 1000? Почему бы не иметь 500 на одного провайдера и 500 на другом? Они просто процессы отправки сообщений.

Тратить кучу денег на ресурсы, которые вы можете или не можете использовать, представляет собой старый образ мышления. Зачем покупать больше, чем именно то, что вам нужно?