WatchService
— это удобный класс, который может уведомлять вас о любых изменениях файловой системы (создание / обновление / удаление файла) в заданном наборе каталогов. Это хорошо описано в официальной документации, поэтому я не буду писать еще одно введение. Вместо этого мы попытаемся объединить его с Akka, чтобы обеспечить полностью асинхронный, неблокирующий механизм уведомления об изменениях файловой системы. И мы будем масштабировать его как для нескольких каталогов, так и для нескольких… серверов! Просто для начала приведу простой, наглядный пример:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
val watchService = FileSystems.getDefault.newWatchService() Paths.get( "/foo/bar" ).register(watchService, ENTRY_CREATE, ENTRY_DELETE) while ( true ) { val key = watchService.take() key.pollEvents() foreach { event => event.kind() match { case ENTRY_CREATE => //... case ENTRY_DELETE => //... case x => logger.warn(s "Unknown event $x" ) } } key.reset() } |
Я знаю, что java.nio
означает « новый ввод / вывод », а не « неблокирующий ввод / вывод », но можно ожидать, что такой класс будет работать асинхронно. Вместо этого мы должны пожертвовать одним потоком, использовать неловкий цикл while(true)
и заблокировать watchService.take()
. Может быть, именно так работают базовые операционные системы (к счастью, WatchService
использует нативный API ОС, когда он доступен)? Неважно, мы должны жить с этим. К счастью, один WatchService
может отслеживать произвольное количество путей, поэтому нам нужен только один поток на каждое приложение, а не на каталог. Итак, давайте завернем это в Runnable
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
class WatchServiceTask2(notifyActor: ActorRef) extends Runnable with Logging { private val watchService = FileSystems.getDefault.newWatchService() def run() { try { while (!Thread.currentThread().isInterrupted) { val key = watchService.take() //coming soon... key.reset() } } catch { case e: InterruptedException => logger.info( "Interrupting, bye!" ) } finally { watchService.close() } } } |
Это скелетная реализация любого Runnable
который ждет / блокирует, за которым я хочу, чтобы вы следовали. Проверьте Thread.isInterrupted()
и Thread.isInterrupted()
основного цикла, когда возникнет InterruptedException
. Таким образом, позже вы можете безопасно закрыть поток, вызвав Thread.interrupt()
без каких-либо задержек. notifyActor
внимание на две вещи: нам нужна ссылка notifyActor
в конструкторе (понадобится позже, надеюсь, вы знаете, почему), и мы пока не отслеживаем никаких каталогов. К счастью, мы можем добавить отслеживаемые каталоги в любое время (но мы никогда не сможем удалить их впоследствии, ограничение API ?!) Однако есть одна проблема: WatchService
контролирует только данный каталог, но не подкаталоги (это не рекурсивно). К счастью, еще один новичок в блоке JDK, Files.walkFileTree()
, освобождает нас от утомительного рекурсивного алгоритма:
01
02
03
04
05
06
07
08
09
10
11
12
|
def watchRecursively(root: Path) { watch(root) Files.walkFileTree(root, new SimpleFileVisitor[Path] { override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = { watch(dir) FileVisitResult.CONTINUE } }) } private def watch(path: Path) = path.register(watchService, ENTRY_CREATE, ENTRY_DELETE) |
Посмотрите, как хорошо мы можем пройти по всему дереву каталогов, используя плоский FileVisitor
? Теперь последняя часть головоломки — это тело цикла выше (вы найдете полный исходный код на GitHub ):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
key.pollEvents() foreach { event => val relativePath = event.context().asInstanceOf[Path] val path = key.watchable().asInstanceOf[Path].resolve(relativePath) event.kind() match { case ENTRY_CREATE => if (path.toFile.isDirectory) { watchRecursively(path) } notifyActor ! Created(path.toFile) case ENTRY_DELETE => notifyActor ! Deleted(path.toFile) case x => logger.warn(s "Unknown event $x" ) } } |
Когда создается новая запись файловой системы, и она оказывается каталогом, мы также начинаем отслеживать этот каталог. Таким образом, если, например, мы запускаем мониторинг /tmp
, также отслеживается каждый отдельный подкаталог, как существующий при запуске, так и только что созданный. Классы сообщений довольно просты. Вы можете утверждать, что отдельные классы CreatedFile
и CreatedDirectory
могли бы быть лучшей идеей — в зависимости от вашего CreatedDirectory
использования, это было проще с точки зрения этой статьи:
1
2
3
4
5
|
sealed trait FileSystemChange case class Created(fileOrDir: File) extends FileSystemChange case class Deleted(fileOrDir: File) extends FileSystemChange case class MonitorDir(path: Path) |
Последнее сообщение MonitorDir
будет использовано всего за секунду. Давайте завернем нашу задачу Runnable
и инкапсулируем ее в актере. Я знаю, как плохо выглядит запуск потока внутри актера Akka, но Java API заставляет нас сделать это, и это будет нашим секретом, который никогда не ускользнет от этого конкретного актера:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class FileSystemActor extends Actor { val log = Logging(context.system, this ) val watchServiceTask = new WatchServiceTask(self) val watchThread = new Thread(watchServiceTask, "WatchService" ) override def preStart() { watchThread.setDaemon( true ) watchThread.start() } override def postStop() { watchThread.interrupt() } def receive = LoggingReceive { case MonitorDir(path) => watchServiceTask watchRecursively path case Created(file) => //e.g. forward or broadcast to other actors case Deleted(fileOrDir) => } } |
Несколько вещей, которые нужно иметь в виду: актер берет на себя полную ответственность за жизненный цикл потока "WatchService"
. Также посмотрите, как он обрабатывает сообщение MonitorDir
. Однако мы не следим ни за одним каталогом с самого начала. Это сделано снаружи:
1
2
3
4
5
|
val system = ActorSystem( "WatchFsSystem" ) val fsActor = system.actorOf(Props[FileSystemActor], "fileSystem" ) fsActor ! MonitorDir(Paths get "/home/john" ) //... system.shutdown() |
Очевидно, что вы можете отправлять любое количество сообщений MonitorDir
с разными каталогами, и все они контролируются одновременно — но вам не нужно отслеживать подкаталоги, это сделано для вас. Создание и удаление нового файла для проверки дыма нашего решения, и, очевидно, оно работает:
1
2
3
|
received handled message MonitorDir(/home/john/tmp) received handled message Created(/home/john/tmp/test.txt) received handled message Deleted(/home/john/tmp/test.txt) |
Есть одна интересная часть функциональности, которую мы получаем бесплатно. Если мы запустим это приложение в кластере и настроим одного действующего субъекта, который будет создан только в одном из экземпляров (см. « Удаленные действующие лица — подробный пример того, как настроить удаленные действующие лица», в Akka ), мы можем легко объединять изменения файловой системы с нескольких серверов! Просто найдите удаленный («синглтонный» в кластере) агрегатный FileSystemActor
в FileSystemActor
и перенаправьте на него события. Вышеупомянутая статья объясняет очень похожую архитектуру, поэтому я не буду вдаваться в подробности. Достаточно сказать, что с этой топологией можно легко отслеживать несколько серверов и собирать события изменений на всех из них.
Итак … у нас есть классное решение, давайте посмотрим на проблему. В установке с одним узлом FileSystemActor
обеспечивает хорошую абстракцию над блокировкой WatchService
. Другие участники, заинтересованные в изменениях файловой системы, могут зарегистрироваться в FileSystemActor
и быстро реагировать на изменения. В многоузловой кластерной настройке она работает примерно так же, но теперь мы можем легко управлять несколькими узлами. Одной из идей будет репликация файлов по узлам.