В этой статье мы обсудим:
- Мониторинг файловой системы с использованием Java NIO.2
- Общие подводные камни библиотеки Java по умолчанию
- Разработать простой монитор файловой системы на основе потоков
- Используйте вышеизложенное для разработки реактивного монитора файловой системы с использованием актерской модели
Примечание . Хотя все примеры кода здесь написаны на Scala, его можно переписать и на простом Java. Чтобы быстро ознакомиться с синтаксисом Scala, вот очень короткая и красивая таблица Scala . Чтобы получить более полное руководство по Scala для Java-программистов, обратитесь к нему (не обязательно следовать этой статье).
Для кратчайшего краткого описания, следующий код Java:
1
2
3
4
5
6
7
8
|
public void foo( int x, int y) { int z = x + y if (z == 1 ) { System.out.println(x); } else { System.out.println(y); } } |
эквивалентно следующему коду Scala:
1
2
3
4
5
6
7
|
def foo(x: Int, y: Int): Unit = { val z: Int = x + y z match { case 1 => println(x) case _ => println(y) } } |
Весь код, представленный здесь, доступен под лицензией MIT как часть библиотеки улучшенных файлов на GitHub .
Допустим, вам поручено создать кроссплатформенную систему поиска файлов на рабочем столе. Вы быстро понимаете, что после начальной индексации всех файлов вам также необходимо быстро переиндексировать любые новые файлы (или каталоги), которые были созданы или обновлены. Наивным способом было бы просто пересматривать всю файловую систему каждые несколько минут; но это было бы невероятно неэффективно, поскольку большинство операционных систем предоставляют API-интерфейсы уведомлений файловой системы, которые позволяют разработчику приложений регистрировать обратные вызовы для изменений, например, ionotify в Linux, FSEvenets в Mac и FindFirstChangeNotification в Windows.
Но теперь вы застряли с API-интерфейсами, специфичными для ОС! К счастью, начиная с Java SE 7, у нас есть независимая от платформы абстракция для наблюдения за изменениями файловой системы через API WatchService . API WatchService был разработан как часть Java NIO.2 под JSR-51, и вот пример «привет мира» его использования для просмотра заданного пути :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import java.nio.file._ import java.nio.file.StandardWatchEventKinds._ import scala.collection.JavaConversions._ def watch(directory: Path): Unit = { // First create the service val service: WatchService = directory.getFileSystem.newWatchService() // Register the service to the path and also specify which events we want to be notified about directory.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) while ( true ) { val key: WatchKey = service.take() // Wait for this key to be signalled for {event <- key.pollEvents()} { // event.context() is the path to the file that got changed event.kind() match { case ENTRY_CREATE => println(s "${event.context()} got created" ) case ENTRY_MODIFY => println(s "${event.context()} got modified" ) case ENTRY_DELETE => println(s "${event.context()} got deleted" ) case _ => // This can happen when OS discards or loses an event. // See: http://docs.oracle.com/javase/8/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW println(s "Unknown event $event happened at ${event.context()}" ) } } } } |
Хотя вышесказанное является хорошей первой попыткой, в ней не хватает нескольких аспектов:
- Плохой дизайн . Приведенный выше код выглядит неестественно, и вам, вероятно, пришлось поискать его в StackOverflow, чтобы получить его правильно. Можем ли мы сделать лучше?
- Плохой дизайн : код не очень хорошо справляется с обработкой ошибок. Что происходит, когда мы сталкиваемся с файлом, который не можем открыть?
- Поправка : API Java позволяет нам только следить за каталогом на предмет изменений в его прямых потомках; он не рекурсивно смотрит каталог для вас.
- Понятно : API Java не позволяет нам просматривать один файл , только каталог.
- Понятно : даже если мы решим вышеупомянутые проблемы, Java API не начнет автоматически просматривать новый дочерний файл или каталог, созданный в корневом каталоге.
- Плохой дизайн : код, реализованный выше, предоставляет блокировку / опрос, основанную на потоках модель. Можем ли мы использовать лучшую абстракцию параллелизма?
Давайте начнем с каждой из вышеперечисленных проблем.
- Лучший интерфейс : вот как должен выглядеть мой идеальный интерфейс:
1
2
3
4
5
6
7
|
abstract class FileMonitor(root: Path) { def start(): Unit def onCreate(path: Path): Unit def onModify(path: Path): Unit def onDelete(path: Path): Unit def stop(): Unit } |
Таким образом, я могу просто написать пример кода как:
1
2
3
4
5
6
|
val watcher = new FileMonitor(myFile) { override def onCreate(path: Path) = println(s "$path got created" ) override def onModify(path: Path) = println(s "$path got modified" ) override def onDelete(path: Path) = println(s "$path got deleted" ) } watcher.start() |
Хорошо, давайте попробуем адаптировать первый пример, используя поток Java, чтобы мы могли представить «мой идеальный интерфейс»:
01
02
03
04
05
06
07
08
09
10
|
trait FileMonitor { // My ideal interface val root: Path // starting file def start(): Unit // start the monitor def onCreate(path: Path) = {} // on-create callback def onModify(path: Path) = {} // on-modify callback def onDelete(path: Path) = {} // on-delete callback def onUnknownEvent(event: WatchEvent[_]) = {} // handle lost/discarded events def onException(e: Throwable) = {} // handle errors e.g. a read error def stop(): Unit // stop the monitor } |
А вот очень базовая реализация на основе потоков:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor { setDaemon( true ) // daemonize this thread setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler { override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception) }) val service = root.getFileSystem.newWatchService() override def run() = Iterator.continually(service.take()).foreach(process) override def interrupt() = { service.close() super .interrupt() } override def start() = { watch(root) super .start() } protected [ this ] def watch(file: Path): Unit = { file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) } protected [ this ] def process(key: WatchKey) = { key.pollEvents() foreach { case event: WatchEvent[Path] => dispatch(event.kind(), event.context()) case event => onUnknownEvent(event) } key.reset() } def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = { eventType match { case ENTRY_CREATE => onCreate(file) case ENTRY_MODIFY => onModify(file) case ENTRY_DELETE => onDelete(file) } } } |
Выше выглядит намного чище! Теперь мы можем смотреть файлы в доступной форме, не вдаваясь в подробности JavaDocs, просто реализуя onCreate(path)
, onModify(path)
, onDelete(path)
и т. Д.
- Обработка исключений : это уже сделано выше.
onException
всякий раз, когда мы сталкиваемся с исключением, и вызывающий может решить, что делать дальше, внедрив его. - Рекурсивный просмотр : Java API не позволяет рекурсивный просмотр каталогов . Нам нужно изменить
watch(file)
чтобы рекурсивно прикреплять наблюдателя:
1
2
3
4
5
6
7
8
9
|
def watch(file: Path, recursive: Boolean = true ): Unit = { if (Files.isDirectory(file)) { file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) // recursively call watch on children of this file if (recursive) { Files.list(file).iterator() foreach {f => watch(f, recursive)} } } } |
- Просмотр обычных файлов : как упоминалось ранее, API Java может только просматривать каталоги . Один способ взлома, который мы можем сделать для просмотра отдельных файлов, — это установить наблюдатель в его родительский каталог и реагировать только в том случае, если событие инициируется в самом файле.
1
2
3
4
5
6
7
8
|
override def start() = { if (Files.isDirectory(root)) { watch(root, recursive = true ) } else { watch(root.getParent, recursive = false ) } super .start() } |
И теперь в process(key)
мы проверяем, реагируем ли мы либо на каталог, либо на этот файл:
1
|
def reactTo(target: Path) = Files.isDirectory(root) || (root == target) |
И мы проверяем перед dispatch
сейчас:
1
2
3
4
5
|
case event: WatchEvent[Path] => val target = event.context() if (reactTo(target)) { dispatch(event.kind(), target) } |
- Автоматический просмотр новых элементов : Java API, не позволяет автоматически просматривать новые субфайлы . Мы можем решить эту проблему, подключив наблюдатель к
process(key)
приENTRY_CREATE
событияENTRY_CREATE
:
1
2
3
4
5
6
|
if (reactTo(target)) { if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) { watch(root.resolve(target)) } dispatch(event.kind(), target) } |
Собрав все это вместе, мы получили наш последний FileMonitor.scala
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor { setDaemon( true ) // daemonize this thread setUncaughtExceptionHandler( new Thread.UncaughtExceptionHandler { override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception) }) val service = root.getFileSystem.newWatchService() override def run() = Iterator.continually(service.take()).foreach(process) override def interrupt() = { service.close() super .interrupt() } override def start() = { if (Files.isDirectory(root)) { watch(root, recursive = true ) } else { watch(root.getParent, recursive = false ) } super .start() } protected [ this ] def watch(file: Path, recursive: Boolean = true ): Unit = { if (Files.isDirectory(file)) { file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) if (recursive) { Files.list(file).iterator() foreach {f => watch(f, recursive)} } } } private [ this ] def reactTo(target: Path) = Files.isDirectory(root) || (root == target) protected [ this ] def process(key: WatchKey) = { key.pollEvents() foreach { case event: WatchEvent[Path] => val target = event.context() if (reactTo(target)) { if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) { watch(root.resolve(target)) } dispatch(event.kind(), target) } case event => onUnknownEvent(event) } key.reset() } def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = { eventType match { case ENTRY_CREATE => onCreate(file) case ENTRY_MODIFY => onModify(file) case ENTRY_DELETE => onDelete(file) } } } |
Теперь, когда мы обратились ко всем ошибкам и дистанцировались от тонкостей API WatchService, мы все еще тесно связаны с API на основе потоков. Мы будем использовать вышеупомянутый класс, чтобы представить другую модель параллелизма, а именно модель актора, вместо этого, чтобы спроектировать реактивный, динамический и устойчивый наблюдатель файловой системы с использованием Akka . Хотя создание актеров Akka выходит за рамки данной статьи, мы представим очень простой актер, использующий ThreadFileMonitor
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import java.nio.file.{Path, WatchEvent} import akka.actor._ class FileWatcher(file: Path) extends ThreadFileMonitor(file) with Actor { import FileWatcher._ // MultiMap from Events to registered callbacks protected [ this ] val callbacks = newMultiMap[Event, Callback] // Override the dispatcher from ThreadFileMonitor to inform the actor of a new event override def dispatch(event: Event, file: Path) = self ! Message.NewEvent(event, file) // Override the onException from the ThreadFileMonitor override def onException(exception: Throwable) = self ! Status.Failure(exception) // when actor starts, start the ThreadFileMonitor override def preStart() = super .start() // before actor stops, stop the ThreadFileMonitor override def postStop() = super .interrupt() override def receive = { case Message.NewEvent(event, target) if callbacks contains event => callbacks(event) foreach {f => f(event -> target)} case Message.RegisterCallback(events, callback) => events foreach {event => callbacks.addBinding(event, callback)} case Message.RemoveCallback(event, callback) => callbacks.removeBinding(event, callback) } } object FileWatcher { type Event = WatchEvent.Kind[Path] type Callback = PartialFunction[(Event, Path), Unit] sealed trait Message object Message { case class NewEvent(event: Event, file: Path) extends Message case class RegisterCallback(events: Seq[Event], callback: Callback) extends Message case class RemoveCallback(event: Event, callback: Callback) extends Message } } |
Это позволяет нам динамически регистрировать и удалять обратные вызовы, чтобы реагировать на события файловой системы:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
// initialize the actor instance val system = ActorSystem( "mySystem" ) val watcher: ActorRef = system.actorOf(Props( new FileWatcher(Paths.get( "/home/pathikrit" )))) // util to create a RegisterCallback message for the actor def when(events: Event*)(callback: Callback): Message = { Message.RegisterCallback(events.distinct, callback) } // send the register callback message for create/modify events watcher ! when(events = ENTRY_CREATE, ENTRY_MODIFY) { case (ENTRY_CREATE, file) => println(s "$file got created" ) case (ENTRY_MODIFY, file) => println(s "$file got modified" ) } |
- Полный источник:
FileWatcher.scala
Ссылка: | Реактивный мониторинг файловой системы с использованием актеров Akka от нашего партнера JCG Патикрита Боумика в блоге Java Advent Calendar . |