Статьи

Загрузка файлов с обратным давлением — RxJava FAQ

Обработка файла в виде потока оказывается чрезвычайно эффективной и удобной. Многие люди, кажется, забывают, что с Java 8 (3+ года!) Мы можем очень легко превратить любой файл в поток строк:

1
2
3
4
5
6
7
8
String filePath = "foobar.txt";
try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
    reader.lines()
            .filter(line -> !line.startsWith("#"))
            .map(String::toLowerCase)
            .flatMap(line -> Stream.of(line.split(" ")))
            .forEach(System.out::println);
}

reader.lines() возвращает Stream<String> который вы можете в дальнейшем преобразовать. В этом примере мы отбрасываем строки, начинающиеся с "#" и разбиваем каждую строку, разбивая ее на слова. Таким образом, мы достигаем потока слов в отличие от потока строк. Работать с текстовыми файлами почти так же просто, как работать с обычными коллекциями Java. В RxJava мы уже узнали об операторе generate() . Это может также использоваться здесь, чтобы создать надежный поток строк из файла:

01
02
03
04
05
06
07
08
09
10
11
12
Flowable<String> file = Flowable.generate(
        () -> new BufferedReader(new FileReader(filePath)),
        (reader, emitter) -> {
            final String line = reader.readLine();
            if (line != null) {
               emitter.onNext(line);
            } else {
               emitter.onComplete();
            }
        },
        reader -> reader.close()
);

Оператор generate() в вышеупомянутом примере немного сложнее. Первый аргумент — государственная фабрика. Каждый раз, когда кто-то подписывается на этот поток, вызывается фабрика и создается BufferedReader сохранением состояния. Затем, когда последующие операторы или абоненты хотят получить некоторые данные, вызывается вторая лямбда (с двумя параметрами). Это лямбда-выражение пытается onNext() из файла ровно одну строку и отправить ее вниз по потоку ( onNext() ) или завершить при обнаружении конца файла. Это довольно просто. Третий необязательный аргумент метода generate() — это лямбда-выражение, которое может выполнять некоторую очистку с помощью state В нашем случае это очень удобно, так как мы должны закрывать файл не только тогда, когда достигнут конец файла, но и когда пользователи преждевременно отписываются.

Познакомьтесь с оператором Flowable.using ()

Это кажется большой работой, особенно когда у нас уже есть поток строк из JDK 8. Оказывается, есть похожий фабричный оператор с именем using() который очень удобен. Прежде всего, самый простой способ перевода Stream из Java в Flowable — это преобразование Stream в Iterator (проверенная обработка исключений игнорируется):

1
2
3
4
5
6
7
8
Flowable.fromIterable(new Iterable<String>() {
    @Override
    public Iterator<String> iterator() {
        final BufferedReader reader = new BufferedReader(new FileReader(filePath));
        final Stream<String> lines = reader.lines();
        return lines.iterator();
    }
});

Это может быть упрощено до:

1
2
3
4
5
Flowable.<String>fromIterable(() -> {
    final BufferedReader reader = new BufferedReader(new FileReader(filePath));
    final Stream<String> lines = reader.lines();
    return lines.iterator();
});

Но мы забыли о закрытии BufferedReader при этом FileReader при этом обрабатывает файл. Таким образом мы ввели утечку ресурсов. При таких обстоятельствах using() оператора using() работает как шарм. В некотором смысле это похоже на оператор try-with-resources . Вы можете создать поток на основе какого-либо внешнего ресурса. Жизненный цикл этого ресурса (создание и удаление) будет управляться для вас, когда кто-то подписывается или отписывается:

1
2
3
4
5
Flowable.using(
        () -> new BufferedReader(new FileReader(filePath)),
        reader -> Flowable.fromIterable(() -> reader.lines().iterator()),
        reader -> reader.close()
);

Это довольно похоже на последний пример generate() , однако самое важное лямбда-выражение в середине совсем другое. Мы получаем ресурс ( reader ) в качестве аргумента и должны возвращать Flowable (не один элемент). Эта лямбда вызывается только один раз, а не каждый раз, когда нижестоящий запрашивает новый элемент. Оператор using() дает нам управление жизненным циклом BufferedReaders . using() полезно, когда у нас есть часть состояния (точно так же как с generate() ), которая способна производить весь Flowable одновременно, а не один элемент за раз.

Потоковые XML-файлы

… или JSON в этом отношении. Представьте, что у вас есть очень большой XML-файл, который состоит из следующих записей, сотни тысяч из них:

1
2
3
4
5
6
7
8
9
<trkpt lat="52.23453" lon="21.01685">
    <ele>116</ele>
</trkpt>
<trkpt lat="52.23405" lon="21.01711">
    <ele>116</ele>
</trkpt>
<trkpt lat="52.23397" lon="21.0166">
    <ele>116</ele>
</trkpt>

Это фрагмент из стандартного формата обмена GPS, который может описывать географические маршруты произвольной длины. Каждый <trkpt> является отдельной точкой с широтой, долготой и высотой. Мы хотели бы иметь поток точек отслеживания (игнорируя возвышение для простоты), чтобы файл можно было использовать частично, а не загружать все сразу. У нас есть три варианта:

  • DOM / JAXB — все должно быть загружено в память и сопоставлено с объектами Java. Не будет работать с бесконечно длинными файлами (или даже очень большими)
  • SAX — библиотека на основе push, которая вызывает обратные вызовы всякий раз, когда обнаруживает открытие или закрытие XML-тега. Кажется немного лучше, но не может поддерживать обратное давление — это библиотека, которая решает, когда вызывать обратные вызовы, и нет способа замедлить ее
  • StAX — как SAX, но мы должны активно получать данные из XML-файла. Это важно для поддержки противодавления — мы решаем, когда читать следующую порцию данных

Давайте попробуем реализовать синтаксический анализ и потоковую передачу, возможно, очень большого XML-файла с использованием StAX и RxJava. Сначала мы должны научиться использовать StAX . Парсер называется XMLStreamReader и создается со следующей последовательностью заклинаний и проклятий:

1
2
3
4
XMLStreamReader staxReader(String name) throws XMLStreamException {
    final InputStream inputStream = new BufferedInputStream(new FileInputStream(name));
    return XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
}

Просто закройте глаза и убедитесь, что у вас всегда есть место для копирования и вставки приведенного выше фрагмента. Становится еще хуже. Чтобы прочитать первый <trkpt> включая его атрибуты, мы должны написать довольно сложный код:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import lombok.Value;
 
@Value
class Trackpoint {
    private final BigDecimal lat;
    private final BigDecimal lon;
}
 
Trackpoint nextTrackpoint(XMLStreamReader r) {
    while (r.hasNext()) {
        int event = r.next();
        switch (event) {
            case XMLStreamConstants.START_ELEMENT:
                if (r.getLocalName().equals("trkpt")) {
                    return parseTrackpoint(r);
                }
                break;
            case XMLStreamConstants.END_ELEMENT:
                if (r.getLocalName().equals("gpx")) {
                    return null;
                }
                break;
        }
    }
    return null;
}
 
Trackpoint parseTrackpoint(XMLStreamReader r) {
    return new Trackpoint(
            new BigDecimal(r.getAttributeValue("", "lat")),
            new BigDecimal(r.getAttributeValue("", "lon"))
    );
}

API — цитата низкого уровня и почти восхитительно старинная. Все происходит в гигантском цикле, который читает … что-то типа int . Это int может быть START_ELEMENT , END_ELEMENT или несколько других вещей, которые нас не интересуют. Помните, что мы читаем XML-файл, но не построчно или не символ за символом, а с помощью логических токенов XML (тегов). Итак, если мы обнаруживаем открытие элемента <trkpt> мы его анализируем, в противном случае мы продолжаем. Второе важное условие — когда мы обнаруживаем закрытие </gpx> которое должно быть последним в файле GPX. В этом случае мы возвращаем null , сигнализируя конец XML-файла.

Чувствует себя сложным? На самом деле это самый простой способ чтения большого XML с постоянным использованием памяти, независимо от размера файла. Как все это относится к RxJava? На данный момент мы можем очень легко построить Flowable<Trackpoint> . Да, Flowable , Не Observable (см .: Obsevable против Observable ). Такой поток будет иметь полную поддержку противодавления, то есть он будет читать файл с соответствующей скоростью:

01
02
03
04
05
06
07
08
09
10
11
12
13
Flowable<Trackpoint> trackpoints = generate(
        () -> staxReader("track.gpx"),
        this::pushNextTrackpoint,
        XMLStreamReader::close);
 
void pushNextTrackpoint(XMLStreamReader reader, Emitter<Trackpoint> emitter) {
    final Trackpoint trkpt = nextTrackpoint(reader);
    if (trkpt != null) {
        emitter.onNext(trkpt);
    } else {
        emitter.onComplete();
    }
}

Ух, так просто, такое противодавление! [1] Сначала мы создаем XMLStreamReader и XMLStreamReader его закрытие, когда файл заканчивается или кто-то отписывается. Помните, что каждый подписчик будет открывать и начинать анализ одного и того же файла снова и снова. Лямбда-выражение в середине просто принимает переменные состояния ( XMLStreamReader ) и испускает еще одну трекпоинт. Все это кажется довольно неясным, и это так! Но теперь у нас есть поток с обратной защитой, взятый из, возможно, очень большого файла, использующего очень мало ресурсов. Мы можем обрабатывать трекпоинт одновременно или комбинировать их с другими источниками данных. В следующей статье мы узнаем, как загрузить JSON очень похожим образом.