DirWatcher.java
/*
* Copyright (c) 2017, Stein Eldar Johnsen
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.file;
import net.morimekta.file.internal.RunFileEventCallbacks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static java.util.Objects.requireNonNull;
import static net.morimekta.file.FileEvent.CREATED;
import static net.morimekta.file.FileEvent.DELETED;
import static net.morimekta.file.FileEvent.MODIFIED;
/**
* A dir watcher uses a native watch service and watch directories
* for any change. Each listener is registered to be listening to
* individual directories, but can be registered for more than one.
* The listener will then be notified of changes to the content of
* the directory. Including:
*
* <ul>
* <li>Files, sub-directories or symlinks being created</li>
* <li>Files, sub-directories or symlinks being deleted</li>
* <li>
* Files being written to. This <i>MAY</i> cause multiple
* events, as each event is triggered by file-system writes.
* </li>
* <li>
* When a file is moved into or out from the directory.
* Handled as a {@link FileEvent#CREATED} or {@link FileEvent#DELETED}
* event respectively.
* </li>
* </ul>
*
* It does <b>NOT</b> notify on:
*
* <ul>
* <li>The listened to directory being deleted itself.</li>
* <li>Changes to metadata and permissions</li>
* </ul>
*/
public class DirWatcher implements Closeable {
/**
* Create a FileWatcher with default watch service.
*/
public DirWatcher() {
this(newWatchService());
}
/**
* Create a FileWatcher using the provided watch service.
*
* @param watchService Watcher service to use.
*/
public DirWatcher(WatchService watchService) {
this(watchService,
Executors.newSingleThreadExecutor(makeThreadFactory("DirWatcher")),
Executors.newSingleThreadExecutor(makeThreadFactory("DirWatcherCallback")));
}
// @VisibleForTesting
protected DirWatcher(WatchService watchService,
ExecutorService watcherExecutor,
ExecutorService callbackExecutor) {
this.mutex = new Object();
this.watchDirKeys = new HashMap<>();
this.watchKeyDirs = new HashMap<>();
this.watchDirListeners = new ConcurrentHashMap<>();
this.watchService = requireNonNull(watchService, "watchService == null");
this.callbackExecutor = requireNonNull(callbackExecutor, "callbackExecutor == null");
this.watcherExecutor = requireNonNull(watcherExecutor, "watchExecutor == null");
this.watcherExecutor.submit(this::watchFilesTask);
}
/**
* Start watching file path and notify watcher for updates on that file.
*
* @param dir The file path to watch.
* @param listener The listener to be notified.
* @return The watcher.
*/
public DirWatcher addWatcher(Path dir, FileEventListener listener) {
requireNonNull(dir, "dir == null");
requireNonNull(listener, "listener == null");
synchronized (mutex) {
startWatchingInternal(dir).add(() -> listener);
}
return this;
}
/**
* Start watching file path and notify watcher for updates on that file.
*
* @param dir The file path to watch.
* @param listener The listener to be notified.
* @return The watcher.
*/
public DirWatcher weakAddWatcher(Path dir, FileEventListener listener) {
requireNonNull(dir, "dir == null");
requireNonNull(listener, "listener == null");
synchronized (mutex) {
startWatchingInternal(dir).add(new WeakReference<>(listener)::get);
}
return this;
}
/**
* Remove a watcher from the list of listeners.
*
* @param dir The file path to no longer watch.
* @param listener The listener to be removed.
* @return True if the listener was removed from the list.
*/
public boolean removeWatcher(Path dir, FileEventListener listener) {
requireNonNull(dir, "dir == null");
requireNonNull(listener, "listener == null");
dir = readCanonicalPath(dir);
synchronized (mutex) {
List<Supplier<FileEventListener>> suppliers = watchDirListeners.get(dir);
if (suppliers == null) {
return false;
}
AtomicBoolean removed = new AtomicBoolean(false);
if (removeFromListeners(suppliers, listener)) {
removed.set(true);
}
if (suppliers.isEmpty()) {
watchDirListeners.remove(dir);
Optional.ofNullable(watchDirKeys.remove(dir))
.ifPresent(key -> {
key.cancel();
watchKeyDirs.remove(key);
});
}
return removed.get();
}
}
/**
* Remove a watcher from the list of listeners.
*
* @param listener The listener to be removed.
* @return True if the listener was removed from the list.
*/
public boolean removeWatcher(FileEventListener listener) {
requireNonNull(listener, "listener == null");
synchronized (mutex) {
AtomicBoolean removed = new AtomicBoolean(false);
Iterator<Map.Entry<Path, List<Supplier<FileEventListener>>>> iterator =
watchDirListeners.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Path, List<Supplier<FileEventListener>>> entry = iterator.next();
if (removeFromListeners(entry.getValue(), listener)) {
removed.set(true);
}
if (entry.getValue().isEmpty()) {
iterator.remove();
Optional.ofNullable(watchDirKeys.remove(entry.getKey()))
.ifPresent(key -> {
key.cancel();
watchKeyDirs.remove(key);
});
}
}
return removed.get();
}
}
@Override
public void close() {
synchronized (mutex) {
if (watcherExecutor.isShutdown()) {
return;
}
watcherExecutor.shutdown();
callbackExecutor.shutdown();
watchDirListeners.clear();
watchDirKeys.clear();
watchKeyDirs.clear();
}
try {
watchService.close();
} catch (IOException e) {
LOGGER.error("WatchService did not close.", e);
}
try {
if (!watcherExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.warn("WatcherExecutor failed to terminate in 10 seconds.");
}
} catch (InterruptedException e) {
LOGGER.error("WatcherExecutor termination interrupted.", e);
}
try {
if (!callbackExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.warn("CallbackExecutor failed to terminate in 10 seconds.");
}
} catch (InterruptedException e) {
LOGGER.error("CallbackExecutor termination interrupted.", e);
}
}
private <T> boolean removeFromListeners(List<Supplier<T>> listeners,
T listener) {
Iterator<Supplier<T>> iterator = listeners.iterator();
boolean removed = false;
while (iterator.hasNext()) {
T next = iterator.next().get();
if (next == listener) {
iterator.remove();
removed = true;
} else if (next == null) {
iterator.remove();
}
}
return removed;
}
private static ThreadFactory makeThreadFactory(String name) {
AtomicInteger idx = new AtomicInteger();
return runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setName(name + "-" + idx.incrementAndGet());
thread.setDaemon(true);
return thread;
};
}
private List<Supplier<FileEventListener>> startWatchingInternal(Path path) {
if (watcherExecutor.isShutdown()) {
throw new IllegalStateException("Starts to watch on closed FileWatcher");
}
Path canonicalPath = readCanonicalPath(path);
if (!Files.exists(canonicalPath)) {
throw new UncheckedIOException("No such directory: " + path.toString(),
new FileNotFoundException(path.toString()));
}
if (!Files.isDirectory(canonicalPath)) {
throw new IllegalArgumentException("Not a directory: " + path);
}
watchDirKeys.computeIfAbsent(canonicalPath, dir -> {
try {
WatchKey key = canonicalPath.register(
watchService,
ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE, OVERFLOW);
watchKeyDirs.put(key, canonicalPath);
return key;
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
});
return watchDirListeners.computeIfAbsent(path, fp -> new ArrayList<>());
}
/**
* Handle the watch service event loop.
*/
private void watchFilesTask() {
while (!watcherExecutor.isShutdown()) {
try {
WatchKey key = watchService.take();
Path parent = watchKeyDirs.get(key);
if (parent == null) {
key.reset();
continue;
}
Set<Map.Entry<Path, FileEvent>> updates = new LinkedHashSet<>();
for (WatchEvent<?> ev : key.pollEvents()) {
WatchEvent.Kind<?> kind = ev.kind();
FileEvent event = forWatchEventKind(kind);
if (event == null) {
if (kind == OVERFLOW) {
LOGGER.warn("Overflow event, file updates may have been lost.");
}
continue;
}
@SuppressWarnings("unchecked")
WatchEvent<Path> watchEvent = (WatchEvent<Path>) ev;
updates.add(new AbstractMap.SimpleImmutableEntry<>(
parent.resolve(watchEvent.context()), event));
}
// Ready the key again so it will signal more events.
key.reset();
if (updates.size() > 0) {
Map<Path, List<Supplier<FileEventListener>>> watcherMap;
synchronized (mutex) {
watcherMap = deepCopy(watchDirListeners);
}
callbackExecutor.submit(new RunFileEventCallbacks(updates, watcherMap));
}
} catch (InterruptedException interruptedEx) {
// LOGGER.error("Interrupted in service file watch thread: " + interruptedEx.getMessage(), interruptedEx);
return;
}
}
}
private static Map<Path, List<Supplier<FileEventListener>>> deepCopy(Map<Path, List<Supplier<FileEventListener>>> in) {
Map<Path, List<Supplier<FileEventListener>>> out = new HashMap<>();
in.forEach((path, suppliers) -> out.put(path, new ArrayList<>(suppliers)));
return out;
}
private static Path readCanonicalPath(Path path) {
try {
return FileUtil.readCanonicalPath(path);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private static WatchService newWatchService() {
try {
return FileSystems.getDefault().newWatchService();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// Only file modification, creation, deletion is interesting.
public static FileEvent forWatchEventKind(WatchEvent.Kind<?> kind) {
if (kind == ENTRY_CREATE) {
return CREATED;
} else if (kind == ENTRY_MODIFY) {
return MODIFIED;
} else if (kind == ENTRY_DELETE) {
return DELETED;
}
return null;
}
private static final Logger LOGGER = LoggerFactory.getLogger(FileWatcher.class);
private final Object mutex;
// Watched files, as a map from requested file path to list of watchers.
private final Map<Path, List<Supplier<FileEventListener>>> watchDirListeners;
private final Map<Path, WatchKey> watchDirKeys;
private final Map<WatchKey, Path> watchKeyDirs;
private final ExecutorService callbackExecutor;
private final ExecutorService watcherExecutor;
private final WatchService watchService;
}