Традиционные программы имеют один поток выполнения: операторы или инструкции, составляющие программу, выполняются последовательно до тех пор, пока программа не завершится.
Многопоточная программа имеет более одного потока выполнения. Внутри каждого потока операторы выполняются последовательно, но сами потоки могут выполняться параллельно, например, на многоядерном процессоре. Часто на одной машине с ЦП несколько потоков фактически не выполняются параллельно, но параллелизм моделируется путем чередования выполнения потоков.
Ruby облегчает написание многопоточных программ с помощью класса Thread . Рубиновые потоки — это легкий и эффективный способ достижения параллелизма в вашем коде.
Создание рубиновых потоков
Чтобы начать новый поток, просто свяжите блок с вызовом Thread.new . Будет создан новый поток для выполнения кода в блоке, а исходный поток немедленно вернется из Thread.new и возобновит выполнение с помощью следующего оператора —
# Thread #1 is running here Thread.new { # Thread #2 runs this code } # Thread #1 runs this code
пример
Вот пример, который показывает, как мы можем использовать многопоточную программу Ruby.
#!/usr/bin/ruby def func1 i = 0 while i<=2 puts "func1 at: #{Time.now}" sleep(2) i = i+1 end end def func2 j = 0 while j<=2 puts "func2 at: #{Time.now}" sleep(1) j = j+1 end end puts "Started At #{Time.now}" t1 = Thread.new{func1()} t2 = Thread.new{func2()} t1.join t2.join puts "End at #{Time.now}"
Это даст следующий результат —
Started At Wed May 14 08:21:54 -0700 2008 func1 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:54 -0700 2008 func2 at: Wed May 14 08:21:55 -0700 2008 func1 at: Wed May 14 08:21:56 -0700 2008 func2 at: Wed May 14 08:21:56 -0700 2008 func1 at: Wed May 14 08:21:58 -0700 2008 End at Wed May 14 08:22:00 -0700 2008
Жизненный цикл темы
Новые темы создаются с помощью Thread.new . Вы также можете использовать синонимы Thread.start и Thread.fork .
Нет необходимости запускать поток после его создания, он начинает работать автоматически, когда становятся доступными ресурсы ЦП.
Класс Thread определяет ряд методов для запроса и управления потоком во время его работы. Поток запускает код в блоке, связанном с вызовом Thread.new, и затем останавливается.
Значение последнего выражения в этом блоке является значением потока и может быть получено путем вызова метода value объекта Thread. Если поток завершился, то значение сразу возвращает значение потока. В противном случае метод значения блокируется и не возвращается, пока поток не завершится.
Метод класса Thread.current возвращает объект Thread, представляющий текущий поток. Это позволяет потокам манипулировать собой. Метод класса Thread.main возвращает объект Thread, представляющий основной поток. Это начальный поток выполнения, который начался при запуске программы Ruby.
Вы можете дождаться завершения определенного потока, вызвав метод Thread.join этого потока. Вызывающий поток будет блокироваться, пока данный поток не будет завершен.
Потоки и исключения
Если исключение возникает в главном потоке и нигде не обрабатывается, интерпретатор Ruby печатает сообщение и завершает работу. В потоках, отличных от основного потока, необработанные исключения приводят к прекращению работы потока.
Если поток t завершается из-за необработанного исключения, а другой поток s вызывает t.join или t.value, то исключение, возникшее в t, вызывается в потоке s .
Если Thread.abort_on_exception имеет значение false , условие по умолчанию, необработанное исключение просто убивает текущий поток, а все остальные продолжают работать.
Если вы хотите, чтобы любое необработанное исключение в каком-либо потоке вызывало выход интерпретатора, установите для метода класса Thread.abort_on_exception значение true .
t = Thread.new { ... } t.abort_on_exception = true
Переменные потока
Поток может обычно обращаться к любым переменным, которые находятся в области видимости при создании потока. Переменные, локальные для блока потока, являются локальными для потока и не являются общими.
Класс потока имеет специальную возможность, позволяющую создавать локальные переменные потока и обращаться к ним по имени. Вы просто обрабатываете объект потока, как если бы это был Hash, записывая в элементы с помощью [] = и читая их обратно с помощью [].
В этом примере каждый поток записывает текущее значение счетчика переменных в локальную переменную с ключом mycount .
#!/usr/bin/ruby count = 0 arr = [] 10.times do |i| arr[i] = Thread.new { sleep(rand(0)/10.0) Thread.current["mycount"] = count count += 1 } end arr.each {|t| t.join; print t["mycount"], ", " } puts "count = #{count}"
Это дает следующий результат —
8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10
Основной поток ожидает завершения подпотоков, а затем выводит значение счетчика, захваченного каждым.
Приоритеты потоков
Первым фактором, влияющим на планирование потоков, является приоритет потоков: потоки с высоким приоритетом планируются перед потоками с низким приоритетом. Точнее, поток получит процессорное время, только если нет потоков с более высоким приоритетом, ожидающих запуска.
Вы можете установить и запросить приоритет объекта Ruby Thread с приоритетом = и приоритетом . Вновь созданный поток начинается с того же приоритета, что и поток, который его создал. Основной поток начинается с приоритета 0.
Нет никакого способа установить приоритет потока прежде, чем он начнет работать. Тем не менее, поток может повысить или понизить свой собственный приоритет как первое действие, которое он предпринимает.
Исключение потоков
Если два потока совместно используют доступ к одним и тем же данным, и хотя бы один из потоков изменяет эти данные, вы должны быть особенно внимательны, чтобы убедиться, что ни один поток не сможет увидеть данные в несогласованном состоянии. Это называется исключением потока .
Mutex — это класс, который реализует простую семафорную блокировку для взаимоисключающего доступа к некоторому общему ресурсу. То есть только один поток может удерживать блокировку в данный момент времени. Другие потоки могут ожидать очереди, чтобы блокировка стала доступной, или просто получать немедленную ошибку, указывающую, что блокировка недоступна.
Размещая все доступы к совместно используемым данным под контролем мьютекса , мы гарантируем согласованность и атомарную работу. Давайте попробуем привести примеры, первый без мутатакса, а второй с мутаксом —
Пример без Mutax
#!/usr/bin/ruby require 'thread' count1 = count2 = 0 difference = 0 counter = Thread.new do loop do count1 += 1 count2 += 1 end end spy = Thread.new do loop do difference += (count1 - count2).abs end end sleep 1 puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
Это даст следующий результат —
count1 : 1583766 count2 : 1583766 difference : 0
#!/usr/bin/ruby require 'thread' mutex = Mutex.new count1 = count2 = 0 difference = 0 counter = Thread.new do loop do mutex.synchronize do count1 += 1 count2 += 1 end end end spy = Thread.new do loop do mutex.synchronize do difference += (count1 - count2).abs end end end sleep 1 mutex.lock puts "count1 : #{count1}" puts "count2 : #{count2}" puts "difference : #{difference}"
Это даст следующий результат —
count1 : 696591 count2 : 696591 difference : 0
Обработка тупика
Когда мы начинаем использовать объекты Mutex для исключения потоков, мы должны быть осторожны, чтобы избежать тупиков . Deadlock — это состояние, которое возникает, когда все потоки ожидают получения ресурса, удерживаемого другим потоком. Поскольку все потоки заблокированы, они не могут снять блокировки, которые они удерживают. И поскольку они не могут снять блокировки, никакой другой поток не может получить эти блокировки.
Это где условные переменные входят в картину. Переменная условия — это просто семафор, который связан с ресурсом и используется в защите определенного мьютекса . Когда вам нужен ресурс, который недоступен, вы ждете переменную условия. Это действие снимает блокировку с соответствующего мьютекса . Когда какой-то другой поток сообщает, что ресурс доступен, исходный поток выходит из режима ожидания и одновременно восстанавливает блокировку в критической области.
пример
#!/usr/bin/ruby require 'thread' mutex = Mutex.new cv = ConditionVariable.new a = Thread.new { mutex.synchronize { puts "A: I have critical section, but will wait for cv" cv.wait(mutex) puts "A: I have critical section again! I rule!" } } puts "(Later, back at the ranch...)" b = Thread.new { mutex.synchronize { puts "B: Now I am critical, but am done with cv" cv.signal puts "B: I am still critical, finishing up" } } a.join b.join
Это даст следующий результат —
A: I have critical section, but will wait for cv (Later, back at the ranch...) B: Now I am critical, but am done with cv B: I am still critical, finishing up A: I have critical section again! I rule!
Состояния потоков
Существует пять возможных возвращаемых значений, соответствующих пяти возможным состояниям, как показано в следующей таблице. Метод status возвращает состояние потока.
Состояние потока | Возвращаемое значение |
---|---|
Runnable | бежать |
Спать | Спать |
Aborting | отбрасывание |
Завершается нормально | ложный |
Прекращено за исключением | ноль |
Методы класса потока
Следующие методы предоставляются классом Thread, и они применимы ко всем потокам, доступным в программе. Эти методы будут вызываться как использующие имя класса Thread следующим образом:
Thread.abort_on_exception = true
Thread.abort_on_exception
Возвращает статус глобального прерывания при условии исключения . По умолчанию установлено значение false . При значении true все потоки будут прерваны (процесс завершит работу (0)), если в каком-либо потоке возникнет исключение
Thread.abort_on_exception =
Если установлено значение true , все потоки будут прерваны, если возникнет исключение. Возвращает новое состояние.
Thread.critical
Возвращает состояние критического состояния глобального потока .
Thread.critical =
Устанавливает состояние критического состояния глобального потока и возвращает его. При значении true запрещает планирование любого существующего потока. Не блокирует создание и запуск новых потоков. Некоторые операции с потоками (такие как остановка или уничтожение потока, переход в спящий режим в текущем потоке и создание исключения) могут привести к планированию потока даже в критической секции.
Thread.current
Возвращает текущий исполняемый поток.
Thread.exit
Завершает текущий запущенный поток и планирует запуск другого потока. Если этот поток уже помечен для уничтожения, exit возвращает поток . Если это основной поток или последний поток, выйдите из процесса.
Thread.fork {блок}
Синоним для Thread.new.
Thread.kill (aThread)
Заставляет данную нить выйти
Thread.list
Возвращает массив объектов Thread для всех потоков, которые могут быть запущены или остановлены. Нить.
Thread.main
Возвращает основной поток для процесса.
Thread.new ([arg] *) {| арги | блок}
Создает новый поток для выполнения инструкций, указанных в блоке, и начинает его выполнение. Все аргументы, переданные в Thread.new , передаются в блок.
Thread.pass
Вызывает планировщик потока, чтобы передать выполнение другому потоку.
Thread.start ([аргументы] *) {| арги | блок}
В основном так же, как Thread.new . Однако, если класс Thread является подклассом, то вызов start в этом подклассе не вызовет метод инициализации подкласса.
Thread.stop
Останавливает выполнение текущего потока, переводит его в состояние сна и планирует выполнение другого потока. Сбрасывает критическое состояние на ложное.
Методы экземпляра потока
Эти методы применимы к экземпляру потока. Эти методы будут вызываться как использующие экземпляр Thread следующим образом:
ч [символ]
Ссылка на атрибут — возвращает значение локальной переменной потока, используя символ или имя aSymbol . Если указанная переменная не существует, возвращает nil .
thr [aSymbol] =
Назначение атрибута — Устанавливает или создает значение локальной переменной потока, используя символ или строку.
thr.abort_on_exception
Возвращает статус прерывания при условии исключения для thr . По умолчанию установлено значение false .
thr.abort_on_exception =
При значении true все потоки (включая основную программу) отменяются, если в thr вызывается исключение. Процесс будет эффективно завершен (0) .
thr.alive?
Возвращает true, если thr работает или спит.
thr.exit
Завершает работу thr и планирует запуск другого потока. Если этот поток уже помечен для уничтожения, exit возвращает поток. Если это основной поток или последний поток, выход из процесса.
thr.join
Вызывающий поток приостановит выполнение и запустит thr . Не возвращается, пока не выйдет. Любые не присоединенные потоки будут уничтожены при выходе из основной программы.
thr.key?
Возвращает true, если данная строка (или символ) существует как локальная переменная потока.
thr.kill
Синоним для Thread.exit .
thr.priority
Возвращает приоритет thr . По умолчанию ноль; потоки с более высоким приоритетом будут работать до потоков с более низким приоритетом.
thr.priority =
Устанавливает приоритет thr в Integer. Потоки с более высоким приоритетом будут работать до потоков с более низким приоритетом.
thr.raise (исключение)
Возникает исключение из тр . Звонящий не должен быть тр .
thr.run
Просыпается , делая его подходящим для планирования. Если не в критическом разделе, то вызывает планировщик.
thr.safe_level
Возвращает действующий безопасный уровень для thr .
thr.status
Возвращает состояние thr : sleep, если thr спит или ожидает ввода-вывода, запускается, если выполняется thr , false, если thr завершается нормально, и nil, если thr завершается с исключением.
thr.stop?
Возвращает true, если thr мертв или спит.
thr.value
Ожидает завершения thr через Thread.join и возвращает его значение.
thr.wakeup
Помечает как подходящее для планирования, однако может все еще оставаться заблокированным при вводе / выводе.