Статьи

Краткий обзор Fork / Join Framework в Java

Введение

Фреймворк Fork / Join — это фреймворк для решения проблемы с использованием параллельного подхода «разделяй и властвуй» Они были введены в дополнение к существующему API параллелизма. До их внедрения существующие реализации ExecutorService были популярным выбором для запуска асинхронных задач, но они лучше всего работают, когда задачи однородны и независимы. Запуск зависимых задач и объединение их результатов с использованием этих реализаций было непростым делом. С введением инфраструктуры Fork / Join была сделана попытка устранить этот недостаток. В этой статье мы кратко рассмотрим API и решим пару простых проблем, чтобы понять, как они работают.

Решение неблокирующей задачи

Давайте сразу перейдем к коду. Давайте создадим задачу, которая будет возвращать сумму всех элементов списка. Следующие шаги представляют наш алгоритм в псевдокоде:

01. Найти средний индекс списка

02. Разделите список посередине

03. Рекурсивно создайте новую задачу, которая будет вычислять сумму левой части

04. Рекурсивно создайте новое задание, которое вычислит сумму правой части.

05. Добавьте результат левой суммы, среднего элемента и правой суммы

Вот код —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class ListSummer extends RecursiveTask<Integer> {
  private final List<Integer> listToSum;
 
  ListSummer(List<Integer> listToSum) {
    this.listToSum = listToSum;
  }
 
  @Override
  protected Integer compute() {
    if (listToSum.isEmpty()) {
      log.info("Found empty list, sum is 0");
      return 0;
    }
 
    int middleIndex = listToSum.size() / 2;
    log.info("List {}, middle Index: {}", listToSum, middleIndex);
 
    List<Integer> leftSublist = listToSum.subList(0, middleIndex);
    List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());
 
    ListSummer leftSummer = new ListSummer(leftSublist);
    ListSummer rightSummer = new ListSummer(rightSublist);
 
    leftSummer.fork();
    rightSummer.fork();
 
    Integer leftSum = leftSummer.join();
    Integer rightSum = rightSummer.join();
    int total = leftSum + listToSum.get(middleIndex) + rightSum;
    log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);
 
    return total;
  }
}

Во-первых, мы расширяем подтип RecursiveTask объекта ForkJoinTask . Этот тип расширяется, когда мы ожидаем, что наша параллельная задача вернет результат. Когда задача не возвращает результат, а только выполняет эффект, мы расширяем подтип RecursiveAction . Для большинства практических задач, которые мы решаем, этих двух подтипов достаточно.

Во-вторых, и RecursiveTask, и RecursiveAction определяют абстрактный метод вычисления. Это где мы ставим наши вычисления.

В-третьих, внутри нашего метода вычисления мы проверяем размер списка, который передается через конструктор. Если оно пустое, мы уже знаем результат суммы, равной нулю, и сразу возвращаемся. В противном случае мы разделяем наши списки на два подсписка и создаем два экземпляра нашего типа ListSummer. Затем мы вызываем метод fork () (определенный в ForkJoinTask) для этих двух экземпляров:

1
2
leftSummer.fork();
rightSummer.fork();

Что приводит к тому, что эти задачи планируются для асинхронного выполнения, точный механизм, который используется для этой цели, будет объяснен позже в этом посте.

После этого мы вызываем метод join () (также определенный в ForkJoinTask), чтобы дождаться результата этих двух частей.

1
2
Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();

Которые затем суммируются со средним элементом списка, чтобы получить окончательный результат.

Для облегчения понимания примера добавлено множество сообщений журнала. Тем не менее, когда мы обрабатываем список, содержащий тысячи записей, возможно, не стоит иметь такую ​​подробную запись в журнал, особенно запись всего списка.

Вот и все. Давайте создадим тестовый класс сейчас для тестового запуска —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ListSummerTest {
 
  @Test
  public void shouldSumEmptyList() {
    ListSummer summer = new ListSummer(List.of());
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.submit(summer);
 
    int result = summer.join();
 
    assertThat(result).isZero();
  }
 
  @Test
  public void shouldSumListWithOneElement() {
    ListSummer summer = new ListSummer(List.of(5));
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.submit(summer);
 
    int result = summer.join();
 
    assertThat(result).isEqualTo(5);
  }
 
  @Test
  public void shouldSumListWithMultipleElements() {
    ListSummer summer = new ListSummer(List.of(
        1, 2, 3, 4, 5, 6, 7, 8, 9
    ));
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.submit(summer);
 
    int result = summer.join();
 
    assertThat(result).isEqualTo(45);
  }
}

В тесте мы создаем экземпляр ForkJoinPool . ForkJoinPool — это уникальная реализация ExecutorService для запуска ForkJoinTasks. Он использует специальный алгоритм, известный как алгоритм кражи работы. В отличие от других реализаций ExecutorService, где существует только одна очередь, содержащая все задачи, которые должны быть выполнены, в реализации с кражей работы каждый рабочий поток получает свою рабочую очередь. Каждый поток начинает выполнять задачи из своей очереди.

Когда мы обнаруживаем, что ForkJoinTask можно разбить на несколько меньших подзадач, мы разбиваем их на более мелкие задачи и затем вызываем метод fork () для этих задач. Этот вызов приводит к тому, что подзадачи помещаются в очередь выполняющегося потока. Во время выполнения, когда один поток исчерпывает свою очередь / не имеет задач для выполнения, он может «красть» задачи из очереди другого потока (отсюда и название «кража работы»). Это поведение кражи — то, что приводит к лучшей пропускной способности, чем использование любых других реализаций ExecutorService.

Ранее, когда мы вызывали fork () в наших экземплярах задач leftSummer и rightSummer, они помещались в рабочую очередь исполняющего потока, после чего их «крали» другие активные потоки в пуле (и т. Д.), Так как они это делали больше ничего не нужно делать в этот момент.

Довольно круто, правда?

Решение задачи блокировки

Проблема, которую мы решили только сейчас, носит неблокирующий характер. Если мы хотим решить проблему, которая выполняет некоторую блокирующую операцию, то для повышения пропускной способности нам необходимо изменить нашу стратегию.

Давайте рассмотрим это на другом примере. Допустим, мы хотим создать очень простой веб-сканер. Этот сканер получит список HTTP-ссылок, выполнит GET-запросы для извлечения тел ответа, а затем вычислит длину ответа. Вот код —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {
  private final List<String> links;
 
  ResponseLengthCalculator(List<String> links) {
    this.links = links;
  }
 
  @Override
  protected Map<String, Integer> compute() {
    if (links.isEmpty()) {
      log.info("No more links to fetch");
      return Collections.emptyMap();
    }
 
    int middle = links.size() / 2;
    log.info("Middle index: {}", links, middle);
    ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));
    ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));
 
    log.info("Forking left partition");
    leftPartition.fork();
    log.info("Left partition forked, now forking right partition");
    rightPartition.fork();
    log.info("Right partition forked");
 
    String middleLink = links.get(middle);
    HttpRequester httpRequester = new HttpRequester(middleLink);
    String response;
    try {
      log.info("Calling managedBlock for {}", middleLink);
      ForkJoinPool.managedBlock(httpRequester);
      response = httpRequester.response;
    } catch (InterruptedException ex) {
      log.error("Error occurred while trying to implement blocking link fetcher", ex);
      response = "";
    }
 
    Map<String, Integer> responseMap = new HashMap<>(links.size());
 
    Map<String, Integer> leftLinks = leftPartition.join();
    responseMap.putAll(leftLinks);
    responseMap.put(middleLink, response.length());
    Map<String, Integer> rightLinks = rightPartition.join();
    responseMap.putAll(rightLinks);
 
    log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);
 
    return responseMap;
  }
 
  private static class HttpRequester implements ForkJoinPool.ManagedBlocker {
    private final String link;
    private String response;
 
    private HttpRequester(String link) {
      this.link = link;
    }
 
    @Override
    public boolean block() {
      HttpGet headRequest = new HttpGet(link);
      CloseableHttpClient client = HttpClientBuilder
          .create()
          .disableRedirectHandling()
          .build();
      try {
        log.info("Executing blocking request for {}", link);
        CloseableHttpResponse response = client.execute(headRequest);
        log.info("HTTP request for link {} has been executed", link);
        this.response = EntityUtils.toString(response.getEntity());
      } catch (IOException e) {
        log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());
        this.response = "";
      }
      return true;
    }
 
    @Override
    public boolean isReleasable() {
      return false;
    }
  }
}

Мы создаем реализацию ForkJoinPool.ManagedBlocker, куда мы помещаем блокирующий HTTP-вызов. Этот интерфейс определяет два метода — block () и isReleasable () . Метод block () — это место, где мы помещаем наш блокирующий вызов. После того, как мы закончим нашу операцию блокировки, мы возвращаем true, указывая, что дальнейшая блокировка не требуется. Мы возвращаем false из реализации isReleasable (), чтобы указать рабочему потоку fork-join, что реализация метода block () потенциально блокирует по своей природе. Реализация isReleasable () будет вызвана рабочим потоком fork-join прежде, чем он вызовет метод block (). Наконец, мы отправляем наш экземпляр HttpRequester в наш пул, вызывая статический метод ForkJoinPool.managedBlock () . После этого наша задача блокировки начнет выполняться. Когда он блокирует HTTP-запрос, метод ForkJoinPool.managedBlock () также организует активацию резервного потока, если это необходимо для обеспечения достаточного параллелизма.

Давайте возьмем эту реализацию для тест-драйва! Вот код —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ResponseLengthCalculatorTest {
 
  @Test
  public void shouldReturnEmptyMapForEmptyList() {
    ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());
    ForkJoinPool pool = new ForkJoinPool();
 
    pool.submit(responseLengthCalculator);
 
    Map<String, Integer> result = responseLengthCalculator.join();
    assertThat(result).isEmpty();
  }
 
  @Test
  public void shouldHandle200Ok() {
    ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
        "http://httpstat.us/200"
    ));
    ForkJoinPool pool = new ForkJoinPool();
 
    pool.submit(responseLengthCalculator);
 
    Map<String, Integer> result = responseLengthCalculator.join();
    assertThat(result)
        .hasSize(1)
        .containsKeys("http://httpstat.us/200")
        .containsValue(0);
  }
 
  @Test
  public void shouldFetchResponseForDifferentResponseStatus() {
    ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
        "http://httpstat.us/200",
        "http://httpstat.us/302",
        "http://httpstat.us/404",
        "http://httpstat.us/502"
    ));
    ForkJoinPool pool = new ForkJoinPool();
 
    pool.submit(responseLengthCalculator);
 
    Map<String, Integer> result = responseLengthCalculator.join();
    assertThat(result)
        .hasSize(4);
  }
}

Вот и все на сегодня, ребята! Как всегда, любые отзывы / предложения по улучшению / комментарии высоко ценятся!

Все примеры, обсуждаемые здесь, можно найти на Github ( конкретный коммит ).

Огромный привет сервису http://httpstat.us , он очень помог при разработке простых тестов.

Опубликовано на Java Code Geeks с разрешения Саима Ахмеда, партнера нашей программы JCG . См. Оригинальную статью здесь: краткий обзор Fork / Join Framework в Java

Мнения, высказанные участниками Java Code Geeks, являются их собственными.