Статьи

Прерывание задач исполнителя

Этот вариант использования встречается довольно редко, когда вы хотите отменить запущенную задачу исполнителя. Например, у вас есть текущие загрузки, которые вы хотите остановить, или у вас есть текущее копирование файлов, которое вы хотите отменить. Итак, вы делаете:

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Future<?> future = executor.submit(new Runnable() {
    @Override
    public void run() {
        // Time-consuming or possibly blocking I/O
    }
});
....
executor.shutdownNow();
// or
future.cancel();

К сожалению, это не работает. Звонит  shutdownNow() или  cencel() не останавливает текущий работоспособный. То, что делают эти методы, это просто вызывает  .interrupt() соответствующие потоки. Проблема в том, что ваш runnable не обрабатывает  InterruptedException (и не может). Это довольно распространенная проблема, описанная во многих книгах и статьях, но все же это немного нелогично.

Ну так что ты делаешь? вам нужен способ остановить медленную или блокирующую операцию. Если у вас есть длинный / бесконечный цикл, вы можете просто добавить условие Thread.currentThread().isInterrupted() и не продолжать, если оно есть. Однако, как правило, блокировка происходит вне вашего кода, поэтому вы должны дать команду основному коду остановиться. Обычно это происходит путем закрытия потока или отключения соединения. Но чтобы сделать это, вам нужно сделать немало вещей.

  • простираться Runnable
  • Сделайте «отменяемые» ресурсы (например, входной поток) полем экземпляра, которое
  • предоставить  cancel метод для вашего расширенного runnable, где вы получите «отменяемый» ресурс и отмените его (например, вызов  inputStream.close())
  • Реализуйте пользовательский интерфейс,  ThreadFactory который, в свою очередь, создает пользовательские  Thread экземпляры, которые переопределяют  interrupt() метод, и вызывает  cancel() метод в вашем расширенном Runnable
  • Создание экземпляра исполнителя с помощью фабрики пользовательских потоков (статические методы фабрики принимают его в качестве аргумента)
  • Обработайте внезапное закрытие / остановку / отключение ваших ресурсов блокировки, в  run()методе

Плохая новость заключается в том, что у вас должен быть доступ к конкретному отменяемому исполняемому файлу в вашей фабрике потоков. Вы не можете использовать,  instanceof чтобы проверить, подходит ли он к соответствующему типу, потому что исполнители переносят исполняемые файлы, которые вы им отправляете, в  Worker экземпляры, которые не предоставляют свои основные исполняемые объекты.

Для однопоточных исполнителей это просто — вы просто держите в своем внешнем классе ссылку на представленный в настоящий момент исполняемый файл и обращаетесь к нему в  interrupt методе, например:

private final CancellableRunnable runnable;
...
 
runnable = new CancellableRunnable() {
    private MutableBoolean bool = new MutableBoolean();
    @Override
    public void run() {
        bool.setValue(true);
        while (bool.booleanValue()) {
            // emulating a blocking operation with an endless loop
        }
    }
     
    @Override
    public void cancel() {
        bool.setValue(false);
        // usually here you'd have inputStream.close() or connection.disconnect()
    }
};
 
ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
       return new Thread(r) {
           @Override
           public void interrupt() {
               super.interrupt();
               runnable.cancel();
           }
       };
    }
}); 
 
Future<?> future = executor.submit(runnable);
...
future.cancel();

( CancellableRunnable это пользовательский интерфейс, который просто определяет  cancel() метод)

Но что произойдет, если ваш исполнитель должен выполнить несколько задач одновременно? Если вы хотите отменить все из них, то вы можете сохранить список представленных  CancellableRunnable экземпляров и просто отменить все из них при прерывании. Таким образом, runnables будет отменяться несколько раз, поэтому вы должны учитывать это.

Если вам нужен мелкозернистый контроль, например, путем отмены определенных фьючерсов, то нет простого решения. Вы не можете даже расширить,  ThreadPoolExecutor потому что  addWorker метод является частным. Вы должны скопировать и вставить его.

Единственный вариант не полагаться на  future.cancel() или  executor.shutdownAll() и вместо того, чтобы держать свой собственный список  CancellableFuture экземпляров и сопоставить их с их соответствующими фьючерсами. Поэтому, когда вы хотите отменить некоторые (или все) runnables, вы делаете это наоборот: получите нужный runnable, который хотите отменить, позвоните  .cancel() (как показано выше), затем получите соответствующий  Futureи отмените его. Что-то типа:

Map<CancellableRunnable, Future<?>> cancellableFutures = new HashMap<>();
Future<?> future = executor.submit(runnable);
cancellableFutures.put(runnable, future);
 
//now you want to abruptly cancel a particular task
runnable.cancel();
cancellableFutures.get(runnable).cancel(true);

(Вместо того чтобы использовать runnable в качестве ключа, вы можете использовать некоторый идентификатор, который имеет смысл в вашем сценарии использования, и сохранить как runnable и future в качестве значения этого ключа)

Это хороший обходной путь, но в любом случае я отправил запрос на улучшение пакета java.util.concurrent, чтобы в будущем выпуске у нас была возможность управлять этим вариантом использования.