DirWatcher.java

/*
 * Copyright (c) 2017, Stein Eldar Johnsen
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.file;

import net.morimekta.file.internal.RunFileEventCallbacks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.WeakReference;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
import static java.util.Objects.requireNonNull;
import static net.morimekta.file.FileEvent.CREATED;
import static net.morimekta.file.FileEvent.DELETED;
import static net.morimekta.file.FileEvent.MODIFIED;

/**
 * A dir watcher uses a native watch service and watch directories
 * for any change. Each listener is registered to be listening to
 * individual directories, but can be registered for more than one.
 * The listener will then be notified of changes to the content of
 * the directory. Including:
 *
 * <ul>
 *     <li>Files, sub-directories or symlinks being created</li>
 *     <li>Files, sub-directories or symlinks being deleted</li>
 *     <li>
 *         Files being written to. This <i>MAY</i> cause multiple
 *         events, as each event is triggered by file-system writes.
 *     </li>
 *     <li>
 *         When a file is moved into or out from the directory.
 *         Handled as a {@link FileEvent#CREATED} or {@link FileEvent#DELETED}
 *         event respectively.
 *     </li>
 * </ul>
 *
 * It does <b>NOT</b> notify on:
 *
 * <ul>
 *     <li>The listened to directory being deleted itself.</li>
 *     <li>Changes to metadata and permissions</li>
 * </ul>
 */
public class DirWatcher implements Closeable {
    /**
     * Create a FileWatcher with default watch service.
     */
    public DirWatcher() {
        this(newWatchService());
    }

    /**
     * Create a FileWatcher using the provided watch service.
     *
     * @param watchService Watcher service to use.
     */
    public DirWatcher(WatchService watchService) {
        this(watchService,
             Executors.newSingleThreadExecutor(makeThreadFactory("DirWatcher")),
             Executors.newSingleThreadExecutor(makeThreadFactory("DirWatcherCallback")));
    }

    // @VisibleForTesting
    protected DirWatcher(WatchService watchService,
                         ExecutorService watcherExecutor,
                         ExecutorService callbackExecutor) {
        this.mutex = new Object();
        this.watchDirKeys = new HashMap<>();
        this.watchKeyDirs = new HashMap<>();
        this.watchDirListeners = new ConcurrentHashMap<>();
        this.watchService = requireNonNull(watchService, "watchService == null");
        this.callbackExecutor = requireNonNull(callbackExecutor, "callbackExecutor == null");
        this.watcherExecutor = requireNonNull(watcherExecutor, "watchExecutor == null");
        this.watcherExecutor.submit(this::watchFilesTask);
    }

    /**
     * Start watching file path and notify watcher for updates on that file.
     *
     * @param dir      The file path to watch.
     * @param listener The listener to be notified.
     * @return The watcher.
     */
    public DirWatcher addWatcher(Path dir, FileEventListener listener) {
        requireNonNull(dir, "dir == null");
        requireNonNull(listener, "listener == null");
        synchronized (mutex) {
            startWatchingInternal(dir).add(() -> listener);
        }
        return this;
    }

    /**
     * Start watching file path and notify watcher for updates on that file.
     *
     * @param dir      The file path to watch.
     * @param listener The listener to be notified.
     * @return The watcher.
     */
    public DirWatcher weakAddWatcher(Path dir, FileEventListener listener) {
        requireNonNull(dir, "dir == null");
        requireNonNull(listener, "listener == null");
        synchronized (mutex) {
            startWatchingInternal(dir).add(new WeakReference<>(listener)::get);
        }
        return this;
    }

    /**
     * Remove a watcher from the list of listeners.
     *
     * @param dir      The file path to no longer watch.
     * @param listener The listener to be removed.
     * @return True if the listener was removed from the list.
     */
    public boolean removeWatcher(Path dir, FileEventListener listener) {
        requireNonNull(dir, "dir == null");
        requireNonNull(listener, "listener == null");

        dir = readCanonicalPath(dir);
        synchronized (mutex) {
            List<Supplier<FileEventListener>> suppliers = watchDirListeners.get(dir);
            if (suppliers == null) {
                return false;
            }

            AtomicBoolean removed = new AtomicBoolean(false);
            if (removeFromListeners(suppliers, listener)) {
                removed.set(true);
            }
            if (suppliers.isEmpty()) {
                watchDirListeners.remove(dir);
                Optional.ofNullable(watchDirKeys.remove(dir))
                        .ifPresent(key -> {
                            key.cancel();
                            watchKeyDirs.remove(key);
                        });
            }
            return removed.get();
        }
    }

    /**
     * Remove a watcher from the list of listeners.
     *
     * @param listener The listener to be removed.
     * @return True if the listener was removed from the list.
     */
    public boolean removeWatcher(FileEventListener listener) {
        requireNonNull(listener, "listener == null");

        synchronized (mutex) {
            AtomicBoolean removed = new AtomicBoolean(false);
            Iterator<Map.Entry<Path, List<Supplier<FileEventListener>>>> iterator =
                    watchDirListeners.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Path, List<Supplier<FileEventListener>>> entry = iterator.next();

                if (removeFromListeners(entry.getValue(), listener)) {
                    removed.set(true);
                }
                if (entry.getValue().isEmpty()) {
                    iterator.remove();
                    Optional.ofNullable(watchDirKeys.remove(entry.getKey()))
                            .ifPresent(key -> {
                                              key.cancel();
                                              watchKeyDirs.remove(key);
                                       });
                }
            }
            return removed.get();
        }
    }

    @Override
    public void close() {
        synchronized (mutex) {
            if (watcherExecutor.isShutdown()) {
                return;
            }
            watcherExecutor.shutdown();
            callbackExecutor.shutdown();

            watchDirListeners.clear();
            watchDirKeys.clear();
            watchKeyDirs.clear();
        }
        try {
            watchService.close();
        } catch (IOException e) {
            LOGGER.error("WatchService did not close.", e);
        }
        try {
            if (!watcherExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                LOGGER.warn("WatcherExecutor failed to terminate in 10 seconds.");
            }
        } catch (InterruptedException e) {
            LOGGER.error("WatcherExecutor termination interrupted.", e);
        }
        try {
            if (!callbackExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                LOGGER.warn("CallbackExecutor failed to terminate in 10 seconds.");
            }
        } catch (InterruptedException e) {
            LOGGER.error("CallbackExecutor termination interrupted.", e);
        }
    }

    private <T> boolean removeFromListeners(List<Supplier<T>> listeners,
                                            T listener) {
        Iterator<Supplier<T>> iterator = listeners.iterator();
        boolean               removed  = false;
        while (iterator.hasNext()) {
            T next = iterator.next().get();
            if (next == listener) {
                iterator.remove();
                removed = true;
            } else if (next == null) {
                iterator.remove();
            }
        }
        return removed;
    }

    private static ThreadFactory makeThreadFactory(String name) {
        AtomicInteger idx = new AtomicInteger();
        return runnable -> {
            Thread thread = Executors.defaultThreadFactory().newThread(runnable);
            thread.setName(name + "-" + idx.incrementAndGet());
            thread.setDaemon(true);
            return thread;
        };
    }

    private List<Supplier<FileEventListener>> startWatchingInternal(Path path) {
        if (watcherExecutor.isShutdown()) {
            throw new IllegalStateException("Starts to watch on closed FileWatcher");
        }

        Path canonicalPath = readCanonicalPath(path);
        if (!Files.exists(canonicalPath)) {
            throw new UncheckedIOException("No such directory: " + path.toString(),
                                           new FileNotFoundException(path.toString()));
        }
        if (!Files.isDirectory(canonicalPath)) {
            throw new IllegalArgumentException("Not a directory: " + path);
        }
        watchDirKeys.computeIfAbsent(canonicalPath, dir -> {
            try {
                WatchKey key = canonicalPath.register(
                        watchService,
                        ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE, OVERFLOW);
                watchKeyDirs.put(key, canonicalPath);
                return key;
            } catch (IOException e) {
                throw new UncheckedIOException(e.getMessage(), e);
            }
        });
        return watchDirListeners.computeIfAbsent(path, fp -> new ArrayList<>());
    }

    /**
     * Handle the watch service event loop.
     */
    private void watchFilesTask() {
        while (!watcherExecutor.isShutdown()) {
            try {
                WatchKey key    = watchService.take();
                Path     parent = watchKeyDirs.get(key);
                if (parent == null) {
                    key.reset();
                    continue;
                }

                Set<Map.Entry<Path, FileEvent>> updates = new LinkedHashSet<>();
                for (WatchEvent<?> ev : key.pollEvents()) {
                    WatchEvent.Kind<?> kind  = ev.kind();
                    FileEvent          event = forWatchEventKind(kind);
                    if (event == null) {
                        if (kind == OVERFLOW) {
                            LOGGER.warn("Overflow event, file updates may have been lost.");
                        }
                        continue;
                    }

                    @SuppressWarnings("unchecked")
                    WatchEvent<Path> watchEvent = (WatchEvent<Path>) ev;
                    updates.add(new AbstractMap.SimpleImmutableEntry<>(
                            parent.resolve(watchEvent.context()), event));
                }
                // Ready the key again so it will signal more events.
                key.reset();

                if (updates.size() > 0) {
                    Map<Path, List<Supplier<FileEventListener>>> watcherMap;
                    synchronized (mutex) {
                        watcherMap = deepCopy(watchDirListeners);
                    }

                    callbackExecutor.submit(new RunFileEventCallbacks(updates, watcherMap));
                }
            } catch (InterruptedException interruptedEx) {
                // LOGGER.error("Interrupted in service file watch thread: " + interruptedEx.getMessage(), interruptedEx);
                return;
            }
        }
    }

    private static Map<Path, List<Supplier<FileEventListener>>> deepCopy(Map<Path, List<Supplier<FileEventListener>>> in) {
        Map<Path, List<Supplier<FileEventListener>>> out = new HashMap<>();
        in.forEach((path, suppliers) -> out.put(path, new ArrayList<>(suppliers)));
        return out;
    }

    private static Path readCanonicalPath(Path path) {
        try {
            return FileUtil.readCanonicalPath(path);
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    private static WatchService newWatchService() {
        try {
            return FileSystems.getDefault().newWatchService();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    // Only file modification, creation, deletion is interesting.
    public static FileEvent forWatchEventKind(WatchEvent.Kind<?> kind) {
        if (kind == ENTRY_CREATE) {
            return CREATED;
        } else if (kind == ENTRY_MODIFY) {
            return MODIFIED;
        } else if (kind == ENTRY_DELETE) {
            return DELETED;
        }
        return null;
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(FileWatcher.class);

    private final Object mutex;

    // Watched files, as a map from requested file path to list of watchers.
    private final Map<Path, List<Supplier<FileEventListener>>> watchDirListeners;
    private final Map<Path, WatchKey>                          watchDirKeys;
    private final Map<WatchKey, Path>                          watchKeyDirs;

    private final ExecutorService callbackExecutor;
    private final ExecutorService watcherExecutor;
    private final WatchService    watchService;
}