Статьи

Группировка, выборка и пакетирование — пользовательские сборщики в Java 8

Продолжая первую статью , на этот раз мы напишем еще несколько полезных пользовательских коллекторов: для группировки по заданным критериям, выборочного ввода, пакетирования и сдвига с окном фиксированного размера.

Группировка (подсчет вхождений, гистограмма)

Представьте, что у вас есть коллекция некоторых элементов, и вы хотите вычислить, сколько раз каждый элемент (относительно equals() ) появляется в этой коллекции. Это может быть достигнуто с помощью CollectionUtils.getCardinalityMap() из коллекций Apache Commons. Этот метод принимает Iterable<T> и возвращает Map<T, Integer> , считая, сколько раз каждый элемент появлялся в коллекции. Однако иногда вместо использования equals() мы хотели бы сгруппировать по произвольному атрибуту ввода T Например, скажем, у нас есть список объектов Person и мы хотели бы вычислить количество мужчин и женщин (т. Map<Sex, Integer> ) или, возможно, распределение по возрасту. Существует встроенный коллектор Collectors.groupingBy(Function<T, K> classifier) — однако он возвращает карту от ключа ко всем элементам, сопоставленным с этим ключом. Увидеть:

1
2
3
4
5
6
7
8
import static java.util.stream.Collectors.groupingBy;
 
//...
 
final List<Person> people = //...
final Map<Sex, List<Person>> bySex = people
        .stream()
        .collect(groupingBy(Person::getSex));

Это ценно, но в нашем случае излишне строит два List<Person> . Я только хочу знать количество людей. Нет такого встроенного сборщика, но мы можем составить его довольно простым способом:

1
2
3
4
5
6
7
8
9
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
 
//...
 
final Map<Sex, Long> bySex = people
        .stream()
        .collect(
                groupingBy(Person::getSex, HashMap::new, counting()));

Эта перегруженная версия groupingBy() принимает три параметра. Первая — это ключевая функция ( классификатор ), как и ранее. Второй аргумент создает новую карту, вскоре мы увидим, почему она полезна. counting() — это вложенный сборщик, который собирает всех людей одного пола и объединяет их вместе — в нашем случае мы просто считаем их по прибытии. Возможность выбора реализации карты полезна, например, при построении гистограммы возраста. Мы хотели бы знать, сколько людей у ​​нас в данном возрасте — но возрастные значения должны быть отсортированы:

1
2
3
4
5
6
7
8
final TreeMap<Integer, Long> byAge = people
    .stream()
    .collect(
            groupingBy(Person::getAge, TreeMap::new, counting()));
 
byAge
        .forEach((age, count) ->
                System.out.println(age + ":\t" + count));

В итоге мы TreeMap от возраста (отсортированного) до количества людей, имеющих этот возраст.

Отбор проб, дозирование и раздвижное окно

Метод IterableLike.sliding() в Scala позволяет просматривать коллекцию через скользящее окно фиксированного размера. Это окно начинается с начала и в каждой итерации перемещается на заданное количество элементов. Такая функциональность, отсутствующая в Java 8, позволяет нескольким полезным операторам, таким как вычисление скользящего среднего , разбиение большой коллекции на партии (сравните с Lists.partition() в Guava ) или выборку каждого n-го элемента. Мы реализуем сборщик для Java 8, обеспечивающий подобное поведение. Давайте начнем с модульных тестов, которые должны кратко описать, чего мы хотим достичь:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import static com.nurkiewicz.CustomCollectors.sliding
 
@Unroll
class CustomCollectorsSpec extends Specification {
 
    def "Sliding window of #input with size #size and step of 1 is #output"() {
        expect:
        input.stream().collect(sliding(size)) == output
 
        where:
        input  | size | output
        []     | 5    | []
        [1]    | 1    | [[1]]
        [1, 2] | 1    | [[1], [2]]
        [1, 2] | 2    | [[1, 2]]
        [1, 2] | 3    | [[1, 2]]
        1..3   | 3    | [[1, 2, 3]]
        1..4   | 2    | [[1, 2], [2, 3], [3, 4]]
        1..4   | 3    | [[1, 2, 3], [2, 3, 4]]
        1..7   | 3    | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
        1..7   | 6    | [1..6, 2..7]
    }
 
    def "Sliding window of #input with size #size and no overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, size)) == output
 
        where:
        input | size | output
        []    | 5    | []
        1..3  | 2    | [[1, 2], [3]]
        1..4  | 4    | [1..4]
        1..4  | 5    | [1..4]
        1..7  | 3    | [1..3, 4..6, [7]]
        1..6  | 2    | [[1, 2], [3, 4], [5, 6]]
    }
 
    def "Sliding window of #input with size #size and some overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, 2)) == output
 
        where:
        input | size | output
        []    | 5    | []
        1..4  | 5    | [[1, 2, 3, 4]]
        1..7  | 3    | [1..3, 3..5, 5..7]
        1..6  | 4    | [1..4, 3..6]
        1..9  | 4    | [1..4, 3..6, 5..8, 7..9]
        1..10 | 4    | [1..4, 3..6, 5..8, 7..10]
        1..11 | 4    | [1..4, 3..6, 5..8, 7..10, 9..11]
    }
 
    def "Sliding window of #input with size #size and gap of #gap is #output"() {
        expect:
        input.stream().collect(sliding(size, size + gap)) == output
 
        where:
        input | size | gap | output
        []    | 5    | 1   | []
        1..9  | 4    | 2   | [1..4, 7..9]
        1..10 | 4    | 2   | [1..4, 7..10]
        1..11 | 4    | 2   | [1..4, 7..10]
        1..12 | 4    | 2   | [1..4, 7..10]
        1..13 | 4    | 2   | [1..4, 7..10, [13]]
        1..13 | 5    | 1   | [1..5, 7..11, [13]]
        1..12 | 5    | 3   | [1..5, 9..12]
        1..13 | 5    | 3   | [1..5, 9..13]
    }
 
    def "Sampling #input taking every #nth th element is #output"() {
        expect:
        input.stream().collect(sliding(1, nth)) == output
 
        where:
        input  | nth | output
        []     | 1   | []
        []     | 5   | []
        1..3   | 5   | [[1]]
        1..6   | 2   | [[1], [3], [5]]
        1..10  | 5   | [[1], [6]]
        1..100 | 30  | [[1], [31], [61], [91]]
    }
}

Используя управляемые данными тесты в Споке, мне удалось написать почти 40 тестовых случаев в сжатые сроки, кратко описав все требования. Я надеюсь, что это понятно для вас, даже если вы не видели этот синтаксис раньше. Я уже предполагал существование удобных фабричных методов:

01
02
03
04
05
06
07
08
09
10
11
public class CustomCollectors {
 
    public static <T> Collector<T, ?, List<List<T>>> sliding(int size) {
        return new SlidingCollector<>(size, 1);
    }
 
    public static <T> Collector<T, ?, List<List<T>>> sliding(int size, int step) {
        return new SlidingCollector<>(size, step);
    }
 
}

Тот факт, что коллекционеры получают предметы один за другим, усложняет работу. Конечно, сначала собрать весь список и пролистать его было бы проще, но отчасти расточительно. Построим результат итеративно. Я даже не притворяюсь, что эту задачу можно распараллелить вообще, поэтому я оставлю combiner() реализованным:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {
 
    private final int size;
    private final int step;
    private final int window;
    private final Queue<T> buffer = new ArrayDeque<>();
    private int totalIn = 0;
 
    public SlidingCollector(int size, int step) {
        this.size = size;
        this.step = step;
        this.window = max(size, step);
    }
 
    @Override
    public Supplier<List<List<T>>> supplier() {
        return ArrayList::new;
    }
 
    @Override
    public BiConsumer<List<List<T>>, T> accumulator() {
        return (lists, t) -> {
            buffer.offer(t);
            ++totalIn;
            if (buffer.size() == window) {
                dumpCurrent(lists);
                shiftBy(step);
            }
        };
    }
 
    @Override
    public Function<List<List<T>>, List<List<T>>> finisher() {
        return lists -> {
            if (!buffer.isEmpty()) {
                final int totalOut = estimateTotalOut();
                if (totalOut > lists.size()) {
                    dumpCurrent(lists);
                }
            }
            return lists;
        };
    }
 
    private int estimateTotalOut() {
        return max(0, (totalIn + step - size - 1) / step) + 1;
    }
 
    private void dumpCurrent(List<List<T>> lists) {
        final List<T> batch = buffer.stream().limit(size).collect(toList());
        lists.add(batch);
    }
 
    private void shiftBy(int by) {
        for (int i = 0; i < by; i++) {
            buffer.remove();
        }
    }
 
    @Override
    public BinaryOperator<List<List<T>>> combiner() {
        return (l1, l2) -> {
            throw new UnsupportedOperationException("Combining not possible");
        };
    }
 
    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.noneOf(Characteristics.class);
    }
 
}

Я потратил довольно много времени на написание этой реализации, особенно правильной finisher() так что не пугайтесь. Важной частью является buffer который собирает элементы, пока он не может сформировать одно скользящее окно. Затем «самые старые» предметы отбрасываются, и окно скользит вперед step за step . Я не особенно доволен этой реализацией, но тесты проходят. sliding(N) (синоним к sliding(N, 1) ) позволит рассчитать скользящее среднее из N элементов. sliding(N, N) разбивает входные данные на партии размером N sliding(1, N) берет каждый N-й элемент (выборки). Я надеюсь, что вы найдете этот коллекционер полезным, наслаждайтесь!