Этот вариант использования встречается довольно редко, когда вы хотите отменить запущенную задачу исполнителя. Например, у вас есть текущие загрузки, которые вы хотите остановить, или у вас есть текущее копирование файлов, которое вы хотите отменить. Итак, вы делаете:
ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(new Runnable() { @Override public void run() { // Time-consuming or possibly blocking I/O } }); .... executor.shutdownNow(); // or future.cancel();
К сожалению, это не работает. Звонит shutdownNow()
или cencel()
не останавливает текущий работоспособный. То, что делают эти методы, это просто вызывает .interrupt()
соответствующие потоки. Проблема в том, что ваш runnable не обрабатывает InterruptedException
(и не может). Это довольно распространенная проблема, описанная во многих книгах и статьях, но все же это немного нелогично.
Ну так что ты делаешь? вам нужен способ остановить медленную или блокирующую операцию. Если у вас есть длинный / бесконечный цикл, вы можете просто добавить условие Thread.currentThread().isInterrupted()
и не продолжать, если оно есть. Однако, как правило, блокировка происходит вне вашего кода, поэтому вы должны дать команду основному коду остановиться. Обычно это происходит путем закрытия потока или отключения соединения. Но чтобы сделать это, вам нужно сделать немало вещей.
- простираться
Runnable
- Сделайте «отменяемые» ресурсы (например, входной поток) полем экземпляра, которое
- предоставить
cancel
метод для вашего расширенного runnable, где вы получите «отменяемый» ресурс и отмените его (например, вызовinputStream.close()
) - Реализуйте пользовательский интерфейс,
ThreadFactory
который, в свою очередь, создает пользовательскиеThread
экземпляры, которые переопределяютinterrupt()
метод, и вызываетcancel()
метод в вашем расширенномRunnable
- Создание экземпляра исполнителя с помощью фабрики пользовательских потоков (статические методы фабрики принимают его в качестве аргумента)
- Обработайте внезапное закрытие / остановку / отключение ваших ресурсов блокировки, в
run()
методе
Плохая новость заключается в том, что у вас должен быть доступ к конкретному отменяемому исполняемому файлу в вашей фабрике потоков. Вы не можете использовать, instanceof
чтобы проверить, подходит ли он к соответствующему типу, потому что исполнители переносят исполняемые файлы, которые вы им отправляете, в Worker
экземпляры, которые не предоставляют свои основные исполняемые объекты.
Для однопоточных исполнителей это просто — вы просто держите в своем внешнем классе ссылку на представленный в настоящий момент исполняемый файл и обращаетесь к нему в interrupt
методе, например:
private final CancellableRunnable runnable; ... runnable = new CancellableRunnable() { private MutableBoolean bool = new MutableBoolean(); @Override public void run() { bool.setValue(true); while (bool.booleanValue()) { // emulating a blocking operation with an endless loop } } @Override public void cancel() { bool.setValue(false); // usually here you'd have inputStream.close() or connection.disconnect() } }; ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r) { @Override public void interrupt() { super.interrupt(); runnable.cancel(); } }; } }); Future<?> future = executor.submit(runnable); ... future.cancel();
( CancellableRunnable
это пользовательский интерфейс, который просто определяет cancel()
метод)
Но что произойдет, если ваш исполнитель должен выполнить несколько задач одновременно? Если вы хотите отменить все из них, то вы можете сохранить список представленных CancellableRunnable
экземпляров и просто отменить все из них при прерывании. Таким образом, runnables будет отменяться несколько раз, поэтому вы должны учитывать это.
Если вам нужен мелкозернистый контроль, например, путем отмены определенных фьючерсов, то нет простого решения. Вы не можете даже расширить, ThreadPoolExecutor
потому что addWorker
метод является частным. Вы должны скопировать и вставить его.
Единственный вариант не полагаться на future.cancel()
или executor.shutdownAll()
и вместо того, чтобы держать свой собственный список CancellableFuture
экземпляров и сопоставить их с их соответствующими фьючерсами. Поэтому, когда вы хотите отменить некоторые (или все) runnables, вы делаете это наоборот: получите нужный runnable, который хотите отменить, позвоните .cancel()
(как показано выше), затем получите соответствующий Future
и отмените его. Что-то типа:
Map<CancellableRunnable, Future<?>> cancellableFutures = new HashMap<>(); Future<?> future = executor.submit(runnable); cancellableFutures.put(runnable, future); //now you want to abruptly cancel a particular task runnable.cancel(); cancellableFutures.get(runnable).cancel(true);
(Вместо того чтобы использовать runnable в качестве ключа, вы можете использовать некоторый идентификатор, который имеет смысл в вашем сценарии использования, и сохранить как runnable и future в качестве значения этого ключа)
Это хороший обходной путь, но в любом случае я отправил запрос на улучшение пакета java.util.concurrent, чтобы в будущем выпуске у нас была возможность управлять этим вариантом использования.