В этой статье вы узнаете основы параллелизма в Elixir и узнаете, как создавать процессы, отправлять и получать сообщения, а также создавать длительные процессы. Также вы узнаете о GenServer, узнаете, как его можно использовать в вашем приложении, и узнаете о некоторых вкусностях, которые он предоставляет для вас.
Как вы, вероятно, знаете, Elixir — это функциональный язык, используемый для создания отказоустойчивых параллельных систем, которые обрабатывают множество одновременных запросов. BEAM (виртуальная машина Erlang) использует процессы для одновременного выполнения различных задач, что означает, например, что обслуживание одного запроса не блокирует другой. Процессы легки и изолированы, что означает, что они не разделяют какую-либо память, и даже если один процесс падает, другие могут продолжать работать.
Процессы BEAM сильно отличаются от процессов ОС . По сути, BEAM работает в одном процессе ОС и использует свои собственные планировщики . Каждый планировщик занимает одно ядро ЦП , работает в отдельном потоке и может одновременно обрабатывать тысячи процессов (которые по очереди выполняются). Вы можете прочитать немного больше о BEAM и многопоточности в StackOverflow.
Итак, как вы видите, процессы BEAM (я скажу только «процессы» с этого момента) очень важны в Elixir. Язык предоставляет вам несколько низкоуровневых инструментов для ручного запуска процессов, поддержания состояния и обработки запросов. Однако мало кто их использует — для этого более распространено использование платформы Open Telecom Platform (OTP) .
В настоящее время OTP не имеет ничего общего с телефонами — это универсальная структура для построения сложных параллельных систем. Он определяет, как должны быть структурированы ваши приложения, и предоставляет базу данных, а также набор очень полезных инструментов для создания серверных процессов, восстановления после ошибок, ведения журналов и т. Д. В этой статье мы поговорим о поведении сервера GenServer , которое предоставлено OTP.
Вы можете думать о GenServer как об абстракции или помощнике, который упрощает работу с серверными процессами. Во-первых, вы увидите, как порождать процессы, используя некоторые низкоуровневые функции. Затем мы переключимся на GenServer и посмотрим, как он упростит нам задачу, избавляя от необходимости каждый раз писать утомительный (и довольно общий) код. Давайте начнем!
Все начинается с появления
Если бы вы спросили меня, как создать процесс в Elixir, я бы ответил: породить его! spawn / 1 — это функция, определенная внутри модуля Kernel
которая возвращает новый процесс. Эта функция принимает лямбду, которая будет выполнена в созданном процессе. Как только выполнение завершится, процесс также завершится:
1
2
3
|
spawn(fn -> IO.puts(«hi») end) |> IO.inspect
# => hi
# => #PID<0.72.0>
|
Итак, здесь spawn
вернул новый идентификатор процесса. Если вы добавите задержку к лямбде, через некоторое время будет напечатана строка «hi»:
1
2
3
4
5
6
|
spawn(fn ->
:timer.sleep(5000)
IO.puts(«hi»)
end) |> IO.inspect
# => #PID<0.82.0>
# => (after 5 seconds) «hi»
|
Теперь мы можем порождать столько процессов, сколько захотим, и они будут выполняться одновременно:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
spawn_it = fn(num) ->
spawn(fn ->
:timer.sleep(5000)
IO.puts(«hi #{num}»)
end)
end
Enum.each(
1..10,
fn(_) -> spawn_it.(:rand.uniform(100)) end
)
# => (all printed out at the same time, after 5 seconds)
# => hi 5
# => hi 10 etc…
|
Здесь мы запускаем десять процессов и распечатываем тестовую строку со случайным числом. :rand
— это модуль, предоставленный Erlang, поэтому его имя — атом. Круто то, что все сообщения будут распечатаны одновременно, через пять секунд. Это происходит потому, что все десять процессов выполняются одновременно.
Сравните это со следующим примером, который выполняет ту же задачу, но без использования spawn/1
:
01
02
03
04
05
06
07
08
09
10
11
12
|
dont_spawn_it = fn(num) ->
:timer.sleep(5000)
IO.puts(«hi #{num}»)
end
Enum.each(
1..10,
fn(_) -> dont_spawn_it.(:rand.uniform(100)) end
)
# => (after 5 seconds) hi 70
# => (after another 5 seconds) hi 45
# => etc…
|
Пока этот код работает, вы можете пойти на кухню и приготовить еще одну чашку кофе, так как это займет около минуты. Каждое сообщение отображается последовательно, что, конечно, не оптимально!
Вы можете спросить: «Сколько памяти потребляет процесс?» Ну, это зависит, но изначально он занимает пару килобайт, что очень мало (даже мой старый ноутбук имеет 8 ГБ памяти, не говоря уже о крутых современных серверах).
Все идет нормально. Прежде чем мы начнем работать с GenServer, давайте обсудим еще одну важную вещь: передача и получение сообщений .
Работа с сообщениями
Неудивительно, что процессы (которые, как вы помните, изолированные) должны каким-то образом взаимодействовать, особенно когда речь идет о построении более или менее сложных систем. Чтобы достичь этого, мы можем использовать сообщения.
Сообщение может быть отправлено с помощью функции с совершенно очевидным именем: send / 2 . Он принимает пункт назначения (порт, идентификатор процесса или имя процесса) и фактическое сообщение. После отправки сообщения оно появляется в почтовом ящике процесса и может быть обработано. Как видите, общая идея очень похожа на нашу повседневную деятельность по обмену электронной почтой.
Почтовый ящик — это в основном очередь «первым пришел — первым обслужен» (FIFO) . После обработки сообщения оно удаляется из очереди. Чтобы начать получать сообщения, вам нужно — угадайте, что! — макрос приема . Этот макрос содержит одно или несколько предложений, и сообщение сопоставляется с ними. Если совпадение найдено, сообщение обрабатывается. В противном случае сообщение помещается обратно в почтовый ящик. Кроме того, вы можете установить необязательное условие after
, которое выполняется, если сообщение не было получено в данный момент времени. Вы можете прочитать больше о send/2
и receive
в официальных документах .
Ладно, хватит теории — давайте попробуем поработать с сообщениями. Прежде всего, отправьте что-нибудь текущему процессу:
1
|
send(self(), «hello!»)
|
Макрос self / 0 возвращает pid вызывающего процесса, и это именно то, что нам нужно. Не опускайте круглые скобки после функции, так как вы получите предупреждение о совпадении неоднозначности.
Теперь получите сообщение при установке предложения after
:
1
2
3
4
5
6
7
8
|
receive do
msg ->
IO.puts «Yay, a message: #{msg}»
msg
after 1000 -> IO.puts :stderr, «I want messages!»
end |> IO.puts
# => Yay, a message: hello!
# => hello!
|
Обратите внимание, что предложение возвращает результат оценки последней строки, поэтому мы получаем «привет!» строка.
Помните, что вы можете ввести столько пунктов, сколько необходимо:
01
02
03
04
05
06
07
08
09
10
|
send(self(), {:ok, «hello!»})
receive do
{:ok, msg} ->
IO.puts «Yay, a message: #{msg}»
msg
{:error, msg} -> IO.puts :stderr, «Oh no, something bad has happened: #{msg}»
_ -> IO.puts «I dunno what this message is…»
after 1000 -> IO.puts :stderr, «I want messages!»
end |> IO.puts
|
Здесь у нас есть четыре предложения: одно для обработки сообщения об успехе, другое для обработки ошибок, а затем предложение «отступления» и время ожидания.
Если сообщение не соответствует ни одному из пунктов, оно хранится в почтовом ящике, что не всегда желательно. Почему? Потому что всякий раз, когда приходит новое сообщение, старые обрабатываются в первом заголовке (поскольку почтовый ящик является очередью FIFO), что замедляет работу программы. Таким образом, «запасной» пункт может пригодиться.
Теперь, когда вы знаете, как создавать процессы, отправлять и получать сообщения, давайте рассмотрим несколько более сложный пример, который включает создание простого сервера, отвечающего на различные сообщения.
Работа с серверным процессом
В предыдущем примере мы отправили только одно сообщение, получили его и выполнили некоторую работу. Это хорошо, но не очень функционально. Обычно происходит то, что у нас есть сервер, который может отвечать на различные сообщения. Под «сервером» я подразумеваю длительный процесс, построенный с повторяющейся функцией. Например, давайте создадим сервер для выполнения некоторых математических уравнений. Он получит сообщение, содержащее запрошенную операцию и некоторые аргументы.
Начните с создания сервера и функции зацикливания:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
defmodule MathServer do
def start do
spawn &listen/0
end
defp listen do
receive do
{:sqrt, caller, arg} -> IO.puts arg
_ -> IO.puts :stderr, «Not implemented.»
end
listen()
end
end
|
Таким образом, мы запускаем процесс, который продолжает прослушивать входящие сообщения. После получения сообщения функция listen/0
вызывается снова, создавая бесконечный цикл. Внутри функции listen/0
мы добавляем поддержку сообщения :sqrt
, которое будет вычислять квадратный корень из числа. arg
будет содержать фактическое число для выполнения операции. Кроме того, мы определяем запасной пункт.
Теперь вы можете запустить сервер и присвоить его идентификатор процесса переменной:
1
2
3
|
math_server = MathServer.start
IO.inspect math_server
# => #PID<0.85.0>
|
Brilliant! Теперь давайте добавим функцию реализации для фактического выполнения вычисления:
1
2
3
4
5
6
|
defmodule MathServer do
# …
def sqrt(server, arg) do
send(:some_name, {:sqrt, self(), arg})
end
end
|
Используйте эту функцию сейчас:
1
2
|
MathServer.sqrt(math_server, 3)
# => 3
|
На данный момент он просто выводит переданный аргумент, так что настройте код следующим образом, чтобы выполнить математическую операцию:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
defmodule MathServer do
# …
defp listen do
receive do
{:sqrt, caller, arg} -> send(:some_name, {:result, do_sqrt(arg)})
_ -> IO.puts :stderr, «Not implemented.»
end
listen()
end
defp do_sqrt(arg) do
:math.sqrt(arg)
end
end
|
Теперь на сервер отправляется еще одно сообщение, содержащее результат вычисления.
Что интересно, функция sqrt/2
просто отправляет на сервер сообщение с просьбой выполнить операцию, не дожидаясь результата. Так что, в основном, он выполняет асинхронный вызов .
Очевидно, что мы хотим получить результат в какой-то момент времени, поэтому создайте другую публичную функцию:
1
2
3
4
5
6
|
def grab_result do
receive do
{:result, result} -> result
after 5000 -> IO.puts :stderr, «Timeout»
end
end
|
Теперь используйте это:
1
2
3
4
|
math_server = MathServer.start
MathServer.sqrt(math_server, 3)
MathServer.grab_result |> IO.puts
# => 1.7320508075688772
|
Оно работает! Конечно, вы даже можете создать пул серверов и распределять задачи между ними, достигая параллелизма. Это удобно, когда запросы не связаны друг с другом.
Познакомьтесь с GenServer
Хорошо, мы рассмотрели несколько функций, позволяющих нам создавать длительные процессы на сервере и отправлять и получать сообщения. Это здорово, но мы должны написать слишком много стандартного кода, который запускает серверный цикл ( start/0
), отвечает на сообщения ( listen/0
private function) и возвращает результат ( grab_result/0
). В более сложных ситуациях нам также может понадобиться поддерживать общее состояние или обрабатывать ошибки.
Как я сказал в начале статьи, нет необходимости изобретать велосипед. Вместо этого мы можем использовать поведение GenServer, которое уже предоставляет нам весь стандартный код и имеет отличную поддержку серверных процессов (как мы видели в предыдущем разделе).
Поведение в Elixir — это код, который реализует общий шаблон. Чтобы использовать GenServer, вам нужно определить специальный модуль обратного вызова, который удовлетворяет условиям контракта в соответствии с поведением. В частности, он должен реализовывать некоторые функции обратного вызова, и фактическая реализация зависит от вас. После того, как обратные вызовы написаны, модуль поведения может использовать их.
Как указано в документации , GenServer требует реализации шести обратных вызовов, хотя они также имеют реализацию по умолчанию. Это означает, что вы можете переопределять только те, которые требуют некоторой пользовательской логики.
Перво-наперво: нам нужно запустить сервер, прежде чем делать что-либо еще, поэтому перейдите к следующему разделу!
Запуск сервера
Чтобы продемонстрировать использование GenServer, давайте напишем CalcServer
, который позволит пользователям применять различные операции к аргументу. Результат операции будет сохранен в состоянии сервера , а затем к нему также может быть применена другая операция. Или пользователь может получить окончательный результат вычислений.
Прежде всего, используйте макрос использования для подключения GenServer:
1
2
3
|
defmodule CalcServer do
use GenServer
end
|
Теперь нам нужно переопределить некоторые обратные вызовы.
Первым является init / 1 , который вызывается при запуске сервера. Переданный аргумент используется для установки начального состояния сервера. В простейшем случае этот обратный вызов должен возвращать кортеж {:ok, initial_state}
, хотя существуют и другие возможные возвращаемые значения, такие как {:stop, reason}
, что приводит к немедленной остановке сервера.
Я думаю, что мы можем позволить пользователям определять начальное состояние для нашего сервера. Однако мы должны проверить, что переданный аргумент является числом. Так что используйте пункт охраны для этого:
01
02
03
04
05
06
07
08
09
10
11
|
defmodule CalcServer do
use GenServer
def init(initial_value) when is_number(initial_value) do
{:ok, initial_value}
end
def init(_) do
{:stop, «The value must be an integer!»}
end
end
|
Теперь просто запустите сервер с помощью функции start / 3 и предоставьте свой CalcServer
в качестве модуля обратного вызова (первый аргумент). Вторым аргументом будет начальное состояние:
1
2
|
GenServer.start(CalcServer, 5.1) |> IO.inspect
# => {:ok, #PID<0.85.0>}
|
Если вы попытаетесь передать не число в качестве второго аргумента, сервер не будет запущен, а это именно то, что нам нужно.
Большой! Теперь, когда наш сервер запущен, мы можем начать кодировать математические операции.
Обработка асинхронных запросов
Асинхронные запросы называются приведениями в терминах GenServer. Чтобы выполнить такой запрос, используйте функцию cast / 2 , которая принимает сервер и фактический запрос. Это похоже на функцию sqrt/2
которую мы кодировали, когда говорим о серверных процессах. Он также использует подход «запусти и забудь», что означает, что мы не ждем завершения запроса.
Для обработки асинхронных сообщений используется обратный вызов handle_cast / 2 . Он принимает запрос и состояние и должен ответить кортежем {:noreply, new_state}
в простейшем случае (или {:stop, reason, new_state}
чтобы остановить цикл сервера). Например, давайте обработаем асинхронное приведение :sqrt
:
1
2
3
|
def handle_cast(:sqrt, state) do
{:noreply, :math.sqrt(state)}
end
|
Вот так мы поддерживаем состояние нашего сервера. Первоначально номер (переданный при запуске сервера) был 5.1
. Теперь мы обновляем состояние и устанавливаем его :math.sqrt(5.1)
.
Код функции интерфейса, которая использует cast/2
:
1
2
3
|
def sqrt(pid) do
GenServer.cast(pid, :sqrt)
end
|
Мне это напоминает злого волшебника, который разыгрывает заклинание, но не заботится о воздействии, которое оно вызывает.
Обратите внимание, что нам требуется идентификатор процесса для выполнения приведения. Помните, что после успешного запуска сервера возвращается кортеж {:ok, pid}
. Поэтому давайте использовать сопоставление с образцом для извлечения идентификатора процесса:
1
2
|
{:ok, pid} = GenServer.start(CalcServer, 5.1)
CalcServer.sqrt(pid)
|
Ницца! Тот же подход может быть использован для реализации, скажем, умножения. Код будет немного сложнее, так как нам нужно будет передать второй аргумент, множитель:
1
2
3
|
def multiply(pid, multiplier) do
GenServer.cast(pid, {:multiply, multiplier})
end
|
Функция cast
поддерживает только два аргумента, поэтому мне нужно создать кортеж и передать туда дополнительный аргумент.
Теперь обратный звонок:
1
2
3
|
def handle_cast({:multiply, multiplier}, state) do
{:noreply, state * multiplier}
end
|
Мы также можем написать один обратный вызов handle_cast
который поддерживает работу, а также останавливает сервер, если операция неизвестна:
1
2
3
4
5
6
7
|
def handle_cast(operation, state) do
case operation do
:sqrt -> {:noreply, :math.sqrt(state)}
{:multiply, multiplier} -> {:noreply, state * multiplier}
_ -> {:stop, «Not implemented», state}
end
end
|
Теперь используйте новую функцию интерфейса:
1
|
CalcServer.multiply(pid, 2)
|
Отлично, но в настоящее время нет способа получить результат вычислений. Поэтому пришло время определить еще один обратный вызов.
Обработка синхронных запросов
Если асинхронные запросы являются приведениями, то синхронные запросы называются вызовами . Для выполнения таких запросов используйте функцию call / 3 , которая принимает сервер, запрос и необязательный таймаут, который по умолчанию равен пяти секундам.
Синхронные запросы используются, когда мы хотим подождать, пока ответ действительно не поступит с сервера. Типичным вариантом использования является получение некоторой информации, например, результатов вычислений, как в сегодняшнем примере (вспомните grab_result/0
из одного из предыдущих разделов).
Для обработки синхронных запросов используется обратный вызов handle_call/3
. Он принимает запрос, кортеж, содержащий pid сервера, и термин, идентифицирующий вызов, а также текущее состояние. В простейшем случае он должен ответить кортежем {:reply, reply, new_state}
.
Код этого обратного вызова сейчас:
1
2
3
|
def handle_call(:result, _, state) do
{:reply, state, state}
end
|
Как видите, ничего сложного. reply
и новое состояние соответствуют текущему состоянию, так как я не хочу ничего менять после того, как результат был возвращен.
Теперь result/1
интерфейса result/1
функция:
1
2
3
|
def result(pid) do
GenServer.call(pid, :result)
end
|
Это оно! Окончательное использование CalcServer показано ниже:
1
2
3
4
5
|
{:ok, pid} = GenServer.start(CalcServer, 5.1)
CalcServer.sqrt(pid)
CalcServer.multiply(pid, 2)
CalcServer.result(pid) |> IO.puts
# => 4.516635916254486
|
Aliasing
Становится несколько утомительным всегда предоставлять идентификатор процесса при вызове функций интерфейса. К счастью, вы можете дать вашему процессу имя или псевдоним . Это делается при запуске сервера путем установки name
:
1
2
3
4
|
GenServer.start(CalcServer, 5.1, name: :calc)
CalcServer.sqrt
CalcServer.multiply(2)
CalcServer.result |> IO.puts
|
Обратите внимание, что сейчас я не храню pid, хотя вы можете захотеть выполнить сопоставление с шаблоном, чтобы убедиться, что сервер действительно запущен.
Теперь интерфейсные функции стали немного проще:
01
02
03
04
05
06
07
08
09
10
11
|
def sqrt do
GenServer.cast(:calc, :sqrt)
end
def multiply(multiplier) do
GenServer.cast(:calc, {:multiply, multiplier})
end
def result do
GenServer.call(:calc, :result)
end
|
Только не забывайте, что вы не можете запустить два сервера с одинаковым псевдонимом.
В качестве альтернативы вы можете представить еще одну интерфейсную функцию start/1
внутри вашего модуля и воспользоваться макросом __MODULE __ / 0 , который возвращает имя текущего модуля в виде атома:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
defmodule CalcServer do
use GenServer
def start(initial_value) do
GenServer.start(CalcServer, initial_value, name: __MODULE__)
end
def sqrt do
GenServer.cast(__MODULE__, :sqrt)
end
def multiply(multiplier) do
GenServer.cast(__MODULE__, {:multiply, multiplier})
end
def result do
GenServer.call(__MODULE__, :result)
end
# …
end
CalcServer.start(6.1)
CalcServer.sqrt
CalcServer.multiply(2)
CalcServer.result |> IO.puts
|
прекращение
Другой обратный вызов, который можно переопределить в вашем модуле, называется terminate / 2 . Он принимает причину и текущее состояние и вызывается, когда сервер собирается завершить работу. Это может произойти, когда, например, вы передаете неверный аргумент интерфейсной функции multiply/1
:
1
2
|
# …
CalcServer.multiply(2)
|
Обратный вызов может выглядеть примерно так:
1
2
3
|
def terminate(_reason, _state) do
IO.puts «The server terminated»
end
|
Вывод
В этой статье мы рассмотрели основы параллелизма в Elixir и обсудили функции и макросы, такие как spawn
, receive
и send
. Вы узнали, что такое процессы, как их создавать и как отправлять и получать сообщения. Кроме того, мы увидели, как построить простой длительный процесс сервера, который отвечает как на синхронные, так и на асинхронные сообщения.
Кроме того, мы обсудили поведение GenServer и увидели, как он упрощает код, вводя различные обратные вызовы. Мы работали с обратными вызовами init
, terminate
, handle_call
и handle_cast
и создали простой вычислительный сервер. Если что-то показалось вам неясным, не стесняйтесь оставлять свои вопросы!
GenServer — это еще не все, и, конечно, невозможно описать все в одной статье. В моем следующем посте я объясню, что такое супервизоры и как вы можете использовать их для мониторинга ваших процессов и восстановления после ошибок. До тех пор, счастливого кодирования!