Статьи

Infinispan Распределенные потоки

Теперь, когда Infinispan поддерживает Java 8, мы можем в полной мере использовать некоторые из новых функций. Одна из важных особенностей Java 8 — новые   классы Stream . Это переворачивает голову на обработку данных, так что вместо того, чтобы самим итерировать данные, базовый поток обрабатывает это, и вы просто предоставляете операции, которые над ним выполняются. Это прекрасно подходит для распределенной обработки, поскольку итерация полностью обрабатывается реализацией (в данном случае Infinispan).

Поэтому я рад представить для Infinispan 8 функцию распределенных потоков! Это позволяет выполнять любую операцию, которую вы можете выполнять в обычном потоке, а также в распределенном кэше (при условии, что операция и данные  маршализуемы ).

Marshallability

При использовании распределенного или реплицируемого кэша ключи и значения кэша должны быть маршаллируемыми. Это тот же случай для промежуточных и терминальных операций при использовании распределенных потоков. Обычно вам необходимо предоставить экземпляр какого-либо нового класса, который может быть Serializable или для него зарегистрирован Externalizer, как описано в разделе о маршализуемости руководства пользователя.

Однако в Java 8 также появились лямбда-выражения, которые очень легко можно определить как сериализуемые (хотя это немного неловко). Пример этой сериализации можно найти  здесь .

Некоторые из вас могут также знать о   классе Collectors, который используется с методом collect в потоке. К сожалению, все произведенные коллекционеры не могут быть собраны. Таким образом, Infinispan добавил  служебный класс, который может работать вместе с классом Collectors. Это позволяет вам по-прежнему использовать любую комбинацию классов Collectors и по-прежнему работать должным образом, когда все необходимо маршалировать.

параллелизм

Потоки Java 8 естественно имеют чувство параллелизма. То есть поток может быть помечен как параллельный. Это, в свою очередь, позволяет выполнять операции параллельно с использованием нескольких потоков. Самое приятное то, насколько просто это сделать. Поток может быть сделан параллельным при первом его извлечении, вызывая  параллельный  поток, или вы можете при желании включить его после извлечения потока, просто вызывая  параллельный .

Новые Распределенные потоки от Infinispan делают еще один шаг вперед, который я называю параллельным распределением. То есть, поскольку данные уже распределены по узлам, мы также можем разрешить одновременное выполнение операций на разных узлах. Эта опция включена по умолчанию. Однако это можно контролировать с помощью нового интерфейса CacheStream, который обсуждается чуть ниже. Кроме того, чтобы быть ясным, параллель Java 8 может использоваться в сочетании с параллельным распределением. Это просто означает, что у вас будут параллельные операции на нескольких узлах в нескольких потоках на каждом узле.

Интерфейс CacheStream

Предусмотрен новый интерфейс  Cachestream  , позволяющий управлять дополнительными параметрами при использовании Distributed Stream. Я подсвечиваю добавленные методы (примечание примечания были удалены из сущности)

public interface CacheStream<R> extends Stream<R> {
   CacheStream<R> sequentialDistribution();

   CacheStream<R> parallelDistribution();

   CacheStream<R> filterKeySegments(Set<Integer> segments);

   CacheStream<R> filterKeys(Set<?> keys);

   CacheStream<R> distributedBatchSize(int batchSize);

   CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);

   CacheStream<R> disableRehashAware();

   CacheStream<R> timeout(long timeout, TimeUnit unit);

   interface SegmentCompletionListener {
      void segmentCompleted(Set<Integer> segments);
   }
}

distributedBatchSize

Этот метод контролирует, сколько элементов возвращается за один раз для операций, которые являются ключом. Этими операциями являются (spl) итератор и forEach. Это полезно для настройки количества ключей, хранящихся в памяти от удаленного узла. Таким образом, это компромисс между производительностью (больше ключей) и памятью. По умолчанию используется размер порции, настроенный передачей состояния.

parallelDistribution / sequentialDistribution

Это обсуждалось в разделе параллелизма выше. Обратите внимание, что во всех командах это включено по умолчанию, кроме методов spl (iterator).

FilterKeys

Этот метод может использоваться, чтобы распределенный поток работал только с заданным набором ключей. Это сделано очень эффективным способом, поскольку он будет выполнять операцию только на узлах, которые владеют данными ключами. Использование заданного набора ключей также обеспечивает постоянное время доступа из контейнера / хранилища данных, поскольку кэш не должен просматривать каждую отдельную запись в кеше. 

filterKeySegments (только для опытных пользователей)

Это полезно для фильтрации экземпляров более производительным способом. Обычно вы можете использовать промежуточную операцию фильтра, но этот метод выполняется перед выполнением любой из операций, чтобы наиболее эффективно ограничить записи, представляемые для обработки потока. Например, если требуется только подмножество сегментов, возможно, нет необходимости отправлять удаленный запрос.

сегментCompletionListener (только для опытных пользователей)

Как и в предыдущем методе, это связано с ключевыми сегментами. Этот слушатель позволяет конечному пользователю быть уведомленным, когда сегмент был завершен для обработки. Это может быть полезно, если вы хотите отслеживать завершение и если этот узел выходит из строя, вы можете перезапустить обработку только с необработанными сегментами. В настоящее время этот слушатель поддерживается только для методов spl (iterator).

disableRehashAware (только для опытных пользователей)

По умолчанию все потоковые операции являются так называемыми. То есть, если узел присоединяется к кластеру или покидает его во время выполнения операции, кластер узнает об этом и гарантирует, что все данные обрабатываются должным образом без потерь (при условии, что данные не были потеряны).

Это можно отключить, вызвав disableRehashAware; однако, если в середине операции должна произойти перефразировка, возможно, что не все данные могут быть обработаны. Следует отметить, что данные не обрабатываются многократно с этим отключенным, может произойти только потеря данных.

Этот вариант обычно не рекомендуется, если у вас нет ситуации, когда вы можете позволить себе работать только с подмножеством данных. Компромисс в том, что операция может выполняться быстрее, особенно (spl) итератор и методы forEach.

Уменьшение карты

Вековой пример карты / сокращения — это всегда количество слов. Потоки позволяют вам сделать это также! Вот эквивалентный пример подсчета слов, при условии, что у вас есть кэш, содержащий ключи и значения String, и вы хотите подсчитать количество всех слов в значениях. Некоторым из вас может быть интересно, как это связано с нашей существующей платформой карты / сокращения. План состоит в том, чтобы отказаться от существующей карты / уменьшить и полностью заменить ее новыми распределенными потоками на более позднем этапе.

Map<String, Long> results = cache.entrySet().stream()
     .map((Serializable & Function<Map.Entry<String, String>, String[]>) e -> e.getValue().split("\\s+"))
     .flatMap((Serializable & Function<String[], Stream<String>>) Arrays::stream)
     .collect(CacheCollectors.serializableCollector(
        () -> Collectors.groupingBy(Function.identity(), Collectors.counting())));

Помните, что распределенные потоки могут делать гораздо больше, чем просто отображать / уменьшать. И уже есть много примеров для потоков. Чтобы использовать распределенные потоки, вам просто нужно убедиться, что ваши операции маршализуемы, и вы готовы к работе.

Вот несколько страниц с примерами использования потоков прямо из Oracle:

http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html
http: // www .oracle.com / technetwork / article / java / architect-streams-pt2-2227132.html 

Надеюсь, вам понравились распределенные потоки. Мы надеемся, что они изменят ваше взаимодействие с вашими данными в кластере!

Дайте нам знать, что вы думаете, какие-либо проблемы или использования вы хотели бы поделиться!

Ура,

Будет