ExecutorService
— очень удобный инструмент для работы. Вы можете легко использовать его для параллельного выполнения нескольких задач (каждая запись в свою файловую систему). Юо также может сказать ему, чтобы он сдался после некоторого перерыва, и он прервет их для вас. Отлично, именно то, что нам нужно.
Леса выглядят так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
void testCopy() throws Exception { ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors .newCachedThreadPool(); final long start = System.currentTimeMillis(); Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { copy( "a.bin" , "b.bin" ); } catch (Exception e) { e.printStackTrace(); } System.out.println( "Call really finished after: " + (System.currentTimeMillis() - start)); return null ; } }; Collection<Callable<Object>> taskWrapper = Arrays.asList(task); List<Future<Object>> futures = exec.invokeAll(taskWrapper, 50 , TimeUnit.MILLISECONDS); System.out.println( "invokeAll finished after: " + (System.currentTimeMillis() - start)); System.out.println( "Future.isCancelled? " + futures.get( 0 ).isCancelled()); Thread.sleep( 20 ); System.out.println( "Threads still active: " + exec.getActiveCount()); } |
Чтобы имитировать реакцию на тайм-ауты в исправной системе с низкой нагрузкой, я использую файл размером 100 МБ и очень короткий тайм-аут. Задача всегда истекает, моя система не может скопировать 100 МБ за 50 мс.
Я ожидаю следующих результатов:
-
invokeAll
завершено примерно через 50 мс. -
Future.isCancelled?
правда. - Количество активных потоков равно 0. Спящий режим предназначен для устранения некоторых крайних случаев. Короче говоря, это дает функции копирования некоторое время для обнаружения прерывания.
- Вызов действительно заканчивается примерно через 50 мс. Это очень важно, я определенно не хочу, чтобы операции ввода-вывода продолжались после отмены задачи. При более высокой нагрузке это привело бы к слишком большому количеству потоков, которые застряли в поддельном вводе-выводе.
На всякий случай эти тесты запускались на 1.6 JVM от Oracle на 64-битной Windows 7.
Решение 1: потоковое копирование
Первая попытка, вероятно, прямолинейна — используйте цикл с буфером и классическим вводом-выводом, например так:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
private void copy(String in, String out) throws Exception { FileInputStream fin = new FileInputStream(in); FileOutputStream fout = new FileOutputStream(out); byte [] buf = new byte [ 4096 ]; int read; while ((read = fin.read(buf)) > - 1 ) { fout.write(buf, 0 , read); } fin.close(); fout.close(); } |
Это то, что делают все популярные библиотеки потокового копирования, включая IOUtils
из Apache Commons и ByteStreams
из Guava.
Это также с треском проваливается:
1
2
3
4
|
invokeAll finished after: 53 Future.isCancelled? true Threads still active: 1 Call really finished after: 338 |
Причина довольно очевидна: в цикле или где-либо еще не проверяется состояние прерывания потока, поэтому поток продолжается нормально.
Решение 2. Потоковое копирование с проверкой на прерывание
Давайте это исправим! Один из способов сделать это:
1
2
3
4
5
6
|
while ((read = fin.read(buf)) > - 1 ) { fout.write(buf, 0 , read); if (Thread.interrupted()) { throw new IOException( "Thread interrupted, cancelling" ); } } |
Теперь это работает, как ожидалось, печать:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
invokeAll finished after: 52 java.io.IOException: Thread interrupted, cancelling at TransferTest.copyInterruptingStream(TransferTest.java:75) at TransferTest.access$0(TransferTest.java:66) at TransferTest$1.call(TransferTest.java:25) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)Future.isCancelled? true at java.lang.Thread.run(Thread.java:662) Call really finished after: 53 Threads still active: 0 |
Хорошо, но я нахожу это неудовлетворительным. Это выглядит грязно, и я не особенно доволен наличием этого кода в моей библиотеке ввода-вывода. Должен быть лучший путь, который приводит нас к …
Решение 3: НИО с передачей
У NIO есть такая приятная особенность, что она действительно учитывает прерывания потоков. Если вы попытаетесь выполнить чтение или запись в канал после прерывания потока, вы получите ClosedByInterruptException
.
Это именно то, что мне нужно. По какой-то причине я также прочитал этот ответ в StackOverflow , говоря:
«Не используйте буфер, если вам это не нужно. Зачем копировать в память, если вашей целью является другой диск или сетевой адаптер? При использовании больших файлов задержка становится нетривиальной. (…) Используйте FileChannel.transferTo()
или FileChannel.transferFrom()
. Ключевым преимуществом здесь является то, что JVM использует доступ ОС к прямому доступу к памяти (если есть). (Это зависит от реализации, но современные версии Sun и IBM для процессоров общего назначения хороши.) Что происходит, если данные направляются прямо на диск / с диска, на шину, а затем в место назначения… путем прохождения любой цепи через RAM или процессор ».
Отлично, давайте сделаем это!
1
2
3
4
5
6
7
8
9
|
private void copy(String in, String out) throws Exception { FileChannel fin = new FileInputStream(in).getChannel(); FileChannel fout = new FileOutputStream(out).getChannel(); fout.transferFrom(fin, 0 , new File(in).length()); fin.close(); fout.close(); } |
Выход:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
invokeAll finished after: 52 Future.isCancelled? true Threads still active: 1 java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:304) at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:587) at TransferTest.copyNioTransfer(TransferTest.java:91) at TransferTest.access$0(TransferTest.java:87) at TransferTest$1.call(TransferTest.java:27) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Call really finished after: 146 |
Все, что я делаю, это тривиальный звонок на transferFrom
. Он очень лаконичен и обещает такую большую поддержку со стороны оборудования и ОС… Но подождите, почему это заняло 146 мс? Я имею в виду, что 146 миллисекунд намного быстрее, чем 338 мс в первом тесте, но я ожидал, что он завершится примерно через 50 мс.
Давайте повторим тест для файла большего размера, около 1,5 ГБ:
1
2
3
4
5
6
7
|
invokeAll finished after: 9012 Future.isCancelled? true Threads still active: 1 java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java: 184 ) (...) Call really finished after: 9170 |
Насколько это ужасно? Это, наверное, худшее, что могло случиться:
- Задача не была прервана своевременно. 9 секунд это слишком долго, я ожидал около 50 миллис.
-
invokeAll
был заблокирован на все время операции — 9 секунд. Что за черт?
Решение 4 — NIO с буферизацией
Оказывается, мне нужна некоторая буферизация. Давайте попробуем с этим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
private void copyNioBuffered(String in, String out) throws Exception { FileChannel fin = new FileInputStream(in).getChannel(); FileChannel fout = new FileOutputStream(out).getChannel(); ByteBuffer buff = ByteBuffer.allocate( 4096 ); while (fin.read(buff) != - 1 || buff.position() > 0 ) { buff.flip(); fout.write(buff); buff.compact(); } fin.close(); fout.close(); } |
Выход:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
invokeAll finished after: 52 Future.isCancelled? true java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:203) at TransferTest.copyNioBuffered(TransferTest.java:105) at TransferTest.access$0(TransferTest.java:98) at TransferTest$1.call(TransferTest.java:29) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Call really finished after: 55 Threads still active: 0 |
Теперь это именно то, что мне было нужно. Он учитывает прерывания сам по себе, поэтому мне не нужны эти утомительные проверки по всей моей утилите ввода-вывода. Причуды: разные типы каналов
Если моя утилита ввода-вывода используется только для копирования файлов, которые она получает по имени, например:
1
|
static public void copy(String source, String destination) |
… Тогда довольно просто переписать метод для NIO.
Но что, если это более общая подпись, которая работает с потоками?
1
|
static public void copy(InputStream source, OutputStream destination) |
В NIO есть небольшая утилита Channels
с очень полезными методами, такими как:
1
2
|
public static ReadableByteChannel newChannel(InputStream in) public static WritableByteChannel newChannel(OutputStream out) |
Таким образом, кажется, что мы могли бы обернуть наши потоки с помощью этого помощника и извлечь выгоду из прерываемого NIO API. Пока мы не посмотрим на источник:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
public static WritableByteChannel newChannel( final OutputStream out) { if (out == null ) { throw new NullPointerException(); } if (out instanceof FileOutputStream && FileOutputStream. class .equals(out.getClass())) { return ((FileOutputStream)out).getChannel(); } return new WritableByteChannelImpl(out); } private static class WritableByteChannelImpl extends AbstractInterruptibleChannel // Not really interruptible implements WritableByteChannel { // ... Ignores interrupts completely |
Осторожно! Если ваши потоки являются файловыми потоками, они будут прерываемыми. В противном случае вам не повезло — это просто глупая оболочка, больше похожая на адаптер для совместимости API. Предположения убивают, всегда проверяйте источник.
Ссылка: IO vs. NIO — прерывания, тайм-ауты и буферы от нашего партнера JCG Конрада Гаруса в блоге Белки .