Продолжая с первой статьи , на этот раз мы напишем еще несколько полезных пользовательских коллекторов: для группировки по заданным критериям, выборочного ввода, пакетирования и скольжения с окном фиксированного размера.
Группировка (подсчет вхождений, гистограмма)
Представьте, что у вас есть коллекция некоторых предметов, и вы хотите вычислить, сколько раз каждый элемент (относительно equals()
) появляется в этой коллекции. Это может быть достигнуто с помощью CollectionUtils.getCardinalityMap()
Apache Commons Collections. Этот метод принимает Iterable<T>
и возвращает Map<T, Integer>
, считая, сколько раз каждый элемент появился в коллекции. Однако иногда вместо использования equals()
мы хотели бы сгруппировать по произвольному атрибуту ввода T
. Например, скажем, у нас есть список Person
объектов, и мы хотели бы вычислить количество мужчин против женщин (то есть Map<Sex, Integer>
) или, возможно, распределение по возрасту. Существует встроенный коллектор Collectors.groupingBy(Function<T, K> classifier)
— однако он возвращает карту от ключа ко всем элементам, сопоставленным с этим ключом. Увидеть:
import static java.util.stream.Collectors.groupingBy; //... final List<Person> people = //... final Map<Sex, List<Person>> bySex = people .stream() .collect(groupingBy(Person::getSex));
Это ценно, но в нашем случае излишне строит два List<Person>
. Я только хочу знать количество людей. Нет такого встроенного сборщика, но мы можем составить его довольно простым способом:
import static java.util.stream.Collectors.counting; import static java.util.stream.Collectors.groupingBy; //... final Map<Sex, Long> bySex = people .stream() .collect( groupingBy(Person::getSex, HashMap::new, counting()));
Эта перегруженная версия groupingBy()
принимает три параметра. Первая — это ключевая функция ( классификатор ), как и ранее. Второй аргумент создает новую карту, вскоре мы увидим, почему она полезна. counting()
является вложенным сборщиком, который собирает всех людей одного пола и объединяет их вместе — в нашем случае мы просто считаем их по прибытии. Возможность выбора реализации карты полезна, например, при построении гистограммы возраста. Мы хотели бы знать, сколько людей у нас в данном возрасте — но возрастные значения должны быть отсортированы:
final TreeMap<Integer, Long> byAge = people .stream() .collect( groupingBy(Person::getAge, TreeMap::new, counting())); byAge .forEach((age, count) -> System.out.println(age + ":\t" + count));
Мы закончили с TreeMap
возрастом (отсортировано) по количеству людей этого возраста.
Отбор проб, дозирование и раздвижное окно
IterableLike.sliding()
Метод в Scala позволяет просматривать коллекцию через скользящее окно фиксированного размера. Это окно начинается с начала и в каждой итерации перемещается на заданное количество элементов. Такая функциональность, отсутствующая в Java 8, позволяет нескольким полезным операторам, таким как вычисление скользящего среднего , разбивать большую коллекцию на партии (сравнивать с Lists.partition()
Guava ) или производить выборку каждого n-го элемента. Мы реализуем сборщик для Java 8, обеспечивающий подобное поведение. Давайте начнем с модульных тестов, которые должны кратко описать, чего мы хотим достичь:
import static com.nurkiewicz.CustomCollectors.sliding @Unroll class CustomCollectorsSpec extends Specification { def "Sliding window of #input with size #size and step of 1 is #output"() { expect: input.stream().collect(sliding(size)) == output where: input | size | output [] | 5 | [] [1] | 1 | [[1]] [1, 2] | 1 | [[1], [2]] [1, 2] | 2 | [[1, 2]] [1, 2] | 3 | [[1, 2]] 1..3 | 3 | [[1, 2, 3]] 1..4 | 2 | [[1, 2], [2, 3], [3, 4]] 1..4 | 3 | [[1, 2, 3], [2, 3, 4]] 1..7 | 3 | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]] 1..7 | 6 | [1..6, 2..7] } def "Sliding window of #input with size #size and no overlapping is #output"() { expect: input.stream().collect(sliding(size, size)) == output where: input | size | output [] | 5 | [] 1..3 | 2 | [[1, 2], [3]] 1..4 | 4 | [1..4] 1..4 | 5 | [1..4] 1..7 | 3 | [1..3, 4..6, [7]] 1..6 | 2 | [[1, 2], [3, 4], [5, 6]] } def "Sliding window of #input with size #size and some overlapping is #output"() { expect: input.stream().collect(sliding(size, 2)) == output where: input | size | output [] | 5 | [] 1..4 | 5 | [[1, 2, 3, 4]] 1..7 | 3 | [1..3, 3..5, 5..7] 1..6 | 4 | [1..4, 3..6] 1..9 | 4 | [1..4, 3..6, 5..8, 7..9] 1..10 | 4 | [1..4, 3..6, 5..8, 7..10] 1..11 | 4 | [1..4, 3..6, 5..8, 7..10, 9..11] } def "Sliding window of #input with size #size and gap of #gap is #output"() { expect: input.stream().collect(sliding(size, size + gap)) == output where: input | size | gap | output [] | 5 | 1 | [] 1..9 | 4 | 2 | [1..4, 7..9] 1..10 | 4 | 2 | [1..4, 7..10] 1..11 | 4 | 2 | [1..4, 7..10] 1..12 | 4 | 2 | [1..4, 7..10] 1..13 | 4 | 2 | [1..4, 7..10, [13]] 1..13 | 5 | 1 | [1..5, 7..11, [13]] 1..12 | 5 | 3 | [1..5, 9..12] 1..13 | 5 | 3 | [1..5, 9..13] } def "Sampling #input taking every #nth th element is #output"() { expect: input.stream().collect(sliding(1, nth)) == output where: input | nth | output [] | 1 | [] [] | 5 | [] 1..3 | 5 | [[1]] 1..6 | 2 | [[1], [3], [5]] 1..10 | 5 | [[1], [6]] 1..100 | 30 | [[1], [31], [61], [91]] } }
Using data driven tests in Spock I managed to write almost 40 test cases in no-time, succinctly describing all requirements. I hope these are clear for you, even if you haven’t seen this syntax before. I already assumed existence of handy factory methods:
public class CustomCollectors { public static <T> Collector<T, ?, List<List<T>>> sliding(int size) { return new SlidingCollector<>(size, 1); } public static <T> Collector<T, ?, List<List<T>>> sliding(int size, int step) { return new SlidingCollector<>(size, step); } }
The fact that collectors receive items one after another makes are job harder. Of course first collecting the whole list and sliding over it would have been easier, but sort of wasteful. Let’s build result iteratively. I am not even pretending this task can be parallelized in general, so I’ll leave combiner()
unimplemented:
public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> { private final int size; private final int step; private final int window; private final Queue<T> buffer = new ArrayDeque<>(); private int totalIn = 0; public SlidingCollector(int size, int step) { this.size = size; this.step = step; this.window = max(size, step); } @Override public Supplier<List<List<T>>> supplier() { return ArrayList::new; } @Override public BiConsumer<List<List<T>>, T> accumulator() { return (lists, t) -> { buffer.offer(t); ++totalIn; if (buffer.size() == window) { dumpCurrent(lists); shiftBy(step); } }; } @Override public Function<List<List<T>>, List<List<T>>> finisher() { return lists -> { if (!buffer.isEmpty()) { final int totalOut = estimateTotalOut(); if (totalOut > lists.size()) { dumpCurrent(lists); } } return lists; }; } private int estimateTotalOut() { return max(0, (totalIn + step - size - 1) / step) + 1; } private void dumpCurrent(List<List<T>> lists) { final List<T> batch = buffer.stream().limit(size).collect(toList()); lists.add(batch); } private void shiftBy(int by) { for (int i = 0; i < by; i++) { buffer.remove(); } } @Override public BinaryOperator<List<List<T>>> combiner() { return (l1, l2) -> { throw new UnsupportedOperationException("Combining not possible"); }; } @Override public Set<Characteristics> characteristics() { return EnumSet.noneOf(Characteristics.class); } }
I spent quite some time writing this implementation, especially correct finisher()
so don’t be frightened. The crucial part is a buffer
that collects items until it can form one sliding window. Then «oldest» items are discarded and window slides forward by step
. I am not particularly happy with this implementation, but tests are passing. sliding(N)
(synonym to sliding(N, 1)
) will allow calculating moving average of N
items.sliding(N, N)
splits input into batches of size N
. sliding(1, N)
takes every N-th element (samples). I hope you’ll find this collector useful, enjoy!