Учебники

Рубин – Многопоточность

Традиционные программы имеют один поток выполнения: операторы или инструкции, составляющие программу, выполняются последовательно до тех пор, пока программа не завершится.

Многопоточная программа имеет более одного потока выполнения. Внутри каждого потока операторы выполняются последовательно, но сами потоки могут выполняться параллельно, например, на многоядерном процессоре. Часто на одной машине с ЦП несколько потоков фактически не выполняются параллельно, но параллелизм моделируется путем чередования выполнения потоков.

Ruby облегчает написание многопоточных программ с помощью класса Thread . Рубиновые потоки – это легкий и эффективный способ достижения параллелизма в вашем коде.

Создание рубиновых потоков

Чтобы начать новый поток, просто свяжите блок с вызовом Thread.new . Будет создан новый поток для выполнения кода в блоке, а исходный поток немедленно вернется из Thread.new и возобновит выполнение с помощью следующего оператора –

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

пример

Вот пример, который показывает, как мы можем использовать многопоточную программу Ruby.

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

Это даст следующий результат –

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

Жизненный цикл темы

Новые темы создаются с помощью Thread.new . Вы также можете использовать синонимы Thread.start и Thread.fork .

Нет необходимости запускать поток после его создания, он начинает работать автоматически, когда становятся доступными ресурсы ЦП.

Класс Thread определяет ряд методов для запроса и управления потоком во время его работы. Поток запускает код в блоке, связанном с вызовом Thread.new, и затем останавливается.

Значение последнего выражения в этом блоке является значением потока и может быть получено путем вызова метода value объекта Thread. Если поток завершился, то значение сразу возвращает значение потока. В противном случае метод значения блокируется и не возвращается, пока поток не завершится.

Метод класса Thread.current возвращает объект Thread, представляющий текущий поток. Это позволяет потокам манипулировать собой. Метод класса Thread.main возвращает объект Thread, представляющий основной поток. Это начальный поток выполнения, который начался при запуске программы Ruby.

Вы можете дождаться завершения определенного потока, вызвав метод Thread.join этого потока. Вызывающий поток будет блокироваться, пока данный поток не будет завершен.

Потоки и исключения

Если исключение возникает в главном потоке и нигде не обрабатывается, интерпретатор Ruby печатает сообщение и завершает работу. В потоках, отличных от основного потока, необработанные исключения приводят к прекращению работы потока.

Если поток t завершается из-за необработанного исключения, а другой поток s вызывает t.join или t.value, то исключение, возникшее в t, вызывается в потоке s .

Если Thread.abort_on_exception имеет значение false , условие по умолчанию, необработанное исключение просто убивает текущий поток, а все остальные продолжают работать.

Если вы хотите, чтобы любое необработанное исключение в каком-либо потоке вызывало выход интерпретатора, установите для метода класса Thread.abort_on_exception значение true .

t = Thread.new { ... }
t.abort_on_exception = true

Переменные потока

Поток может обычно обращаться к любым переменным, которые находятся в области видимости при создании потока. Переменные, локальные для блока потока, являются локальными для потока и не являются общими.

Класс потока имеет специальную возможность, позволяющую создавать локальные переменные потока и обращаться к ним по имени. Вы просто обрабатываете объект потока, как если бы это был Hash, записывая в элементы с помощью [] = и читая их обратно с помощью [].

В этом примере каждый поток записывает текущее значение счетчика переменных в локальную переменную с ключом mycount .

Live Demo

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

Это дает следующий результат –

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

Основной поток ожидает завершения подпотоков, а затем выводит значение счетчика, захваченного каждым.

Приоритеты потоков

Первым фактором, влияющим на планирование потоков, является приоритет потоков: потоки с высоким приоритетом планируются перед потоками с низким приоритетом. Точнее, поток получит процессорное время, только если нет потоков с более высоким приоритетом, ожидающих запуска.

Вы можете установить и запросить приоритет объекта Ruby Thread с приоритетом = и приоритетом . Вновь созданный поток начинается с того же приоритета, что и поток, который его создал. Основной поток начинается с приоритета 0.

Нет никакого способа установить приоритет потока прежде, чем он начнет работать. Тем не менее, поток может повысить или понизить свой собственный приоритет как первое действие, которое он предпринимает.

Исключение потоков

Если два потока совместно используют доступ к одним и тем же данным, и хотя бы один из потоков изменяет эти данные, вы должны быть особенно внимательны, чтобы убедиться, что ни один поток не сможет увидеть данные в несогласованном состоянии. Это называется исключением потока .

Mutex – это класс, который реализует простую семафорную блокировку для взаимоисключающего доступа к некоторому общему ресурсу. То есть только один поток может удерживать блокировку в данный момент времени. Другие потоки могут ожидать очереди, чтобы блокировка стала доступной, или просто получать немедленную ошибку, указывающую, что блокировка недоступна.

Размещая все доступы к совместно используемым данным под контролем мьютекса , мы гарантируем согласованность и атомарную работу. Давайте попробуем привести примеры, первый без мутатакса, а второй с мутаксом –

Пример без Mutax

Live Demo

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

Это даст следующий результат –

count1 :  1583766
count2 :  1583766
difference : 0

Live Demo

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

Это даст следующий результат –

count1 :  696591
count2 :  696591
difference : 0

Обработка тупика

Когда мы начинаем использовать объекты Mutex для исключения потоков, мы должны быть осторожны, чтобы избежать тупиков . Deadlock – это состояние, которое возникает, когда все потоки ожидают получения ресурса, удерживаемого другим потоком. Поскольку все потоки заблокированы, они не могут снять блокировки, которые они удерживают. И поскольку они не могут снять блокировки, никакой другой поток не может получить эти блокировки.

Это где условные переменные входят в картину. Переменная условия – это просто семафор, который связан с ресурсом и используется в защите определенного мьютекса . Когда вам нужен ресурс, который недоступен, вы ждете переменную условия. Это действие снимает блокировку с соответствующего мьютекса . Когда какой-то другой поток сообщает, что ресурс доступен, исходный поток выходит из режима ожидания и одновременно восстанавливает блокировку в критической области.

пример

Live Demo

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

Это даст следующий результат –

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

Состояния потоков

Существует пять возможных возвращаемых значений, соответствующих пяти возможным состояниям, как показано в следующей таблице. Метод status возвращает состояние потока.

Состояние потока Возвращаемое значение
Runnable бежать
Спать Спать
Aborting отбрасывание
Завершается нормально ложный
Прекращено за исключением ноль

Методы класса потока

Следующие методы предоставляются классом Thread, и они применимы ко всем потокам, доступным в программе. Эти методы будут вызываться как использующие имя класса Thread следующим образом:

Thread.abort_on_exception = true

Thread.abort_on_exception

Возвращает статус глобального прерывания при условии исключения . По умолчанию установлено значение false . При значении true все потоки будут прерваны (процесс завершит работу (0)), если в каком-либо потоке возникнет исключение

Thread.abort_on_exception =

Если установлено значение true , все потоки будут прерваны, если возникнет исключение. Возвращает новое состояние.

Thread.critical

Возвращает состояние критического состояния глобального потока .

Thread.critical =

Устанавливает состояние критического состояния глобального потока и возвращает его. При значении true запрещает планирование любого существующего потока. Не блокирует создание и запуск новых потоков. Некоторые операции с потоками (такие как остановка или уничтожение потока, переход в спящий режим в текущем потоке и создание исключения) могут привести к планированию потока даже в критической секции.

Thread.current

Возвращает текущий исполняемый поток.

Thread.exit

Завершает текущий запущенный поток и планирует запуск другого потока. Если этот поток уже помечен для уничтожения, exit возвращает поток . Если это основной поток или последний поток, выйдите из процесса.

Thread.fork {блок}

Синоним для Thread.new.

Thread.kill (aThread)

Заставляет данную нить выйти

Thread.list

Возвращает массив объектов Thread для всех потоков, которые могут быть запущены или остановлены. Нить.

Thread.main

Возвращает основной поток для процесса.

Thread.new ([arg] *) {| арги | блок}

Создает новый поток для выполнения инструкций, указанных в блоке, и начинает его выполнение. Все аргументы, переданные в Thread.new , передаются в блок.

Thread.pass

Вызывает планировщик потока, чтобы передать выполнение другому потоку.

Thread.start ([аргументы] *) {| арги | блок}

В основном так же, как Thread.new . Однако, если класс Thread является подклассом, то вызов start в этом подклассе не вызовет метод инициализации подкласса.

Thread.stop

Останавливает выполнение текущего потока, переводит его в состояние сна и планирует выполнение другого потока. Сбрасывает критическое состояние на ложное.

Методы экземпляра потока

Эти методы применимы к экземпляру потока. Эти методы будут вызываться как использующие экземпляр Thread следующим образом:

ч [символ]

Ссылка на атрибут – возвращает значение локальной переменной потока, используя символ или имя aSymbol . Если указанная переменная не существует, возвращает nil .

thr [aSymbol] =

Назначение атрибута – Устанавливает или создает значение локальной переменной потока, используя символ или строку.

thr.abort_on_exception

Возвращает статус прерывания при условии исключения для thr . По умолчанию установлено значение false .

thr.abort_on_exception =

При значении true все потоки (включая основную программу) отменяются, если в thr вызывается исключение. Процесс будет эффективно завершен (0) .

thr.alive?

Возвращает true, если thr работает или спит.

thr.exit

Завершает работу thr и планирует запуск другого потока. Если этот поток уже помечен для уничтожения, exit возвращает поток. Если это основной поток или последний поток, выход из процесса.

thr.join

Вызывающий поток приостановит выполнение и запустит thr . Не возвращается, пока не выйдет. Любые не присоединенные потоки будут уничтожены при выходе из основной программы.

thr.key?

Возвращает true, если данная строка (или символ) существует как локальная переменная потока.

thr.kill

Синоним для Thread.exit .

thr.priority

Возвращает приоритет thr . По умолчанию ноль; потоки с более высоким приоритетом будут работать до потоков с более низким приоритетом.

thr.priority =

Устанавливает приоритет thr в Integer. Потоки с более высоким приоритетом будут работать до потоков с более низким приоритетом.

thr.raise (исключение)

Возникает исключение из тр . Звонящий не должен быть тр .

thr.run

Просыпается , делая его подходящим для планирования. Если не в критическом разделе, то вызывает планировщик.

thr.safe_level

Возвращает действующий безопасный уровень для thr .

thr.status

Возвращает состояние thr : sleep, если thr спит или ожидает ввода-вывода, запускается, если выполняется thr , false, если thr завершается нормально, и nil, если thr завершается с исключением.

thr.stop?

Возвращает true, если thr мертв или спит.

thr.value

Ожидает завершения thr через Thread.join и возвращает его значение.

thr.wakeup

Помечает как подходящее для планирования, однако может все еще оставаться заблокированным при вводе / выводе.