FileWatcher.java
/*
* Copyright (c) 2017, Stein Eldar Johnsen
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.file;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
import static net.morimekta.file.FileUtil.readCanonicalPath;
/**
* File watcher helper for use with simple callbacks. It monitors two
* types of changes: Actual file changes, and symlink changes. Note that
* whenever a file or symlink is added to the whole, it may be monitored
* at until the file or symlink is deleted.
*
* The {@link FileEvent} listeners will be notified on all events for which
* the listener was registered for. And regardless of the real change, e.g.
* on a symlink in the middle of a symlink chain, the event will always
* resolve back to the "requested" file, and it is triggered whenever
* any of:
*
* <ul>
* <li>The current canonical file content changed.</li>
* <li>The link starts pointing to a different file, this
* is only calculated based on the file being a symlink,
* or being in a symlinked directory. Everything else
* is treated as canonical locations.</li>
* </ul>
*
* <p>
*
* So if this is the case (ref configMaps in kubernetes):
* <pre>{@code
* /volume/map/..2018-01/config.txt (old file)
* /volume/map/..2018-02/config.txt (new file)
* /volume/map/..data -> symlink to '..2018-01'
* /volume/map/config.txt -> symlink to '..data/config.txt'
* }</pre>
*
* If you listen to '/volume/map/config.txt', then you are notified if:
*
* <ul>
* <li>'/volume/map/..2018-01/config.txt' is changed</li>
* <li>'/volume/map/..data' symlink is updated to '..2018-02'</li>
* <li>'/volume/map/config.txt' symlink is updated to '..2018-02/config.txt'</li>
* </ul>
*
* The important case here is the middle one, as this is the way kubernetes
* handles its configMaps and Secrets. Note that the file-watcher is meant to
* be used to watch individual files, not full directories. For that use the
* {@link DirWatcher}.
*/
public class FileWatcher implements Closeable {
/**
* Create a FileWatcher with default watch service.
*/
public FileWatcher() {
this(new DirWatcher());
}
/**
* Create a FileWatcher using the provided directory watcher. Note that the
* watcher will be closed with the file watcher.
*
* @param dirWatcher Watcher service to use.
*/
public FileWatcher(DirWatcher dirWatcher) {
this.mutex = new Object();
this.dirWatcher = dirWatcher;
this.dirListenerMap = new ConcurrentHashMap<>();
this.watchedFiles = new ConcurrentHashMap<>();
this.targetToRequests = new ConcurrentHashMap<>();
}
/**
* Start watching file path and notify watcher for updates on that file.
*
* @param file The file path to watch.
* @param listener The listener to be notified.
* @return The watcher.
*/
public FileWatcher addWatcher(Path file, FileEventListener listener) {
requireNonNull(file, "file == null");
requireNonNull(listener, "listener == null");
synchronized (mutex) {
startWatchingInternal(file).add(() -> listener);
}
return this;
}
/**
* Start watching file path and notify watcher for updates on that file.
* The watcher will be kept in a weak reference and will allow GC to delete
* the instance.
*
* @param file The file path to watch.
* @param listener The listener to be notified.
* @return The watcher.
*/
public FileWatcher weakAddWatcher(Path file, FileEventListener listener) {
requireNonNull(file, "file == null");
requireNonNull(listener, "listener == null");
synchronized (mutex) {
startWatchingInternal(file).add(new WeakReference<>(listener)::get);
}
return this;
}
/**
* Remove a watcher from the list of listeners.
*
* @param file The file path to no longer watch.
* @param listener The listener to be removed.
* @return True if the listener was removed from the list.
*/
public boolean removeWatcher(Path file, FileEventListener listener) {
requireNonNull(file, "file == null");
requireNonNull(listener, "listener == null");
synchronized (mutex) {
List<Supplier<FileEventListener>> suppliers = watchedFiles.get(file);
if (suppliers == null) {
return false;
}
AtomicBoolean removed = new AtomicBoolean(false);
if (removeFromListeners(suppliers, listener)) {
removed.set(true);
}
if (suppliers.isEmpty()) {
stopWatchingInternal(file);
}
return removed.get();
}
}
/**
* Remove a watcher from the list of listeners.
*
* @param listener The listener to be removed.
* @return True if the listener was removed from the list.
*/
public boolean removeWatcher(FileEventListener listener) {
requireNonNull(listener, "listener == null");
synchronized (mutex) {
AtomicBoolean removed = new AtomicBoolean(false);
watchedFiles.forEach((path, suppliers) -> {
if (removeFromListeners(suppliers, listener)) {
removed.set(true);
}
if (suppliers.isEmpty()) {
stopWatchingInternal(path);
}
});
return removed.get();
}
}
@Override
public void close() {
synchronized (mutex) {
targetToRequests.clear();
watchedFiles.clear();
dirListenerMap.clear();
}
dirWatcher.close();
}
private <T> boolean removeFromListeners(List<Supplier<T>> listeners,
T listener) {
Iterator<Supplier<T>> iterator = listeners.iterator();
boolean removed = false;
while (iterator.hasNext()) {
T next = iterator.next().get();
if (next == listener) {
iterator.remove();
removed = true;
} else if (next == null) {
iterator.remove();
}
}
return removed;
}
private List<Supplier<FileEventListener>> startWatchingInternal(Path path) {
try {
if (!Files.exists(path)) {
throw new UncheckedIOException("No such file: " + path.toString(),
new FileNotFoundException(path.toString()));
}
path = path.toAbsolutePath();
// Resolve directory to canonical directory.
Path parent = readCanonicalPath(path.getParent());
// But do not canonical file, as if this is a symlink, we want to listen
// to changes to the symlink, not just the file it points to. If that is wanted
// the file should be made canonical (resolve symlinks) before calling
// startWatching(file).
path = parent.resolve(path.getFileName());
addDirectoryWatcher(parent);
for (Path add : linkTargets(path)) {
Path addParent = add.getParent();
if (Files.isSymbolicLink(addParent)) {
addDirectoryWatcher(addParent.getParent());
}
addDirectoryWatcher(add.getParent());
}
addLinkTarget(path);
return watchedFiles.computeIfAbsent(path, fp -> new ArrayList<>());
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private void addDirectoryWatcher(Path directory) {
dirListenerMap.computeIfAbsent(directory, dir -> {
FileEventListener listener = this::onFileEvent;
dirWatcher.addWatcher(dir, listener);
return listener;
});
}
private void stopWatchingInternal(Path path) {
try {
path = path.toAbsolutePath();
// Resolve directory to canonical directory.
Path parent = readCanonicalPath(path.getParent());
// But do not canonical file, as if this is a symlink, we want to listen
// to changes to the symlink, not the file it points to. If that is wanted
// the file should be made canonical (resolve symlinks) before calling
// startWatching(file).
path = parent.resolve(path.getFileName());
removeLinkTarget(path);
watchedFiles.remove(path);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// List of targets for a file or link, ending with the
// canonical target of the link. The first entry is always the
// requested file / link, and the last entry the canonical file.
private List<Path> linkTargets(Path link) throws IOException {
link = link.toAbsolutePath();
List<Path> out = new ArrayList<>();
out.add(link);
if (Files.isSymbolicLink(link.getParent())) {
Path target = link.getParent()
.getParent()
.resolve(Files.readSymbolicLink(link.getParent()))
.resolve(link.getFileName())
.toAbsolutePath();
out.addAll(linkTargets(target));
} else if (Files.isSymbolicLink(link)) {
Path target = link.getParent()
.resolve(Files.readSymbolicLink(link))
.toAbsolutePath();
out.addAll(linkTargets(target));
} else {
// Then resolve the rest.
Path canonical = readCanonicalPath(link);
if (!canonical.equals(link)) {
out.add(canonical);
}
}
return out;
}
private void removeLinkTarget(Path request) {
Set<Path> removeTargets = new HashSet<>();
for (Map.Entry<Path, Set<Path>> entry : targetToRequests.entrySet()) {
entry.getValue().remove(request);
if (entry.getValue().isEmpty()) {
removeTargets.add(entry.getKey());
}
}
for (Path target : removeTargets) {
targetToRequests.remove(target);
}
}
private void addLinkTarget(Path request) throws IOException {
List<Path> links = linkTargets(request);
for (Path path : links) {
targetToRequests.computeIfAbsent(path, l -> new HashSet<>()).add(request);
if (Files.isSymbolicLink(path.getParent())) {
targetToRequests.computeIfAbsent(path.getParent(), l -> new HashSet<>()).add(request);
}
}
}
private Map<Path, List<Supplier<FileEventListener>>> deepCopyOfWatchFiles(Set<Path> requestPaths) {
synchronized (mutex) {
Map<Path, List<Supplier<FileEventListener>>> out = new HashMap<>();
for (Path path : requestPaths) {
Optional.ofNullable(watchedFiles.get(path))
.ifPresent(suppliers -> out.put(path, new ArrayList<>(suppliers)));
}
return out;
}
}
private void onFileEvent(Path file, FileEvent event) {
Set<Path> requestPaths = targetToRequests.get(file);
if (requestPaths == null) return;
Map<Path, List<Supplier<FileEventListener>>> watchFiles = deepCopyOfWatchFiles(requestPaths);
for (Map.Entry<Path, List<Supplier<FileEventListener>>> entry : watchFiles.entrySet()) {
for (Supplier<FileEventListener> supplier : entry.getValue()) {
FileEventListener listener = supplier.get();
if (listener != null) {
try {
listener.onFileEvent(entry.getKey(), event);
} catch (Exception e) {
LOGGER.error("Exception on file update for {}.", entry.getKey(), e);
}
}
}
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(FileWatcher.class);
private final Object mutex;
private final DirWatcher dirWatcher;
private final Map<Path, FileEventListener> dirListenerMap;
// Watched files, as a map from requested file path to list of watchers.
private final Map<Path, List<Supplier<FileEventListener>>> watchedFiles;
// File Path -> Requested paths.
private final Map<Path, Set<Path>> targetToRequests;
}