Недавно мне пришлось обработать набор файлов, содержащих исторические данные о рынке тик-тика, и быстро понял, что ни один из них не может быть считан в память с использованием традиционного InputStream
поскольку каждый файл имеет размер более 4 гигабайт. Emacs не мог даже открыть их.
В этом конкретном случае я мог бы написать простой скрипт bash, который делит файлы на более мелкие части и читает их как обычно. Но я не хочу этого, поскольку двоичные форматы лишают законной силы этот подход.
Таким образом, способ правильно решить эту проблему — это постепенная обработка областей данных с использованием файлов с отображенной памятью. Что хорошо в файлах с отображением в памяти, так это то, что они не занимают виртуальную память или пространство подкачки, поскольку они поддерживаются данными файла на диске.
Окей, давайте посмотрим на эти файлы и извлечем некоторые данные. Похоже, они содержат текстовые строки ASCII с полями, разделенными запятыми.
Формат: [currency-pair],[timestamp],[bid-price],[ask-price]
Пример: EUR/USD,20120102 00:01:30.420,1.29451,1.2949
Справедливо, я мог бы написать программу для этого формата. Но чтение и анализ файлов — это ортогональные понятия; так что давайте сделаем шаг назад и подумаем об общем дизайне, который можно использовать повторно в случае, если в будущем мы столкнемся с подобной проблемой.
Проблема сводится к постепенному декодированию набора записей, закодированных в бесконечно длинном байтовом массиве без исчерпания памяти. Тот факт, что примерный формат закодирован в тексте, разделенном запятыми / строчными буквами, не имеет значения для общего решения, поэтому ясно, что для обработки различных форматов необходим интерфейс декодера.
Опять же, каждая запись не может быть проанализирована и сохранена в памяти до тех пор, пока не будет обработан весь файл, поэтому нам нужен способ постепенной передачи фрагментов записей, которые могут быть записаны в другом месте, на диске или в сети, до их сбора мусора. Итератор — хорошая абстракция для выполнения этого требования, потому что они действуют как курсоры, и это именно то, что нужно. Каждая итерация пересылает указатель файла и позволяет нам что-то делать с данными.
Итак, сначала интерфейс Decoder
. Идея состоит в том, чтобы постепенно декодировать объекты из MappedByteBuffer или возвращать нуль, если в буфере не осталось объектов.
1
2
3
|
public interface Decoder<T> { public T decode(ByteBuffer buffer); } |
Затем идет FileReader
который реализует Iterable
. Каждая итерация будет обрабатывать следующие 4096 байтов данных и декодировать их в список объектов, используя Decoder
. Обратите внимание, что FileReader
принимает список файлов, что приятно, так как он позволяет обходить данные, не беспокоясь об агрегации между файлами. Кстати, 4096 байтовых кусков, вероятно, немного меньше для больших файлов.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
public class FileReader implements Iterable<List<T>> { private static final long CHUNK_SIZE = 4096 ; private final Decoder<T> decoder; private Iterator<File> files; private FileReader(Decoder<T> decoder, File... files) { this (decoder, Arrays.asList(files)); } private FileReader(Decoder<T> decoder, List<File> files) { this .files = files.iterator(); this .decoder = decoder; } public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) { return new FileReader<T>(decoder, files); } public static <T> FileReader<T> create(Decoder<T> decoder, File... files) { return new FileReader<T>(decoder, files); } @Override public Iterator<List<T>> iterator() { return new Iterator<List<T>>() { private List<T> entries; private long chunkPos = 0 ; private MappedByteBuffer buffer; private FileChannel channel; @Override public boolean hasNext() { if (buffer == null || !buffer.hasRemaining()) { buffer = nextBuffer(chunkPos); if (buffer == null ) { return false ; } } T result = null ; while ((result = decoder.decode(buffer)) != null ) { if (entries == null ) { entries = new ArrayList<T>(); } entries.add(result); } // set next MappedByteBuffer chunk chunkPos += buffer.position(); buffer = null ; if (entries != null ) { return true ; } else { Closeables.closeQuietly(channel); return false ; } } private MappedByteBuffer nextBuffer( long position) { try { if (channel == null || channel.size() == position) { if (channel != null ) { Closeables.closeQuietly(channel); channel = null ; } if (files.hasNext()) { File file = files.next(); channel = new RandomAccessFile(file, "r" ).getChannel(); chunkPos = 0 ; position = 0 ; } else { return null ; } } long chunkSize = CHUNK_SIZE; if (channel.size() - position < chunkSize) { chunkSize = channel.size() - position; } return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize); } catch (IOException e) { Closeables.closeQuietly(channel); throw new RuntimeException(e); } } @Override public List<T> next() { List<T> res = entries; entries = null ; return res; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } } |
Следующая задача — написать Decoder
и я решил реализовать универсальный TextRowDecoder
для любого формата текстового файла с разделителями-запятыми, принимая количество полей в строке и разделитель полей и возвращая массив байтовых массивов. TextRowDecoder
может быть повторно использован определенными форматными декодерами, которые могут обрабатывать разные наборы символов.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
public class TextRowDecoder implements Decoder< byte [][]> { private static final byte LF = 10 ; private final int numFields; private final byte delimiter; public TextRowDecoder( int numFields, byte delimiter) { this .numFields = numFields; this .delimiter = delimiter; } @Override public byte [][] decode(ByteBuffer buffer) { int lineStartPos = buffer.position(); int limit = buffer.limit(); while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == LF) { // reached line feed so parse line int lineEndPos = buffer.position(); // set positions for one row duplication if (buffer.limit() < lineEndPos + 1 ) { buffer.position(lineStartPos).limit(lineEndPos); } else { buffer.position(lineStartPos).limit(lineEndPos + 1 ); } byte [][] entry = parseRow(buffer.duplicate()); if (entry != null ) { // reset main buffer buffer.position(lineEndPos); buffer.limit(limit); // set start after LF lineStartPos = lineEndPos; } return entry; } } buffer.position(lineStartPos); return null ; } public byte [][] parseRow(ByteBuffer buffer) { int fieldStartPos = buffer.position(); int fieldEndPos = 0 ; int fieldNumber = 0 ; byte [][] fields = new byte [numFields][]; while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == delimiter || b == LF) { fieldEndPos = buffer.position(); // save limit int limit = buffer.limit(); // set positions for one row duplication buffer.position(fieldStartPos).limit(fieldEndPos); fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1 ); fieldNumber++; // reset main buffer buffer.position(fieldEndPos); buffer.limit(limit); // set start after LF fieldStartPos = fieldEndPos; } if (fieldNumber == numFields) { return fields; } } return null ; } private byte [] parseField(ByteBuffer buffer, int pos, int length) { byte [] field = new byte [length]; for ( int i = 0 ; i < field.length; i++) { field[i] = buffer.get(); } return field; } } |
И вот как файлы обрабатываются. Каждый список содержит элементы, декодированные из одного буфера, и каждый элемент является массивом байтовых массивов, как указано в TextRowDecoder
.
1
2
3
4
5
|
TextRowDecoder decoder = new TextRowDecoder( 4 , comma); FileReader< byte [][]> reader = FileReader.create(decoder, file.listFiles()); for (List< byte [][]> chunk : reader) { // do something with each chunk } |
Мы могли бы остановиться здесь, но было еще одно требование. Каждая строка содержит метку времени, и каждый пакет должен быть сгруппирован по периодам времени, а не по буферам, день за днем или час за часом. Я все еще хочу перебирать каждый пакет, поэтому немедленной реакцией было создание обертки Iterable
для FileReader
которая бы реализовала это поведение. Еще одна деталь состоит в том, что каждый элемент должен предоставлять свою метку времени для PeriodEntries
путем реализации интерфейса меток Timestamped
(здесь не показан).
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> { private final Iterator<List<T extends Timestamped>> entriesIt; private final long interval; private PeriodEntries(Iterable<List<T>> entriesIt, long interval) { this .entriesIt = entriesIt.iterator(); this .interval = interval; } public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) { return new PeriodEntries<T>(entriesIt, interval); } @Override public Iterator<List<T extends Timestamped>> iterator() { return new Iterator<List<T>>() { private Queue<List<T>> queue = new LinkedList<List<T>>(); private long previous; private Iterator<T> entryIt; @Override public boolean hasNext() { if (!advanceEntries()) { return false ; } T entry = entryIt.next(); long time = normalizeInterval(entry); if (previous == 0 ) { previous = time; } if (queue.peek() == null ) { List<T> group = new ArrayList<T>(); queue.add(group); } while (previous == time) { queue.peek().add(entry); if (!advanceEntries()) { break ; } entry = entryIt.next(); time = normalizeInterval(entry); } previous = time; List<T> result = queue.peek(); if (result == null || result.isEmpty()) { return false ; } return true ; } private boolean advanceEntries() { // if there are no rows left if (entryIt == null || !entryIt.hasNext()) { // try get more rows if possible if (entriesIt.hasNext()) { entryIt = entriesIt.next().iterator(); return true ; } else { // no more rows return false ; } } return true ; } private long normalizeInterval(Timestamped entry) { long time = entry.getTime(); int utcOffset = TimeZone.getDefault().getOffset(time); long utcTime = time + utcOffset; long elapsed = utcTime % interval; return time - elapsed; } @Override public List<T> next() { return queue.poll(); } @Override public void remove() { throw new UnsupportedOperationException(); } }; } } |
Код окончательной обработки не сильно изменился благодаря введению этой функциональности, только одного простого и понятного цикла for, который не должен заботиться о группировании элементов по файлам, буферам и периодам. PeriodEntries
также достаточно гибок, чтобы управлять любой длиной интервала.
01
02
03
04
05
06
07
08
09
10
11
|
TrueFxDecoder decoder = new TrueFxDecoder(); FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles()); long periodLength = TimeUnit.DAYS.toMillis( 1 ); PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength); for (List<TrueFxData> entries : periods) { // data for each day for (TrueFxData entry : entries) { // process each entry } } |
Как вы понимаете, решить эту проблему с помощью коллекций было бы невозможно; выбор итераторов был решающим конструктивным решением, чтобы иметь возможность анализировать терабайты данных, не занимая слишком много места в куче.
Справка: Обработка огромных файлов с помощью Java от нашего партнера по JCG Кристоффера Сьогрена в блоге Deephacks .