В этой статье мы обсудим:
- Мониторинг файловой системы с использованием Java NIO.2
- Общие подводные камни библиотеки Java по умолчанию
- Разработать простой монитор файловой системы на основе потоков
- Используйте вышеизложенное для разработки реактивного монитора файловой системы с использованием актерской модели
Примечание . Хотя все примеры кода здесь написаны на Scala, его можно переписать и на простом Java. Чтобы быстро ознакомиться с синтаксисом Scala, вот очень короткая и красивая таблица Scala . Чтобы получить более полное руководство по Scala для Java-программистов, обратитесь к нему (не обязательно следовать этой статье).
Для кратчайшего краткого описания, следующий код Java:
| 1 2 3 4 5 6 7 8 | publicvoidfoo(intx, inty) {  intz = x + y  if(z == 1) {    System.out.println(x);  } else{    System.out.println(y);  }} | 
эквивалентно следующему коду Scala:
| 1 2 3 4 5 6 7 | def foo(x: Int, y: Int): Unit = {  val z: Int = x + y  z match {   case1=> println(x)   case_ => println(y)  }} | 
Весь код, представленный здесь, доступен под лицензией MIT как часть библиотеки улучшенных файлов на GitHub .
Допустим, вам поручено создать кроссплатформенную систему поиска файлов на рабочем столе. Вы быстро понимаете, что после начальной индексации всех файлов вам также необходимо быстро переиндексировать любые новые файлы (или каталоги), которые были созданы или обновлены. Наивным способом было бы просто пересматривать всю файловую систему каждые несколько минут; но это было бы невероятно неэффективно, поскольку большинство операционных систем предоставляют API-интерфейсы уведомлений файловой системы, которые позволяют разработчику приложений регистрировать обратные вызовы для изменений, например, ionotify в Linux, FSEvenets в Mac и FindFirstChangeNotification в Windows.
Но теперь вы застряли с API-интерфейсами, специфичными для ОС! К счастью, начиная с Java SE 7, у нас есть независимая от платформы абстракция для наблюдения за изменениями файловой системы через API WatchService . API WatchService был разработан как часть Java NIO.2 под JSR-51, и вот пример «привет мира» его использования для просмотра заданного пути :
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | importjava.nio.file._importjava.nio.file.StandardWatchEventKinds._importscala.collection.JavaConversions._def watch(directory: Path): Unit = {  // First create the service  val service: WatchService = directory.getFileSystem.newWatchService()  // Register the service to the path and also specify which events we want to be notified about  directory.register(service,  ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)  while(true) {    val key: WatchKey = service.take()  // Wait for this key to be signalled    for{event <- key.pollEvents()} {      // event.context() is the path to the file that got changed        event.kind() match {        caseENTRY_CREATE => println(s"${event.context()} got created")        caseENTRY_MODIFY => println(s"${event.context()} got modified")        caseENTRY_DELETE => println(s"${event.context()} got deleted")                case_ =>           // This can happen when OS discards or loses an event.           // See: http://docs.oracle.com/javase/8/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW          println(s"Unknown event $event happened at ${event.context()}")      }    }  }} | 
Хотя вышесказанное является хорошей первой попыткой, в ней не хватает нескольких аспектов:
- Плохой дизайн . Приведенный выше код выглядит неестественно, и вам, вероятно, пришлось поискать его в StackOverflow, чтобы получить его правильно. Можем ли мы сделать лучше?
- Плохой дизайн : код не очень хорошо справляется с обработкой ошибок. Что происходит, когда мы сталкиваемся с файлом, который не можем открыть?
- Поправка : API Java позволяет нам только следить за каталогом на предмет изменений в его прямых потомках; он не рекурсивно смотрит каталог для вас.
- Понятно : API Java не позволяет нам просматривать один файл , только каталог.
- Понятно : даже если мы решим вышеупомянутые проблемы, Java API не начнет автоматически просматривать новый дочерний файл или каталог, созданный в корневом каталоге.
- Плохой дизайн : код, реализованный выше, предоставляет блокировку / опрос, основанную на потоках модель. Можем ли мы использовать лучшую абстракцию параллелизма?
Давайте начнем с каждой из вышеперечисленных проблем.
- Лучший интерфейс : вот как должен выглядеть мой идеальный интерфейс:
| 1 2 3 4 5 6 7 | abstractclassFileMonitor(root: Path) {  def start(): Unit  def onCreate(path: Path): Unit  def onModify(path: Path): Unit  def onDelete(path: Path): Unit    def stop(): Unit} | 
Таким образом, я могу просто написать пример кода как:
| 1 2 3 4 5 6 | val watcher = newFileMonitor(myFile) {  override def onCreate(path: Path) = println(s"$path got created")    override def onModify(path: Path) = println(s"$path got modified")      override def onDelete(path: Path) = println(s"$path got deleted")  }watcher.start() | 
Хорошо, давайте попробуем адаптировать первый пример, используя поток Java, чтобы мы могли представить «мой идеальный интерфейс»:
| 01 02 03 04 05 06 07 08 09 10 | trait FileMonitor {                               // My ideal interface  val root: Path                                  // starting file    def start(): Unit                               // start the monitor   def onCreate(path: Path) = {}                   // on-create callback   def onModify(path: Path) = {}                   // on-modify callback   def onDelete(path: Path) = {}                   // on-delete callback   def onUnknownEvent(event: WatchEvent[_]) = {}   // handle lost/discarded events  def onException(e: Throwable) = {}              // handle errors e.g. a read error  def stop(): Unit                                // stop the monitor} | 
А вот очень базовая реализация на основе потоков:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | classThreadFileMonitor(val root: Path) extendsThread with FileMonitor {  setDaemon(true)        // daemonize this thread  setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler {    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)      })  val service = root.getFileSystem.newWatchService()  override def run() = Iterator.continually(service.take()).foreach(process)  override def interrupt() = {    service.close()    super.interrupt()  }  override def start() = {    watch(root)    super.start()  }  protected[this] def watch(file: Path): Unit = {    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)  }  protected[this] def process(key: WatchKey) = {    key.pollEvents() foreach {      caseevent: WatchEvent[Path] => dispatch(event.kind(), event.context())            caseevent => onUnknownEvent(event)    }    key.reset()  }  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {    eventType match {      caseENTRY_CREATE => onCreate(file)      caseENTRY_MODIFY => onModify(file)      caseENTRY_DELETE => onDelete(file)    }  }} | 
  Выше выглядит намного чище!  Теперь мы можем смотреть файлы в доступной форме, не вдаваясь в подробности JavaDocs, просто реализуя onCreate(path) , onModify(path) , onDelete(path) и т. Д. 
-   Обработка исключений : это уже сделано выше.  onExceptionвсякий раз, когда мы сталкиваемся с исключением, и вызывающий может решить, что делать дальше, внедрив его.
-   Рекурсивный просмотр : Java API не позволяет рекурсивный просмотр каталогов .  Нам нужно изменить watch(file)чтобы рекурсивно прикреплять наблюдателя:
| 1 2 3 4 5 6 7 8 9 | def watch(file: Path, recursive: Boolean = true): Unit = {  if(Files.isDirectory(file)) {    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)                   // recursively call watch on children of this file       if(recursive) {        Files.list(file).iterator() foreach {f => watch(f, recursive)}     }   }} | 
- Просмотр обычных файлов : как упоминалось ранее, API Java может только просматривать каталоги . Один способ взлома, который мы можем сделать для просмотра отдельных файлов, — это установить наблюдатель в его родительский каталог и реагировать только в том случае, если событие инициируется в самом файле.
| 1 2 3 4 5 6 7 8 | override def start() = {  if(Files.isDirectory(root)) {    watch(root, recursive = true)   } else{    watch(root.getParent, recursive = false)  }  super.start()} | 
  И теперь в process(key) мы проверяем, реагируем ли мы либо на каталог, либо на этот файл: 
| 1 | def reactTo(target: Path) = Files.isDirectory(root) || (root == target) | 
  И мы проверяем перед dispatch сейчас: 
| 1 2 3 4 5 | caseevent: WatchEvent[Path] =>  val target = event.context()  if(reactTo(target)) {    dispatch(event.kind(), target)  } | 
-   Автоматический просмотр новых элементов : Java API, не позволяет автоматически просматривать новые субфайлы .  Мы можем решить эту проблему, подключив наблюдатель к process(key)приENTRY_CREATEсобытияENTRY_CREATE:
| 1 2 3 4 5 6 | if(reactTo(target)) {  if(Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {    watch(root.resolve(target))  }  dispatch(event.kind(), target)} | 
  Собрав все это вместе, мы получили наш последний FileMonitor.scala : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | classThreadFileMonitor(val root: Path) extendsThread with FileMonitor {  setDaemon(true) // daemonize this thread  setUncaughtExceptionHandler(newThread.UncaughtExceptionHandler {    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)      })  val service = root.getFileSystem.newWatchService()  override def run() = Iterator.continually(service.take()).foreach(process)  override def interrupt() = {    service.close()    super.interrupt()  }  override def start() = {    if(Files.isDirectory(root)) {      watch(root, recursive = true)     } else{      watch(root.getParent, recursive = false)    }    super.start()  }  protected[this] def watch(file: Path, recursive: Boolean = true): Unit = {    if(Files.isDirectory(file)) {      file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)      if(recursive) {        Files.list(file).iterator() foreach {f => watch(f, recursive)}      }      }  }  private[this] def reactTo(target: Path) = Files.isDirectory(root) || (root == target)  protected[this] def process(key: WatchKey) = {    key.pollEvents() foreach {      caseevent: WatchEvent[Path] =>        val target = event.context()        if(reactTo(target)) {          if(Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {            watch(root.resolve(target))          }          dispatch(event.kind(), target)        }      caseevent => onUnknownEvent(event)    }    key.reset()  }  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {    eventType match {      caseENTRY_CREATE => onCreate(file)      caseENTRY_MODIFY => onModify(file)      caseENTRY_DELETE => onDelete(file)    }  }} | 
  Теперь, когда мы обратились ко всем ошибкам и дистанцировались от тонкостей API WatchService, мы все еще тесно связаны с API на основе потоков.  Мы будем использовать вышеупомянутый класс, чтобы представить другую модель параллелизма, а именно модель актора, вместо этого, чтобы спроектировать реактивный, динамический и устойчивый наблюдатель файловой системы с использованием Akka .  Хотя создание актеров Akka выходит за рамки данной статьи, мы представим очень простой актер, использующий ThreadFileMonitor : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | importjava.nio.file.{Path, WatchEvent}importakka.actor._classFileWatcher(file: Path) extendsThreadFileMonitor(file) with Actor {  importFileWatcher._  // MultiMap from Events to registered callbacks  protected[this] val callbacks = newMultiMap[Event, Callback]    // Override the dispatcher from ThreadFileMonitor to inform the actor of a new event  override def dispatch(event: Event, file: Path) = self ! Message.NewEvent(event, file)    // Override the onException from the ThreadFileMonitor  override def onException(exception: Throwable) = self ! Status.Failure(exception)  // when actor starts, start the ThreadFileMonitor  override def preStart() = super.start()       // before actor stops, stop the ThreadFileMonitor  override def postStop() = super.interrupt()  override def receive = {    caseMessage.NewEvent(event, target) ifcallbacks contains event =>        callbacks(event) foreach {f => f(event -> target)}    caseMessage.RegisterCallback(events, callback) =>        events foreach {event => callbacks.addBinding(event, callback)}    caseMessage.RemoveCallback(event, callback) =>        callbacks.removeBinding(event, callback)  }}object FileWatcher {  type Event = WatchEvent.Kind[Path]  type Callback = PartialFunction[(Event, Path), Unit]  sealed trait Message  object Message {    caseclassNewEvent(event: Event, file: Path) extendsMessage    caseclassRegisterCallback(events: Seq[Event], callback: Callback) extendsMessage    caseclassRemoveCallback(event: Event, callback: Callback) extendsMessage  }} | 
Это позволяет нам динамически регистрировать и удалять обратные вызовы, чтобы реагировать на события файловой системы:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 | // initialize the actor instanceval system = ActorSystem("mySystem") val watcher: ActorRef = system.actorOf(Props(newFileWatcher(Paths.get("/home/pathikrit"))))// util to create a RegisterCallback message for the actordef when(events: Event*)(callback: Callback): Message = {  Message.RegisterCallback(events.distinct, callback)}// send the register callback message for create/modify eventswatcher ! when(events = ENTRY_CREATE, ENTRY_MODIFY) {     case(ENTRY_CREATE, file) => println(s"$file got created")  case(ENTRY_MODIFY, file) => println(s"$file got modified")} | 
-   Полный источник: FileWatcher.scala
| Ссылка: | Реактивный мониторинг файловой системы с использованием актеров Akka от нашего партнера JCG Патикрита Боумика в блоге Java Advent Calendar . |