Статьи

Apache Spark RDD и потоки Java

Несколько месяцев назад мне посчастливилось принять участие в нескольких PoC, которые использовали Apache Spark. Там я получил возможность использовать устойчивые распределенные наборы данных (для краткости RDD ), преобразования и действия.

Через несколько дней я понял, что, хотя Apache Spark и JDK являются очень разными платформами, между преобразованиями и действиями RDD и потоковыми промежуточными и терминальными операциями есть сходства. Я думаю, что это сходство может помочь начинающим (таким как я * ухмыляться ) начать работу с Apache Spark.

Java Stream Apache Spark RDD
Промежуточная операция преобразование
Терминальная операция действие

Обратите внимание, что Apache Spark и JDK являются
очень разные платформы. Apache Spark — это среда кластерных вычислений с открытым исходным кодом, которая помогает в обработке и анализе больших данных. JDK (Java Development Kit) включает в себя инструменты для разработки, отладки и мониторинга Java-приложений (не только обработки данных).

Java Streams

Давайте начнем с потоков. Java 8 была выпущена где-то в 2014 году. Возможно, самая значимая особенность, которую она принесла, — это Streams API (или просто потоки).

После создания Stream он предоставляет множество операций, которые можно сгруппировать в две категории:

  • промежуточный,
  • и терминал.

Промежуточные операции возвращают поток из предыдущего. Эти промежуточные операции могут быть связаны вместе, чтобы сформировать конвейер. Терминальные операции , с другой стороны, закрывают конвейер потока и возвращают результат.

Вот пример.

1
2
3
4
Stream.of(1, 2, 3)
        .peek(n -> System.out.println("Peeked at: " + n))
        .map(n -> n*n)
        .forEach(System.out::println);

Когда приведенный выше пример выполняется, он генерирует следующий вывод:

1
2
3
4
5
6
Peeked at: 1
1
Peeked at: 2
4
Peeked at: 3
9

Промежуточные операции ленивы. Фактическое выполнение не начинается, пока не встретится терминальная операция. Терминальная операция в этом случае — forEach() . Поэтому мы не видим следующего.

1
2
3
4
5
6
Peeked at: 1
Peeked at: 2
Peeked at: 3
1
4
9

Вместо этого мы видим, что операции peek() , map() и forEach() были объединены для формирования конвейера. На каждом проходе операция static of() возвращает один элемент из указанных значений. Затем вызывается конвейер: peek() который печатает строку «Peeked at: 1», затем map() и завершается forEach() которая печатает число «1». Затем с другим проходом, начинающимся с of() который возвращает следующий элемент из указанных значений, затем peek() , map() и так далее.

Выполнение промежуточной операции, такой как peek() , фактически не выполняет никакого просмотра, но вместо этого создает новый поток, который при прохождении содержит те же элементы исходного потока, но дополнительно выполняет предоставленное действие.

Apache Spark RDD

Теперь давайте обратимся к СДР Spark (устойчивый распределенный набор данных). Основная абстракция Spark для работы с данными — это устойчивый распределенный набор данных (RDD).

СДР — это просто распределенная коллекция элементов. В Spark вся работа выражается либо в создании новых RDD, либо в вызове операций над RDD для вычисления результата. Внутри Spark автоматически распределяет данные, содержащиеся в RDD, по всему кластеру и распараллеливает операции, которые вы над ними выполняете.

Созданные RDD предлагают два типа операций:

  • преобразования,
  • и действия.

Преобразования создают новый RDD из предыдущего. Действия , с другой стороны, вычисляют результат на основе RDD и либо возвращают его в программу драйвера, либо сохраняют его во внешней системе хранения (например, HDFS).

Вот пример с грубым эквивалентом с использованием Java Streams.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
SparkConf conf = new SparkConf().setAppName(...);
JavaSparkContext sc = new JavaSparkContext(conf);
 
List<Integer> squares = sc.parallelize(Arrays.asList(1, 2, 3))
        .map(n -> n*n)
        .collect();
 
System.out.println(squares.toString());
 
// Rough equivalent using Java Streams
List<Integer> squares2 = Stream.of(1, 2, 3)
        .map(n -> n*n)
        .collect(Collectors.toList());
 
System.out.println(squares2.toString());

После настройки контекста Spark мы вызываем parallelize() которая создает RDD из заданного списка элементов. map() — это преобразование, а collect() — это действие. Преобразования, такие как операции промежуточного потока в Java, лениво оцениваются. В этом примере Spark не начнет выполнять функцию, предусмотренную в вызове map() пока не увидит действие. Этот подход на первый взгляд может показаться необычным, но он имеет большой смысл при работе с огромными объемами данных (другими словами, большими данными). Это позволяет Spark разделять работу и выполнять их параллельно.

Пример подсчета слов

Давайте использовать количество слов в качестве примера. Здесь у нас есть две реализации: одна использует Apache Spark, а другая использует Java Streams.

Вот версия Java Stream.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
public class WordCountJava {
 
 private static final String REGEX = "\\s+";
  
 public Map<String, Long> count(URI uri) throws IOException {
  return Files.lines(Paths.get(uri))
   .map(line -> line.split(REGEX))
   .flatMap(Arrays::stream)
   .map(word -> word.toLowerCase())
   .collect(groupingBy(
    identity(), TreeMap::new, counting()));
 }
 
}

Здесь мы читаем исходный файл построчно и преобразуем каждую строку в последовательность слов (через промежуточную операцию map() ). Поскольку у нас есть последовательность слов для каждой строки и у нас много строк, мы конвертируем их в одну последовательность слов с помощью flatMap() . В конце мы группируем их по их identity() (т. Е. Идентификатором строки является сама строка) и считаем их.

При тестировании по текстовому файлу, который содержит две строки:

1
2
The quick brown fox jumps over the lazy dog
The quick brown fox jumps over the lazy dog

Выводит следующую карту:

1
{brown=2, dog=2, fox=2, jumps=2, lazy=2, over=2, quick=2, the=4}

А теперь вот версия Spark.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class WordCountSpark {
 
 private static final String REGEX = "\\s+";
  
 public List<Tuple2<String, Long>> count(URI uri, JavaSparkContext sc) throws IOException {
  JavaRDD<String> input = sc.textFile(Paths.get(uri).toString());
  return input.flatMap(
     line -> Arrays.asList(line.split(REGEX)).iterator())
    .map(word -> word.toLowerCase())
    .mapToPair(word -> new Tuple2<String, Long>(word, 1L))
    .reduceByKey((x, y) -> (Long) x + (Long) y)
    .sortByKey()
    .collect();
 }
 
}

При запуске с тем же двухстрочным текстовым файлом он выводит следующее:

1
[(brown,2), (dog,2), (fox,2), (jumps,2), (lazy,2), (over,2), (quick,2), (the,4)]

Первоначальная конфигурация JavaSparkContext была исключена для краткости. Мы создаем JavaRDD из текстового файла. Стоит отметить, что этот начальный RDD будет работать построчно из текстового файла. Вот почему мы разбиваем каждую строку на последовательность слов и flatMap() их. Затем мы преобразуем слово в кортеж ключ-значение со счетом один (1) для добавочного счета. Как только мы это сделаем, мы сгруппируем по словам ( reduceByKey() ) наши кортежи значения ключа из предыдущего RDD, и в конце мы сортируем их в естественном порядке.

В заключение

Как показано, обе реализации похожи. Внедрение Spark требует больше настройки и настройки и является более мощным. Изучение промежуточных и терминальных потоковых операций может помочь разработчику Java начать с понимания Apache Spark.

Спасибо Krischelle, RB и Juno за то, что позволили мне принять участие в PoC, которые использовали Apache Spark.

Ссылка: Apache Spark RDD и Java Streams от нашего партнера по JCG Лоренцо Ди из блога « Адаптация и обучение» .