Если вы не обратили внимания на область крупномасштабных распределенных вычислений — революция продолжается! Становится все более очевидным, что программные экосистемы, построенные вокруг так называемых больших данных, находятся на переднем крае инноваций в облачных вычислениях. К сожалению, было больше
споров вокруг определения того, насколько большие большие данные на самом деле, а не определения общего набора требований для крупномасштабных вычислительных платформ больших данных.
Стивен О’Грэйди из RedMonk кратко резюмировал это явление: «Большие данные, как и NoSQL, стали предметом
ответственности в большинстве случаев. Оставляя в стороне отсутствие последовательного определения, этот термин мало полезен, потому что он одномерный. Большие наборы данных представляют собой уникальные вычислительные проблемы. Но структура, рабочая нагрузка, доступность и даже местоположение данных могут оказаться одинаково сложными ».
Зак Урлокер (Zack Urlocker), советник и член совета директоров нескольких стартап-компаний в области SaaS, также выразил свою
критику в отношении сложности существующих систем: «Вы должны быть почти на уровне гениальности, чтобы создавать системы на основе Cassandra, Hadoop и как сегодня. Это мощные инструменты, но очень низкого уровня, эквивалентные программированию клиент-серверных приложений на языке ассемблера. Когда он работает его [
так в оригинале ] велик, но усилие является значительным , и это , вероятно , выходит за рамки основных ИТ — организаций.»
Именно здесь мы позиционируем план действий Infinispan, так как объявляем о начальных шагах в области распределенного выполнения и инфраструктуры MapReduce, построенной на основе Infinispan. Распределенная сетка данных Infinispan является наиболее естественным для такой платформы. Мы уже создали инфраструктуру для практически неограниченного линейного масштабирования данных в памяти. Однако наличие такой сетки данных без возможности выполнения крупномасштабных вычислений на ней похоже на наличие Ferrari без водительских прав. Прислушиваясь к критике в отношении отсутствия направления в области больших данных и сложности существующих платформ распределенного исполнения, мы сосредоточились в первую очередь на простоте, не жертвуя мощью и богатым набором функций, которые должна иметь такая структура.
Простая распределенная модель исполнения
Основными интерфейсами для простого выполнения распределенных задач являются
DistributedCallable и
DistributedExecutorService . DistributedCallable, по сути, является версией существующего Callable из пакета java.util.concurrent, за исключением того, что DistributedCallable может выполняться в удаленной JVM и получать входные данные из кэша Infinispan. Основной алгоритм задач практически не меняется, меняется только входной источник.
Существующая реализация Callable, скорее всего, получает свои входные данные в виде некоторого Java-объекта / примитива, тогда как DistributedCallable получает свои входные данные из кэша Infinispan. Поэтому пользователи, которые уже реализовали интерфейс Callable для описания своих блоков задач, просто расширили бы DistributedCallable и использовали бы ключи из среды выполнения Infinispan в качестве входных данных для задачи. Реализация DistributedCallable может фактически поддерживать реализацию уже существующего Callable, в то же время быть готовой к распределенному выполнению, расширяя DistributedCallable.
public interface DistributedCallable extends Callable { /** * Invoked by execution environment after DistributedCallable * has been migrated for execution to * a specific Infinispan node. * * @param cache * cache whose keys are used as input data for * this DistributedCallable task * @param inputKeys * keys used as input for this DistributedCallable task */ public void setEnvironment(Cache cache, Set inputKeys); }
DistributedExecutorService — это простое расширение знакомого ExecutorService из пакета java.util.concurrent. Однако преимущества DistributedExecutorService не следует упускать из виду. Для существующих задач Callable, которые пользователи будут отправлять в ExecutorService, есть возможность отправить их для выполнения в кластере Infinispan. Среда выполнения Infinispan перенесет эту задачу на узел выполнения, запустит задачу и вернет результаты вызывающему узлу. Конечно, не все задачи Callable извлекут выгоду из этой функции. Отличные кандидаты — это длительные и сложные вычислительные задачи.
Второе преимущество DistributedExecutorService заключается в том, что он позволяет быстро и просто реализовать задачи, которые принимают входные данные от узлов кэша Infinispan, выполняют определенные вычисления и возвращают результаты вызывающей стороне. Пользователи будут указывать, какие ключи использовать в качестве входных данных для указанного DistributedCallable, и отправлять этот вызываемый элемент для выполнения на кластере Infinispan. Среда выполнения Infinispan найдет ключи appriate, перенесет DistributedCallable на целевой узел (узлы) выполнения и, наконец, вернет список результатов для каждого выполненного Callable. Конечно, пользователи могут не указывать ключи ввода, в этом случае Infinispan будет выполнять DistributedCallable для всех ключей для указанного кэша.
Модель MapReduce
Собственная модель Infinispan MapReduce является адаптацией оригинального Google MapReduce. В каждой карте есть четыре основных компонента сокращения: Mapper, Reducer, Collator и MapReduceTask.
public interface Mapper { /** * Invoked once for each input cache entry * K,V transforms that input into a result T. * * @param key * the kay * @param value * the value * @return result T */ T map(K key, V value); }
public interface Reducer { /** * Reduces a result T from map phase and return R. * Assume that on Infinispan node N, an instance * of Mapper was mapped and invoked on k many * key/value pairs. Each T(i) in the list of all * T's returned from map phase executed on * Infinispan node N is passed to reducer along * with previsouly computed R(i-1). Finally the last * invocation of reducer on T(k), R is returned to a * distributed task that originated map/reduce * request. * * @param mapResult * result T of map phase * @param previouslyReduced * previously accumulated reduced result * @return result R * */ R reduce(T mapResult, R previouslyReduced); }
public interface Collator { /** * Collates all results added so far and * returns result R to invoker of distributed task. * * @return final result of distributed task computation */ R collate(); /** * Invoked by runtime every time reduced result * R is received from executed Reducer on remote * nodes. * * @param remoteNode * address of the node where reduce phase occurred * @param remoteResult * the result R of reduce phase */ void reducedResultReceived(Address remoteNode, R remoteResult); }
mapped = list() for entry in cache.entries: t = mapper.map(entry.key, entry.value) mapped.add(t) r = null for t in mapped: r = reducer.reduce(t, r) return r to Infinispan node that invoked the task On Infinispan node invoking this task: reduced_results = invoke map reduce task on all nodes, retrieve map{address:result} for r in reduced_results.entries: remote_address = r.key remote_reduced_result = r.value collator.add(remote_address, remote_reduced_result) return collator.collate()
Примеры
Чтобы лучше понять каркас MapReduce, давайте взглянем на пример, связанный с файловой системой Grid Infinispan. Как бы мы рассчитали общий размер всех файлов в системе, используя инфраструктуру MapReduce? Легко! Посмотрите на GridFileSizeExample.
public class GridFileSizeExample { public static void main(String arg[]) throws Exception { Cache cache = null; MapReduceTask task = new MapReduceTask(cache); Long result = task.mappedWith(new Mapper() { @Override public Long map(String key, GridFile.Metadata value) { return (long) value.getLength(); } }).reducedWith(new Reducer() { @Override public Long reduce(Long mapResult, Long previouslyReduced) { return previouslyReduced == null ? mapResult : mapResult + previouslyReduced; } }).collate(new Collator(){ private Long result = 0L; @Override public Long collate() { return result; } @Override public void reducedResultReceived(Address remoteNode, Long remoteResult) { result += remoteResult; }}); System.out.println("Total filesystem size is " + result + " bytes"); } }
В заключение, это не идеальное и окончательное распределенное выполнение, а API MapReduce, который может удовлетворить требования всех пользователей, но это хорошее начало. По мере того, как мы продвигаемся вперед и делаем его более функциональным, сохраняя его простым, мы постоянно ищем ваши отзывы. Вместе мы можем достичь амбициозных целей, изложенных в начале этой статьи.