ExecutorService — очень удобный инструмент для работы. Вы можете легко использовать его для параллельного выполнения нескольких задач (каждая запись в свою файловую систему). Юо также может сказать ему, чтобы он сдался после некоторого перерыва, и он прервет их для вас. Отлично, именно то, что нам нужно.
Леса выглядят так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
void testCopy() throws Exception { ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors .newCachedThreadPool(); final long start = System.currentTimeMillis(); Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { copy("a.bin", "b.bin"); } catch (Exception e) { e.printStackTrace(); } System.out.println("Call really finished after: " + (System.currentTimeMillis() - start)); return null; } }; Collection<Callable<Object>> taskWrapper = Arrays.asList(task); List<Future<Object>> futures = exec.invokeAll(taskWrapper, 50, TimeUnit.MILLISECONDS); System.out.println("invokeAll finished after: " + (System.currentTimeMillis() - start)); System.out.println("Future.isCancelled? " + futures.get(0).isCancelled()); Thread.sleep(20); System.out.println("Threads still active: " + exec.getActiveCount());} |
Чтобы имитировать реакцию на тайм-ауты в исправной системе с низкой нагрузкой, я использую файл размером 100 МБ и очень короткий тайм-аут. Задача всегда истекает, моя система не может скопировать 100 МБ за 50 мс.
Я ожидаю следующих результатов:
-
invokeAllзавершено примерно через 50 мс. -
Future.isCancelled?правда. - Количество активных потоков равно 0. Спящий режим предназначен для устранения некоторых крайних случаев. Короче говоря, это дает функции копирования некоторое время для обнаружения прерывания.
- Вызов действительно заканчивается примерно через 50 мс. Это очень важно, я определенно не хочу, чтобы операции ввода-вывода продолжались после отмены задачи. При более высокой нагрузке это привело бы к слишком большому количеству потоков, которые застряли в поддельном вводе-выводе.
На всякий случай эти тесты запускались на 1.6 JVM от Oracle на 64-битной Windows 7.
Решение 1: потоковое копирование
Первая попытка, вероятно, прямолинейна — используйте цикл с буфером и классическим вводом-выводом, например так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
private void copy(String in, String out) throws Exception { FileInputStream fin = new FileInputStream(in); FileOutputStream fout = new FileOutputStream(out); byte[] buf = new byte[4096]; int read; while ((read = fin.read(buf)) > -1) { fout.write(buf, 0, read); } fin.close(); fout.close();} |
Это то, что делают все популярные библиотеки потокового копирования, включая IOUtils из Apache Commons и ByteStreams из Guava.
Это также с треском проваливается:
|
1
2
3
4
|
invokeAll finished after: 53Future.isCancelled? trueThreads still active: 1Call really finished after: 338 |
Причина довольно очевидна: в цикле или где-либо еще не проверяется состояние прерывания потока, поэтому поток продолжается нормально.
Решение 2. Потоковое копирование с проверкой на прерывание
Давайте это исправим! Один из способов сделать это:
|
1
2
3
4
5
6
|
while ((read = fin.read(buf)) > -1) { fout.write(buf, 0, read); if (Thread.interrupted()) { throw new IOException("Thread interrupted, cancelling"); }} |
Теперь это работает, как ожидалось, печать:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
invokeAll finished after: 52java.io.IOException: Thread interrupted, cancelling at TransferTest.copyInterruptingStream(TransferTest.java:75) at TransferTest.access$0(TransferTest.java:66) at TransferTest$1.call(TransferTest.java:25) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)Future.isCancelled? true at java.lang.Thread.run(Thread.java:662)Call really finished after: 53Threads still active: 0 |
Хорошо, но я нахожу это неудовлетворительным. Это выглядит грязно, и я не особенно доволен наличием этого кода в моей библиотеке ввода-вывода. Должен быть лучший путь, который приводит нас к …
Решение 3: НИО с передачей
У NIO есть такая приятная особенность, что она действительно учитывает прерывания потоков. Если вы попытаетесь выполнить чтение или запись в канал после прерывания потока, вы получите ClosedByInterruptException .
Это именно то, что мне нужно. По какой-то причине я также прочитал этот ответ в StackOverflow , говоря:
«Не используйте буфер, если вам это не нужно. Зачем копировать в память, если вашей целью является другой диск или сетевой адаптер? При использовании больших файлов задержка становится нетривиальной. (…) Используйте FileChannel.transferTo() или FileChannel.transferFrom() . Ключевым преимуществом здесь является то, что JVM использует доступ ОС к прямому доступу к памяти (если есть). (Это зависит от реализации, но современные версии Sun и IBM для процессоров общего назначения хороши.) Что происходит, если данные направляются прямо на диск / с диска, на шину, а затем в место назначения… путем прохождения любой цепи через RAM или процессор ».
Отлично, давайте сделаем это!
|
1
2
3
4
5
6
7
8
9
|
private void copy(String in, String out) throws Exception { FileChannel fin = new FileInputStream(in).getChannel(); FileChannel fout = new FileOutputStream(out).getChannel(); fout.transferFrom(fin, 0, new File(in).length()); fin.close(); fout.close();} |
Выход:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
invokeAll finished after: 52Future.isCancelled? trueThreads still active: 1java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:304) at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:587) at TransferTest.copyNioTransfer(TransferTest.java:91) at TransferTest.access$0(TransferTest.java:87) at TransferTest$1.call(TransferTest.java:27) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)Call really finished after: 146 |
Все, что я делаю, это тривиальный звонок на transferFrom . Он очень лаконичен и обещает такую большую поддержку со стороны оборудования и ОС… Но подождите, почему это заняло 146 мс? Я имею в виду, что 146 миллисекунд намного быстрее, чем 338 мс в первом тесте, но я ожидал, что он завершится примерно через 50 мс.
Давайте повторим тест для файла большего размера, около 1,5 ГБ:
|
1
2
3
4
5
6
7
|
invokeAll finished after: 9012Future.isCancelled? trueThreads still active: 1java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) (...)Call really finished after: 9170 |
Насколько это ужасно? Это, наверное, худшее, что могло случиться:
- Задача не была прервана своевременно. 9 секунд это слишком долго, я ожидал около 50 миллис.
-
invokeAllбыл заблокирован на все время операции — 9 секунд. Что за черт?
Решение 4 — NIO с буферизацией
Оказывается, мне нужна некоторая буферизация. Давайте попробуем с этим:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
private void copyNioBuffered(String in, String out) throws Exception { FileChannel fin = new FileInputStream(in).getChannel(); FileChannel fout = new FileOutputStream(out).getChannel(); ByteBuffer buff = ByteBuffer.allocate(4096); while (fin.read(buff) != -1 || buff.position() > 0) { buff.flip(); fout.write(buff); buff.compact(); } fin.close(); fout.close();} |
Выход:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
invokeAll finished after: 52Future.isCancelled? truejava.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:203) at TransferTest.copyNioBuffered(TransferTest.java:105) at TransferTest.access$0(TransferTest.java:98) at TransferTest$1.call(TransferTest.java:29) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)Call really finished after: 55Threads still active: 0 |
Теперь это именно то, что мне было нужно. Он учитывает прерывания сам по себе, поэтому мне не нужны эти утомительные проверки по всей моей утилите ввода-вывода. Причуды: разные типы каналов
Если моя утилита ввода-вывода используется только для копирования файлов, которые она получает по имени, например:
|
1
|
static public void copy(String source, String destination) |
… Тогда довольно просто переписать метод для NIO.
Но что, если это более общая подпись, которая работает с потоками?
|
1
|
static public void copy(InputStream source, OutputStream destination) |
В NIO есть небольшая утилита Channels с очень полезными методами, такими как:
|
1
2
|
public static ReadableByteChannel newChannel(InputStream in)public static WritableByteChannel newChannel(OutputStream out) |
Таким образом, кажется, что мы могли бы обернуть наши потоки с помощью этого помощника и извлечь выгоду из прерываемого NIO API. Пока мы не посмотрим на источник:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
public static WritableByteChannel newChannel(final OutputStream out) { if (out == null) { throw new NullPointerException(); } if (out instanceof FileOutputStream && FileOutputStream.class.equals(out.getClass())) { return ((FileOutputStream)out).getChannel(); } return new WritableByteChannelImpl(out);}private static class WritableByteChannelImpl extends AbstractInterruptibleChannel // Not really interruptible implements WritableByteChannel{// ... Ignores interrupts completely |
Осторожно! Если ваши потоки являются файловыми потоками, они будут прерываемыми. В противном случае вам не повезло — это просто глупая оболочка, больше похожая на адаптер для совместимости API. Предположения убивают, всегда проверяйте источник.
Ссылка: IO vs. NIO — прерывания, тайм-ауты и буферы от нашего партнера JCG Конрада Гаруса в блоге Белки .