Статьи

WatchService в сочетании с актерами Akka

WatchService — это удобный класс, который может уведомлять вас о любых изменениях файловой системы (создание / обновление / удаление файла) в заданном наборе каталогов. Это хорошо описано в официальной документации, поэтому я не буду писать еще одно введение. Вместо этого мы попытаемся объединить его с Akka, чтобы обеспечить полностью асинхронный, неблокирующий механизм уведомления об изменениях файловой системы. И мы будем масштабировать его как для нескольких каталогов, так и для нескольких… серверов! Просто для начала приведу простой, наглядный пример:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
val watchService = FileSystems.getDefault.newWatchService()
Paths.get("/foo/bar").register(watchService, ENTRY_CREATE, ENTRY_DELETE)
  
while(true) {
    val key = watchService.take()
    key.pollEvents() foreach { event =>
        event.kind() match {
            case ENTRY_CREATE =>    //...
            case ENTRY_DELETE =>    //...
            case x =>
                logger.warn(s"Unknown event $x")
        }
    }
    key.reset()
}

Я знаю, что java.nio означает « новый ввод / вывод », а не « неблокирующий ввод / вывод », но можно ожидать, что такой класс будет работать асинхронно. Вместо этого мы должны пожертвовать одним потоком, использовать неловкий цикл while(true) и заблокировать watchService.take() . Может быть, именно так работают базовые операционные системы (к счастью, WatchService использует нативный API ОС, когда он доступен)? Неважно, мы должны жить с этим. К счастью, один WatchService может отслеживать произвольное количество путей, поэтому нам нужен только один поток на каждое приложение, а не на каталог. Итак, давайте завернем это в Runnable :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
class WatchServiceTask2(notifyActor: ActorRef) extends Runnable with Logging {
    private val watchService = FileSystems.getDefault.newWatchService()
  
    def run() {
        try {
            while (!Thread.currentThread().isInterrupted) {
                val key = watchService.take()
                //coming soon...
                key.reset()
            }
        } catch {
            case e: InterruptedException =>
                logger.info("Interrupting, bye!")
        } finally {
            watchService.close()
        }
    }
}

Это скелетная реализация любого Runnable который ждет / блокирует, за которым я хочу, чтобы вы следовали. Проверьте Thread.isInterrupted() и Thread.isInterrupted() основного цикла, когда возникнет InterruptedException . Таким образом, позже вы можете безопасно закрыть поток, вызвав Thread.interrupt() без каких-либо задержек. notifyActor внимание на две вещи: нам нужна ссылка notifyActor в конструкторе (понадобится позже, надеюсь, вы знаете, почему), и мы пока не отслеживаем никаких каталогов. К счастью, мы можем добавить отслеживаемые каталоги в любое время (но мы никогда не сможем удалить их впоследствии, ограничение API ?!) Однако есть одна проблема: WatchService контролирует только данный каталог, но не подкаталоги (это не рекурсивно). К счастью, еще один новичок в блоке JDK, Files.walkFileTree() , освобождает нас от утомительного рекурсивного алгоритма:

01
02
03
04
05
06
07
08
09
10
11
12
def watchRecursively(root: Path) {
    watch(root)
    Files.walkFileTree(root, new SimpleFileVisitor[Path] {
        override def preVisitDirectory(dir: Path, attrs: BasicFileAttributes) = {
            watch(dir)
            FileVisitResult.CONTINUE
        }
    })
}
  
private def watch(path: Path) =
    path.register(watchService, ENTRY_CREATE, ENTRY_DELETE)

Посмотрите, как хорошо мы можем пройти по всему дереву каталогов, используя плоский FileVisitor ? Теперь последняя часть головоломки — это тело цикла выше (вы найдете полный исходный код на GitHub ):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
key.pollEvents() foreach {
    event =>
        val relativePath = event.context().asInstanceOf[Path]
        val path = key.watchable().asInstanceOf[Path].resolve(relativePath)
        event.kind() match {
            case ENTRY_CREATE =>
                if (path.toFile.isDirectory) {
                    watchRecursively(path)
                }
                notifyActor ! Created(path.toFile)
            case ENTRY_DELETE =>
                notifyActor ! Deleted(path.toFile)
            case x =>
                logger.warn(s"Unknown event $x")
        }
}

Когда создается новая запись файловой системы, и она оказывается каталогом, мы также начинаем отслеживать этот каталог. Таким образом, если, например, мы запускаем мониторинг /tmp , также отслеживается каждый отдельный подкаталог, как существующий при запуске, так и только что созданный. Классы сообщений довольно просты. Вы можете утверждать, что отдельные классы CreatedFile и CreatedDirectory могли бы быть лучшей идеей — в зависимости от вашего CreatedDirectory использования, это было проще с точки зрения этой статьи:

1
2
3
4
5
sealed trait FileSystemChange
case class Created(fileOrDir: File) extends FileSystemChange
case class Deleted(fileOrDir: File) extends FileSystemChange
  
case class MonitorDir(path: Path)

Последнее сообщение MonitorDir будет использовано всего за секунду. Давайте завернем нашу задачу Runnable и инкапсулируем ее в актере. Я знаю, как плохо выглядит запуск потока внутри актера Akka, но Java API заставляет нас сделать это, и это будет нашим секретом, который никогда не ускользнет от этого конкретного актера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
class FileSystemActor extends Actor {
    val log = Logging(context.system, this)
    val watchServiceTask = new WatchServiceTask(self)
    val watchThread = new Thread(watchServiceTask, "WatchService")
  
    override def preStart() {
        watchThread.setDaemon(true)
        watchThread.start()
    }
  
    override def postStop() {
        watchThread.interrupt()
    }
  
    def receive = LoggingReceive {
        case MonitorDir(path) =>
            watchServiceTask watchRecursively path
        case Created(file) =>
            //e.g. forward or broadcast to other actors
        case Deleted(fileOrDir) =>
    }
}

Несколько вещей, которые нужно иметь в виду: актер берет на себя полную ответственность за жизненный цикл потока "WatchService" . Также посмотрите, как он обрабатывает сообщение MonitorDir . Однако мы не следим ни за одним каталогом с самого начала. Это сделано снаружи:

1
2
3
4
5
val system = ActorSystem("WatchFsSystem")
val fsActor = system.actorOf(Props[FileSystemActor], "fileSystem")
fsActor ! MonitorDir(Paths get "/home/john")
//...
system.shutdown()

Очевидно, что вы можете отправлять любое количество сообщений MonitorDir с разными каталогами, и все они контролируются одновременно — но вам не нужно отслеживать подкаталоги, это сделано для вас. Создание и удаление нового файла для проверки дыма нашего решения, и, очевидно, оно работает:

1
2
3
received handled message MonitorDir(/home/john/tmp)
received handled message Created(/home/john/tmp/test.txt)
received handled message Deleted(/home/john/tmp/test.txt)

Есть одна интересная часть функциональности, которую мы получаем бесплатно. Если мы запустим это приложение в кластере и настроим одного действующего субъекта, который будет создан только в одном из экземпляров (см. « Удаленные действующие лица — подробный пример того, как настроить удаленные действующие лица», в Akka ), мы можем легко объединять изменения файловой системы с нескольких серверов! Просто найдите удаленный («синглтонный» в кластере) агрегатный FileSystemActor в FileSystemActor и перенаправьте на него события. Вышеупомянутая статья объясняет очень похожую архитектуру, поэтому я не буду вдаваться в подробности. Достаточно сказать, что с этой топологией можно легко отслеживать несколько серверов и собирать события изменений на всех из них.

Итак … у нас есть классное решение, давайте посмотрим на проблему. В установке с одним узлом FileSystemActor обеспечивает хорошую абстракцию над блокировкой WatchService . Другие участники, заинтересованные в изменениях файловой системы, могут зарегистрироваться в FileSystemActor и быстро реагировать на изменения. В многоузловой кластерной настройке она работает примерно так же, но теперь мы можем легко управлять несколькими узлами. Одной из идей будет репликация файлов по узлам.

Ссылка: WatchService в сочетании с актерами Akka от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседстве .