Java 8 была — как всегда — выпуском компромиссов и обратной совместимости. Выпуск, в котором экспертная группа JSR-335, возможно, не согласилась с объемом или осуществимостью определенных функций с некоторыми из аудитории . Посмотрите некоторые конкретные объяснения Брайана Гетца о том, почему …
- … «Final» не допускается в методах по умолчанию в Java 8
- … «Синхронизированный» не разрешен в методах Java 8 по умолчанию
Но сегодня мы собираемся сосредоточиться на «недостатках» Streams API, или, как сказал бы Брайан Гетц: вещи вне сферы видимости с учетом целей дизайна.
Параллельные потоки?
Параллельные вычисления — это сложно, и раньше это было больно. Людям не очень понравился новый (теперь уже старый) API Fork / Join , когда он впервые был поставлен с Java 7. И наоборот, Stream.parallel() и краткость вызова Stream.parallel() непобедимы.
Но многим людям на самом деле не нужны параллельные вычисления (не путать с многопоточностью!). В 95% всех случаев люди, вероятно, предпочли бы более мощный API-интерфейс Streams или, возможно, более мощный API-интерфейс Collections с множеством удивительных методов для различных подтипов Iterable .
Однако Iterable опасно. Даже простое преобразование Iterable в Stream помощью потенциального метода Iterable.stream() похоже, рискует открыть ящик Пандоры! ,
Последовательные потоки!
Так что, если JDK не отправит его, мы создадим его сами!
Потоки довольно крутые как таковые. Они потенциально бесконечны, и это крутая особенность. В основном — и особенно в случае функционального программирования — размер коллекции не имеет большого значения, поскольку мы преобразуем элемент за элементом с помощью функций.
Если мы допустим, что потоки являются чисто последовательными, то у нас может быть любой из этих довольно крутых методов (некоторые из которых также возможны с параллельными потоками):
-
cycle()— гарантированный способ сделать каждый поток бесконечным -
duplicate()— дублирует поток в два эквивалентных потока -
foldLeft()— последовательная и неассоциативная альтернативаfoldLeft()reduce() -
foldRight()— последовательная и неассоциативная альтернативаreduce() -
limitUntil()— ограничить поток теми записями перед первой, чтобы удовлетворить предикат -
limitWhile()— ограничить поток теми записями перед первой, чтобы не удовлетворить предикат -
maxBy()— уменьшить поток до максимального отображаемого значения -
minBy()— уменьшить поток до минимального отображаемого значения -
partition()— разделить поток на два потока, один из которых удовлетворяет предикату, а другой не удовлетворяет одному и тому же предикату -
reverse()— создать новый поток в обратном порядке -
skipUntil()— пропустить записи, пока предикат не будет удовлетворен -
skipWhile()— пропустить записи, если предикат удовлетворен -
slice()— взять часть потока, т.е. объединитьskip()иlimit() -
splitAt()— разделить поток на два потока в заданной позиции -
unzip()— разбить поток пар на два потока -
zip()— объединить два потока в один поток пар -
zipWithIndex()— объединить поток с соответствующим ему потоком индексов в один поток пар
Новый тип Seq от jOOλ

Все вышеперечисленное является частью jOOλ. jOOλ (произносится как «драгоценность», или «dju-lambda», также пишется jOOL в URL-адресах и т. д.) — это лицензированная библиотека ASL 2.0, которая возникла из наших собственных потребностей в разработке при реализации интеграционных тестов jOOQ с Java 8. Java 8 исключительно хороша. подходит для написания тестов, которые рассуждают о множествах, кортежах, записях и всех вещах SQL
Но API-интерфейс Streams немного кажется недостаточным, поэтому мы обернули потоки JDK в наш собственный тип Seq (Seq для последовательности / последовательного потока):
|
1
2
3
4
5
|
// Wrap a stream in a sequenceSeq<Integer> seq1 = seq(Stream.of(1, 2, 3));// Or create a sequence directly from valuesSeq<Integer> seq2 = Seq.of(1, 2, 3); |
Мы сделали Seq новым интерфейсом, расширяющим интерфейс JDK Stream , так что вы можете полностью использовать Seq с другими API-интерфейсами Java, оставив существующие методы без изменений:
|
1
2
3
4
5
6
7
8
9
|
public interface Seq<T> extends Stream<T> { /** * The underlying {@link Stream} implementation. */ Stream<T> stream(); // [...]} |
Теперь функциональное программирование — это только половина удовольствия, если у вас нет кортежей. К сожалению, в Java нет встроенных кортежей, и хотя создать библиотеку кортежей с помощью универсальных шаблонов легко, кортежи все еще являются синтаксическими гражданами второго сорта, сравнивая, например , Java с Scala или C # и даже VB.NET .
Тем не менее …
У JOOλ также есть кортежи
Мы запустили генератор кода для создания кортежей степени 1-8 (мы могли бы добавить еще в будущем, например, чтобы соответствовать «магическому» уровню 22 Scala и jOOQ ).
И если в библиотеке есть такие кортежи, ей также необходимы соответствующие функции. Суть этих TupleN и FunctionN TupleN к следующему:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
public class Tuple3<T1, T2, T3>implements Tuple, Comparable<Tuple3<T1, T2, T3>>, Serializable, Cloneable { public final T1 v1; public final T2 v2; public final T3 v3; // [...]} |
и
|
1
2
3
4
5
6
7
8
9
|
@FunctionalInterfacepublic interface Function3<T1, T2, T3, R> { default R apply(Tuple3<T1, T2, T3> args) { return apply(args.v1, args.v2, args.v3); } R apply(T1 v1, T2 v2, T3 v3);} |
В типах Tuple есть еще много функций, но давайте их оставим на сегодня.
Кстати, недавно у меня была интересная дискуссия с Гэвином Кингом (создателем Hibernate) о reddit . С точки зрения ORM, Java-классы кажутся подходящей реализацией для SQL / реляционных кортежей, и это действительно так. С точки зрения ORM.
Но классы и кортежи принципиально отличаются, что является очень тонкой проблемой для большинства ORM — например, как объяснил здесь Влад Михалча .
Кроме того, представление SQL о выражениях значений строк (т. Е. Кортежей) весьма отличается от того, что можно смоделировать с помощью классов Java. Эта тема будет освещена в следующем сообщении в блоге.
Некоторые примеры из jOOλ
Имея в виду вышеупомянутые цели, давайте посмотрим, как приведенный выше API может работать на примере:
Архивирование
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
// (tuple(1, "a"), tuple(2, "b"), tuple(3, "c"))Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c"));// ("1:a", "2:b", "3:c")Seq.of(1, 2, 3).zip( Seq.of("a", "b", "c"), (x, y) -> x + ":" + y);// (tuple("a", 0), tuple("b", 1), tuple("c", 2))Seq.of("a", "b", "c").zipWithIndex();// tuple((1, 2, 3), (a, b, c))Seq.unzip(Seq.of( tuple(1, "a"), tuple(2, "b"), tuple(3, "c"))); |
Это уже тот случай, когда кортежи стали очень удобными. Когда мы «упаковываем» два потока в один, нам нужен тип значения оболочки, который объединяет оба значения. Классически, люди могли использовать Object[] для быстрых и грязных решений, но массив не указывает типы атрибутов или степень.
К сожалению, компилятор Java не может определить эффективную границу типа <T> в Seq<T> . Вот почему у нас может быть только статический метод unzip() (вместо экземпляра), сигнатура которого выглядит следующим образом:
|
1
2
3
4
5
6
7
8
|
// This worksstatic <T1, T2> Tuple2<Seq<T1>, Seq<T2>> unzip(Stream<Tuple2<T1, T2>> stream) { ... } // This doesn't work:interface Seq<T> extends Stream<T> { Tuple2<Seq<???>, Seq<???>> unzip();} |
Пропуск и ограничение
|
01
02
03
04
05
06
07
08
09
10
11
|
// (3, 4, 5)Seq.of(1, 2, 3, 4, 5).skipWhile(i -> i < 3);// (3, 4, 5)Seq.of(1, 2, 3, 4, 5).skipUntil(i -> i == 3);// (1, 2)Seq.of(1, 2, 3, 4, 5).limitWhile(i -> i < 3);// (1, 2)Seq.of(1, 2, 3, 4, 5).limitUntil(i -> i == 3); |
Другие функциональные библиотеки, вероятно, используют термины, отличные от skip (например, drop) и limit (например, take). В конце концов, это не имеет значения. Мы выбрали термины, которые уже присутствуют в существующем Stream API: Stream.skip() и Stream.limit()
Раскладной
|
1
2
3
4
5
|
// "abc"Seq.of("a", "b", "c").foldLeft("", (u, t) -> t + u);// "cba"Seq.of("a", "b", "c").foldRight("", (t, u) -> t + u); |
Stream.reduce() предназначены для распараллеливания. Это означает, что переданные ему функции должны иметь следующие важные атрибуты:
Но иногда вы действительно хотите «уменьшить» поток с помощью функций, которые не имеют вышеуказанных атрибутов, и, следовательно, вам, вероятно, не важно, чтобы это сокращение было параллелизуемым. Это где «складывание» приходит.
Хорошее объяснение различных различий между уменьшением и складыванием (в Scala) можно увидеть здесь .
расщепляющий
|
1
2
3
4
5
6
7
8
|
// tuple((1, 2, 3), (1, 2, 3))Seq.of(1, 2, 3).duplicate();// tuple((1, 3, 5), (2, 4, 6))Seq.of(1, 2, 3, 4, 5, 6).partition(i -> i % 2 != 0)// tuple((1, 2), (3, 4, 5))Seq.of(1, 2, 3, 4, 5).splitAt(2); |
Все вышеперечисленные функции имеют одну общую черту: они работают в одном потоке для создания двух новых потоков, которые могут использоваться независимо.
Очевидно, это означает, что внутренне некоторая память должна быть использована для хранения буферов частично потребляемых потоков. Например
- дублирование должно отслеживать все значения, которые были использованы в одном потоке, но не в другом
- разделение должно быстро перейти к следующему значению, которое удовлетворяет (или не удовлетворяет) предикату, не теряя все пропущенные значения
- разделение может потребоваться для быстрой перемотки к индексу разделения
Для реального функционального удовольствия давайте взглянем на возможную реализацию splitAt() :
|
01
02
03
04
05
06
07
08
09
10
|
static <T> Tuple2<Seq<T>, Seq<T>> splitAt(Stream<T> stream, long position) { return seq(stream) .zipWithIndex() .partition(t -> t.v2 < position) .map((v1, v2) -> tuple( v1.map(t -> t.v1), v2.map(t -> t.v1) ));} |
… или с комментариями:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
static <T> Tuple2<Seq<T>, Seq<T>> splitAt(Stream<T> stream, long position) { // Add jOOλ functionality to the stream // -> local Type: Seq<T> return seq(stream) // Keep track of stream positions // with each element in the stream // -> local Type: Seq<Tuple2<T, Long>> .zipWithIndex() // Split the streams at position // -> local Type: Tuple2<Seq<Tuple2<T, Long>>, // Seq<Tuple2<T, Long>>> .partition(t -> t.v2 < position) // Remove the indexes from zipWithIndex again // -> local Type: Tuple2<Seq<T>, Seq<T>> .map((v1, v2) -> tuple( v1.map(t -> t.v1), v2.map(t -> t.v1) ));} |
Хорошо, не правда ли? Возможная реализация partition() , с другой стороны, немного сложнее. Здесь тривиально с помощью Iterator вместо нового Spliterator :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
static <T> Tuple2<Seq<T>, Seq<T>> partition( Stream<T> stream, Predicate<? super T> predicate) { final Iterator<T> it = stream.iterator(); final LinkedList<T> buffer1 = new LinkedList<>(); final LinkedList<T> buffer2 = new LinkedList<>(); class Partition implements Iterator<T> { final boolean b; Partition(boolean b) { this.b = b; } void fetch() { while (buffer(b).isEmpty() && it.hasNext()) { T next = it.next(); buffer(predicate.test(next)).offer(next); } } LinkedList<T> buffer(boolean test) { return test ? buffer1 : buffer2; } @Override public boolean hasNext() { fetch(); return !buffer(b).isEmpty(); } @Override public T next() { return buffer(b).poll(); } } return tuple( seq(new Partition(true)), seq(new Partition(false)) );} |
Я позволю вам выполнить упражнение и проверить приведенный выше код.
Получить и внести свой вклад в jOOλ, сейчас!

Тем не менее, мы считаем, что все, чего не хватает в Streams API Java 8, — это всего лишь пара методов, которые очень полезны для последовательных потоков.
В предыдущем посте мы показали, как мы можем приводить лямбда-выражения к SQL на основе строк, используя простую оболочку для JDBC ( конечно, мы по-прежнему считаем, что вместо нее следует использовать jOOQ ).
Сегодня мы показали, как можно легко написать потрясающую функциональную и последовательную потоковую обработку с помощью jOOλ.
Оставайтесь с нами для еще большего совершенства в ближайшем будущем (и, конечно, приветствуются запросы на извлечение!)
| Ссылка: | Когда Java 8 Streams API не достаточно от нашего партнера по JCG Лукаса Эдера из блога JAVA, SQL и JOOQ . |
