Статьи

Параллельные потоки и сплитераторы

Сегодня мы рассмотрим один из аспектов, где использование потоков — это реальная победа — когда нам нужно работать с потоками. Помимо параллельных потоков, мы также рассмотрим Spliterator, который действует как механизм, который выталкивает элементы в конвейер.

Потоки используют технику, известную как внутренняя итерация. Это внутреннее, потому что Iterator (или в нашем случае Spliterator), который обеспечивает работу через наш поток, скрыт от нас. Чтобы использовать поток, все, что нам нужно сделать [когда у нас будет источник], это добавить этапы конвейера и предоставить функции, которые требуются для этих этапов. Нам не нужно знать, как данные передаются по конвейеру, просто это так. Преимущество заключается в том, что работа скрыта от нас, и мы можем больше сосредоточиться на работе, которая должна быть выполнена, а не на том, как это можно сделать.

Наоборот, внешняя итерация — это когда нам дается переменная цикла или итератор, и мы ищем значение и сами пропускаем его через код. Это, очевидно, дает нам преимущество в том, что у нас полный контроль и низкие накладные расходы. Недостатком является то, что мы должны выполнять всю работу, просматривая значения и передавая их через тело цикла. Это также будет означать больше тестового кода, и правильное тестирование тел цикла может быть сложным. При обычных циклах for мы также должны быть осторожны с одноразовыми ошибками.

Вопрос, который мы должны задать себе при рассмотрении итерационного метода: действительно ли нам нужен абсолютный контроль над задачей? Потоки делают некоторые вещи действительно хорошо, но идут с небольшим снижением производительности. Возможно, не потоковое (или даже не Java) решение больше подходит для высокопроизводительной работы. С другой стороны, сортировка и фильтрация файлов для отображения в, скажем, элементе «недавно доступный» не требует высокой производительности. В этом случае мы, вероятно, согласились бы на простой и быстрый способ сделать это, а не на лучший. Даже если мы выберем эффективное решение, потребуется некоторое сравнение, так как сюрпризы часто ждут. Таким образом, мы компенсируем удобство в отношении производительности, времени разработки и риска ошибок.

Как мы увидим, потоки легко распараллелить. Мы просто меняем тип потока на параллельный поток с помощью оператора parallel (). Сделать это с внутренней итерацией сложно, потому что настроено, что мы получаем один элемент за каждую итерацию. Лучшее, что мы можем сделать в этой среде — передать работу потокам. Чтобы сделать что-то эффективно, мы, вероятно, должны были бы перебрать все значения во внешнем цикле и посмотреть на разделение работы по-другому. Мы увидим способ сделать это.

Имея это в виду, мы рассмотрим генератор простых чисел. Во-первых, это не самый эффективный генератор простых чисел. Для демонстрации было полезно иметь хорошо известное приложение, которое легко понять, легко выполнять с потоками, и на его завершение потребовалось бы немало времени.

Давайте сначала посмотрим на внутреннюю версию итерации:

public class ForLoopPrimes
{
  public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    Set<Integer> s = new HashSet<>();

    // The candidates to try (1 is not a prime number by definition!)
    outer:
    for (int i = 2; i <= maxPrimeTry; i++)
    {
      // Only need to try up to sqrt(i) - see notes
      int maxJ = (int) Math.sqrt(i);

      // Our divisor candidates
      for (int j = 2; j <= maxJ; j++)
      {
        // If we can divide exactly by j, i is not prime
        if (i / j * j == i)
        {
          continue outer;
        }
      }

      // If we got here, it's prime
      s.add(i);
    }

    return s;
  }

  public static void main(String args[])
  {
     int maxPrimeTry = 9999999;

     long startTime = System.currentTimeMillis();

     Set<Integer> s = findPrimes(maxPrimeTry);

     long timeTaken = System.currentTimeMillis() - startTime;

     s.stream().sorted().forEach(System.out::println);

     System.out.println("Time taken: " + timeTaken);
  }
}

Примечание: поскольку нам нужно найти только один делитель, а умножение является коммутативным, нам нужно только исчерпать все потенциальные пары факторов и проверить один из них [меньший]. Меньшее не может быть больше квадратного корня из простого кандидата и должно быть не менее 2.

Это пример алгоритма грубой силы. Мы пробуем каждую комбинацию, а не стелс или оптимизацию В этом случае мы также ожидаем, что внутренняя итерационная версия будет работать быстро, так как на каждую итерацию не так много работы.

Так почему мы должны демонстрировать это?

Предположим, мы хотим воспользоваться преимуществами аппаратного обеспечения в современных процессорах и наладить это. Как мы можем это сделать? До Java 7 и, конечно, до Java 5 это было бы настоящей болью. Мы должны разделить рабочую нагрузку, поддерживать пул потоков и сигнализировать им, что работа доступна, а затем собирать их обратно по завершении. Мы, вероятно, также хотим завершить рабочие потоки в конце, если у нас есть больше работы. Хотя это не ракетостроение, может быть трудно быстро исправить ошибки, и трудно обнаружить тонкие ошибки.

Java 7 делает это намного проще с фреймворком ForkJoin. Это все еще сложно и легко ошибиться. Мы будем использовать RecursiveAction, чтобы разбить внешний цикл на части, используя стратегию «разделяй и властвуй». Обратите внимание, что параллельные потоки делают это также.

public class ForkJoinPrimes
{
  private static int workSize;
  private static Queue<Results> resultsQueue;

  // Use this to collect work
  private static class Results
  {
    public final int minPrimeTry;
    public final int maxPrimeTry;
    public final Set resultSet;

    public Results(int minPrimeTry, int maxPrimeTry, Set resultSet)
    {
      this.minPrimeTry = minPrimeTry;
      this.maxPrimeTry = maxPrimeTry;
      this.resultSet = resultSet;
    }
  }

  private static class FindPrimes extends RecursiveAction
  {
    private final int start;
    private final int end;

    public FindPrimes(int start, int end)
    {
      this.start = start;
      this.end = end;
    }

    private Set<Integer> findPrimes(int minPrimeTry,
                                    int maxPrimeTry)
    {
      Set<Integer> s = new HashSet<>();

      // The candidates to try
      // (1 is not a prime number by definition!)
      outer:
      for (int i = minPrimeTry; i <= maxPrimeTry; i++)
      {
        // Only need to try up to sqrt(i) - see notes
        int maxJ = (int) Math.sqrt(i);

        // Our divisor candidates
        for (int j = 2; j <= maxJ; j++)
        {
          // If we can divide exactly by j, i is not prime
          if (i / j * j == i)
          {
            continue outer;
          }
        }

        // If we got here, it's prime
        s.add(i);
      }

      return s;
    }

    protected void compute()
    {
      // Small enough for us?
      if (end - start < workSize)
      {
        resultsQueue.offer(new Results(start, end,
                                 findPrimes(start, end)));
      }
      else
      {
        // Divide into two pieces
        int mid = (start + end) / 2;

        invokeAll(new FindPrimes(start, mid),
                            new FindPrimes(mid + 1, end));
      }
    }
  }

  public static void main(String args[])
  {
    int maxPrimeTry = 9999999;
    int maxWorkDivisor = 8;

    workSize = (maxPrimeTry + 1) / maxWorkDivisor;

    ForkJoinPool pool = new ForkJoinPool();

    resultsQueue = new ConcurrentLinkedQueue<>();

    long startTime = System.currentTimeMillis();

    pool.invoke(new FindPrimes(2, maxPrimeTry));

    long timeTaken = System.currentTimeMillis() - startTime;

    System.out.println("Number of tasks executed: " +
                       resultsQueue.size());

    while (resultsQueue.size() > 0)
    {
      Results results = resultsQueue.poll();

      Set<Integer> s = results.resultSet;

      s.stream().sorted().forEach(System.out::println);
    }

    System.out.println("Time taken: " + timeTaken);
  }
}

Это вполне узнаваемо, поскольку мы повторно использовали последовательный код для выполнения работы в подзадаче. Мы создаем два RecursiveActons, чтобы разбить рабочую нагрузку на две части. Мы продолжаем ломаться, пока рабочая нагрузка не станет ниже определенного размера, когда мы выполняем действие. Мы наконец собираем наши результаты в параллельную очередь. Обратите внимание, что здесь есть немало кода.

Давайте посмотрим на последовательное решение потоков Java 8:

public class SequentialStreamPrimes
{
  public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    return IntStream.rangeClosed(2, maxPrimeTry)
                    .map(i -> IntStream.rangeClosed(2,
                                          (int) (Math.sqrt(i)))
                    .filter(j -> i / j * j == i).map(j -> 0)
                    .findAny().orElse(i))
                    .filter(i -> i != 0)
                    .mapToObj(i -> Integer.valueOf(i))
                    .collect(Collectors.toSet());
  }

  public static void main(String args[])
  {
    int maxPrimeTry = 9999999;

    long startTime = System.currentTimeMillis();

    Set<Integer> s = findPrimes(maxPrimeTry);

    long timeTaken = System.currentTimeMillis() - startTime;

    s.stream().sorted().forEach(System.out::println);

    System.out.println("Time taken: " + timeTaken);
  }
}

Мы видим, что решение для потоков достаточно хорошо согласуется с внешней версией итерации, за исключением нескольких необходимых трюков:

  • Поскольку нам нужен только один фактор, мы используем findAny (). Это действует как заявление о разрыве.
  • findAny () возвращает Optional, поэтому нам нужно развернуть его, чтобы получить наше значение. Если у нас нет значения (т. Е. Мы нашли простое число), мы будем хранить простое число (внешнее значение i), поместив его в предложение orElse.
  • Если внутренний IntStream находит фактор, мы можем отобразить 0 для хранения, поскольку мы никогда не будем проверять 0. К сожалению, это означает, что мы пытаемся сохранить что-то для каждого кандидата, который увеличивает накладные расходы.

Итак, давайте сделаем это с резьбой. Нам нужно лишь немного изменить метод findPrimes:

 public static Set<Integer> findPrimes(int maxPrimeTry)
  {
    return IntStream.rangeClosed(2, maxPrimeTry)
                    .parallel()
                    .map(i -> IntStream.rangeClosed(2,
                                          (int) (Math.sqrt(i)))
                    .filter(j -> i / j * j == i).map(j -> 0)
                    .findAny().orElse(i))
                    .filter(i -> i != 0)
                    .mapToObj(i -> Integer.valueOf(i))
                    .collect(Collectors.toSet());
  }

На этот раз нам не нужно возиться с алгоритмом. Просто добавив промежуточную стадию parallel () в поток, мы заставим его разделить работу. Parallel (), как и filter и map, является промежуточной операцией. Промежуточные операции также могут изменить поведение потока, а также повлиять на передаваемые значения. Другие промежуточные стадии, которые мы еще не видели:

  • sequential () — сделать поток последовательным
  • different () — только отдельные значения проходят
  • sorted () — отсортированный поток возвращается, при желании мы можем передать Comparator
  • unordered () — возвращает неупорядоченный поток

Если мы запустим jconsole во время работы и посмотрим на вкладку Threads, мы сможем сравнить последовательную и параллельную версии. В параллельной версии мы видим несколько потоков ForkJoin, выполняющих эту работу.

Я сделал несколько таймингов и получил следующие результаты [обратите внимание, что это не совсем точно, так как другие задачи могли выполняться в фоновом режиме на моем компьютере — значения с точностью до полсекунды].

  • Внешний, последовательный (для цикла): 8,5 секунд
  • Внешний, параллельный (ForkJoin): 2,5 секунды
  • Внутренний, последовательный (последовательный поток): 21 секунда
  • Внутренний, параллельный (параллельный поток): 6 секунд

Это, вероятно, как и ожидалось. Объем работы на итерацию во внутреннем цикле невелик, поэтому любые потоковые действия будут иметь относительно высокие издержки, как видно в версии с последовательным потоком, а также нам пришлось сохранять значение независимо от того, было ли оно простым или нет. Параллельный поток поступает немного быстрее, чем цикл for, но версия ForkJoin превосходит его в два с лишним раза. Обратите внимание, насколько проще была версия потоков [когда мы, конечно, получили представление о потоках) по сравнению с количеством код в версии ForkJoin.

Давайте посмотрим на рабочую лошадку этого рабочего распределения, Spliterator. Spliterator является интерфейсом, подобным Iterator, но вместо простого предоставления следующего значения он также может разделить работу на более мелкие части, которые выполняются ForkJoinTasks.

Когда мы создаем Spliterator, мы предоставляем подробную информацию о размере рабочей нагрузки и характеристиках, которые имеют значения. В некоторых типах Spliterator, таких как RangeIntSpliterator [который предоставляет IntRange], для получения характеристик используется метод характеристика (), а не их предоставление через конструктор, как это делает AbstractSpliterator.

Нам, очевидно, нужен размер рабочей нагрузки, чтобы мы могли разделить работу и знать, когда прекратить деление. Характеристики, которые мы можем предоставить, определяются в интерфейсе Spliterator следующим образом:

SIZED  — мы можем предоставить определенное количество значений, которые будут отправлены до обработки (в отличие от InfiniteSupplyingSpliterator)
SUBSIZED  — подразумевает, что любые Spliterators, которые создает trySplit (), будут SIZED и SUBSIZED. Не все РАЗРАБОТАННЫЕ РАЗДЕЛИТЕЛИ будут разделены на РАЗМЕЩЕННЫЕ разделители. API дает пример бинарного дерева, где мы можем знать, сколько элементов в дереве, но не в поддеревьях.
ORDERED  — мы предоставляем значения в последовательности, например, из списка
SORTED  — порядок следует за порядком сортировки. (а не последовательность); ORDERED также должен быть установлен
DISTINCT  — каждое значение отличается от любого другого, например, если мы поставляем из набора
NONNULL — значения, поступающие из источника, не будут нулевыми.
IMMUTABLE  — невозможно изменить источник (например, добавить или удалить значения) — если это не задано и не является CONCURRENT, мы советуем проверить документацию на предмет того, что происходит при модификации ( например, ConcurrentModificationException)
CONCURRENT  — источник может быть одновременно безопасно изменен, и мы рекомендуем проверить документацию по политике

Эти характеристики используются механизмом разделения, например, в классе ForEachOps (который используется для выполнения задач в конвейере, оканчивающемся на forEach). Обычно мы можем просто использовать предварительно созданный Spliterator [и часто даже не нужно беспокоиться об этом, потому что он предоставляется методом stream ()]. Помните, что структура потоков позволяет нам выполнять работу, не зная всех деталей ее выполнения. Мы должны волноваться только в тех редких случаях, когда возникает особая проблема или требуется максимальная производительность.

Разделение выполняется с помощью операции trySplit (). Это возвращает новый Spliterator. Для требований этой функции следует обратиться к документации API.

Когда мы используем содержимое части потока, используя Spliterator, вызывается операция forEachRemaining (action). Это берет исходные данные и вызывает следующее действие с помощью действия принять вызов. Например, если следующей операцией является filter, вызывается вызов accept для фильтра. Это вызывает тестовый метод содержащегося предиката, и если это так, вызывается метод accept следующего этапа. В какой-то момент конечный этап будет вызван [метод accept не вызывает никакого другого этапа], и окончательное значение будет использовано, уменьшено или собрано. Когда мы вызываем метод stream (), этот конвейер создается, и вызов промежуточных этапов связывает их до конца конвейера. Вызов финальной стадии потребления делает последнюю ссылку и отключает все.

В качестве альтернативы, когда нам нужно сгенерировать каждый элемент из не массового источника, используется функция tryAdvance (). Это переданное действие, которое вызывается, как и прежде. Однако мы возвращаем true, если мы хотим продолжить, и false, если мы этого не делаем. Например, InfiniteSupplyingSpliterator всегда возвращает true, но мы можем использовать AbstractSpliterator, если хотим это контролировать. Помните AbstractIntSpliterator из нашей SixGame в статье о конечных генераторах? Одна из наших функций tryAdvance была такой:

@Override
public boolean tryAdvance(Consumer action)
{
  if (action == null)
    throw new NullPointerException();
  if (done)
    return false;

  action.accept(rollDie());

  return true;
}

В этом случае, если мы бросаем кубик, мы всегда продолжаем. Это позволило бы установить готовую логику из другого места, если бы мы не хотели снова бросать кубик. Возможно, было бы немного лучше вернуться! Сделано вместо истины, чтобы немедленно прекратить генерацию, как только шестерка была брошена. Однако в этом случае прохождение другого цикла вряд ли было делом тяжелым.

Вот и все для обзора потоков. В следующей статье мы подробнее рассмотрим лямбда-выражения.