Статьи

Когда Java 8 Streams API недостаточно

Java 8 была — как всегда — выпуском компромиссов и обратной совместимости. Выпуск, в котором экспертная группа JSR-335, возможно, не согласилась с объемом или осуществимостью определенных функций с некоторыми из аудитории . Посмотрите некоторые конкретные объяснения Брайана Гетца о том, почему …

Но сегодня мы собираемся сосредоточиться на «недостатках» Streams API, или, как сказал бы Брайан Гетц: вещи вне сферы видимости с учетом целей дизайна.

Параллельные потоки?

Параллельные вычисления — это сложно, и раньше это было больно. Людям не очень понравился новый (теперь уже старый) API Fork / Join , когда он впервые был поставлен с Java 7. И наоборот, Stream.parallel() и краткость вызова Stream.parallel() непобедимы.

Но многим людям на самом деле не нужны параллельные вычисления (не путать с многопоточностью!). В 95% всех случаев люди, вероятно, предпочли бы более мощный API-интерфейс Streams или, возможно, более мощный API-интерфейс Collections с множеством удивительных методов для различных подтипов Iterable .

Однако Iterable опасно. Даже простое преобразование Iterable в Stream помощью потенциального метода Iterable.stream() похоже, рискует открыть ящик Пандоры! ,

Последовательные потоки!

Так что, если JDK не отправит его, мы создадим его сами!

Потоки довольно крутые как таковые. Они потенциально бесконечны, и это крутая особенность. В основном — и особенно в случае функционального программирования — размер коллекции не имеет большого значения, поскольку мы преобразуем элемент за элементом с помощью функций.

Если мы допустим, что потоки являются чисто последовательными, то у нас может быть любой из этих довольно крутых методов (некоторые из которых также возможны с параллельными потоками):

  • cycle() — гарантированный способ сделать каждый поток бесконечным
  • duplicate() — дублирует поток в два эквивалентных потока
  • foldLeft() — последовательная и неассоциативная альтернатива foldLeft() reduce()
  • foldRight() — последовательная и неассоциативная альтернатива reduce()
  • limitUntil() — ограничить поток теми записями перед первой, чтобы удовлетворить предикат
  • limitWhile() — ограничить поток теми записями перед первой, чтобы не удовлетворить предикат
  • maxBy() — уменьшить поток до максимального отображаемого значения
  • minBy() — уменьшить поток до минимального отображаемого значения
  • partition() — разделить поток на два потока, один из которых удовлетворяет предикату, а другой не удовлетворяет одному и тому же предикату
  • reverse() — создать новый поток в обратном порядке
  • skipUntil() — пропустить записи, пока предикат не будет удовлетворен
  • skipWhile() — пропустить записи, если предикат удовлетворен
  • slice() — взять часть потока, т.е. объединить skip() и limit()
  • splitAt() — разделить поток на два потока в заданной позиции
  • unzip() — разбить поток пар на два потока
  • zip() — объединить два потока в один поток пар
  • zipWithIndex() — объединить поток с соответствующим ему потоком индексов в один поток пар

Новый тип Seq от jOOλ

jool-логотип-черный
Все вышеперечисленное является частью jOOλ. jOOλ (произносится как «драгоценность», или «dju-lambda», также пишется jOOL в URL-адресах и т. д.) — это лицензированная библиотека ASL 2.0, которая возникла из наших собственных потребностей в разработке при реализации интеграционных тестов jOOQ с Java 8. Java 8 исключительно хороша. подходит для написания тестов, которые рассуждают о множествах, кортежах, записях и всех вещах SQL

Но API-интерфейс Streams немного кажется недостаточным, поэтому мы обернули потоки JDK в наш собственный тип Seq (Seq для последовательности / последовательного потока):

1
2
3
4
5
// Wrap a stream in a sequence
Seq<Integer> seq1 = seq(Stream.of(1, 2, 3));
 
// Or create a sequence directly from values
Seq<Integer> seq2 = Seq.of(1, 2, 3);

Мы сделали Seq новым интерфейсом, расширяющим интерфейс JDK Stream , так что вы можете полностью использовать Seq с другими API-интерфейсами Java, оставив существующие методы без изменений:

1
2
3
4
5
6
7
8
9
public interface Seq<T> extends Stream<T> {
 
    /**
     * The underlying {@link Stream} implementation.
     */
    Stream<T> stream();
     
    // [...]
}

Теперь функциональное программирование — это только половина удовольствия, если у вас нет кортежей. К сожалению, в Java нет встроенных кортежей, и хотя создать библиотеку кортежей с помощью универсальных шаблонов легко, кортежи все еще являются синтаксическими гражданами второго сорта, сравнивая, например , Java с Scala или C # и даже VB.NET .

Тем не менее …

У JOOλ также есть кортежи

Мы запустили генератор кода для создания кортежей степени 1-8 (мы могли бы добавить еще в будущем, например, чтобы соответствовать «магическому» уровню 22 Scala и jOOQ ).

jooq-The-лучший способ к записи-SQL-в-Java-маленький

И если в библиотеке есть такие кортежи, ей также необходимы соответствующие функции. Суть этих TupleN и FunctionN TupleN к следующему:

01
02
03
04
05
06
07
08
09
10
11
12
public class Tuple3<T1, T2, T3>
implements
    Tuple,
    Comparable<Tuple3<T1, T2, T3>>,
    Serializable, Cloneable {
     
    public final T1 v1;
    public final T2 v2;
    public final T3 v3;
     
    // [...]
}

и

1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface Function3<T1, T2, T3, R> {
 
    default R apply(Tuple3<T1, T2, T3> args) {
        return apply(args.v1, args.v2, args.v3);
    }
 
    R apply(T1 v1, T2 v2, T3 v3);
}

В типах Tuple есть еще много функций, но давайте их оставим на сегодня.

Кстати, недавно у меня была интересная дискуссия с Гэвином Кингом (создателем Hibernate) о reddit . С точки зрения ORM, Java-классы кажутся подходящей реализацией для SQL / реляционных кортежей, и это действительно так. С точки зрения ORM.

Но классы и кортежи принципиально отличаются, что является очень тонкой проблемой для большинства ORM — например, как объяснил здесь Влад Михалча .

Кроме того, представление SQL о выражениях значений строк (т. Е. Кортежей) весьма отличается от того, что можно смоделировать с помощью классов Java. Эта тема будет освещена в следующем сообщении в блоге.

Некоторые примеры из jOOλ

Имея в виду вышеупомянутые цели, давайте посмотрим, как приведенный выше API может работать на примере:

Архивирование

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
// (tuple(1, "a"), tuple(2, "b"), tuple(3, "c"))
Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c"));
 
// ("1:a", "2:b", "3:c")
Seq.of(1, 2, 3).zip(
    Seq.of("a", "b", "c"),
    (x, y) -> x + ":" + y
);
 
// (tuple("a", 0), tuple("b", 1), tuple("c", 2))
Seq.of("a", "b", "c").zipWithIndex();
 
// tuple((1, 2, 3), (a, b, c))
Seq.unzip(Seq.of(
    tuple(1, "a"),
    tuple(2, "b"),
    tuple(3, "c")
));

Это уже тот случай, когда кортежи стали очень удобными. Когда мы «упаковываем» два потока в один, нам нужен тип значения оболочки, который объединяет оба значения. Классически, люди могли использовать Object[] для быстрых и грязных решений, но массив не указывает типы атрибутов или степень.

К сожалению, компилятор Java не может определить эффективную границу типа <T> в Seq<T> . Вот почему у нас может быть только статический метод unzip() (вместо экземпляра), сигнатура которого выглядит следующим образом:

1
2
3
4
5
6
7
8
// This works
static <T1, T2> Tuple2<Seq<T1>, Seq<T2>>
    unzip(Stream<Tuple2<T1, T2>> stream) { ... }
     
// This doesn't work:
interface Seq<T> extends Stream<T> {
    Tuple2<Seq<???>, Seq<???>> unzip();
}

Пропуск и ограничение

01
02
03
04
05
06
07
08
09
10
11
// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipWhile(i -> i < 3);
 
// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipUntil(i -> i == 3);
 
// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitWhile(i -> i < 3);
 
// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitUntil(i -> i == 3);

Другие функциональные библиотеки, вероятно, используют термины, отличные от skip (например, drop) и limit (например, take). В конце концов, это не имеет значения. Мы выбрали термины, которые уже присутствуют в существующем Stream API: Stream.skip() и Stream.limit()

Раскладной

1
2
3
4
5
// "abc"
Seq.of("a", "b", "c").foldLeft("", (u, t) -> t + u);
 
// "cba"
Seq.of("a", "b", "c").foldRight("", (t, u) -> t + u);

Stream.reduce() предназначены для распараллеливания. Это означает, что переданные ему функции должны иметь следующие важные атрибуты:

Но иногда вы действительно хотите «уменьшить» поток с помощью функций, которые не имеют вышеуказанных атрибутов, и, следовательно, вам, вероятно, не важно, чтобы это сокращение было параллелизуемым. Это где «складывание» приходит.

Хорошее объяснение различных различий между уменьшением и складыванием (в Scala) можно увидеть здесь .

расщепляющий

1
2
3
4
5
6
7
8
// tuple((1, 2, 3), (1, 2, 3))
Seq.of(1, 2, 3).duplicate();
 
// tuple((1, 3, 5), (2, 4, 6))
Seq.of(1, 2, 3, 4, 5, 6).partition(i -> i % 2 != 0)
 
// tuple((1, 2), (3, 4, 5))
Seq.of(1, 2, 3, 4, 5).splitAt(2);

Все вышеперечисленные функции имеют одну общую черту: они работают в одном потоке для создания двух новых потоков, которые могут использоваться независимо.

Очевидно, это означает, что внутренне некоторая память должна быть использована для хранения буферов частично потребляемых потоков. Например

  • дублирование должно отслеживать все значения, которые были использованы в одном потоке, но не в другом
  • разделение должно быстро перейти к следующему значению, которое удовлетворяет (или не удовлетворяет) предикату, не теряя все пропущенные значения
  • разделение может потребоваться для быстрой перемотки к индексу разделения

Для реального функционального удовольствия давайте взглянем на возможную реализацию splitAt() :

01
02
03
04
05
06
07
08
09
10
static <T> Tuple2<Seq<T>, Seq<T>>
splitAt(Stream<T> stream, long position) {
    return seq(stream)
          .zipWithIndex()
          .partition(t -> t.v2 < position)
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

… или с комментариями:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static <T> Tuple2<Seq<T>, Seq<T>>
splitAt(Stream<T> stream, long position) {
    // Add jOOλ functionality to the stream
    // -> local Type: Seq<T>
    return seq(stream)
     
    // Keep track of stream positions
    // with each element in the stream
    // -> local Type: Seq<Tuple2<T, Long>>
          .zipWithIndex()
       
    // Split the streams at position
    // -> local Type: Tuple2<Seq<Tuple2<T, Long>>,
    //                       Seq<Tuple2<T, Long>>>
          .partition(t -> t.v2 < position)
           
    // Remove the indexes from zipWithIndex again
    // -> local Type: Tuple2<Seq<T>, Seq<T>>
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

Хорошо, не правда ли? Возможная реализация partition() , с другой стороны, немного сложнее. Здесь тривиально с помощью Iterator вместо нового Spliterator :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static <T> Tuple2<Seq<T>, Seq<T>> partition(
        Stream<T> stream,
        Predicate<? super T> predicate
) {
    final Iterator<T> it = stream.iterator();
    final LinkedList<T> buffer1 = new LinkedList<>();
    final LinkedList<T> buffer2 = new LinkedList<>();
 
    class Partition implements Iterator<T> {
 
        final boolean b;
 
        Partition(boolean b) {
            this.b = b;
        }
 
        void fetch() {
            while (buffer(b).isEmpty() && it.hasNext()) {
                T next = it.next();
                buffer(predicate.test(next)).offer(next);
            }
        }
 
        LinkedList<T> buffer(boolean test) {
            return test ? buffer1 : buffer2;
        }
 
        @Override
        public boolean hasNext() {
            fetch();
            return !buffer(b).isEmpty();
        }
 
        @Override
        public T next() {
            return buffer(b).poll();
        }
    }
 
    return tuple(
        seq(new Partition(true)),
        seq(new Partition(false))
    );
}

Я позволю вам выполнить упражнение и проверить приведенный выше код.

Получить и внести свой вклад в jOOλ, сейчас!

jool-логотип-черный Все вышеперечисленное является частью jOOλ , доступного бесплатно на GitHub. Уже есть частично готовая к использованию Java-8 полноценная библиотека с названием functionsjava , которая идет намного дальше, чем jOOλ.

Тем не менее, мы считаем, что все, чего не хватает в Streams API Java 8, — это всего лишь пара методов, которые очень полезны для последовательных потоков.

В предыдущем посте мы показали, как мы можем приводить лямбда-выражения к SQL на основе строк, используя простую оболочку для JDBC ( конечно, мы по-прежнему считаем, что вместо нее следует использовать jOOQ ).

Сегодня мы показали, как можно легко написать потрясающую функциональную и последовательную потоковую обработку с помощью jOOλ.

Оставайтесь с нами для еще большего совершенства в ближайшем будущем (и, конечно, приветствуются запросы на извлечение!)

Ссылка: Когда Java 8 Streams API не достаточно от нашего партнера по JCG Лукаса Эдера из блога JAVA, SQL и JOOQ .