Обработка файла в виде потока оказывается чрезвычайно эффективной и удобной. Многие люди, кажется, забывают, что с Java 8 (3+ года!) Мы можем очень легко превратить любой файл в поток строк:
1
2
3
4
5
6
7
8
|
String filePath = "foobar.txt" ; try (BufferedReader reader = new BufferedReader( new FileReader(filePath))) { reader.lines() .filter(line -> !line.startsWith( "#" )) .map(String::toLowerCase) .flatMap(line -> Stream.of(line.split( " " ))) .forEach(System.out::println); } |
reader.lines()
возвращает Stream<String>
который вы можете в дальнейшем преобразовать. В этом примере мы отбрасываем строки, начинающиеся с "#"
и разбиваем каждую строку, разбивая ее на слова. Таким образом, мы достигаем потока слов в отличие от потока строк. Работать с текстовыми файлами почти так же просто, как работать с обычными коллекциями Java. В RxJava мы уже узнали об операторе generate()
. Это может также использоваться здесь, чтобы создать надежный поток строк из файла:
01
02
03
04
05
06
07
08
09
10
11
12
|
Flowable<String> file = Flowable.generate( () -> new BufferedReader( new FileReader(filePath)), (reader, emitter) -> { final String line = reader.readLine(); if (line != null ) { emitter.onNext(line); } else { emitter.onComplete(); } }, reader -> reader.close() ); |
Оператор generate()
в вышеупомянутом примере немного сложнее. Первый аргумент — государственная фабрика. Каждый раз, когда кто-то подписывается на этот поток, вызывается фабрика и создается BufferedReader
сохранением состояния. Затем, когда последующие операторы или абоненты хотят получить некоторые данные, вызывается вторая лямбда (с двумя параметрами). Это лямбда-выражение пытается onNext()
из файла ровно одну строку и отправить ее вниз по потоку ( onNext()
) или завершить при обнаружении конца файла. Это довольно просто. Третий необязательный аргумент метода generate()
— это лямбда-выражение, которое может выполнять некоторую очистку с помощью state В нашем случае это очень удобно, так как мы должны закрывать файл не только тогда, когда достигнут конец файла, но и когда пользователи преждевременно отписываются.
Познакомьтесь с оператором Flowable.using ()
Это кажется большой работой, особенно когда у нас уже есть поток строк из JDK 8. Оказывается, есть похожий фабричный оператор с именем using()
который очень удобен. Прежде всего, самый простой способ перевода Stream
из Java в Flowable
— это преобразование Stream
в Iterator
(проверенная обработка исключений игнорируется):
1
2
3
4
5
6
7
8
|
Flowable.fromIterable( new Iterable<String>() { @Override public Iterator<String> iterator() { final BufferedReader reader = new BufferedReader( new FileReader(filePath)); final Stream<String> lines = reader.lines(); return lines.iterator(); } }); |
Это может быть упрощено до:
1
2
3
4
5
|
Flowable.<String>fromIterable(() -> { final BufferedReader reader = new BufferedReader( new FileReader(filePath)); final Stream<String> lines = reader.lines(); return lines.iterator(); }); |
Но мы забыли о закрытии BufferedReader
при этом FileReader
при этом обрабатывает файл. Таким образом мы ввели утечку ресурсов. При таких обстоятельствах using()
оператора using()
работает как шарм. В некотором смысле это похоже на оператор try-with-resources
. Вы можете создать поток на основе какого-либо внешнего ресурса. Жизненный цикл этого ресурса (создание и удаление) будет управляться для вас, когда кто-то подписывается или отписывается:
1
2
3
4
5
|
Flowable.using( () -> new BufferedReader( new FileReader(filePath)), reader -> Flowable.fromIterable(() -> reader.lines().iterator()), reader -> reader.close() ); |
Это довольно похоже на последний пример generate()
, однако самое важное лямбда-выражение в середине совсем другое. Мы получаем ресурс ( reader
) в качестве аргумента и должны возвращать Flowable
(не один элемент). Эта лямбда вызывается только один раз, а не каждый раз, когда нижестоящий запрашивает новый элемент. Оператор using()
дает нам управление жизненным циклом BufferedReaders
. using()
полезно, когда у нас есть часть состояния (точно так же как с generate()
), которая способна производить весь Flowable
одновременно, а не один элемент за раз.
Потоковые XML-файлы
… или JSON в этом отношении. Представьте, что у вас есть очень большой XML-файл, который состоит из следующих записей, сотни тысяч из них:
1
2
3
4
5
6
7
8
9
|
< trkpt lat = "52.23453" lon = "21.01685" > < ele >116</ ele > </ trkpt > < trkpt lat = "52.23405" lon = "21.01711" > < ele >116</ ele > </ trkpt > < trkpt lat = "52.23397" lon = "21.0166" > < ele >116</ ele > </ trkpt > |
Это фрагмент из стандартного формата обмена GPS, который может описывать географические маршруты произвольной длины. Каждый <trkpt>
является отдельной точкой с широтой, долготой и высотой. Мы хотели бы иметь поток точек отслеживания (игнорируя возвышение для простоты), чтобы файл можно было использовать частично, а не загружать все сразу. У нас есть три варианта:
- DOM / JAXB — все должно быть загружено в память и сопоставлено с объектами Java. Не будет работать с бесконечно длинными файлами (или даже очень большими)
- SAX — библиотека на основе push, которая вызывает обратные вызовы всякий раз, когда обнаруживает открытие или закрытие XML-тега. Кажется немного лучше, но не может поддерживать обратное давление — это библиотека, которая решает, когда вызывать обратные вызовы, и нет способа замедлить ее
- StAX — как SAX, но мы должны активно получать данные из XML-файла. Это важно для поддержки противодавления — мы решаем, когда читать следующую порцию данных
Давайте попробуем реализовать синтаксический анализ и потоковую передачу, возможно, очень большого XML-файла с использованием StAX и RxJava. Сначала мы должны научиться использовать StAX . Парсер называется XMLStreamReader
и создается со следующей последовательностью заклинаний и проклятий:
1
2
3
4
|
XMLStreamReader staxReader(String name) throws XMLStreamException { final InputStream inputStream = new BufferedInputStream( new FileInputStream(name)); return XMLInputFactory.newInstance().createXMLStreamReader(inputStream); } |
Просто закройте глаза и убедитесь, что у вас всегда есть место для копирования и вставки приведенного выше фрагмента. Становится еще хуже. Чтобы прочитать первый <trkpt>
включая его атрибуты, мы должны написать довольно сложный код:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import lombok.Value; @Value class Trackpoint { private final BigDecimal lat; private final BigDecimal lon; } Trackpoint nextTrackpoint(XMLStreamReader r) { while (r.hasNext()) { int event = r.next(); switch (event) { case XMLStreamConstants.START_ELEMENT: if (r.getLocalName().equals( "trkpt" )) { return parseTrackpoint(r); } break ; case XMLStreamConstants.END_ELEMENT: if (r.getLocalName().equals( "gpx" )) { return null ; } break ; } } return null ; } Trackpoint parseTrackpoint(XMLStreamReader r) { return new Trackpoint( new BigDecimal(r.getAttributeValue( "" , "lat" )), new BigDecimal(r.getAttributeValue( "" , "lon" )) ); } |
API — цитата низкого уровня и почти восхитительно старинная. Все происходит в гигантском цикле, который читает … что-то типа int
. Это int
может быть START_ELEMENT
, END_ELEMENT
или несколько других вещей, которые нас не интересуют. Помните, что мы читаем XML-файл, но не построчно или не символ за символом, а с помощью логических токенов XML (тегов). Итак, если мы обнаруживаем открытие элемента <trkpt>
мы его анализируем, в противном случае мы продолжаем. Второе важное условие — когда мы обнаруживаем закрытие </gpx>
которое должно быть последним в файле GPX. В этом случае мы возвращаем null
, сигнализируя конец XML-файла.
Чувствует себя сложным? На самом деле это самый простой способ чтения большого XML с постоянным использованием памяти, независимо от размера файла. Как все это относится к RxJava? На данный момент мы можем очень легко построить Flowable<Trackpoint>
. Да, Flowable
, Не Observable
(см .: Obsevable
против Observable
). Такой поток будет иметь полную поддержку противодавления, то есть он будет читать файл с соответствующей скоростью:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
Flowable<Trackpoint> trackpoints = generate( () -> staxReader( "track.gpx" ), this ::pushNextTrackpoint, XMLStreamReader::close); void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) { final Trackpoint trkpt = nextTrackpoint(reader); if (trkpt != null ) { emitter.onNext(trkpt); } else { emitter.onComplete(); } } |
Ух, так просто, такое противодавление! [1] Сначала мы создаем XMLStreamReader
и XMLStreamReader
его закрытие, когда файл заканчивается или кто-то отписывается. Помните, что каждый подписчик будет открывать и начинать анализ одного и того же файла снова и снова. Лямбда-выражение в середине просто принимает переменные состояния ( XMLStreamReader
) и испускает еще одну трекпоинт. Все это кажется довольно неясным, и это так! Но теперь у нас есть поток с обратной защитой, взятый из, возможно, очень большого файла, использующего очень мало ресурсов. Мы можем обрабатывать трекпоинт одновременно или комбинировать их с другими источниками данных. В следующей статье мы узнаем, как загрузить JSON очень похожим образом.
Ссылка: | Загрузка файлов с противодавлением — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге на Java и соседях . |