Статьи

Создание чрезвычайно большого в памяти InputStream для целей тестирования

По некоторым причинам я нуждался в чрезвычайно большом, возможно даже бесконечном  InputStream , который просто возвращал бы то же самое  byte[]снова и снова. Таким образом, я мог бы создать безумно большой поток данных, повторяя небольшую выборку. Вроде схожей функциональности можно найти в Guava:  Iterable<T> Iterables.cycle(Iterable<T>) и  Iterator<T> Iterators.cycle(Iterator<T>). Например, если вам нужен бесконечный источник  0 и  1, просто скажите Iterables.cycle(0, 1) и получите  0, 1, 0, 1, 0, 1... бесконечно. К сожалению, я не нашел такой утилиты для InputStream, поэтому я начал писать свою собственную. В этой статье описываются многие ошибки, которые я допустил в ходе этого процесса, в основном из-за чрезмерного усложнения и переосмысления простого решения.

Нам не нужно бесконечное количество  InputStream, достаточно создать очень большой (скажем, 32 ГиБ). Итак, мы следуем следующему методу:

public static InputStream repeat(byte[] sample, int times)

Он в основном принимает  sample массив байтов и возвращает  InputStream возвращаемые эти байты. Однако, когда он  sample заканчивается, он переворачивается, снова возвращая те же байты — этот процесс повторяется заданное количество раз, пока не  InputStreamпрекратятся сигналы. Одно из решений, которое я не пробовал, но которое кажется наиболее очевидным:

public static InputStream repeat(byte[] sample, int times) {
    final byte[] allBytes = new byte[sample.length * times];
    for (int i = 0; i < times; i++) {
        System.arraycopy(sample, 0, allBytes, i * sample.length, sample.length);
    }
    return new ByteArrayInputStream(allBytes);
}

Я вижу, ты там смеешься! Если  sample это 100 байт, и нам нужно 32 ГиБ ввода, повторяющие эти 100 байт, сгенерированное InputStream не должно выделять 32 ГБ памяти, мы должны быть здесь более умными. На самом деле  repeat()выше есть еще одна тонкая ошибка. Массивы в Java ограничены 2 31 -1 записей ( int), 32 ГиБ намного выше этого. Причина эта программа компилируется молчаливое Целочисленное переполнение здесь:  sample.length * times. Это умножение не подходит  int.

Хорошо, давайте попробуем то, что хотя бы теоретически может работать. Моя первая идея заключалась в следующем: что, если я создам много ByteArrayInputStreamобщих папок  byte[] sample (они не делают рьяную копию) и каким-то образом объединят их? Таким образом, мне нужен был какой-то  InputStream адаптер, который мог бы принимать произвольное количество базовых InputStreams и соедините их вместе — когда первый поток исчерпан, переключитесь на следующий. Этот неловкий момент, когда вы ищете что-то в Apache Commons или Guava и, очевидно,  это было в JDK навсегда …  java.io.SequenceInputStream почти идеален. Тем не менее, он может связать только два основных  InputStreamэлемента. Конечно, поскольку  SequenceInputStreamэто  InputStream само по себе, мы можем использовать его рекурсивно в качестве аргумента для внешнего  SequenceInputStream. Повторяя этот процесс, мы можем связать произвольное число  ByteArrayInputStreams вместе:

public static InputStream repeat(byte[] sample, int times) {
    if (times <= 1) {
        return new ByteArrayInputStream(sample);
    } else {
        return new SequenceInputStream(
                new ByteArrayInputStream(sample),
                repeat(sample, times - 1)
        );
    }
}

Если  times это 1, просто завернуть  sample в  ByteArrayInputStream. В противном случае используйте  SequenceInputStream рекурсивно. Я думаю, вы можете сразу заметить, что не так с этим кодом: слишком глубокая рекурсия. Уровень вложенности такой же, как  times аргумент, который достигнет миллионов или даже миллиардов. Должен быть лучший способ. К счастью, незначительное улучшение меняет глубину рекурсии с O (n) на O (logn):

public static InputStream repeat(byte[] sample, int times) {
    if (times <= 1) {
        return new ByteArrayInputStream(sample);
    } else {
        return new SequenceInputStream(
                repeat(sample, times / 2),
                repeat(sample, times - times / 2)
        );
    }
}

Честно говоря, это была первая реализация, которую я попробовал. Это простое  применение  принципа «  разделяй и властвуй» , где мы получаем результат, равномерно разделяя его на две меньшие подзадачи. Выглядит умно, но есть одна проблема: легко доказать, что мы создаем t ( t = timesByteArrayInputStreams и O (t)  SequenceInputStreams. Хотя  sample байтовый массив используется совместно, миллионы различных  InputStream экземпляров тратят впустую память. Это приводит нас к альтернативной реализации, создавая только одну  InputStream, независимо от значения  times:

import com.google.common.collect.Iterators;
import org.apache.commons.lang3.ArrayUtils;

public static InputStream repeat(byte[] sample, int times) {
    final Byte[] objArray = ArrayUtils.toObject(sample);
    final Iterator<Byte> infinite = Iterators.cycle(objArray);
    final Iterator<Byte> limited = Iterators.limit(infinite, sample.length * times);
    return new InputStream() {
        @Override
        public int read() throws IOException {
            return limited.hasNext() ?
                    limited.next() & 0xFF :
                    -1;
        }
    };
}

Мы будем использовать в  Iterators.cycle() конце концов. Но прежде чем перейти  byte[] к  Byte[] такому, поскольку итераторы могут работать только с объектами, а не с примитивами. Не существует идиоматического способа превратить массив примитивов в массив штучных типов, поэтому я использую ArrayUtils.toObject (byte [])  из Apache Commons Lang. Имея массив объектов, мы можем создать  infiniteитератор, циклически изменяющий значения  sample. Так как мы не хотим бесконечный поток, мы отключили бесконечный итератор Iterators.limit(Iterator<T>, int), снова используя Guava. Теперь мы просто должны преодолеть от  Iterator<Byte> до InputStream — после того, как все семантически они представляют то же самое.

Это решение страдает двумя проблемами. Прежде всего, он производит тонны мусора из-за распаковки. Сбор мусора не так сильно беспокоит мертвые, недолговечные объекты, но все же кажется расточительным. Вторая проблема, с которой мы уже сталкивались ранее: sample.length * times умножение может вызвать целочисленное переполнение. Это не может быть исправлено, потому что  Iterators.limit() занимает int, а не  long — без веской причины. Кстати, мы избежали третьей проблемы, выполнив побитовое  и  с  0xFF — в противном случае  byte значение  -1 будет сигнализировать конец потока, что не так.  x & 0xFF правильно переведен в unsigned  255 ( int). 

Так что, хотя реализация выше короткая и приятная, декларативная, а не обязательная, она слишком медленная и ограниченная. Если у вас есть опыт работы с Си, я могу представить, как неудобно вы видели, как я борюсь. В конце концов, самая простая, мучительно простая и низкоуровневая реализация была той, которую я придумал последней:

public static InputStream repeat(byte[] sample, int times) {
    return new InputStream() {
        private long pos = 0;
        private final long total = (long)sample.length * times;

        public int read() throws IOException {
            return pos < total ?
                    sample[(int)(pos++ % sample.length)] :
                    -1;
        }
    };
}

GC бесплатно, чистый JDK, быстрый и простой для понимания. Пусть это будет для вас уроком: начните с самого простого решения, которое приходит вам на ум, не переусердствуйте и не будьте слишком умными. Мои предыдущие решения, декларативные, функциональные, неизменяемые и т. Д. — возможно, они выглядели умными, но их не было ни быстро, ни легко понять.

Утилита, которую мы только что разработали, была не просто игрушечным проектом, она будет использована позже в следующей статье.