Статьи

Обработка огромных файлов

Недавно мне пришлось обработать набор файлов, содержащих исторические данные о рынке тик-тика, и быстро понял, что ни один из них не может быть считан в память с использованием традиционного, InputStreamпоскольку каждый файл имеет размер более 4 гигабайт. Emacs не мог даже открыть их.

В этом конкретном случае я мог бы написать простой скрипт bash, который делит файлы на более мелкие части и читает их как обычно. Но я не хочу этого, поскольку двоичные форматы лишают законной силы этот подход.

Таким образом, способ правильно решить эту проблему — это постепенная обработка областей данных с использованием файлов с отображенной памятью . Что хорошо в файлах с отображением в памяти, так это то, что они не занимают виртуальную память или пространство подкачки, поскольку они поддерживаются данными файла на диске.

Окей, давайте посмотрим на эти файлы и извлечем некоторые данные. Похоже, они содержат текстовые строки ASCII с полями, разделенными запятыми.

Формат: [currency-pair],[timestamp],[bid-price],

Пример: EUR/USD,20120102 00:01:30.420,1.29451,1.2949

Справедливо, я мог бы написать программу для этого формата. Но чтение и анализ файлов — это ортогональные понятия; так что давайте сделаем шаг назад и подумаем об общем дизайне, который можно использовать повторно в случае, если в будущем мы столкнемся с подобной проблемой.

Проблема сводится к постепенному декодированию набора записей, закодированных в бесконечно длинном байтовом массиве без исчерпания памяти. Тот факт, что примерный формат закодирован в тексте, разделенном запятыми / строчными буквами, не имеет значения для общего решения, поэтому ясно, что для обработки различных форматов необходим интерфейс декодера.

Опять же, каждая запись не может быть проанализирована и сохранена в памяти до тех пор, пока не будет обработан весь файл, поэтому нам нужен способ постепенной передачи фрагментов записей, которые могут быть записаны в другом месте, на диске или в сети, до того, как они будут собраны сборщиком мусора. Итератор — хорошая абстракция для выполнения этого требования, потому что они действуют как курсоры, что и есть смысл. Каждая итерация пересылает указатель файла и позволяет нам что-то делать с данными.

Итак, сначала Decoderинтерфейс. Идея состоит в том, чтобы постепенно декодировать объекты из [a target = «_ blank» href = «http://docs.oracle.com/javase/1.4.2/docs/api/java/nio/MappedByteBuffer.html»] MappedByteBuffer или вернуть ноль, если в буфере не осталось объектов.

public interface Decoder<T> {
    public T decode(ByteBuffer buffer);
}

Затем приходит то, FileReaderчто реализует Iterable. Каждая итерация будет обрабатывать следующие 4096 байтов данных и декодировать их в список объектов, используя Decoder. Заметьте, что FileReaderпринимаете список файлов , что приятно, поскольку он позволяет обходить данные, не беспокоясь об агрегации между файлами . Кстати, 4096 байтовых кусков, вероятно, немного меньше для больших файлов .

public class FileReader implements Iterable<List<T>> {
  private static final long CHUNK_SIZE = 4096;
  private final Decoder<T> decoder;
  private Iterator<File> files;
 
  private FileReader(Decoder<T> decoder, File... files) {
    this(decoder, Arrays.asList(files));
  }
  private FileReader(Decoder<T> decoder, List<File> files) {
    this.files = files.iterator();
    this.decoder = decoder;
  }
  public static <T> FileReader<T> create(Decoder<T> decoder, List<File> files) {
    return new FileReader<T>(decoder, files);
  }

  public static <T> FileReader<T> create(Decoder<T> decoder, File... files) {
    return new FileReader<T>(decoder, files);
  }
  @Override
  public Iterator<List<T>> iterator() {
    return new Iterator<List<T>>() {
      private List<T> entries;
      private long chunkPos = 0;
      private MappedByteBuffer buffer;
      private FileChannel channel;
      @Override
      public boolean hasNext() {
        if (buffer == null || !buffer.hasRemaining()) {
          buffer = nextBuffer(chunkPos);
          if (buffer == null) {
            return false;
          }
        }
        T result = null;
        while ((result = decoder.decode(buffer)) != null) {
          if (entries == null) {
            entries = new ArrayList<T>();
          }
          entries.add(result);
        }
        // set next MappedByteBuffer chunk
        chunkPos += buffer.position();
        buffer = null;
        if (entries != null) {
          return true;
        } else {
          Closeables.closeQuietly(channel);
          return false;
        }
      }
 
      private MappedByteBuffer nextBuffer(long position) {
        try {
          if (channel == null || channel.size() == position) {
            if (channel != null) {
              Closeables.closeQuietly(channel);
              channel = null;
            }
            if (files.hasNext()) {
              File file = files.next();
              channel = new RandomAccessFile(file, "r").getChannel();
              chunkPos = 0;
              position = 0;
            } else {
              return null;
            }
          }
          long chunkSize = CHUNK_SIZE;
          if (channel.size() - position < chunkSize) {
            chunkSize = channel.size() - position;
          }
           return channel.map(FileChannel.MapMode.READ_ONLY, chunkPos, chunkSize);
        } catch (IOException e) {
           Closeables.closeQuietly(channel);
           throw new RuntimeException(e);
        }
      }
 
      @Override
      public List<T> next() {
        List<T> res = entries;
        entries = null;
        return res;
      }
 
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };
  }
}

Следующая задача — написать, Decoderи я решил реализовать универсальный TextRowDecoderдля любого формата текстового файла с разделителями-запятыми, принимая количество полей в строке и разделитель полей и возвращая массив байтовых массивов. TextRowDecoderзатем может быть повторно использован специфичными для формата декодерами, которые могут обрабатывать разные наборы символов.

public class TextRowDecoder implements Decoder<byte[][]> {
  private static final byte LF = 10;
  private final int numFields;
  private final byte delimiter;
  public TextRowDecoder(int numFields, byte delimiter) {
   this.numFields = numFields;
   this.delimiter = delimiter;
  }
  @Override
  public byte[][] decode(ByteBuffer buffer) {
    int lineStartPos = buffer.position();
    int limit = buffer.limit();
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == LF) { // reached line feed so parse line
        int lineEndPos = buffer.position();
        // set positions for one row duplication
        if (buffer.limit() < lineEndPos + 1) {
          buffer.position(lineStartPos).limit(lineEndPos);
        } else {
          buffer.position(lineStartPos).limit(lineEndPos + 1);
        }
        byte[][] entry = parseRow(buffer.duplicate());
        if (entry != null) {
          // reset main buffer
          buffer.position(lineEndPos);
          buffer.limit(limit);
          // set start after LF
          lineStartPos = lineEndPos;
        }
        return entry;
      }
    }
    buffer.position(lineStartPos);
    return null;
  }
 
  public byte[][] parseRow(ByteBuffer buffer) {
    int fieldStartPos = buffer.position();
    int fieldEndPos = 0;
    int fieldNumber = 0;
    byte[][] fields = new byte[numFields][];
    while (buffer.hasRemaining()) {
      byte b = buffer.get();
      if (b == delimiter || b == LF) {
        fieldEndPos = buffer.position();
        // save limit
        int limit = buffer.limit();
        // set positions for one row duplication
        buffer.position(fieldStartPos).limit(fieldEndPos);
        fields[fieldNumber] = parseField(buffer.duplicate(), fieldNumber, fieldEndPos - fieldStartPos - 1);
        fieldNumber++;
        // reset main buffer
        buffer.position(fieldEndPos);
        buffer.limit(limit);
        // set start after LF
        fieldStartPos = fieldEndPos;
      }
      if (fieldNumber == numFields) {
        return fields;
      }
    }
    return null;
  }
 
  private byte[] parseField(ByteBuffer buffer, int pos, int length) {
    byte[] field = new byte[length];
    for (int i = 0; i < field.length; i++) {
      field[i] = buffer.get();
    }
    return field;
  }
}

And this is how files are processed. Each list contain elements decoded from a single buffer and each element is an array of byte arrays as specified by the TextRowDecoder.

TextRowDecoder decoder = new TextRowDecoder(4, comma);
FileReader<byte[][]> reader = FileReader.create(decoder, file.listFiles());
for (List<byte[][]> chunk : reader) {
  // do something with each chunk
}

We could stop here but there was one more requirement. Every row contain a timestamp and each batch must be grouped according to periods of time instead of buffers, day-by-day or hour-by-hour. I still want to iterate through each batch so the immediate reaction was to create a Iterable wrapper for FileReader that would implement this behaviour. One additional detail is that each element must to provide its timestamp to PeriodEntries by implementing the Timestamped interface (not shown here).

public class PeriodEntries<T extends Timestamped> implements Iterable<List<T>> {
  private final Iterator<List<T extends Timestamped>> entriesIt;
  private final long interval;
  private PeriodEntries(Iterable<List<T>> entriesIt, long interval) {
    this.entriesIt = entriesIt.iterator();
    this.interval = interval;
  }

  public static <T extends Timestamped> PeriodEntries<T> create(Iterable<List<T>> entriesIt, long interval) {
   return new PeriodEntries<T>(entriesIt, interval);
  }
 
  @Override
  public Iterator<List<T extends Timestamped>> iterator() {
    return new Iterator<List<T>>() {
      private Queue<List<T>> queue = new LinkedList<List<T>>();
      private long previous;
      private Iterator<T> entryIt;
 
      @Override
      public boolean hasNext() {
        if (!advanceEntries()) {
          return false;
        }
        T entry =  entryIt.next();
        long time = normalizeInterval(entry);
        if (previous == 0) {
          previous = time;
        }
        if (queue.peek() == null) {
          List<T> group = new ArrayList<T>();
          queue.add(group);
        }
        while (previous == time) {
          queue.peek().add(entry);
          if (!advanceEntries()) {
            break;
          }
          entry = entryIt.next();
          time = normalizeInterval(entry);
        }
        previous = time;
        List<T> result = queue.peek();
        if (result == null || result.isEmpty()) {
          return false;
        }
        return true;
      }
 
      private boolean advanceEntries() {
        // if there are no rows left
        if (entryIt == null || !entryIt.hasNext()) {
          // try get more rows if possible
          if (entriesIt.hasNext()) {
            entryIt = entriesIt.next().iterator();
            return true;
          } else {
            // no more rows
            return false;
          }
        }
        return true;
      }
 
      private long normalizeInterval(Timestamped entry) {
        long time = entry.getTime();
        int utcOffset = TimeZone.getDefault().getOffset(time);
        long utcTime = time + utcOffset;
        long elapsed = utcTime % interval;
        return time - elapsed;
      }
      @Override
      public List<T> next() {
        return queue.poll();
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
   };
  }
}

The final processing code did not change much by introducing this functionality, only one clean and tight for-loop that does not have to care about grouping elements across files, buffers and periods. PeriodEntries is also flexible enough to mange any length on the interval.

TrueFxDecoder decoder = new TrueFxDecoder();
FileReader<TrueFxData> reader = FileReader.create(decoder, file.listFiles());
long periodLength = TimeUnit.DAYS.toMillis(1);
PeriodEntries<TrueFxData> periods = PeriodEntries.create(reader, periodLength);
 
for (List<TrueFxData> entries : periods) {
   // data for each day
   for (TrueFxData entry : entries) {
     // process each entry
   }
}

As you may realize, it would not have been possible to solve this problem with collections; choosing iterators was a crucial design decision to be able to parse terabytes of data without consuming too much heap space.