Статьи

Изучение Эрланга с картой / уменьшить

В категории «Параллельное ориентированное программирование» Эрланг недавно получил хорошее внимание из-за заявленного инженерами Facebook успеха в использовании Erlang в крупномасштабных приложениях.

Соблазнившись выяснить основные составляющие Erlang, я решил потратить некоторое время на изучение языка.

 

Проблема многопоточности

Многопоточность выполнения — это распространенная модель программирования в современных языках, поскольку она позволяет более эффективно использовать вычислительные ресурсы, обеспечиваемые многоядерной и многокомпьютерной архитектурой. Однако один из вопросов, требующих ответа, заключается в том, как эти параллельные потоки выполнения взаимодействуют и работают совместно для решения проблемы приложения.

Есть в основном две модели для связи между одновременными выполнениями. Один основан на модели «Общая память», в которой один поток выполнения записывает информацию в общее место, откуда другие потоки будут читать. Модель потоков Java основана на такой семантике «разделяемой памяти». Типичная проблема этой модели заключается в том, что одновременное обновление требует очень сложной схемы защиты, в противном случае несогласованный доступ может привести к несогласованности данных.

К сожалению, эту схему защиты очень сложно проанализировать, когда несколько потоков начинают взаимодействовать в комбинаторном множестве различных способов. Трудно отладить проблему взаимоблокировки часто всплывают. Чтобы уменьшить сложность, обычно рекомендуется использовать грубую модель зернистой блокировки, но это может уменьшить параллелизм.

Эрланг выбрал другую модель, основанную на «передаче сообщений». В этой модели любая информация, которой необходимо поделиться, будет «скопирована» в сообщение и отправлена ​​в другие исполнения. В этой модели каждый поток выполнения имеет свое состояние «полностью локально» (недоступно для просмотра другим потоком выполнения). Их локальное состояние обновляется, когда они узнают, что происходит в других потоках, получая свои сообщения. Эта модель отражает то, как люди в реальной жизни взаимодействуют друг с другом.

 

Erlang последовательная обработка

Исходя из объектно-ориентированного опыта императивного программирования, есть несколько вещей, которые мне нужно отучиться / выучить в Erlang.

Erlang — это функциональный язык программирования, не имеющий концепций ОО. Код Erlang структурирован как «функция» в базовом блоке, сгруппированном под «модулем». Каждая «функция» принимает ряд входных параметров и выдает выходное значение. Как и многие функциональные языки программирования, Erlang поощряет использование «чистой функции», которая «не имеет побочных эффектов» и «детерминирована». «Без побочных эффектов» означает, что при выполнении функции не происходит никаких изменений состояния. «Детерминированный» означает, что один и тот же выход всегда будет получен из одного и того же входа.

Erlang имеет совершенно другую концепцию присваивания переменных в том смысле, что все переменные в Erlang неизменны. Другими словами, каждая переменная может быть назначена только один раз и с тех пор никогда не может быть изменена. Поэтому я не могу сделать X = X + 1, и мне нужно использовать новую переменную и присвоить ей измененное значение, например, Y = X + 1. Эта характеристика «неизменности» сильно упрощает отладку, потому что мне не нужно беспокоиться о том, как значение X изменяется в другой точке выполнения (оно просто не изменится).

Еще одна необычная вещь в Erlang заключается в том, что в языке нет конструкции «цикл цикла». Чтобы добиться эффекта зацикливания, вам необходимо кодировать функцию рекурсивным способом, в основном помещая терминальное предложение для проверки условия выхода, а также тщательно структурируя логику в режиме хвостовой рекурсии. В противном случае вам может не хватить памяти, если размер стека слишком велик. Хвостовая рекурсивная функция означает, что функция возвращает либо значение (но не выражение), либо рекурсивный вызов функции. Эрланг достаточно умен, чтобы выполнять хвостовую рекурсию по нескольким функциям, например, если funcA вызывает funcB, который вызывает funcC, который вызывает funcA. Хвостовая рекурсия особенно важна при написании серверного демона, который обычно выполняет саморекурсивный вызов после обработки запроса.

 

Erlang Параллельная обработка

Поток выполнения в Erlang называется «Процесс». Не путайте с процессами уровня ОС, процесс Эрланга чрезвычайно легок, намного легче, чем потоки Java. Процесс создается вызовом функции spawn (Node, Module, Function, Arguments) и завершается при возврате этой функции.

Erlang процессы общаются друг с другом, передавая сообщения. Идентификаторы процесса используются отправителем для указания адресов получателей. Вызов send происходит асинхронно и возвращается немедленно. Процесс приема сделает синхронный вызов приема и определит количество подходящих шаблонов. Поступающие сообщения, соответствующие шаблону, будут доставлены в процесс получения, в противном случае они останутся в очереди навсегда. Поэтому рекомендуется очистить шаблон мусора от соответствия шаблону. Вызов приема также принимает параметр тайм-аута, поэтому он будет возвращаться, если в течение периода тайм-аута не будет найдено ни одного совпадающего сообщения.

Обработка ошибок в Erlang также сильно отличается от других языков программирования. Хотя Erlang предоставляет модель try / catch, это не предпочтительный подход. Вместо того, чтобы отлавливать ошибку и обрабатывать ее в локальном процессе, процесс должен просто умереть и позволить другому процессу позаботиться о том, что должно быть сделано после его сбоя.

У Эрланга есть концепция «процессов», «связанных» друг с другом, и они следят за состоянием жизни между собой. При настройке по умолчанию умирающий процесс распространяет сигнал выхода на все процессы, на которые он ссылается (ссылки являются двунаправленными). Таким образом, возникает цепной эффект, когда один процесс умирает, вся цепочка процессов умирает. Однако процесс может переопределить свое поведение после получения сигнала выхода. Вместо того, чтобы «умирать», процесс может решить обработать ошибку (возможно, перезапустив мертвый процесс).

 

Другие особенности Erlang

Сопоставление с образцом — это распространенная программная конструкция во многих местах Erlang, а именно: «Вызовы функций», «Присвоение переменных», «Операторы Case» и «Получение сообщений». Требуется некоторое время, чтобы привыкнуть к этому стилю. После этого я чувствую, что эта конструкция очень мощная.

Еще одна интересная функция, которую предоставляет Erlang, — это горячая замена кода. Указав имя модуля при вызове функции, работающий процесс Erlang может выполнить самый последний код без перезапуска. Это мощные функции для эволюции кода, потому что вам не нужно останавливать виртуальную машину при развертывании нового кода.

Поскольку сама функция может быть передана в виде сообщения удаленному процессу, выполнить код удаленно в Erlang чрезвычайно просто. Проблема установки, развертывания практически отсутствует в Эрланге

 

Карта / Уменьшить используя Erlang

После изучения основных понятий мой следующий шаг — поиск проблемы и изучение языка. Основанная на модели рабочего разделения, агрегации и параллельной обработки, Map / Reduce, кажется, имеет характеристическую модель, которая очень хорошо согласуется с моделью параллельной обработки Эрланга. Поэтому я выбрал свой проект для реализации простой инфраструктуры Map / Reduce в Erlang.

Вот реализация Erlang …

 

 

Прежде всего, мне нужны некоторые вспомогательные функции

-module(mapreduce).
-export([reduce_task/2, map_task/2,
test_map_reduce/0]).

%%% Execute the function N times,
%%% and put the result into a list
repeat_exec(N,Func) ->
lists:map(Func, lists:seq(0, N-1)).


%%% Identify the reducer process by
%%% using the hashcode of the key
find_reducer(Processes, Key) ->
Index = erlang:phash(Key, length(Processes)),
lists:nth(Index, Processes).

%%% Identify the mapper process by random
find_mapper(Processes) ->
case random:uniform(length(Processes)) of
0 ->
find_mapper(Processes);
N ->
lists:nth(N, Processes)
end.

%%% Collect result synchronously from
%%% a reducer process
collect(Reduce_proc) ->
Reduce_proc ! {collect, self()},
receive
{result, Result} ->
Result
end.

Основная функция

Функция MapReduce () является точкой входа в систему.

  1. Сначала запускается все число R процессов Редуктора
  2. Он запускает все число процессов Ma Mapper, передавая им идентификаторы процессов редуктора R
  3. Для каждой строки входных данных он случайным образом выбирает один из процессов M mapper и отправляет ему строку
  4. Подождите, пока не закончится завершение
  5. Собрать результат от процессов R редуктора
  6. Вернуть собранный результат

Соответствующий код Эрланга выглядит следующим образом …

%%% The entry point of the map/reduce framework
map_reduce(M, R, Map_func,
Reduce_func, Acc0, List) ->

%% Start all the reducer processes
Reduce_processes =
repeat_exec(R,
fun(_) ->
spawn(mapreduce, reduce_task,
[Acc0, Reduce_func])
end),

io:format("Reduce processes ~w are started~n",
[Reduce_processes]),

%% Start all mapper processes
Map_processes =
repeat_exec(M,
fun(_) ->
spawn(mapreduce, map_task,
[Reduce_processes, Map_func])
end),

io:format("Map processes ~w are started~n",
[Map_processes]),

%% Send the data to the mapper processes
Extract_func =
fun(N) ->
Extracted_line = lists:nth(N+1, List),
Map_proc = find_mapper(Map_processes),
io:format("Send ~w to map process ~w~n",
[Extracted_line, Map_proc]),
Map_proc ! {map, Extracted_line}
end,

repeat_exec(length(List), Extract_func),

timer:sleep(2000),

%% Collect the result from all reducer processes
io:format("Collect all data from reduce processes~n"),
All_results =
repeat_exec(length(Reduce_processes),
fun(N) ->
collect(lists:nth(N+1, Reduce_processes))
end),
lists:flatten(All_results).

 

Процесс карты

Процессы Map после запуска будут выполнять следующие действия:

  1. Получить строку ввода
  2. Выполните предоставленную пользователем функцию карты, чтобы превратить в список пар ключ-значение
  3. Для каждого ключа и значения выберите процесс редуктора и отправьте ключ, значение к нему

Соответствующий код Эрланга будет следующим:

%%% The mapper process
map_task(Reduce_processes, MapFun) ->
receive
{map, Data} ->
IntermediateResults = MapFun(Data),
io:format("Map function produce: ~w~n",
[IntermediateResults ]),
lists:foreach(
fun({K, V}) ->
Reducer_proc =
find_reducer(Reduce_processes, K),
Reducer_proc ! {reduce, {K, V}}
end, IntermediateResults),

map_task(Reduce_processes, MapFun)
end.

 

Уменьшить процесс

С другой стороны, процессы редуктора будут выполняться следующим образом …

  1. Получите ключ, значение из процесса Mapper
  2. Получить текущее накопленное значение по ключу. Если накопленное значение не найдено, используйте начальное накопленное значение
  3. Вызвать предоставленную пользователем функцию уменьшения для расчета нового накопленного значения
  4. Сохраните новое накопленное значение под ключом

Соответствующий код Эрланга будет следующим:

 %%% The reducer process
reduce_task(Acc0, ReduceFun) ->
receive
{reduce, {K, V}} ->
Acc = case get(K) of
undefined ->
Acc0;
Current_acc ->
Current_acc
end,
put(K, ReduceFun(V, Acc)),
reduce_task(Acc0, ReduceFun);
{collect, PPid} ->
PPid ! {result, get()},
reduce_task(Acc0, ReduceFun)
end.

 

Пример подсчета слов

Чтобы проверить структуру Map / Reduce, используя пример подсчета слов.

%%% Testing of Map reduce using word count
test_map_reduce() ->
M_func = fun(Line) ->
lists:map(
fun(Word) ->
{Word, 1}
end, Line)
end,

R_func = fun(V1, Acc) ->
Acc + V1
end,

map_reduce(3, 5, M_func, R_func, 0,
[[this, is, a, boy],
[this, is, a, girl],
[this, is, lovely, boy]]).

 

Это результат выполнения тестовой программы

Erlang (BEAM) emulator version 5.6.1 [smp:2] [async-threads:0]

Eshell V5.6.1 (abort with ^G)
1> c (mapreduce).
{ok,mapreduce}
2>
2> mapreduce:test_map_reduce().
Reduce processes [<0.37.0>,<0.38.0>,<0.39.0>,<0.40.0>,<0.41.0>] are started
Map processes [<0.42.0>,<0.43.0>,<0.44.0>] are started
Send [this,is,a,boy] to map process <0.42.0>
Send [this,is,a,girl] to map process <0.43.0>
Map function produce: [{this,1},{is,1},{a,1},{boy,1}]
Send [this,is,lovely,boy] to map process <0.44.0>
Map function produce: [{this,1},{is,1},{a,1},{girl,1}]
Map function produce: [{this,1},{is,1},{lovely,1},{boy,1}]
Collect all data from reduce processes
[{is,3},{this,3},{boy,2},{girl,1},{a,2},{lovely,1}]
3>

 

Резюме

Из этого упражнения по реализации простой модели Map / Reduce с использованием Erlang я обнаружил, что Erlang очень силен в разработке распределенных систем.