Статьи

Изучите параллелизм путем реализации фьючерсов в Ruby

Абстрактные обои с бизнес-концепцией

Фьючерсы — это абстракция параллелизма, представляющая результат асинхронного вычисления . Это означает, что когда вы даете будущему какое-то вычисление для обработки, это делается в отдельном потоке. Поэтому основной поток может свободно выполнять другую обработку. В тот момент, когда вам нужен результат из будущего, вы можете попросить его. Если он все еще обрабатывает вычисления, основной поток блокируется. В противном случае результат возвращается.

В этой статье мы будем реализовывать нашу собственную библиотеку Futures в Ruby. Попутно вы узнаете больше о некоторых библиотеках параллелизма, которые предоставляет Ruby, и о некоторых забавных трюках с Ruby! Давайте погрузимся прямо в.

Как фьючерсы полезны?

Прежде чем мы начнем, это немного поможет понять, как Futures может быть полезен для нас. Фьючерсы являются идеальным кандидатом для одновременных HTTP-запросов. Давайте начнем с простого приложения Ruby, которое выбирает случайные шутки Чака Норриса из базы данных Чака Норриса в Интернете :

require 'open-uri' require 'json' require 'benchmark' class Chucky URL = 'http://api.icndb.com/jokes/random' def sequential open(URL) do |f| f.each_line { |line| puts JSON.parse(line)['value']['joke'] } end end end 

Чтобы запустить это приложение, сохраните вышеупомянутый файл как chucky.rb и запустите программу следующим образом:

 % irb > require "./chucky" => true > chucky = Chucky.new => #<Chucky:0x007fe02c046d98> > chucky.sequential Contrary to popular belief, the Titanic didn't hit an iceberg. The ship was off course and ran into Chuck Norris while he was doing the backstroke across the Atlantic. 

Каждый раз, когда вы выполняете chucky.sequential , программа будет извлекать случайную шутку Чака Норриса. (Предупреждение: это вызывает сильную зависимость!) Что произойдет, если мы захотим получить больше, чем, скажем, десять шуток? Наивное решение выглядит примерно так:

 10.times { chucky.sequential } 

К сожалению, это крайняя трата ресурсов процессора и вашего времени. Пока выполняется каждый запрос, основной поток блокируется и должен дождаться завершения запроса, прежде чем перейти к следующему. Мы исправим это, внедрив собственную абстракцию Futures (как это делают все хорошие разработчики).

Реализация вашего собственного будущего: сначала протестируйте!

Мы собираемся внедрить нашу драгоценность Futures с помощью Test-Driven Development (TDD). Давай сделаем это! Мы начнем с создания нового драгоценного камня Ruby, используя bundle gem <gem name> :

 % bundle gem futurama Creating gem 'futurama'... MIT License enabled in config create futurama/Gemfile create futurama/.gitignore create futurama/lib/futurama.rb create futurama/lib/futurama/version.rb create futurama/futurama.gemspec create futurama/Rakefile create futurama/README.md create futurama/bin/console create futurama/bin/setup create futurama/LICENSE.txt create futurama/.travis.yml create futurama/.rspec create futurama/spec/spec_helper.rb create futurama/spec/futurama_spec.rb Initializing git repo in futurama 

Затем запустите bin/setup чтобы ввести зависимости:

 % bin/setup Resolving dependencies... Using rake 10.4.2 Using bundler 1.10.6 Using diff-lcs 1.2.5 Using futurama 0.1.0 from source at . Using rspec-support 3.3.0 Using rspec-core 3.3.2 Using rspec-expectations 3.3.1 Using rspec-mocks 3.3.2 Using rspec 3.3.0 Bundle complete! 4 Gemfile dependencies, 9 gems now installed. Use `bundle show [gemname]` to see where a bundled gem is installed. 

Примечание. Если вы не видите установку гемов RSpec, добавьте spec.add_development_dependency "rspec" в файл futurama.gemspec .

Первые шаги

Наш первый тест преднамеренно прост:

 require 'spec_helper' require 'timeout' module Futurama describe 'Future' do it 'returns a value' do future = Future.new { 1 + 2 } expect(future).to eq(3) end end end 

Этот тест описывает интерфейс создания Future . Объект Future принимает блок вычислений. Кроме того, когда к объекту обращаются, возвращаемое значение является результатом вычисления в этом блоке (это второе условие не так просто, как кажется).

Затем создайте файл с именем future.rb в lib / futurama / future.rb . Затем убедитесь, что вам нужен файл future.rb в lib / futurama.rb примерно так:

 require "futurama/version" require "futurama/future" module Futurama end 

Чтобы пройти тест, объект Future должен:

  • принять блок
  • вернуть значение блока при его вызове

Удовлетворить первое условие достаточно просто:

 module Futurama class Future def initialize(&block) @block = block end end end 

Другое условие немного сложнее:

 require 'delegate' module Futurama class Future < Delegator # initialize was here def __getobj__ @block.call end end end 

Когда мы создаем подкласс встроенного класса Delegator , мы должны реализовать метод __getobj__ , иначе Ruby будет жаловаться. Но опять же, в этом весь смысл использования класса Delegator ! Возвращаемое значение этого метода в основном то, что возвращается при доступе к объекту. Другими словами, мы можем переопределить, что это означает, когда к объекту обращаются, что именно то, что нам нужно! Мы вызываем блок из метода __getobj__ и при запуске тестов:

 % rspec Future returns a value Finished in 0.00077 seconds (files took 0.06902 seconds to load) 1 example, 0 failures 

Большой успех!

Выполнение вычислений в фоновом режиме

Вернемся к нашим тестам. Будущее берет вычисления и запускает их в потоке. Чтобы проверить это, мы можем создать Future и дать ему поспать одну секунду перед возвратом значения. В главном потоке мы также моделируем некоторые вычисления, которые занимают секунду:

 module Futurama describe 'Future' do it 'executes the computation in the background' do future = Future.new { sleep(1); 42 } sleep(1) # do some computation Timeout::timeout(0.9) do expect(future).to eq(42) end end end end 

То, что мы утверждаем, интересно. Поскольку Future работает в фоновом режиме так же, как основной поток спит в течение секунды, теоретически должно пройти менее 1 секунды, прежде чем будет возвращен результат из будущего. Мы используем встроенную библиотеку Timeout (мы сделали обязательный require 'timeout' в верхней части spec / futurama / future.rb ), чтобы убедиться, что будущее выполняется в пределах установленной нами временной границы.

 module Futurama class Future < Delegator def initialize(&block) @block = block @thread = Thread.new { run_future } end def run_future @block.call end def __getobj__ @thread.value end end end 

Мы run_future вызов блока в метод run_future и run_future в поток. Эта ветка запускается в момент создания Будущего. Как только поток завершается, доступ к возвращаемому значению осуществляется с использованием Thread#value , как видно из модифицированной реализации __getobj__ .

Запустите тесты и все должно быть зеленым.

Обработка исключений

Далее мы обращаем наше внимание на обработку исключений. Поскольку фьючерсы асинхронны, мы не хотим никаких неприятных сюрпризов в случае внезапного провала будущего. Вместо этого было бы неплохо в будущем сохранить исключение и вызвать его только тогда, когда мы его нажмем. Вот тест, который выражает намерение:

 module Futurama describe 'Future' do it 'captures exceptions and re-raises them' do error_msg = 'Good news, everyone!' future = Future.new { raise error_msg } expect { future.inspect }.to raise_error RuntimeError, error_msg end end end 

К счастью, вам не нужно ничего делать, чтобы пройти тест! Есть предостережение, хотя. Если для Thread.abort_on_exception задано значение true , необработанные исключения в любом потоке приведут к выходу интерпретатора. Не смешно. Давайте выставим эту проблему в тесте:

 module Futurama describe 'Future' do it 'captures exceptions and re-raises them' do Thread.abort_on_exception = true error_msg = 'Good news, everyone!' future = Future.new { raise error_msg } expect { future.inspect }.to raise_error RuntimeError, error_msg end end end 

С этой новой строкой тест больше не проходит. Что делать? Оказывается, вам нужно сделать больше работы. (Извините!) Вот где мы подошли к сути нашей реализации.

Очередь!

Из предыдущего раздела мы теперь должны найти способ хранить исключения, когда они происходят, вместо того, чтобы полагаться на интерпретатор. Кроме того, нам необходимо повторно вызвать исключение при вызове будущего объекта.

Прежде чем мы перейдем к реализации, давайте еще раз подумаем о будущем. В каком-то смысле это структура данных, в которой хранится разрешенное значение или исключение. Другим очень важным фактором является обеспечение безопасности потоков. Как мы можем представить это ??

В то время как в Ruby не существует большого количества потоковобезопасных коллекционных классов, Queue является исключением. Из описания класса:

Этот класс обеспечивает способ синхронизации связи между потоками.

Для нашей реализации напомним, что мы храним разрешенное значение или исключение. Поэтому нам нужна Queue с максимальным размером один . SizedQueue на помощь!

 module Futurama class Future < Delegator def initialize(&block) @block = block @queue = SizedQueue.new(1) @thread = Thread.new { run_future } end def run_future @queue.push(value: @block.call) rescue Exception => ex @queue.push(exception: ex) end def __getobj__ # this will change in the next section end end end 

Добавлен новый экземпляр переменной, @queue , который является SizedQueue емкостью один. Затем мы модифицируем run_future чтобы выдвинуть результат блока или исключение, если оно произошло. Поскольку мы используем SizedQueue , мы гарантируем, что в очередь не будет SizedQueue два элемента.

Получение результата или исключения из очереди

Далее нам нужно решить проблему получения результата или исключения из SizedQueue . Еще одна вещь, которую нужно иметь в виду, это то, что как только будущее разрешает значение, это делается. В следующий раз, когда вы получите значение из будущего, результат будет немедленным. Другими словами, Future запоминает значение / исключение после его разрешения.

 module Futurama class Future < Delegator def initialize(&block) @block = block @queue = SizedQueue.new(1) @thread = Thread.new { run_future } @mutex = Mutex.new end def __getobj__ resolved_future_or_raise[:value] end def resolved_future_or_raise @resolved_future || @mutex.synchronize do @resolved_future ||= @queue.pop end Kernel.raise @resolved_future[:exception] if @resolved_future[:exception] @resolved_future end end end 

Давайте сосредоточимся на методе resolved_future_or_raise :

 @resolved_future || @mutex.synchronize do @resolved_future ||= @queue.pop end 

Здесь, прежде всего, проверьте, разрешено ли будущее. Это просто причудливый способ сказать, что будущее завершило вычисление значения или исключения. В противном случае @queue это значение / исключение из @queue . Нам нужно убедиться, что операции @resolved_future очереди и присвоения результата @resolved_future выполняются атомарно . Другими словами, мы должны гарантировать, что чередования никогда не произойдет. Поэтому мы @mutex операции в @mutex :

 Kernel.raise @resolved_future[:exception] if @resolved_future[:exception] @resolved_future 

Теперь проверьте, есть ли у @resolved_future исключение. Если так, поднимите это. Обратите внимание, что мы используем Kernel#raise . Без указания Kernel будет использоваться Thread#raise .

Наконец, если исключений нет, будет возвращено разрешенное значение. Запустите тесты снова, и все должно быть зеленого цвета!

Экономия на нажатиях клавиш (или: загрязнение пространства имен ядра)

Не нужно набирать Futurama::Future.new { } . Что если бы мы могли просто в future{ } . В Ruby это тривиально. Давайте сначала напишем тест для этого:

 module Futurama describe 'Future' do it 'pollutes the Kernel namespace' do msg = 'Do the Bender!' future = future { msg } expect(future).to eq(msg) end end end 

Запустите тесты, и это не удастся:

 NoMethodError: undefined method `future' for #<RSpec::ExampleGroups::Future:0x007fd274988390> ./spec/futurama/future_spec.rb:67:in `block (2 levels) in <module:Futurama>' -e:1:in `load' -e:1:in `<main>' 

Мы можем легко это исправить, создав файл kernel.rb в lib / futurama :

 require 'futurama' module Kernel def future(&block) Futurama::Future.new(&block) end end 

Добавьте оператор require для этого файла в lib / futurama.rb и снова запустите тесты. Мы вернулись к зеленому!

Получение ценности или исключения из будущего

В настоящее время доступ к значению или исключению можно получить из __getobj__ . Очевидно, мы не ожидаем, что клиентский код узнает о __getobj__ . Вместо этого мы можем псевдоним этого value :

 module Futurama describe 'Future' do it 'allows access to its value' do val = 10 future = Future.new { val } expect(future.value).to eq(val) end end end 

Неудивительно, что тест провалится с:

 NoMethodError: undefined method `value' for 10:Futurama::Future ./spec/futurama/future_spec.rb:76:in `block (2 levels) in <module:Futurama>' -e:1:in `load' -e:1:in `<main>' 

Код для прохождения теста является однострочным:

 require 'thread' module Futurama class Future < Delegator def __getobj__ resolved_future_or_raise[:value] end # place this *below* __getobj__ alias_method :value, :__getobj__ end end 

Теперь все должно быть зеленым!

Давайте попробуем несколько шуток Чака Норриса!

Давайте снова посмотрим на chucky.rb . Обратите внимание, что я поместил этот файл в sample / chucky.rb из futurama .

 require '../lib/futurama' require 'open-uri' require 'json' require 'benchmark' class Chucky URL = 'http://api.icndb.com/jokes/random' def sequential open(URL) do |f| f.each_line { |line| puts JSON.parse(line)['value']['joke'] } end end def concurrent future { sequential } end end 

Для выполнения одновременного поиска шутки Чака Норриса нужно всего лишь одно крошечное изменение:

 def concurrent future { sequential } end 

Это оно! Это может показаться немного анти-климатическим, но мы почти закончили. Чтобы проверить это, мы можем сравнить параллельную версию с последовательной:

 chucky = Chucky.new Benchmark.bm do |x| x.report('concurrent') { 10.times { chucky.concurrent } } x.report('sequential') { 10.times { chucky.sequential } } end 

Или, если вам лень, вы можете увидеть результаты на моей машине:

Ограничения, благодарности и где узнать больше

Создание неограниченного числа фьючерсов приведет к созданию неограниченного числа потоков. Это явно плохо. Такие языки, как Java, имеют исполнителей пула потоков, которые управляют пулом потоков. Это не очень сложно реализовать в Ruby.

Эта статья была вдохновлена ​​изучением исходного кода драгоценного камня Futuroscope . Фактически, его реализация имеет пул потоков, который передается в конструктор Future.

Если вы хотите узнать больше об абстракциях параллелизма, созданных в Ruby, не ищите ничего, кроме гема Concurrent Ruby . Он состоит из множества инструментов параллелизма, таких как Агенты, Пулы потоков, Супервизоры и, да, Futures! Предоставленная документация также очень удобочитаема.

Спасибо за прочтение!

Надеюсь, вам было интересно узнать о Futures и, надеюсь, больше о Ruby. Хотя Ruby по умолчанию не имеет зрелой библиотеки для параллелизма, такие гемы, как Concurrency Ruby, предоставляют отличную возможность узнать об этих инструментах параллелизма на вашем любимом языке.