Недавно мне пришлось обработать набор файлов, содержащих исторические данные о рынке тик-тика, и быстро понял, что ни один из них не может быть считан в память с использованием традиционного, InputStream
поскольку каждый файл имеет размер более 4 гигабайт. Emacs не мог даже открыть их.
В этом конкретном случае я мог бы написать простой скрипт bash, который делит файлы на более мелкие части и читает их как обычно. Но я не хочу этого, поскольку двоичные форматы лишают законной силы этот подход.
Таким образом, способ правильно решить эту проблему — это постепенная обработка областей данных с использованием файлов с отображенной памятью . Что хорошо в файлах с отображением в памяти, так это то, что они не занимают виртуальную память или пространство подкачки, поскольку они поддерживаются данными файла на диске.
Окей, давайте посмотрим на эти файлы и извлечем некоторые данные. Похоже, они содержат текстовые строки ASCII с полями, разделенными запятыми.
Формат: [currency-pair],[timestamp],[bid-price],
Пример: EUR/USD,20120102 00:01:30.420,1.29451,1.2949
Справедливо, я мог бы написать программу для этого формата. Но чтение и анализ файлов — это ортогональные понятия; так что давайте сделаем шаг назад и подумаем об общем дизайне, который можно использовать повторно в случае, если в будущем мы столкнемся с подобной проблемой.
Проблема сводится к постепенному декодированию набора записей, закодированных в бесконечно длинном байтовом массиве без исчерпания памяти. Тот факт, что примерный формат закодирован в тексте, разделенном запятыми / строчными буквами, не имеет значения для общего решения, поэтому ясно, что для обработки различных форматов необходим интерфейс декодера.
Опять же, каждая запись не может быть проанализирована и сохранена в памяти до тех пор, пока не будет обработан весь файл, поэтому нам нужен способ постепенной передачи фрагментов записей, которые могут быть записаны в другом месте, на диске или в сети, до того, как они будут собраны сборщиком мусора. Итератор — хорошая абстракция для выполнения этого требования, потому что они действуют как курсоры, что и есть смысл. Каждая итерация пересылает указатель файла и позволяет нам что-то делать с данными.
Итак, сначала Decoder
интерфейс. Идея состоит в том, чтобы постепенно декодировать объекты из [a target = «_ blank» href = «http://docs.oracle.com/javase/1.4.2/docs/api/java/nio/MappedByteBuffer.html»] MappedByteBuffer или вернуть ноль, если в буфере не осталось объектов.
public interface Decoder<T> { public T decode(ByteBuffer buffer); }
Затем приходит то, FileReader
что реализует Iterable
. Каждая итерация будет обрабатывать следующие 4096 байтов данных и декодировать их в список объектов, используя Decoder
. Заметьте, что FileReader
принимаете список файлов , что приятно, поскольку он позволяет обходить данные, не беспокоясь об агрегации между файлами . Кстати, 4096 байтовых кусков, вероятно, немного меньше для больших файлов .
public class FileReader implements Iterable<List<T>> { private static final long CHUNK_SIZE = 4096; private final Decoder<T> decoder; private Iterator<File> files; private FileReader(Decoder<T> decoder, File... files) { this(decoder, Arrays.asList(files)); } private FileReader(Decoder<T> decoder, List<File> files) { this.files = files.iterator(); this.decoder = decoder; } public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) { return new FileReader<T>(decoder, files); } public static <T> FileReader<T> create(Decoder<T> decoder, File... files) { return new FileReader<T>(decoder, files); } @Override public Iterator<List<T>> iterator() { return new Iterator<List<T>>() { private List<T> entries; private long chunkPos = 0; private MappedByteBuffer buffer; private FileChannel channel; @Override public boolean hasNext() { if (buffer == null || !buffer.hasRemaining()) { buffer = nextBuffer(chunkPos); if (buffer == null) { return false; } } T result = null; while ((result = decoder.decode(buffer)) != null) { if (entries == null) { entries = new ArrayList<T>(); } entries.add(result); } // set next MappedByteBuffer chunk chunkPos += buffer.position(); buffer = null; if (entries != null) { return true; } else { Closeables.closeQuietly(channel); return false; } } private MappedByteBuffer nextBuffer(long position) { try { if (channel == null || channel.size() == position) { if (channel != null) { Closeables.closeQuietly(channel); channel = null; } if (files.hasNext()) { File file = files.next(); channel = new RandomAccessFile(file, "r").getChannel(); chunkPos = 0; position = 0; } else { return null; } } long chunkSize = CHUNK_SIZE; if (channel.size() - position < chunkSize) { chunkSize = channel.size() - position; } return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize); } catch (IOException e) { Closeables.closeQuietly(channel); throw new RuntimeException(e); } } @Override public List<T> next() { List<T> res = entries; entries = null; return res; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }
Следующая задача — написать, Decoder
и я решил реализовать универсальный TextRowDecoder
для любого формата текстового файла с разделителями-запятыми, принимая количество полей в строке и разделитель полей и возвращая массив байтовых массивов. TextRowDecoder
затем может быть повторно использован специфичными для формата декодерами, которые могут обрабатывать разные наборы символов.
public class TextRowDecoder implements Decoder<byte[][]> { private static final byte LF = 10; private final int numFields; private final byte delimiter; public TextRowDecoder(int numFields, byte delimiter) { this.numFields = numFields; this.delimiter = delimiter; } @Override public byte[][] decode(ByteBuffer buffer) { int lineStartPos = buffer.position(); int limit = buffer.limit(); while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == LF) { // reached line feed so parse line int lineEndPos = buffer.position(); // set positions for one row duplication if (buffer.limit() < lineEndPos + 1) { buffer.position(lineStartPos).limit(lineEndPos); } else { buffer.position(lineStartPos).limit(lineEndPos + 1); } byte[][] entry = parseRow(buffer.duplicate()); if (entry != null) { // reset main buffer buffer.position(lineEndPos); buffer.limit(limit); // set start after LF lineStartPos = lineEndPos; } return entry; } } buffer.position(lineStartPos); return null; } public byte[][] parseRow(ByteBuffer buffer) { int fieldStartPos = buffer.position(); int fieldEndPos = 0; int fieldNumber = 0; byte[][] fields = new byte[numFields][]; while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == delimiter || b == LF) { fieldEndPos = buffer.position(); // save limit int limit = buffer.limit(); // set positions for one row duplication buffer.position(fieldStartPos).limit(fieldEndPos); fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1); fieldNumber++; // reset main buffer buffer.position(fieldEndPos); buffer.limit(limit); // set start after LF fieldStartPos = fieldEndPos; } if (fieldNumber == numFields) { return fields; } } return null; } private byte[] parseField(ByteBuffer buffer, int pos, int length) { byte[] field = new byte[length]; for (int i = 0; i < field.length; i++) { field[i] = buffer.get(); } return field; } }
And this is how files are processed. Each list contain elements decoded from a single buffer and each element is an array of byte arrays as specified by the TextRowDecoder
.
TextRowDecoder decoder = new TextRowDecoder(4, comma); FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles()); for (List<byte[][]> chunk : reader) { // do something with each chunk }
We could stop here but there was one more requirement. Every row contain a timestamp and each batch must be grouped according to periods of time instead of buffers, day-by-day or hour-by-hour. I still want to iterate through each batch so the immediate reaction was to create a Iterable
wrapper for FileReader
that would implement this behaviour. One additional detail is that each element must to provide its timestamp to PeriodEntries
by implementing the Timestamped
interface (not shown here).
public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> { private final Iterator<List<T extends Timestamped>> entriesIt; private final long interval; private PeriodEntries(Iterable<List<T>> entriesIt, long interval) { this.entriesIt = entriesIt.iterator(); this.interval = interval; } public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) { return new PeriodEntries<T>(entriesIt, interval); } @Override public Iterator<List<T extends Timestamped>> iterator() { return new Iterator<List<T>>() { private Queue<List<T>> queue = new LinkedList<List<T>>(); private long previous; private Iterator<T> entryIt; @Override public boolean hasNext() { if (!advanceEntries()) { return false; } T entry = entryIt.next(); long time = normalizeInterval(entry); if (previous == 0) { previous = time; } if (queue.peek() == null) { List<T> group = new ArrayList<T>(); queue.add(group); } while (previous == time) { queue.peek().add(entry); if (!advanceEntries()) { break; } entry = entryIt.next(); time = normalizeInterval(entry); } previous = time; List<T> result = queue.peek(); if (result == null || result.isEmpty()) { return false; } return true; } private boolean advanceEntries() { // if there are no rows left if (entryIt == null || !entryIt.hasNext()) { // try get more rows if possible if (entriesIt.hasNext()) { entryIt = entriesIt.next().iterator(); return true; } else { // no more rows return false; } } return true; } private long normalizeInterval(Timestamped entry) { long time = entry.getTime(); int utcOffset = TimeZone.getDefault().getOffset(time); long utcTime = time + utcOffset; long elapsed = utcTime % interval; return time - elapsed; } @Override public List<T> next() { return queue.poll(); } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }
The final processing code did not change much by introducing this functionality, only one clean and tight for-loop that does not have to care about grouping elements across files, buffers and periods. PeriodEntries
is also flexible enough to mange any length on the interval.
TrueFxDecoder decoder = new TrueFxDecoder(); FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles()); long periodLength = TimeUnit.DAYS.toMillis(1); PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength); for (List<TrueFxData> entries : periods) { // data for each day for (TrueFxData entry : entries) { // process each entry } }
As you may realize, it would not have been possible to solve this problem with collections; choosing iterators was a crucial design decision to be able to parse terabytes of data without consuming too much heap space.