Статьи

Java 8 пятница: 10 незначительных ошибок при использовании API потоков

В Data Geekery мы любим Java. И так как мы действительно входим в свободный API jOOQ и запросы DSL , мы абсолютно взволнованы тем, что Java 8 принесет в нашу экосистему.

Ява 8 Пятница

Каждую пятницу мы показываем вам пару замечательных новых функций Java 8 в виде учебника, которые используют лямбда-выражения, методы расширения и другие замечательные вещи. Вы найдете исходный код на GitHub .

10 тонких ошибок при использовании API Streams

Мы сделали все списки ошибок SQL:

Но мы еще не попали в список 10 ошибок с Java 8! По сегодняшнему случаю ( сегодня пятница, 13-е ) мы узнаем, что пойдет не так в ВАШЕМ приложении, когда вы работаете с Java 8. (с нами этого не случится, так как мы застряли с Java 6 для другое время)

1. Случайное повторное использование потоков

Хочу поспорить, что это случится со всеми по крайней мере один раз. Как и существующие «потоки» (например InputStream), вы можете использовать потоки только один раз. Следующий код не будет работать:

IntStream stream = IntStream.of(1, 2);
stream.forEach(System.out::println);
// That was fun! Let's do it again!
stream.forEach(System.out::println);

Вы получите

java.lang.IllegalStateException: 
  stream has already been operated upon or closed

Так что будьте осторожны при использовании вашего потока. Это можно сделать только один раз

2. Случайное создание «бесконечных» потоков

Вы можете создавать бесконечные потоки довольно легко, не замечая этого. Возьмите следующий пример:

// Will run indefinitely
IntStream.iterate(0, i -> i + 1)
.forEach(System.out::println);

Весь смысл потоков заключается в том, что они могут быть бесконечными, если вы их спроектируете. Единственная проблема в том, что вы, возможно, не хотели этого. Поэтому обязательно всегда ставьте правильные ограничения:

// That's better
IntStream.iterate(0, i -> i + 1)
.limit(10)
.forEach(System.out::println);

3. Случайно создавая «тонкие» бесконечные потоки

Мы не можем сказать этого достаточно. Вы БУДЕТЕ в конечном итоге создать бесконечный поток, случайно. Возьмите следующий поток, например:

IntStream.iterate(0, i -> ( i + 1) % 2)
.distinct()
.limit(10)
.forEach(System.out::println);

Так…

  • мы генерируем чередующиеся 0 и 1
  • тогда мы сохраняем только разные значения, то есть один 0 и один 1
  • тогда мы ограничим поток размером 10
  • тогда мы потребляем это

Хорошо … distinct()операция не знает, что функция, предоставленная iterate()методу, выдаст только два различных значения. Можно ожидать большего. Таким образом, он будет всегда потреблять новые значения из потока, и limit(10)он никогда не будет достигнут. Не повезло, ваше приложение глохнет.

4. Случайно создавая «тонкие» параллельные бесконечные потоки

Мы действительно должны настаивать на том, что вы можете случайно попытаться использовать бесконечный поток. Предположим, вы считаете, что distinct()операция должна выполняться параллельно. Вы могли бы написать это:

IntStream.iterate(0, i -> ( i + 1) % 2)
.parallel()
.distinct()
.limit(10)
.forEach(System.out::println);

Теперь мы уже видели, что это будет навсегда. Но ранее, по крайней мере, вы использовали только один процессор на вашем компьютере. Теперь вы, вероятно, потребите четыре из них, потенциально занимая почти всю вашу систему со случайным бесконечным потреблением потока. Это довольно плохо. После этого вы, вероятно, сможете перезагрузить свой сервер / компьютер для разработки. Посмотрите, как выглядел мой ноутбук до взрыва:

Если бы я был ноутбуком, я бы так хотел.

Если бы я был ноутбуком, я бы так хотел.

5. Перепутать порядок операций

Итак, почему мы настаивали на том, чтобы вы случайно создали бесконечные потоки? Это просто. Потому что вы можете просто случайно сделать это. Вышеуказанный поток может быть отлично использован, если вы переключите порядок limit()и distinct():

IntStream.iterate(0, i -> ( i + 1) % 2)
.limit(10)
.distinct()
.forEach(System.out::println);

Это теперь дает:

0
1

Зачем? Поскольку мы сначала ограничиваем бесконечный поток до 10 значений (0 1 0 1 0 1 0 1 0 1), прежде чем мы уменьшим ограниченный поток до различных значений, содержащихся в нем (0 1).

Конечно, это может больше не быть семантически правильным, потому что вы действительно хотели получить первые 10 различных значений из набора данных (вы просто «забыли», что данные бесконечны). Никто на самом деле не хочет 10 случайных значений, и только потом уменьшать их, чтобы они были различимы.

Если вы пришли из SQL-фона, вы можете не ожидать таких различий. Возьмите SQL Server 2012, например. Следующие два оператора SQL одинаковы:

-- Using TOP
SELECTDISTINCTTOP10 *
FROMi
ORDERBY..
-- Using FETCH
SELECT*
FROMi
ORDERBY..
OFFSET 0 ROWS
FETCHNEXT10 ROWSONLY

Так что, как человек SQL, вы можете не осознавать важность порядка операций с потоками.

jOOQ, лучший способ написать SQL на Java

6. Перепутать порядок операций (снова)

Говоря о SQL, если вы являетесь пользователем MySQL или PostgreSQL, вы можете использовать это LIMIT .. OFFSETпредложение. SQL полон тонких причуд, и это одна из них. Предложение OFFSETприменяется ПЕРВЫМ , как это предлагается в синтаксисе SQL Server 2012 (т.е. в стандарте SQL: 2008 ).

Если вы переведете диалект MySQL / PostgreSQL непосредственно в потоки, вы, вероятно, ошибетесь:

IntStream.iterate(0, i -> i + 1)
.limit(10) // LIMIT
.skip(5)  // OFFSET
.forEach(System.out::println);

Вышеуказанные урожаи

5
6
7
8
9

Да. Это не продолжается после 9, потому что limit()теперь применяется первым , производя (0 1 2 3 4 5 6 7 8 9). skip()применяется после уменьшения потока до (5 6 7 8 9). Не то, что вы, возможно, намеревались.

ОСТЕРЕГАЙТЕСЬ LIMIT .. OFFSETпротив "OFFSET .. LIMIT"ловушки!

7. Хождение по файловой системе с фильтрами

Мы уже писали об этом раньше . Хорошей идеей является обход файловой системы с использованием фильтров:

Files.walk(Paths.get("."))
.filter(p -> !p.toFile().getName().startsWith("."))
.forEach(System.out::println);

Вышеупомянутый поток, кажется, проходит только через не скрытые каталоги, то есть каталоги, которые не начинаются с точки. К сожалению, вы снова допустили ошибку № 5 и № 6. walk()уже подготовил весь поток подкаталогов текущего каталога. Хотя и лениво, но логически содержит все подпути . Теперь фильтр правильно отфильтрует пути, имена которых начинаются с точки «.». Например, .gitили .ideaне будет частью результирующего потока. Но эти пути будут: .\.git\refsили .\.idea\libraries. Не то, что вы хотели.

Не исправляйте это, написав следующее:

Files.walk(Paths.get("."))
.filter(p -> !p.toString().contains(File.separator + "."))
.forEach(System.out::println);

Несмотря на то, что это даст правильный вывод, он все равно будет делать это, обходя полное поддерево каталога, возвращаясь во все подкаталоги «скрытых» каталогов.

Я думаю, вам придется снова прибегнуть к старому доброму JDK 1.0 File.list(). Хорошие новости есть, FilenameFilterи FileFilterоба функциональных интерфейса.

8. Изменение резервной коллекции потока

Пока вы выполняете итерацию List, вы не должны изменять этот же список в теле итерации. Это было верно до Java 8, но это может стать более сложным с потоками Java 8. Рассмотрим следующий список из 0..9:

// Of course, we create this list using streams:
List<Integer> list =
IntStream.range(0, 10)
.boxed()
.collect(toCollection(ArrayList::new));

Теперь давайте предположим, что мы хотим удалить каждый элемент при его использовании:

list.stream()
// remove(Object), not remove(int)!
.peek(list::remove)
.forEach(System.out::println);

Интересно, что это будет работать для некоторых элементов! Вывод, который вы можете получить:

0
2
4
6
8
null
null
null
null
null
java.util.ConcurrentModificationException

Если мы проанализируем список после того, как поймал это исключение, то получится забавная находка. Мы получим:

[1, 3, 5, 7, 9]

Хех, это «сработало» для всех нечетных чисел. Это ошибка? Нет, это похоже на особенность. Если вы углубляетесь в код JDK, вы найдете этот комментарий в ArrayList.ArraListSpliterator:

/*
 * If ArrayLists were immutable, or structurally immutable (no
 * adds, removes, etc), we could implement their spliterators
 * with Arrays.spliterator. Instead we detect as much
 * interference during traversal as practical without
 * sacrificing much performance. We rely primarily on
 * modCounts. These are not guaranteed to detect concurrency
 * violations, and are sometimes overly conservative about
 * within-thread interference, but detect enough problems to
 * be worthwhile in practice. To carry this out, we (1) lazily
 * initialize fence and expectedModCount until the latest
 * point that we need to commit to the state we are checking
 * against; thus improving precision.  (This doesn't apply to
 * SubLists, that create spliterators with current non-lazy
 * values).  (2) We perform only a single
 * ConcurrentModificationException check at the end of forEach
 * (the most performance-sensitive method). When using forEach
 * (as opposed to iterators), we can normally only detect
 * interference after actions, not before. Further
 * CME-triggering checks apply to all other possible
 * violations of assumptions for example null or too-small
 * elementData array given its size(), that could only have
 * occurred due to interference.  This allows the inner loop
 * of forEach to run without any further checks, and
 * simplifies lambda-resolution. While this does entail a
 * number of checks, note that in the common case of
 * list.stream().forEach(a), no checks or other computation
 * occur anywhere other than inside forEach itself.  The other
 * less-often-used methods cannot take advantage of most of
 * these streamlinings.
 */

Теперь посмотрим, что происходит, когда мы сообщаем потоку о sorted()результатах:

list.stream()
.sorted()
.peek(list::remove)
.forEach(System.out::println);

Это теперь даст следующий, «ожидаемый» результат

0
1
2
3
4
5
6
7
8
9

А список после потребления потока? Пусто

[]

Итак, все элементы израсходованы и удалены правильно. sorted()Операция является «промежуточным состоянием операции» , что означает , что последующие операции больше не работают на сборе подложки, а на внутреннем состоянии. Теперь «безопасно» удалять элементы из списка!

Ну … мы можем на самом деле? Давайте продолжим parallel(), sorted()удаление:

list.stream()
.sorted()
.parallel()
.peek(list::remove)
.forEach(System.out::println);

Это теперь дает:

7
6
2
5
8
4
1
0
9
3

И список содержит

[8]

Ик. Мы не удалили все элементы !? Бесплатное пиво ( и наклейки JOOQ ) отправляются всем, кто решает эту головоломку с потоками!

Все это выглядит довольно случайным и тонким, мы можем только предположить, что вы никогда не изменяете резервную копию при использовании потока. Это просто не работает.

9. Забыв на самом деле потреблять поток

Как вы думаете, что делает следующий поток?

IntStream.range(1, 5)
.peek(System.out::println)
.peek(i -> {
if(i == 5)
thrownewRuntimeException("bang");
});

Когда вы читаете это, вы можете подумать, что он напечатает (1 2 3 4 5) и затем выдаст исключение. Но это не правильно. Это ничего не сделает. Поток просто сидит там, никогда не истощенный.

Как и в случае любого свободного API или DSL, вы можете забыть вызвать операцию «терминал». Это может быть особенно актуально, когда вы используете peek(), как peek()ужасно много похоже на forEach().

Это может произойти с jOOQ точно так же, когда вы забудете позвонить execute()или fetch():

DSL.using(configuration)
.update(TABLE)
.set(TABLE.COL1, 1)
.set(TABLE.COL2, "abc")
.where(TABLE.ID.eq(3));

К сожалению. нетexecute()

jOOQ, лучший способ написать SQL на Java

Да, «лучший» способ — с 1-2 оговорками ?

10. Параллельный поток тупик

Теперь это настоящее лакомство для конца!

Все параллельные системы могут застрять, если вы не синхронизируете их должным образом. Хотя нахождение реального примера не очевидно, нахождение принудительного примера есть. Следующий parallel()поток гарантированно заходит в тупик:

Object[] locks = { newObject(), newObject() };
IntStream
.range(1, 5)
.parallel()
.peek(Unchecked.intConsumer(i -> {
synchronized(locks[i % locks.length]) {
Thread.sleep(100);
synchronized(locks[(i + 1) % locks.length]) {
Thread.sleep(50);
}
}
}))
.forEach(System.out::println);

Обратите внимание на использование Unchecked.intConsumer(), которое преобразует функциональный IntConsumerинтерфейс в a org.jooq.lambda.fi.util.function.CheckedIntConsumer, которому разрешено генерировать отмеченные исключения.

Хорошо. Не повезло вашей машине. Эти темы будут заблокированы навсегда ?

Хорошей новостью является то, что никогда не было так легко привести пример тупика в учебнике на Java!

Для получения дополнительной информации см. Также ответ Брайана Гетца на этот вопрос о переполнении стека .

Вывод

С потоками и функциональным мышлением мы столкнемся с огромным количеством новых, тонких ошибок. Немногие из этих ошибок могут быть предотвращены, кроме как с помощью практики и концентрации внимания. Вы должны подумать о том, как заказать ваши операции. Вы должны подумать о том, могут ли ваши потоки быть бесконечными.

Потоки (и лямбды) — очень мощный инструмент. Но инструмент, который нам нужно освоить, в первую очередь.