Статьи

Java: Mergesort с использованием Fork / Join Framework

Цель этой записи — показать простой пример действия Fork / Join RecursiveAction, а не углубляться в возможные оптимизации сортировки слиянием или относительные преимущества использования Fork / Join Pool по сравнению с существующими реализациями на основе Java 6, такими как ExecutorService.

Ниже приведена типичная реализация алгоритма сортировки сверху вниз с помощью Java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.lang.reflect.Array;
 
public class MergeSort {
 public static <T extends Comparable<? super T>> void sort(T[] a) {
  @SuppressWarnings('unchecked')
  T[] helper = (T[])Array.newInstance(a[0].getClass() , a.length);
  mergesort(a, helper, 0, a.length-1);
 }
  
 private static <T extends Comparable<? super T>> void mergesort(T[] a, T[] helper, int lo, int hi){
  if (lo>=hi) return;
  int mid = lo + (hi-lo)/2;
  mergesort(a, helper, lo, mid);
  mergesort(a, helper, mid+1, hi);
  merge(a, helper, lo, mid, hi); 
 }
 
 private static <T extends Comparable<? super T>> void merge(T[] a, T[] helper, int lo, int mid, int hi){
  for (int i=lo;i<=hi;i++){
   helper[i]=a[i];
  }
  int i=lo,j=mid+1;
  for(int k=lo;k<=hi;k++){
   if (i>mid){
    a[k]=helper[j++];
   }else if (j>hi){
    a[k]=helper[i++];
   }else if(isLess(helper[i], helper[j])){
    a[k]=helper[i++];
   }else{
    a[k]=helper[j++];
   }
  }
 }
 
 private static <T extends Comparable<? super T>> boolean isLess(T a, T b) {
  return a.compareTo(b) < 0;
 }
}

Чтобы быстро описать алгоритм —
Следующие шаги выполняются рекурсивно:

  1. Входные данные делятся на 2 половины
  2. Каждая половина отсортирована
  3. Отсортированные данные затем объединяются

Сортировка слиянием является каноническим примером для реализации, использующей Java Fork / Join pool , и ниже приводится слепая реализация сортировки слиянием с использованием инфраструктуры Fork / Join:

Рекурсивная задача в сортировке слиянием может быть кратко выражена как реализация RecursiveAction

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private static class MergeSortTask<T extends Comparable<? super T>> extends RecursiveAction{
 private static final long serialVersionUID = -749935388568367268L;
 private final T[] a;
 private final T[] helper;
 private final int lo;
 private final int hi;
  
 public MergeSortTask(T[] a, T[] helper, int lo, int hi){
  this.a = a;
  this.helper = helper;
  this.lo = lo;
  this.hi = hi;
 }
 @Override
 protected void compute() {
  if (lo>=hi) return;
  int mid = lo + (hi-lo)/2;
  MergeSortTask<T> left = new MergeSortTask<>(a, helper, lo, mid);
  MergeSortTask<T> right = new MergeSortTask<>(a, helper, mid+1, hi);
  invokeAll(left, right);
  merge(this.a, this.helper, this.lo, mid, this.hi);
   
   
 }
 private void merge(T[] a, T[] helper, int lo, int mid, int hi){
  for (int i=lo;i<=hi;i++){
   helper[i]=a[i];
  }
  int i=lo,j=mid+1;
  for(int k=lo;k<=hi;k++){
   if (i>mid){
    a[k]=helper[j++];
   }else if (j>hi){
    a[k]=helper[i++];
   }else if(isLess(helper[i], helper[j])){
    a[k]=helper[i++];
   }else{
    a[k]=helper[j++];
   }
  }
 }
 private boolean isLess(T a, T b) {
  return a.compareTo(b) < 0;
 }
}

Выше MergeSortTask реализует метод вычисления, который принимает массив значений, разбивает его на две части, создает MergeSortTask из каждой из частей и выполняет еще две задачи (поэтому он называется RecursiveAction!). Специфическим API, используемым здесь для порождения задачи, является invokeAll, который возвращается только тогда, когда отправленные подзадачи помечены как выполненные. Поэтому, как только левая и правая подзадачи возвращаются, результат объединяется в процедуру слияния.

Учитывая это, единственной оставшейся работой является использование ForkJoinPool для отправки этой задачи. ForkJoinPool аналогичен ExecutorService, используемому для распределения задач в пуле потоков. Разница заключается в цитировании документации по API ForkJoinPool:

ForkJoinPool отличается от других видов ExecutorService в основном тем, что использует воровство работы: все потоки в пуле пытаются найти и выполнить подзадачи, созданные другими активными задачами (в конечном итоге блокируя ожидание работы, если ее не существует)

Вот как выглядит задача отправки задачи в Fork / Join Pool:

1
2
3
4
5
6
public static <T extends Comparable<? super T>> void sort(T[] a) {
 @SuppressWarnings('unchecked')
 T[] helper = (T[])Array.newInstance(a[0].getClass() , a.length);
 ForkJoinPool forkJoinPool = new ForkJoinPool(10);
 forkJoinPool.invoke(new MergeSortTask<T>(a, helper, 0, a.length-1));
}

Полный образец также доступен здесь: https://github.com/bijukunjummen/algos/blob/master/src/main/java/org/bk/algo/sort/algo04/merge/MergeSortForkJoin.java

Ссылка: Mergesort с использованием Fork / Join Framework от нашего партнера JCG Биджу Кунджуммен в блоге all and sundry.