FileWatcher.java

/*
 * Copyright (c) 2017, Stein Eldar Johnsen
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.file;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;
import static net.morimekta.file.FileUtil.readCanonicalPath;

/**
 * File watcher helper for use with simple callbacks. It monitors two
 * types of changes: Actual file changes, and symlink changes. Note that
 * whenever a file or symlink is added to the whole, it may be monitored
 * at until the file or symlink is deleted.
 *
 * The {@link FileEvent} listeners will be notified on all events for which
 * the listener was registered for. And regardless of the real change, e.g.
 * on a symlink in the middle of a symlink chain, the event will always
 * resolve back to the "requested" file, and it is triggered whenever
 * any of:
 *
 * <ul>
 *     <li>The current canonical file content changed.</li>
 *     <li>The link starts pointing to a different file, this
 *         is only calculated based on the file being a symlink,
 *         or being in a symlinked directory. Everything else
 *         is treated as canonical locations.</li>
 * </ul>
 *
 * <p>
 *
 * So if this is the case (ref configMaps in kubernetes):
 * <pre>{@code
 * /volume/map/..2018-01/config.txt (old file)
 * /volume/map/..2018-02/config.txt (new file)
 * /volume/map/..data     -> symlink to '..2018-01'
 * /volume/map/config.txt -> symlink to '..data/config.txt'
 * }</pre>
 *
 * If you listen to '/volume/map/config.txt', then you are notified if:
 *
 * <ul>
 *     <li>'/volume/map/..2018-01/config.txt' is changed</li>
 *     <li>'/volume/map/..data' symlink is updated to '..2018-02'</li>
 *     <li>'/volume/map/config.txt' symlink is updated to '..2018-02/config.txt'</li>
 * </ul>
 *
 * The important case here is the middle one, as this is the way kubernetes
 * handles its configMaps and Secrets. Note that the file-watcher is meant to
 * be used to watch individual files, not full directories. For that use the
 * {@link DirWatcher}.
 */
public class FileWatcher implements Closeable {
    /**
     * Create a FileWatcher with default watch service.
     */
    public FileWatcher() {
        this(new DirWatcher());
    }

    /**
     * Create a FileWatcher using the provided directory watcher. Note that the
     * watcher will be closed with the file watcher.
     *
     * @param dirWatcher Watcher service to use.
     */
    public FileWatcher(DirWatcher dirWatcher) {
        this.mutex = new Object();
        this.dirWatcher = dirWatcher;
        this.dirListenerMap = new ConcurrentHashMap<>();
        this.watchedFiles = new ConcurrentHashMap<>();
        this.targetToRequests = new ConcurrentHashMap<>();
    }

    /**
     * Start watching file path and notify watcher for updates on that file.
     *
     * @param file     The file path to watch.
     * @param listener The listener to be notified.
     * @return The watcher.
     */
    public FileWatcher addWatcher(Path file, FileEventListener listener) {
        requireNonNull(file, "file == null");
        requireNonNull(listener, "listener == null");
        synchronized (mutex) {
            startWatchingInternal(file).add(() -> listener);
        }
        return this;
    }

    /**
     * Start watching file path and notify watcher for updates on that file.
     * The watcher will be kept in a weak reference and will allow GC to delete
     * the instance.
     *
     * @param file     The file path to watch.
     * @param listener The listener to be notified.
     * @return The watcher.
     */
    public FileWatcher weakAddWatcher(Path file, FileEventListener listener) {
        requireNonNull(file, "file == null");
        requireNonNull(listener, "listener == null");
        synchronized (mutex) {
            startWatchingInternal(file).add(new WeakReference<>(listener)::get);
        }
        return this;
    }

    /**
     * Remove a watcher from the list of listeners.
     *
     * @param file     The file path to no longer watch.
     * @param listener The listener to be removed.
     * @return True if the listener was removed from the list.
     */
    public boolean removeWatcher(Path file, FileEventListener listener) {
        requireNonNull(file, "file == null");
        requireNonNull(listener, "listener == null");

        synchronized (mutex) {
            List<Supplier<FileEventListener>> suppliers = watchedFiles.get(file);
            if (suppliers == null) {
                return false;
            }
            AtomicBoolean removed = new AtomicBoolean(false);
            if (removeFromListeners(suppliers, listener)) {
                removed.set(true);
            }
            if (suppliers.isEmpty()) {
                stopWatchingInternal(file);
            }
            return removed.get();
        }
    }

    /**
     * Remove a watcher from the list of listeners.
     *
     * @param listener The listener to be removed.
     * @return True if the listener was removed from the list.
     */
    public boolean removeWatcher(FileEventListener listener) {
        requireNonNull(listener, "listener == null");

        synchronized (mutex) {
            AtomicBoolean removed = new AtomicBoolean(false);
            watchedFiles.forEach((path, suppliers) -> {
                if (removeFromListeners(suppliers, listener)) {
                    removed.set(true);
                }
                if (suppliers.isEmpty()) {
                    stopWatchingInternal(path);
                }
            });
            return removed.get();
        }
    }

    @Override
    public void close() {
        synchronized (mutex) {
            targetToRequests.clear();
            watchedFiles.clear();
            dirListenerMap.clear();
        }
        dirWatcher.close();
    }

    private <T> boolean removeFromListeners(List<Supplier<T>> listeners,
                                            T listener) {
        Iterator<Supplier<T>> iterator = listeners.iterator();
        boolean               removed  = false;
        while (iterator.hasNext()) {
            T next = iterator.next().get();
            if (next == listener) {
                iterator.remove();
                removed = true;
            } else if (next == null) {
                iterator.remove();
            }
        }
        return removed;
    }

    private List<Supplier<FileEventListener>> startWatchingInternal(Path path) {
        try {
            if (!Files.exists(path)) {
                throw new UncheckedIOException("No such file: " + path.toString(),
                                               new FileNotFoundException(path.toString()));
            }

            path = path.toAbsolutePath();
            // Resolve directory to canonical directory.
            Path parent = readCanonicalPath(path.getParent());
            // But do not canonical file, as if this is a symlink, we want to listen
            // to changes to the symlink, not just the file it points to. If that is wanted
            // the file should be made canonical (resolve symlinks) before calling
            // startWatching(file).
            path = parent.resolve(path.getFileName());

            addDirectoryWatcher(parent);
            for (Path add : linkTargets(path)) {
                Path addParent = add.getParent();
                if (Files.isSymbolicLink(addParent)) {
                    addDirectoryWatcher(addParent.getParent());
                }
                addDirectoryWatcher(add.getParent());
            }
            addLinkTarget(path);

            return watchedFiles.computeIfAbsent(path, fp -> new ArrayList<>());
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    private void addDirectoryWatcher(Path directory) {
        dirListenerMap.computeIfAbsent(directory, dir -> {
            FileEventListener listener = this::onFileEvent;
            dirWatcher.addWatcher(dir, listener);
            return listener;
        });
    }

    private void stopWatchingInternal(Path path) {
        try {
            path = path.toAbsolutePath();
            // Resolve directory to canonical directory.
            Path parent = readCanonicalPath(path.getParent());
            // But do not canonical file, as if this is a symlink, we want to listen
            // to changes to the symlink, not the file it points to. If that is wanted
            // the file should be made canonical (resolve symlinks) before calling
            // startWatching(file).
            path = parent.resolve(path.getFileName());

            removeLinkTarget(path);
            watchedFiles.remove(path);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    // List of targets for a file or link, ending with the
    // canonical target of the link. The first entry is always the
    // requested file / link, and the last entry the canonical file.
    private List<Path> linkTargets(Path link) throws IOException {
        link = link.toAbsolutePath();

        List<Path> out = new ArrayList<>();
        out.add(link);

        if (Files.isSymbolicLink(link.getParent())) {
            Path target = link.getParent()
                              .getParent()
                              .resolve(Files.readSymbolicLink(link.getParent()))
                              .resolve(link.getFileName())
                              .toAbsolutePath();
            out.addAll(linkTargets(target));
        } else if (Files.isSymbolicLink(link)) {
            Path target = link.getParent()
                              .resolve(Files.readSymbolicLink(link))
                              .toAbsolutePath();
            out.addAll(linkTargets(target));
        } else {
            // Then resolve the rest.
            Path canonical = readCanonicalPath(link);
            if (!canonical.equals(link)) {
                out.add(canonical);
            }
        }

        return out;
    }

    private void removeLinkTarget(Path request) {
        Set<Path> removeTargets = new HashSet<>();
        for (Map.Entry<Path, Set<Path>> entry : targetToRequests.entrySet()) {
            entry.getValue().remove(request);
            if (entry.getValue().isEmpty()) {
                removeTargets.add(entry.getKey());
            }
        }
        for (Path target : removeTargets) {
            targetToRequests.remove(target);
        }
    }

    private void addLinkTarget(Path request) throws IOException {
        List<Path> links = linkTargets(request);
        for (Path path : links) {
            targetToRequests.computeIfAbsent(path, l -> new HashSet<>()).add(request);
            if (Files.isSymbolicLink(path.getParent())) {
                targetToRequests.computeIfAbsent(path.getParent(), l -> new HashSet<>()).add(request);
            }
        }
    }

    private Map<Path, List<Supplier<FileEventListener>>> deepCopyOfWatchFiles(Set<Path> requestPaths) {
        synchronized (mutex) {
            Map<Path, List<Supplier<FileEventListener>>> out = new HashMap<>();
            for (Path path : requestPaths) {
                Optional.ofNullable(watchedFiles.get(path))
                        .ifPresent(suppliers -> out.put(path, new ArrayList<>(suppliers)));
            }
            return out;
        }
    }

    private void onFileEvent(Path file, FileEvent event) {
        Set<Path> requestPaths = targetToRequests.get(file);
        if (requestPaths == null) return;
        Map<Path, List<Supplier<FileEventListener>>> watchFiles = deepCopyOfWatchFiles(requestPaths);
        for (Map.Entry<Path, List<Supplier<FileEventListener>>> entry : watchFiles.entrySet()) {
            for (Supplier<FileEventListener> supplier : entry.getValue()) {
                FileEventListener listener = supplier.get();
                if (listener != null) {
                    try {
                        listener.onFileEvent(entry.getKey(), event);
                    } catch (Exception e) {
                        LOGGER.error("Exception on file update for {}.", entry.getKey(), e);
                    }
                }
            }
        }
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(FileWatcher.class);

    private final Object mutex;
    private final DirWatcher                              dirWatcher;
    private final Map<Path, FileEventListener>            dirListenerMap;
    // Watched files, as a map from requested file path to list of watchers.
    private final Map<Path, List<Supplier<FileEventListener>>> watchedFiles;
    // File Path -> Requested paths.
    private final Map<Path, Set<Path>>                    targetToRequests;
}