Статьи

Обработка огромных файлов с помощью Java

Недавно мне пришлось обработать набор файлов, содержащих исторические данные о рынке тик-тика, и быстро понял, что ни один из них не может быть считан в память с использованием традиционного InputStream поскольку каждый файл имеет размер более 4 гигабайт. Emacs не мог даже открыть их.

В этом конкретном случае я мог бы написать простой скрипт bash, который делит файлы на более мелкие части и читает их как обычно. Но я не хочу этого, поскольку двоичные форматы лишают законной силы этот подход.

Таким образом, способ правильно решить эту проблему — это постепенная обработка областей данных с использованием файлов с отображенной памятью. Что хорошо в файлах с отображением в памяти, так это то, что они не занимают виртуальную память или пространство подкачки, поскольку они поддерживаются данными файла на диске.

Окей, давайте посмотрим на эти файлы и извлечем некоторые данные. Похоже, они содержат текстовые строки ASCII с полями, разделенными запятыми.

Формат: [currency-pair],[timestamp],[bid-price],[ask-price]

Пример: EUR/USD,20120102 00:01:30.420,1.29451,1.2949

Справедливо, я мог бы написать программу для этого формата. Но чтение и анализ файлов — это ортогональные понятия; так что давайте сделаем шаг назад и подумаем об общем дизайне, который можно использовать повторно в случае, если в будущем мы столкнемся с подобной проблемой.

Проблема сводится к постепенному декодированию набора записей, закодированных в бесконечно длинном байтовом массиве без исчерпания памяти. Тот факт, что примерный формат закодирован в тексте, разделенном запятыми / строчными буквами, не имеет значения для общего решения, поэтому ясно, что для обработки различных форматов необходим интерфейс декодера.

Опять же, каждая запись не может быть проанализирована и сохранена в памяти до тех пор, пока не будет обработан весь файл, поэтому нам нужен способ постепенной передачи фрагментов записей, которые могут быть записаны в другом месте, на диске или в сети, до их сбора мусора. Итератор — хорошая абстракция для выполнения этого требования, потому что они действуют как курсоры, и это именно то, что нужно. Каждая итерация пересылает указатель файла и позволяет нам что-то делать с данными.

Итак, сначала интерфейс Decoder . Идея состоит в том, чтобы постепенно декодировать объекты из MappedByteBuffer или возвращать нуль, если в буфере не осталось объектов.

1
2
3
public interface Decoder<T> {
    public T decode(ByteBuffer buffer);
}

Затем идет FileReader который реализует Iterable . Каждая итерация будет обрабатывать следующие 4096 байтов данных и декодировать их в список объектов, используя Decoder . Обратите внимание, что FileReader принимает список файлов, что приятно, так как он позволяет обходить данные, не беспокоясь об агрегации между файлами. Кстати, 4096 байтовых кусков, вероятно, немного меньше для больших файлов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class FileReader implements Iterable<List<T>> {
  private static final long CHUNK_SIZE = 4096;
  private final Decoder<T> decoder;
  private Iterator<File> files;
  
  private FileReader(Decoder<T> decoder, File... files) {
    this(decoder, Arrays.asList(files));
  }
  private FileReader(Decoder<T> decoder, List<File> files) {
    this.files = files.iterator();
    this.decoder = decoder;
  }
  public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {
    return new FileReader<T>(decoder, files);
  }
 
  public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {
    return new FileReader<T>(decoder, files);
  }
  @Override
  public Iterator<List<T>> iterator() {
    return new Iterator<List<T>>() {
      private List<T> entries;
      private long chunkPos = 0;
      private MappedByteBuffer buffer;
      private FileChannel channel;
      @Override
      public boolean hasNext() {
        if (buffer == null || !buffer.hasRemaining()) {
          buffer = nextBuffer(chunkPos);
          if (buffer == null) {
            return false;
          }
        }
        T result = null;
        while ((result = decoder.decode(buffer)) != null) {
          if (entries == null) {
            entries = new ArrayList<T>();
          }
          entries.add(result);
        }
        // set next MappedByteBuffer chunk
        chunkPos += buffer.position();
        buffer = null;
        if (entries != null) {
          return true;
        } else {
          Closeables.closeQuietly(channel);
          return false;
        }
      }
  
      private MappedByteBuffer nextBuffer(long position) {
        try {
          if (channel == null || channel.size() == position) {
            if (channel != null) {
              Closeables.closeQuietly(channel);
              channel = null;
            }
            if (files.hasNext()) {
              File file = files.next();
              channel = new RandomAccessFile(file, "r").getChannel();
              chunkPos = 0;
              position = 0;
            } else {
              return null;
            }
          }
          long chunkSize = CHUNK_SIZE;
          if (channel.size() - position < chunkSize) {
            chunkSize = channel.size() - position;
          }
           return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);
        } catch (IOException e) {
           Closeables.closeQuietly(channel);
           throw new RuntimeException(e);
        }
      }
  
      @Override
      public List<T> next() {
        List<T> res = entries;
        entries = null;
        return res;
      }
  
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }
}

Следующая задача — написать Decoder и я решил реализовать универсальный TextRowDecoder для любого формата текстового файла с разделителями-запятыми, принимая количество полей в строке и разделитель полей и возвращая массив байтовых массивов. TextRowDecoder может быть повторно использован определенными форматными декодерами, которые могут обрабатывать разные наборы символов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class TextRowDecoder implements Decoder<byte[][]> {
  private static final byte LF = 10;
  private final int numFields;
  private final byte delimiter;
  public TextRowDecoder(int numFields, byte delimiter) {
   this.numFields = numFields;
   this.delimiter = delimiter;
  }
  @Override
  public byte[][] decode(ByteBuffer buffer) {
    int lineStartPos = buffer.position();
    int limit = buffer.limit();
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == LF) { // reached line feed so parse line
        int lineEndPos = buffer.position();
        // set positions for one row duplication
        if (buffer.limit() < lineEndPos + 1) {
          buffer.position(lineStartPos).limit(lineEndPos);
        } else {
          buffer.position(lineStartPos).limit(lineEndPos + 1);
        }
        byte[][] entry = parseRow(buffer.duplicate());
        if (entry != null) {
          // reset main buffer
          buffer.position(lineEndPos);
          buffer.limit(limit);
          // set start after LF
          lineStartPos = lineEndPos;
        }
        return entry;
      }
    }
    buffer.position(lineStartPos);
    return null;
  }
  
  public byte[][] parseRow(ByteBuffer buffer) {
    int fieldStartPos = buffer.position();
    int fieldEndPos = 0;
    int fieldNumber = 0;
    byte[][] fields = new byte[numFields][];
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == delimiter || b == LF) {
        fieldEndPos = buffer.position();
        // save limit
        int limit = buffer.limit();
        // set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);
        fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);
        fieldNumber++;
        // reset main buffer
        buffer.position(fieldEndPos);
        buffer.limit(limit);
        // set start after LF
        fieldStartPos = fieldEndPos;
      }
      if (fieldNumber == numFields) {
        return fields;
      }
    }
    return null;
  }
  
  private byte[] parseField(ByteBuffer buffer, int pos, int length) {
    byte[] field = new byte[length];
    for (int i = 0; i < field.length; i++) {
      field[i] = buffer.get();
    }
    return field;
  }
}

И вот как файлы обрабатываются. Каждый список содержит элементы, декодированные из одного буфера, и каждый элемент является массивом байтовых массивов, как указано в TextRowDecoder .

1
2
3
4
5
               TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {
  // do something with each chunk
}

Мы могли бы остановиться здесь, но было еще одно требование. Каждая строка содержит метку времени, и каждый пакет должен быть сгруппирован по периодам времени, а не по буферам, день за днем ​​или час за часом. Я все еще хочу перебирать каждый пакет, поэтому немедленной реакцией было создание обертки Iterable для FileReader которая бы реализовала это поведение. Еще одна деталь состоит в том, что каждый элемент должен предоставлять свою метку времени для PeriodEntries путем реализации интерфейса меток Timestamped (здесь не показан).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
                public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {
  private final Iterator<List<T extends Timestamped>> entriesIt;
  private final long interval;
  private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {
    this.entriesIt = entriesIt.iterator();
    this.interval = interval;
  }
 
  public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {
   return new PeriodEntries<T>(entriesIt, interval);
  }
  
  @Override
  public Iterator<List<T extends Timestamped>> iterator() {
    return new Iterator<List<T>>() {
      private Queue<List<T>> queue = new LinkedList<List<T>>();
      private long previous;
      private Iterator<T> entryIt;
  
      @Override
      public boolean hasNext() {
        if (!advanceEntries()) {
          return false;
        }
        T entry =  entryIt.next();
        long time = normalizeInterval(entry);
        if (previous == 0) {
          previous = time;
        }
        if (queue.peek() == null) {
          List<T> group = new ArrayList<T>();
          queue.add(group);
        }
        while (previous == time) {
          queue.peek().add(entry);
          if (!advanceEntries()) {
            break;
          }
          entry = entryIt.next();
          time = normalizeInterval(entry);
        }
        previous = time;
        List<T> result = queue.peek();
        if (result == null || result.isEmpty()) {
          return false;
        }
        return true;
      }
  
      private boolean advanceEntries() {
        // if there are no rows left
        if (entryIt == null || !entryIt.hasNext()) {
          // try get more rows if possible
          if (entriesIt.hasNext()) {
            entryIt = entriesIt.next().iterator();
            return true;
          } else {
            // no more rows
            return false;
          }
        }
        return true;
      }
  
      private long normalizeInterval(Timestamped entry) {
        long time = entry.getTime();
        int utcOffset = TimeZone.getDefault().getOffset(time);
        long utcTime = time + utcOffset;
        long elapsed = utcTime % interval;
        return time - elapsed;
      }
      @Override
      public List<T> next() {
        return queue.poll();
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
   };
  }
}

Код окончательной обработки не сильно изменился благодаря введению этой функциональности, только одного простого и понятного цикла for, который не должен заботиться о группировании элементов по файлам, буферам и периодам. PeriodEntries также достаточно гибок, чтобы управлять любой длиной интервала.

01
02
03
04
05
06
07
08
09
10
11
TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);
  
for (List<TrueFxData> entries : periods) {
   // data for each day
   for (TrueFxData entry : entries) {
     // process each entry
   }
}

Как вы понимаете, решить эту проблему с помощью коллекций было бы невозможно; выбор итераторов был решающим конструктивным решением, чтобы иметь возможность анализировать терабайты данных, не занимая слишком много места в куче.

Справка: Обработка огромных файлов с помощью Java от нашего партнера по JCG Кристоффера Сьогрена в блоге Deephacks .