Статьи

Java 7: закрытие файловых каналов NIO.2 без потери данных

Закрытие асинхронного файлового канала может быть очень трудным. Если вы отправили задачи ввода-вывода в асинхронный канал, вы хотите быть уверены, что задачи выполняются правильно. Это может быть сложным требованием к асинхронным каналам по нескольким причинам. Группа каналов по умолчанию использует потоки deamon в качестве рабочих потоков, что не является хорошим выбором, потому что эти потоки просто прекращают работу при выходе из JVM. Если вы используете пользовательский пул потоков с не-деамонными потоками, вам нужно самим управлять жизненным циклом пула потоков. Если вы этого не сделаете, потоки просто остаются в живых, когда основной поток выходит. Следовательно, JVM на самом деле вообще не выходит, что вы можете сделать, это убить JVM.

Другая проблема при закрытии асинхронных каналов упоминается в javadoc AsynchronousFileChannel : «Завершение работы службы исполнителя при открытом канале приводит к неопределенному поведению». Это связано с тем, что операция close() в AsynchronousFileChannel выдает задачи связанной службе-исполнителю, которые имитируют сбой ожидающих операций ввода-вывода (в том же пуле потоков) с AsynchronousCloseException . Следовательно, вы получите RejectedExecutionException если будете выполнять close() для экземпляра асинхронного файлового канала, когда ранее закрывали связанную службу executor.

При этом предлагаемый способ безопасной настройки файлового канала и отключения этого канала выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleChannelClose_AsynchronousCloseException {
 
  private static final String FILE_NAME = "E:/temp/afile.out";
  private static AsynchronousFileChannel outputfile;
  private static AtomicInteger fileindex = new AtomicInteger(0);
  private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
 
  public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
   outputfile = AsynchronousFileChannel.open(
   Paths.get(FILE_NAME),
   new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE,
              StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool);
   List<Future<Integer>> futures = new ArrayList<>();
   for (int i = 0; i < 10000; i++) {
    futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5));
   }
   outputfile.close();
   pool.shutdown();
   pool.awaitTermination(60, TimeUnit.SECONDS);
   for (Future<Integer> future : futures) {
    try {
     future.get();
    } catch (ExecutionException e) {
     System.out.println("Task wasn't executed!");
    }
   }
  }
}

Служба исполнения пользовательского пула потоков определяется в строках 6 и 7. Канал файла определяется в строках 10–13. В строках 18–20 асинхронный канал закрывается упорядоченным образом. Сначала сам канал закрывается, затем служба исполнителя закрывается и, что не менее важно, поток ожидает завершения исполнителя пула потоков.

Хотя это безопасный способ закрыть канал с помощью пользовательской службы исполнителя, появилась новая проблема. Клиенты отправляли задачи асинхронной записи (строка 16) и, возможно, захотят убедиться, что после их успешной отправки эти задачи обязательно будут выполнены. Всегда ожидать Future.get() (строка 23), это не вариант, потому что во многих случаях это приведет к * асинхронным * файловым каналам ad adsurdum. Фрагмент выше вернет партию «Задача не была выполнена!» сообщения о том, что канал закрыт сразу после того, как операции записи были переданы в канал (строка 18). Чтобы избежать такой «потери данных», вы можете реализовать свой собственный CompletionHandler и передать его запрошенной операции записи.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class SimpleChannelClose_CompletionHandler {
...
 public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
...
   outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler);
...
 }
 
 private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() {
  @Override
  public void completed(Integer result, String attachment) {
   // NOP
  }
 
  @Override
  public void failed(Throwable exc, String attachment) {
  System.out.println("Do something to avoid data loss ...");
  }
 };
}

Метод CompletionHandler.failed() (строка 16) перехватывает любое исключение времени выполнения во время обработки задачи. Вы можете ввести любой код компенсации, чтобы избежать потери данных. Когда вы работаете с критически важными данными, тогда может быть хорошей идеей использовать CompletionHandler s. Но все же есть еще одна проблема. Клиенты могут отправлять задачи, но они не знают, будет ли пул успешно обрабатывать эти задачи. Успешный в этом контексте означает, что отправленные байты фактически достигают своего места назначения (файла на жестком диске). Если вы хотите быть уверены, что все представленные задачи действительно обработаны перед закрытием, это немного сложнее. Вам нужен «изящный» механизм закрытия, который ждет, пока рабочая очередь не станет пустой *, прежде чем * он фактически закроет канал и связанную службу исполнителя (это невозможно при использовании стандартных методов жизненного цикла).

Представляем GracefulAsynchronousChannel

Мои последние фрагменты представляют GracefulAsynchronousFileChannel . Вы можете получить полный код здесь, в моем Git-репозитории . Поведение этого канала выглядит следующим образом: гарантирует обработку всех успешно отправленных операций записи и генерирует NonWritableChannelException если канал готовит отключение. Для реализации этого поведения требуются две вещи. Во-первых, вам нужно реализовать afterExecute() в расширении ThreadPoolExecutor которое отправляет сигнал, когда очередь пуста. Это то, что делает DefensiveThreadPoolExecutor .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {
 
 public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
   LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);
 }
 
 /**
  * "Last" task issues a signal that queue is empty after task processing was completed.
  */
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
  if (state == PREPARE) {
   closeLock.lock(); // only one thread will pass when closer thread is awaiting signal
   try {
    if (getQueue().isEmpty() && state < SHUTDOWN) {
     System.out.println("Issueing signal that queue is empty ...");
     isEmpty.signal();
     state = SHUTDOWN; // -> no other thread can issue empty-signal
    }
   } finally {
    closeLock.unlock();
   }
  }
  super.afterExecute(r, t);
 }
}

Метод afterExecute() (строка 12) выполняется после каждой обработанной задачи потоком, который обработал эту задачу. Реализация посылает сигнал isEmpty в строке 18. Вторая часть, которую вам нужно два изящно закрыть канал, это пользовательская реализация метода close() AsynchronousFileChannel .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
 * Method that closes this file channel gracefully without loosing any data.
 */
@Override
public void close() throws IOException {
 AsynchronousFileChannel writeableChannel = innerChannel;
 System.out.println("Starting graceful shutdown ...");
 closeLock.lock();
 try {
  state = PREPARE;
  innerChannel = AsynchronousFileChannel.open(Paths.get(uri),
    new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);
  System.out.println("Channel blocked for write access ...");
  if (!pool.getQueue().isEmpty()) {
   System.out.println("Waiting for signal that queue is empty ...");
   isEmpty.await();
   System.out.println("Received signal that queue is empty ... closing");
  } else {
   System.out.println("Don't have to wait, queue is empty ...");
  }
 } catch (InterruptedException e) {
  Thread.interrupted();
  throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);
 } catch (Exception e) {
  throw new RuntimeException("Unexpected error" + e);
 } finally {
  closeLock.unlock();
  writeableChannel.force(false);
  writeableChannel.close(); // close the writable channel
  innerChannel.close(); // close the read-only channel
  System.out.println("File closed ...");
  pool.shutdown(); // allow clean up tasks from previous close() operation to finish safely
  try {
   pool.awaitTermination(1, TimeUnit.MINUTES);
  } catch (InterruptedException e) {
   Thread.interrupted();
   throw new RuntimeException("Could not terminate thread pool!", e);
  }
  System.out.println("Pool closed ...");
 }
}

Изучите этот код некоторое время. Интересные биты находятся в строке 11, где innerChannel заменяется каналом только для чтения. Это приводит к тому, что любые последующие асинхронные запросы на NonWritableChannelException с ошибкой NonWritableChannelException . В строке 16 метод close() ожидает сигнала isEmpty . Когда этот сигнал отправляется после последней задачи записи, метод close() продолжает процедуру упорядоченного завершения работы (строка 27 и далее). По сути, код добавляет общее состояние жизненного цикла для файлового канала и связанного пула потоков. Таким образом, оба объекта могут обмениваться данными во время процедуры отключения и избежать потери данных.

Вот клиент ведения журнала, который использует GracefulAsynchronousFileChannel .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MyLoggingClient {
 private static AtomicInteger fileindex = new AtomicInteger(0);
 private static final String FILE_URI = "file:/E:/temp/afile.out";
 
 public static void main(String[] args) throws IOException {
  new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink
 
     @Override
     public void run() {
      try {
       for (;;) {
        GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()),
          fileindex.getAndIncrement() * 5);
       }
      } catch (NonWritableChannelException e) {
       System.out.println("Deal with the fact that the channel was closed asynchronously ... "
         + e.toString());
      } catch (Exception e) {
       e.printStackTrace();
      }
     }
    }).start();
 
  Timer timer = new Timer(); // asynchronous channel closer
  timer.schedule(new TimerTask() {
   public void run() {
    try {
     GracefulAsynchronousFileChannel.get(FILE_URI).close();
     long size = Files.size(Paths.get("E:/temp/afile.out"));
     System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5);
     System.out.println("Actual file size (bytes): " + size);
     if (size == (fileindex.get() - 1) * 5)
      System.out.println("No write operation was lost!");
     Files.delete(Paths.get("E:/temp/afile.out"));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }, 1000);
 
 
 }
}

Клиент запускает два потока, один поток выполняет операции записи в бесконечном цикле (строка 6 и далее). Другой поток закрывает канал файла асинхронно после одной секунды обработки (строка 25 и далее). Если вы запустите этот клиент, будет получен следующий вывод:

01
02
03
04
05
06
07
08
09
10
11
Starting graceful shutdown ...
Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException
Channel blocked for write access ...
Waiting for signal that queue is empty ...
Issueing signal that queue is empty ...
Received signal that queue is empty ... closing
File closed ...
Pool closed ...
Expected file size (bytes): 400020
Actual file size (bytes): 400020
No write operation was lost!

Выходные данные показывают упорядоченную процедуру завершения участвующих потоков. Поток журналирования должен учитывать тот факт, что канал был закрыт асинхронно. После обработки поставленных в очередь задач ресурсы канала закрываются. Данные не были потеряны, все, что выдало клиент, действительно было записано в место назначения файла. Нет AsynchronousClosedException s или RejectedExecutionException s в такой изящной процедуре закрытия.

Это все с точки зрения безопасного закрытия асинхронных файловых каналов. Полный код находится здесь, в моем репозитории Git . Надеюсь, тебе это немного понравилось. Ждем ваших комментариев.

Ссылка: «Java 7: Закрытие файловых каналов NIO.2 без потери данных» от нашего партнера JCG Никласа.