Статьи

IO vs. NIO — прерывания, тайм-ауты и буферы

Давайте представим систему, которая иногда должна копировать файл в несколько мест, но таким образом, чтобы отзывчивость была критической. Другими словами, если по какой-то причине файловая система перегружена и мы не можем записать наш файл менее чем за секунду, она должна сдаться.

ExecutorService — очень удобный инструмент для работы. Вы можете легко использовать его для параллельного выполнения нескольких задач (каждая запись в свою файловую систему). Юо также может сказать ему, чтобы он сдался после некоторого перерыва, и он прервет их для вас. Отлично, именно то, что нам нужно.

Леса выглядят так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void testCopy() throws Exception {
 ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors
   .newCachedThreadPool();
 final long start = System.currentTimeMillis();
 Callable<Object> task = new Callable<Object>() {
  @Override
  public Object call() throws Exception {
   try {
    copy("a.bin", "b.bin");
   } catch (Exception e) {
    e.printStackTrace();
   }
   System.out.println("Call really finished after: "
     + (System.currentTimeMillis() - start));
   return null;
  }
 };
 Collection<Callable<Object>> taskWrapper = Arrays.asList(task);
 List<Future<Object>> futures = exec.invokeAll(taskWrapper, 50,
   TimeUnit.MILLISECONDS);
 System.out.println("invokeAll finished after: "
   + (System.currentTimeMillis() - start));
 System.out.println("Future.isCancelled? "
   + futures.get(0).isCancelled());
 Thread.sleep(20);
 System.out.println("Threads still active: " + exec.getActiveCount());
}

Чтобы имитировать реакцию на тайм-ауты в исправной системе с низкой нагрузкой, я использую файл размером 100 МБ и очень короткий тайм-аут. Задача всегда истекает, моя система не может скопировать 100 МБ за 50 мс.

Я ожидаю следующих результатов:

  1. invokeAll завершено примерно через 50 мс.
  2. Future.isCancelled? правда.
  3. Количество активных потоков равно 0. Спящий режим предназначен для устранения некоторых крайних случаев. Короче говоря, это дает функции копирования некоторое время для обнаружения прерывания.
  4. Вызов действительно заканчивается примерно через 50 мс. Это очень важно, я определенно не хочу, чтобы операции ввода-вывода продолжались после отмены задачи. При более высокой нагрузке это привело бы к слишком большому количеству потоков, которые застряли в поддельном вводе-выводе.

На всякий случай эти тесты запускались на 1.6 JVM от Oracle на 64-битной Windows 7.

Решение 1: потоковое копирование

Первая попытка, вероятно, прямолинейна — используйте цикл с буфером и классическим вводом-выводом, например так:

01
02
03
04
05
06
07
08
09
10
11
12
13
private void copy(String in, String out) throws Exception {
 FileInputStream fin = new FileInputStream(in);
 FileOutputStream fout = new FileOutputStream(out);
 
 byte[] buf = new byte[4096];
 int read;
 while ((read = fin.read(buf)) > -1) {
  fout.write(buf, 0, read);
 }
 
 fin.close();
 fout.close();
}

Это то, что делают все популярные библиотеки потокового копирования, включая IOUtils из Apache Commons и ByteStreams из Guava.

Это также с треском проваливается:

1
2
3
4
invokeAll finished after: 53
Future.isCancelled? true
Threads still active: 1
Call really finished after: 338

Причина довольно очевидна: в цикле или где-либо еще не проверяется состояние прерывания потока, поэтому поток продолжается нормально.

Решение 2. Потоковое копирование с проверкой на прерывание

Давайте это исправим! Один из способов сделать это:

1
2
3
4
5
6
while ((read = fin.read(buf)) > -1) {
 fout.write(buf, 0, read);
 if (Thread.interrupted()) {
  throw new IOException("Thread interrupted, cancelling");
 }
}

Теперь это работает, как ожидалось, печать:

01
02
03
04
05
06
07
08
09
10
11
12
13
invokeAll finished after: 52
java.io.IOException: Thread interrupted, cancelling
 at TransferTest.copyInterruptingStream(TransferTest.java:75)
 at TransferTest.access$0(TransferTest.java:66)
 at TransferTest$1.call(TransferTest.java:25)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)Future.isCancelled? true
 at java.lang.Thread.run(Thread.java:662)
 
Call really finished after: 53
Threads still active: 0

Хорошо, но я нахожу это неудовлетворительным. Это выглядит грязно, и я не особенно доволен наличием этого кода в моей библиотеке ввода-вывода. Должен быть лучший путь, который приводит нас к …

Решение 3: НИО с передачей

У NIO есть такая приятная особенность, что она действительно учитывает прерывания потоков. Если вы попытаетесь выполнить чтение или запись в канал после прерывания потока, вы получите ClosedByInterruptException .

Это именно то, что мне нужно. По какой-то причине я также прочитал этот ответ в StackOverflow , говоря:

«Не используйте буфер, если вам это не нужно. Зачем копировать в память, если вашей целью является другой диск или сетевой адаптер? При использовании больших файлов задержка становится нетривиальной. (…) Используйте FileChannel.transferTo() или FileChannel.transferFrom() . Ключевым преимуществом здесь является то, что JVM использует доступ ОС к прямому доступу к памяти (если есть). (Это зависит от реализации, но современные версии Sun и IBM для процессоров общего назначения хороши.) Что происходит, если данные направляются прямо на диск / с диска, на шину, а затем в место назначения… путем прохождения любой цепи через RAM или процессор ».

Отлично, давайте сделаем это!

1
2
3
4
5
6
7
8
9
private void copy(String in, String out) throws Exception {
 FileChannel fin = new FileInputStream(in).getChannel();
 FileChannel fout = new FileOutputStream(out).getChannel();
 
 fout.transferFrom(fin, 0, new File(in).length());
 
 fin.close();
 fout.close();
}

Выход:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
invokeAll finished after: 52
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
 at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
 at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:304)
 at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:587)
 at TransferTest.copyNioTransfer(TransferTest.java:91)
 at TransferTest.access$0(TransferTest.java:87)
 at TransferTest$1.call(TransferTest.java:27)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)
Call really finished after: 146

Все, что я делаю, это тривиальный звонок на transferFrom . Он очень лаконичен и обещает такую ​​большую поддержку со стороны оборудования и ОС… Но подождите, почему это заняло 146 мс? Я имею в виду, что 146 миллисекунд намного быстрее, чем 338 мс в первом тесте, но я ожидал, что он завершится примерно через 50 мс.

Давайте повторим тест для файла большего размера, около 1,5 ГБ:

1
2
3
4
5
6
7
invokeAll finished after: 9012
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
 at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
 (...)
Call really finished after: 9170

Насколько это ужасно? Это, наверное, худшее, что могло случиться:

  • Задача не была прервана своевременно. 9 секунд это слишком долго, я ожидал около 50 миллис.
  • invokeAll был заблокирован на все время операции — 9 секунд. Что за черт?

Решение 4 — NIO с буферизацией

Оказывается, мне нужна некоторая буферизация. Давайте попробуем с этим:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
private void copyNioBuffered(String in, String out) throws Exception {
 FileChannel fin = new FileInputStream(in).getChannel();
 FileChannel fout = new FileOutputStream(out).getChannel();
 
 ByteBuffer buff = ByteBuffer.allocate(4096);
 while (fin.read(buff) != -1 || buff.position() > 0) {
  buff.flip();
  fout.write(buff);
  buff.compact();
 }
 
 fin.close();
 fout.close();
}

Выход:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
invokeAll finished after: 52
Future.isCancelled? true
java.nio.channels.ClosedByInterruptException
 at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
 at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:203)
 at TransferTest.copyNioBuffered(TransferTest.java:105)
 at TransferTest.access$0(TransferTest.java:98)
 at TransferTest$1.call(TransferTest.java:29)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)
Call really finished after: 55
Threads still active: 0

Теперь это именно то, что мне было нужно. Он учитывает прерывания сам по себе, поэтому мне не нужны эти утомительные проверки по всей моей утилите ввода-вывода. Причуды: разные типы каналов

Если моя утилита ввода-вывода используется только для копирования файлов, которые она получает по имени, например:

1
static public void copy(String source, String destination)

… Тогда довольно просто переписать метод для NIO.

Но что, если это более общая подпись, которая работает с потоками?

1
static public void copy(InputStream source, OutputStream destination)

В NIO есть небольшая утилита Channels с очень полезными методами, такими как:

1
2
public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)

Таким образом, кажется, что мы могли бы обернуть наши потоки с помощью этого помощника и извлечь выгоду из прерываемого NIO API. Пока мы не посмотрим на источник:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
public static WritableByteChannel newChannel(final OutputStream out) {
 if (out == null) {
     throw new NullPointerException();
 }
 
 if (out instanceof FileOutputStream &&
  FileOutputStream.class.equals(out.getClass())) {
  return ((FileOutputStream)out).getChannel();
 }
 
 return new WritableByteChannelImpl(out);
}
 
private static class WritableByteChannelImpl
 extends AbstractInterruptibleChannel // Not really interruptible
 implements WritableByteChannel
{
// ... Ignores interrupts completely

Осторожно! Если ваши потоки являются файловыми потоками, они будут прерываемыми. В противном случае вам не повезло — это просто глупая оболочка, больше похожая на адаптер для совместимости API. Предположения убивают, всегда проверяйте источник.

Ссылка: IO vs. NIO — прерывания, тайм-ауты и буферы от нашего партнера JCG Конрада Гаруса в блоге Белки .