Статьи

Создание очень большого в памяти InputStream для целей тестирования

По какой-то причине мне понадобился очень большой, возможно, даже бесконечный InputStream , который просто возвращал бы один и тот же byte[] снова и снова. Таким образом, я мог бы создать безумно большой поток данных, повторяя небольшую выборку. Подобные функции можно найти в Guava: Iterable<T> Iterables.cycle(Iterable<T>) и Iterator<T> Iterators.cycle(Iterator<T>) . Например, если вам нужен бесконечный источник 0 и 1 , просто скажите Iterables.cycle(0, 1) и получите 0, 1, 0, 1, 0, 1... бесконечно. К сожалению, я не нашел такой утилиты для InputStream , поэтому я начал писать свою собственную. В этой статье описываются многие ошибки, которые я допустил в ходе этого процесса, в основном из-за чрезмерного усложнения и переосмысления простого решения.

Нам действительно не нужен бесконечный InputStream , достаточно создать очень большой (скажем, 32 ГиБ). Итак, мы следуем следующему методу:

1
public static InputStream repeat(byte[] sample, int times)

Он в основном берет sample массив байтов и возвращает InputStream возвращающий эти байты. Однако, когда sample заканчивается, он переворачивается, снова возвращая те же байты — этот процесс повторяется заданное количество раз, пока не InputStream сигналы InputStream . Одно решение, которое я на самом деле не пробовал, но которое кажется наиболее очевидным:

1
2
3
4
5
6
7
public static InputStream repeat(byte[] sample, int times) {
    final byte[] allBytes = new byte[sample.length * times];
    for (int i = 0; i < times; i++) {
        System.arraycopy(sample, 0, allBytes, i * sample.length, sample.length);
    }
    return new ByteArrayInputStream(allBytes);
}

Я вижу, ты там смеешься! Если sample составляет 100 байтов, и нам нужно 32 ГБ входных данных, повторяющих эти 100 байтов, сгенерированный InputStream не должен выделять 32 ГБ памяти, здесь мы должны быть более умными. На самом деле у repeat() выше есть еще одна тонкая ошибка. Массивы в Java ограничены 2 31 -1 записями ( int ), 32 ГиБ намного выше этого. Причина, по которой эта программа компилируется, заключается в тихом целочисленном переполнении здесь: sample.length * times . Это умножение не вписывается в int .

Хорошо, давайте попробуем то, что хотя бы теоретически может работать. Моя первая идея заключалась в следующем: что, если я создам много ByteArrayInputStream совместно использующих один и тот же byte[] sample (они не делают готовую копию), и каким-то образом объединю их вместе? Поэтому мне нужен был какой-то адаптер InputStream который мог бы взять произвольное количество базовых InputStream и связать их вместе — когда первый поток исчерпан, переключитесь на следующий. Это неловкий момент, когда вы ищете что-то в Apache Commons или Guava и, очевидно, это было в JDK навсегдаjava.io.SequenceInputStream почти идеален. Тем не менее, он может InputStream только два основных InputStream . Конечно, поскольку SequenceInputStream является самим InputStream , мы можем использовать его рекурсивно в качестве аргумента для внешнего SequenceInputStream . Повторяя этот процесс, мы можем связать произвольное число ByteArrayInputStream s вместе:

01
02
03
04
05
06
07
08
09
10
public static InputStream repeat(byte[] sample, int times) {
    if (times <= 1) {
        return new ByteArrayInputStream(sample);
    } else {
        return new SequenceInputStream(
                new ByteArrayInputStream(sample),
                repeat(sample, times - 1)
        );
    }
}

Если times равны 1, просто поместите sample в ByteArrayInputStream . В противном случае используйте SequenceInputStream рекурсивно. Я думаю, вы можете сразу заметить, что не так с этим кодом: слишком глубокая рекурсия. Уровень вложенности такой же, как аргумент times , который достигнет миллионов или даже миллиардов. Должен быть лучший способ. К счастью, незначительное улучшение меняет глубину рекурсии с O (n) на O (logn):

01
02
03
04
05
06
07
08
09
10
public static InputStream repeat(byte[] sample, int times) {
    if (times <= 1) {
        return new ByteArrayInputStream(sample);
    } else {
        return new SequenceInputStream(
                repeat(sample, times / 2),
                repeat(sample, times - times / 2)
        );
    }
}

Честно говоря, это была первая реализация, которую я попробовал. Это простое применение принципа « разделяй и властвуй» , где мы получаем результат, равномерно разделяя его на две меньшие подзадачи. Выглядит умно, но есть одна проблема: легко доказать, что мы создаем t ( t =times ) ByteArrayInputStreams и O (t) SequenceInputStream s. Хотя sample байтового массива является общим, миллионы различных экземпляров InputStream тратят память. Это приводит нас к альтернативной реализации, создавая только один InputStream , независимо от значения times :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.ArrayUtils;
 
public static InputStream repeat(byte[] sample, int times) {
    final Byte[] objArray = ArrayUtils.toObject(sample);
    final Iterator<Byte> infinite = Iterators.cycle(objArray);
    final Iterator<Byte> limited = Iterators.limit(infinite, sample.length * times);
    return new InputStream() {
        @Override
        public int read() throws IOException {
            return limited.hasNext() ?
                    limited.next() & 0xFF :
                    -1;
        }
    };
}

Мы будем использовать Iterators.cycle() конце концов. Но прежде мы должны перевести byte[] в Byte[] поскольку итераторы могут работать только с объектами, а не с примитивами. Не существует идиоматического способа превратить массив примитивов в массив ArrayUtils.toObject(byte[]) типов, поэтому я использую ArrayUtils.toObject(byte[]) из Apache Commons Lang. Имея массив объектов, мы можем создать infinite итератор, который циклически перебирает значения sample . Поскольку нам не нужен бесконечный поток, мы обрезаем бесконечный итератор, используя Iterators.limit(Iterator<T>, int) , снова из Гуавы. Теперь нам просто нужно перейти от Iterator<Byte> к InputStream — ведь семантически они представляют одно и то же.

Это решение страдает двумя проблемами. Прежде всего, он производит тонны мусора из-за распаковки. Сбор мусора не так сильно беспокоит мертвые, недолговечные объекты, но все же кажется расточительным. Вторая проблема, с которой мы уже сталкивались ранее: sample.length * times может привести к переполнению целых чисел. Это не может быть исправлено, потому что Iterators.limit() принимает int , а не long — без веской причины. Кстати, мы избежали третьей проблемы, выполнив побитовое и 0xFF — в противном случае byte со значением -1 будет сигнализировать конец потока, что не так. x & 0xFF правильно переведено в беззнаковое 255 ( int ).

Так что, хотя реализация выше короткая и приятная, декларативная, а не обязательная, она слишком медленная и ограниченная. Если у вас есть опыт работы с Си, я могу представить, как неудобно вы видели, как я борюсь. В конце концов, самая простая, мучительно простая и низкоуровневая реализация была той, которую я придумал последней:

01
02
03
04
05
06
07
08
09
10
11
12
public static InputStream repeat(byte[] sample, int times) {
    return new InputStream() {
        private long pos = 0;
        private final long total = (long)sample.length * times;
 
        public int read() throws IOException {
            return pos < total ?
                    sample[(int)(pos++ % sample.length)] :
                    -1;
        }
    };
}

GC бесплатно, чистый JDK, быстрый и простой для понимания. Пусть это будет для вас уроком: начните с самого простого решения, которое приходит вам на ум, не переусердствуйте и не будьте слишком умными. Мои предыдущие решения, декларативные, функциональные, неизменяемые и т. Д. — возможно, они выглядели умными, но их не было ни быстро, ни легко понять.

Утилита, которую мы только что разработали, была не просто игрушечным проектом, она будет использована позже в следующей статье .