WatchService — это удобный класс, который может уведомлять вас о любых изменениях файловой системы (создание / обновление / удаление файла) в заданном наборе каталогов. Это хорошо описано в официальной документации, поэтому я не буду писать еще одно введение. Вместо этого мы попытаемся объединить его с Akka, чтобы обеспечить полностью асинхронный, неблокирующий механизм уведомления об изменениях файловой системы. И мы будем масштабировать его как для нескольких каталогов, так и для нескольких… серверов! Просто для начала приведу простой, наглядный пример:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
val watchService = FileSystems.getDefault.newWatchService()Paths.get("/foo/bar").register(watchService, ENTRY_CREATE, ENTRY_DELETE) while(true) { val key = watchService.take() key.pollEvents() foreach { event => event.kind() match { case ENTRY_CREATE => //... case ENTRY_DELETE => //... case x => logger.warn(s"Unknown event $x") } } key.reset()} |
Я знаю, что java.nio означает « новый ввод / вывод », а не « неблокирующий ввод / вывод », но можно ожидать, что такой класс будет работать асинхронно. Вместо этого мы должны пожертвовать одним потоком, использовать неловкий цикл while(true) и заблокировать watchService.take() . Может быть, именно так работают базовые операционные системы (к счастью, WatchService использует нативный API ОС, когда он доступен)? Неважно, мы должны жить с этим. К счастью, один WatchService может отслеживать произвольное количество путей, поэтому нам нужен только один поток на каждое приложение, а не на каталог. Итак, давайте завернем это в Runnable :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
class WatchServiceTask2(notifyActor: ActorRef) extends Runnable with Logging { private val watchService = FileSystems.getDefault.newWatchService() def run() { try { while (!Thread.currentThread().isInterrupted) { val key = watchService.take() //coming soon... key.reset() } } catch { case e: InterruptedException => logger.info("Interrupting, bye!") } finally { watchService.close() } }} |
Это скелетная реализация любого Runnable который ждет / блокирует, за которым я хочу, чтобы вы следовали. Проверьте Thread.isInterrupted() и Thread.isInterrupted() основного цикла, когда возникнет InterruptedException . Таким образом, позже вы можете безопасно закрыть поток, вызвав Thread.interrupt() без каких-либо задержек. notifyActor внимание на две вещи: нам нужна ссылка notifyActor в конструкторе (понадобится позже, надеюсь, вы знаете, почему), и мы пока не отслеживаем никаких каталогов. К счастью, мы можем добавить отслеживаемые каталоги в любое время (но мы никогда не сможем удалить их впоследствии, ограничение API ?!) Однако есть одна проблема: WatchService контролирует только данный каталог, но не подкаталоги (это не рекурсивно). К счастью, еще один новичок в блоке JDK, Files.walkFileTree() , освобождает нас от утомительного рекурсивного алгоритма:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
def watchRecursively(root: Path) { watch(root) Files.walkFileTree(root, new SimpleFileVisitor[Path] { override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = { watch(dir) FileVisitResult.CONTINUE } })} private def watch(path: Path) = path.register(watchService, ENTRY_CREATE, ENTRY_DELETE) |
Посмотрите, как хорошо мы можем пройти по всему дереву каталогов, используя плоский FileVisitor ? Теперь последняя часть головоломки — это тело цикла выше (вы найдете полный исходный код на GitHub ):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
key.pollEvents() foreach { event => val relativePath = event.context().asInstanceOf[Path] val path = key.watchable().asInstanceOf[Path].resolve(relativePath) event.kind() match { case ENTRY_CREATE => if (path.toFile.isDirectory) { watchRecursively(path) } notifyActor ! Created(path.toFile) case ENTRY_DELETE => notifyActor ! Deleted(path.toFile) case x => logger.warn(s"Unknown event $x") }} |
Когда создается новая запись файловой системы, и она оказывается каталогом, мы также начинаем отслеживать этот каталог. Таким образом, если, например, мы запускаем мониторинг /tmp , также отслеживается каждый отдельный подкаталог, как существующий при запуске, так и только что созданный. Классы сообщений довольно просты. Вы можете утверждать, что отдельные классы CreatedFile и CreatedDirectory могли бы быть лучшей идеей — в зависимости от вашего CreatedDirectory использования, это было проще с точки зрения этой статьи:
|
1
2
3
4
5
|
sealed trait FileSystemChangecase class Created(fileOrDir: File) extends FileSystemChangecase class Deleted(fileOrDir: File) extends FileSystemChange case class MonitorDir(path: Path) |
Последнее сообщение MonitorDir будет использовано всего за секунду. Давайте завернем нашу задачу Runnable и инкапсулируем ее в актере. Я знаю, как плохо выглядит запуск потока внутри актера Akka, но Java API заставляет нас сделать это, и это будет нашим секретом, который никогда не ускользнет от этого конкретного актера:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class FileSystemActor extends Actor { val log = Logging(context.system, this) val watchServiceTask = new WatchServiceTask(self) val watchThread = new Thread(watchServiceTask, "WatchService") override def preStart() { watchThread.setDaemon(true) watchThread.start() } override def postStop() { watchThread.interrupt() } def receive = LoggingReceive { case MonitorDir(path) => watchServiceTask watchRecursively path case Created(file) => //e.g. forward or broadcast to other actors case Deleted(fileOrDir) => }} |
Несколько вещей, которые нужно иметь в виду: актер берет на себя полную ответственность за жизненный цикл потока "WatchService" . Также посмотрите, как он обрабатывает сообщение MonitorDir . Однако мы не следим ни за одним каталогом с самого начала. Это сделано снаружи:
|
1
2
3
4
5
|
val system = ActorSystem("WatchFsSystem")val fsActor = system.actorOf(Props[FileSystemActor], "fileSystem")fsActor ! MonitorDir(Paths get "/home/john")//...system.shutdown() |
Очевидно, что вы можете отправлять любое количество сообщений MonitorDir с разными каталогами, и все они контролируются одновременно — но вам не нужно отслеживать подкаталоги, это сделано для вас. Создание и удаление нового файла для проверки дыма нашего решения, и, очевидно, оно работает:
|
1
2
3
|
received handled message MonitorDir(/home/john/tmp)received handled message Created(/home/john/tmp/test.txt)received handled message Deleted(/home/john/tmp/test.txt) |
Есть одна интересная часть функциональности, которую мы получаем бесплатно. Если мы запустим это приложение в кластере и настроим одного действующего субъекта, который будет создан только в одном из экземпляров (см. « Удаленные действующие лица — подробный пример того, как настроить удаленные действующие лица», в Akka ), мы можем легко объединять изменения файловой системы с нескольких серверов! Просто найдите удаленный («синглтонный» в кластере) агрегатный FileSystemActor в FileSystemActor и перенаправьте на него события. Вышеупомянутая статья объясняет очень похожую архитектуру, поэтому я не буду вдаваться в подробности. Достаточно сказать, что с этой топологией можно легко отслеживать несколько серверов и собирать события изменений на всех из них.
Итак … у нас есть классное решение, давайте посмотрим на проблему. В установке с одним узлом FileSystemActor обеспечивает хорошую абстракцию над блокировкой WatchService . Другие участники, заинтересованные в изменениях файловой системы, могут зарегистрироваться в FileSystemActor и быстро реагировать на изменения. В многоузловой кластерной настройке она работает примерно так же, но теперь мы можем легко управлять несколькими узлами. Одной из идей будет репликация файлов по узлам.