Статьи

Реактивный мониторинг файловой системы с использованием актеров Akka

В этой статье мы обсудим:

  1. Мониторинг файловой системы с использованием Java NIO.2
  2. Общие подводные камни библиотеки Java по умолчанию
  3. Разработать простой монитор файловой системы на основе потоков
  4. Используйте вышеизложенное для разработки реактивного монитора файловой системы с использованием актерской модели

Примечание . Хотя все примеры кода здесь написаны на Scala, его можно переписать и на простом Java. Чтобы быстро ознакомиться с синтаксисом Scala, вот очень короткая и красивая таблица Scala . Чтобы получить более полное руководство по Scala для Java-программистов, обратитесь к нему (не обязательно следовать этой статье).

Для кратчайшего краткого описания, следующий код Java:

1
2
3
4
5
6
7
8
public void foo(int x, int y) {
  int z = x + y
  if (z == 1) {
    System.out.println(x);
  } else {
    System.out.println(y);
  }
}

эквивалентно следующему коду Scala:

1
2
3
4
5
6
7
def foo(x: Int, y: Int): Unit = {
  val z: Int = x + y
  z match {
   case 1 => println(x)
   case _ => println(y)
  }
}

Весь код, представленный здесь, доступен под лицензией MIT как часть библиотеки улучшенных файлов на GitHub .

Допустим, вам поручено создать кроссплатформенную систему поиска файлов на рабочем столе. Вы быстро понимаете, что после начальной индексации всех файлов вам также необходимо быстро переиндексировать любые новые файлы (или каталоги), которые были созданы или обновлены. Наивным способом было бы просто пересматривать всю файловую систему каждые несколько минут; но это было бы невероятно неэффективно, поскольку большинство операционных систем предоставляют API-интерфейсы уведомлений файловой системы, которые позволяют разработчику приложений регистрировать обратные вызовы для изменений, например, ionotify в Linux, FSEvenets в Mac и FindFirstChangeNotification в Windows.

Но теперь вы застряли с API-интерфейсами, специфичными для ОС! К счастью, начиная с Java SE 7, у нас есть независимая от платформы абстракция для наблюдения за изменениями файловой системы через API WatchService . API WatchService был разработан как часть Java NIO.2 под JSR-51, и вот пример «привет мира» его использования для просмотра заданного пути :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.nio.file._
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._
 
def watch(directory: Path): Unit = {
  // First create the service
  val service: WatchService = directory.getFileSystem.newWatchService()
 
  // Register the service to the path and also specify which events we want to be notified about
  directory.register(service,  ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
 
  while (true) {
    val key: WatchKey = service.take()  // Wait for this key to be signalled
    for {event <- key.pollEvents()} {
      // event.context() is the path to the file that got changed 
      event.kind() match {
        case ENTRY_CREATE => println(s"${event.context()} got created")
        case ENTRY_MODIFY => println(s"${event.context()} got modified")
        case ENTRY_DELETE => println(s"${event.context()} got deleted")       
        case _ =>
          // This can happen when OS discards or loses an event.
          println(s"Unknown event $event happened at ${event.context()}")
      }
    }
    key.reset()  // Do not forget to do this!! See: http://stackoverflow.com/questions/20180547/
  }
}

Хотя вышесказанное является хорошей первой попыткой, в ней не хватает нескольких аспектов:

  1. Плохой дизайн . Приведенный выше код выглядит неестественно, и вам, вероятно, пришлось поискать его в StackOverflow, чтобы получить его правильно. Можем ли мы сделать лучше?
  2. Плохой дизайн : код не очень хорошо справляется с обработкой ошибок. Что происходит, когда мы сталкиваемся с файлом, который не можем открыть?
  3. Поправка : API Java позволяет нам только следить за каталогом на предмет изменений в его прямых потомках; он не рекурсивно смотрит каталог для вас.
  4. Понятно : API Java не позволяет нам просматривать один файл , только каталог.
  5. Понятно : даже если мы решим вышеупомянутые проблемы, Java API не начнет автоматически просматривать новый дочерний файл или каталог, созданный в корневом каталоге.
  6. Плохой дизайн : код, реализованный выше, предоставляет блокировку / опрос, основанную на потоках модель. Можем ли мы использовать лучшую абстракцию параллелизма?

Давайте начнем с каждой из вышеперечисленных проблем.

  • Лучший интерфейс : вот как должен выглядеть мой идеальный интерфейс:
1
2
3
4
5
6
7
abstract class FileMonitor(root: Path) {
  def start(): Unit
  def onCreate(path: Path): Unit
  def onModify(path: Path): Unit
  def onDelete(path: Path): Unit 
  def stop(): Unit
}

Таким образом, я могу просто написать пример кода как:

1
2
3
4
5
6
val watcher = new FileMonitor(myFile) {
  override def onCreate(path: Path) = println(s"$path got created"
  override def onModify(path: Path) = println(s"$path got modified")   
  override def onDelete(path: Path) = println(s"$path got deleted"
}
watcher.start()

Хорошо, давайте попробуем адаптировать первый пример, используя поток Java, чтобы мы могли представить «мой идеальный интерфейс»:

01
02
03
04
05
06
07
08
09
10
trait FileMonitor {                               // My ideal interface
  val root: Path                                  // starting file 
  def start(): Unit                               // start the monitor
  def onCreate(path: Path) = {}                   // on-create callback
  def onModify(path: Path) = {}                   // on-modify callback
  def onDelete(path: Path) = {}                   // on-delete callback
  def onUnknownEvent(event: WatchEvent[_]) = {}   // handle lost/discarded events
  def onException(e: Throwable) = {}              // handle errors e.g. a read error
  def stop(): Unit                                // stop the monitor
}

А вот очень базовая реализация на основе потоков:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true)        // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)   
  })
 
  val service = root.getFileSystem.newWatchService()
 
  override def run() = Iterator.continually(service.take()).foreach(process)
 
  override def interrupt() = {
    service.close()
    super.interrupt()
  }
 
  override def start() = {
    watch(root)
    super.start()
  }
 
  protected[this] def watch(file: Path): Unit = {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
  }
 
  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] => dispatch(event.kind(), event.context())     
      case event => onUnknownEvent(event)
    }
    key.reset()
  }
 
  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)
    }
  }
}

Выше выглядит намного чище! Теперь мы можем смотреть файлы в доступной форме, не вдаваясь в подробности JavaDocs, просто реализуя onCreate(path) , onModify(path) , onDelete(path) и т. Д.

  • Обработка исключений : это уже сделано выше. onException всякий раз, когда мы сталкиваемся с исключением, и вызывающий может решить, что делать дальше, внедрив его.
  • Рекурсивный просмотр : Java API не позволяет рекурсивный просмотр каталогов . Нам нужно изменить watch(file) чтобы рекурсивно прикреплять наблюдателя:
1
2
3
4
5
6
7
8
9
def watch(file: Path, recursive: Boolean = true): Unit = {
  if (Files.isDirectory(file)) {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)             
     // recursively call watch on children of this file 
     if (recursive) {
       Files.list(file).iterator() foreach {f => watch(f, recursive)}
     }
  }
}
  • Просмотр обычных файлов : как упоминалось ранее, API Java может только просматривать каталоги . Один способ взлома, который мы можем сделать для просмотра отдельных файлов, — это установить наблюдатель в его родительский каталог и реагировать только в том случае, если событие инициируется в самом файле.
1
2
3
4
5
6
7
8
override def start() = {
  if (Files.isDirectory(root)) {
    watch(root, recursive = true)
  } else {
    watch(root.getParent, recursive = false)
  }
  super.start()
}

И теперь в process(key) мы проверяем, реагируем ли мы либо на каталог, либо на этот файл:

1
def reactTo(target: Path) = Files.isDirectory(root) || (root == target)

И мы проверяем перед dispatch сейчас:

1
2
3
4
5
case event: WatchEvent[Path] =>
  val target = event.context()
  if (reactTo(target)) {
    dispatch(event.kind(), target)
  }
  • Автоматический просмотр новых элементов : Java API, не позволяет автоматически просматривать новые субфайлы . Мы можем решить эту проблему, подключив наблюдатель к process(key) при ENTRY_CREATE события ENTRY_CREATE :
1
2
3
4
5
6
if (reactTo(target)) {
  if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
    watch(root.resolve(target))
  }
  dispatch(event.kind(), target)
}

Собрав все это вместе, мы получили наш последний FileMonitor.scala :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true) // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)   
  })
 
  val service = root.getFileSystem.newWatchService()
 
  override def run() = Iterator.continually(service.take()).foreach(process)
 
  override def interrupt() = {
    service.close()
    super.interrupt()
  }
 
  override def start() = {
    if (Files.isDirectory(root)) {
      watch(root, recursive = true)
    } else {
      watch(root.getParent, recursive = false)
    }
    super.start()
  }
 
  protected[this] def watch(file: Path, recursive: Boolean = true): Unit = {
    if (Files.isDirectory(file)) {
      file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
      if (recursive) {
        Files.list(file).iterator() foreach {f => watch(f, recursive)}
      
    }
  }
 
  private[this] def reactTo(target: Path) = Files.isDirectory(root) || (root == target)
 
  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] =>
        val target = event.context()
        if (reactTo(target)) {
          if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
            watch(root.resolve(target))
          }
          dispatch(event.kind(), target)
        }
      case event => onUnknownEvent(event)
    }
    key.reset()
  }
 
  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)
    }
  }
}

Теперь, когда мы обратились ко всем ошибкам и дистанцировались от тонкостей API WatchService, мы все еще тесно связаны с API на основе потоков. Мы будем использовать вышеупомянутый класс, чтобы представить другую модель параллелизма, а именно модель актора, вместо этого, чтобы спроектировать реактивный, динамический и устойчивый наблюдатель файловой системы с использованием Akka . Хотя создание актеров Akka выходит за рамки данной статьи, мы представим очень простой актер, использующий ThreadFileMonitor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.nio.file.{Path, WatchEvent}
 
import akka.actor._
 
class FileWatcher(file: Path) extends ThreadFileMonitor(file) with Actor {
  import FileWatcher._
 
  // MultiMap from Events to registered callbacks
  protected[this] val callbacks = newMultiMap[Event, Callback] 
 
  // Override the dispatcher from ThreadFileMonitor to inform the actor of a new event
  override def dispatch(event: Event, file: Path) = self ! Message.NewEvent(event, file) 
 
  // Override the onException from the ThreadFileMonitor
  override def onException(exception: Throwable) = self ! Status.Failure(exception)
 
  // when actor starts, start the ThreadFileMonitor
  override def preStart() = super.start()  
   
  // before actor stops, stop the ThreadFileMonitor
  override def postStop() = super.interrupt()
 
  override def receive = {
    case Message.NewEvent(event, target) if callbacks contains event =>
       callbacks(event) foreach {f => f(event -> target)}
 
    case Message.RegisterCallback(events, callback) =>
       events foreach {event => callbacks.addBinding(event, callback)}
 
    case Message.RemoveCallback(event, callback) =>
       callbacks.removeBinding(event, callback)
  }
}
 
object FileWatcher {
  type Event = WatchEvent.Kind[Path]
  type Callback = PartialFunction[(Event, Path), Unit]
 
  sealed trait Message
  object Message {
    case class NewEvent(event: Event, file: Path) extends Message
    case class RegisterCallback(events: Seq[Event], callback: Callback) extends Message
    case class RemoveCallback(event: Event, callback: Callback) extends Message
  }
}

Это позволяет нам динамически регистрировать и удалять обратные вызовы, чтобы реагировать на события файловой системы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
// initialize the actor instance
val system = ActorSystem("mySystem")
val watcher: ActorRef = system.actorOf(Props(new FileWatcher(Paths.get("/home/pathikrit"))))
 
// util to create a RegisterCallback message for the actor
def when(events: Event*)(callback: Callback): Message = {
  Message.RegisterCallback(events.distinct, callback)
}
 
// send the register callback message for create/modify events
watcher ! when(events = ENTRY_CREATE, ENTRY_MODIFY) {  
  case (ENTRY_CREATE, file) => println(s"$file got created")
  case (ENTRY_MODIFY, file) => println(s"$file got modified")
}