ConfigWatcher.java

/*
 * Copyright 2023 Morimekta Utils Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.config;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import net.morimekta.collect.util.LazyCachedSupplier;
import net.morimekta.config.readers.ConfigReader;
import net.morimekta.file.DirWatcher;
import net.morimekta.file.FileEvent;
import net.morimekta.file.FileEventListener;
import net.morimekta.file.FileUtil;
import net.morimekta.file.FileWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;
import static net.morimekta.config.ConfigChangeType.CREATED;
import static net.morimekta.config.ConfigChangeType.DELETED;
import static net.morimekta.config.ConfigChangeType.MODIFIED;
import static net.morimekta.config.ConfigEventListener.Status.ERROR;
import static net.morimekta.config.ConfigEventListener.Status.OK;
import static net.morimekta.config.ConfigEventListener.Status.PARSE_FAILED;
import static net.morimekta.config.ConfigEventListener.Status.READ_FAILED;
import static net.morimekta.config.ConfigException.asConfigException;
import static net.morimekta.file.FileUtil.readCanonicalPath;

/**
 * The config watcher watches the whole directory of config files of the same
 * type, and notifies listeners of any changes. The purpose of this watcher is
 * to manage config files serving different entities of the same type, where the
 * entity config for each can change live, be created and removed.
 */
public class ConfigWatcher<ConfigFileType> implements Closeable {
    /**
     * A config file entry cached in the watcher.
     *
     * @param <ConfigFileType> The parsed config file type.
     */
    public static class ConfigEntry<ConfigFileType> {
        /**
         * The file name of the config file.
         */
        public final Path           file;
        public final String         identifier;
        public final Instant        updatedAt;
        public final ConfigFileType config;

        /**
         * @param file       The config file.
         * @param identifier The config identifier.
         * @param updatedAt  The timestamp it was last successfully updated.
         * @param config     The current config content.
         */
        public ConfigEntry(Path file,
                           String identifier,
                           Instant updatedAt,
                           ConfigFileType config) {
            this.file = requireNonNull(file, "file == null");
            this.identifier = requireNonNull(identifier, "identifier == null");
            this.updatedAt = requireNonNull(updatedAt, "updatedAt == null");
            this.config = requireNonNull(config, "config == null");
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            ConfigEntry<?> that = (ConfigEntry<?>) o;
            return file.equals(that.file) &&
                   identifier.equals(that.identifier) &&
                   updatedAt.equals(that.updatedAt) &&
                   config.equals(that.config);
        }

        @Override
        public int hashCode() {
            return Objects.hash(file, identifier, updatedAt, config);
        }

        @Override
        public String toString() {
            return "ConfigEntry{" +
                   "file=" + file +
                   ", identifier='" + identifier + '\'' +
                   ", updatedAt=" + updatedAt +
                   ", config=" + config +
                   '}';
        }
    }

    /**
     * Create a config watcher instance.
     *
     * @param configDir        The directory where the config files should be.
     * @param configReader     The config file reader used for parsing.
     * @param configIdentifier Get identifiers from the config file for
     *                         metrics and logging.
     */
    public ConfigWatcher(Path configDir,
                         ConfigReader<ConfigFileType> configReader,
                         Function<ConfigFileType, String> configIdentifier) {
        this(configDir, configReader, configIdentifier, Clock.systemUTC());
    }

    /**
     * Create a config watcher instance.
     *
     * @param configDir        The directory where the config files should be.
     * @param configReader     The config file reader used for parsing.
     * @param configIdentifier Get identifiers from the config file for
     *                         metrics and logging.
     * @param clock            Clock for knowing config update times.
     */
    public ConfigWatcher(Path configDir,
                         ConfigReader<ConfigFileType> configReader,
                         Function<ConfigFileType, String> configIdentifier,
                         Clock clock) {
        this(configDir, configReader, configIdentifier, clock, DIR_WATCHER.get(), FILE_WATCHER.get());
    }

    /**
     * Create a config watcher instance.
     *
     * @param configDir        The directory where the config files should be.
     * @param configReader     The config file reader used for parsing.
     * @param configIdentifier Get identifiers from the config file for
     *                         metrics and logging.
     * @param clock            Clock for knowing config update times.
     * @param dirWatcher       A directory watcher.
     * @param fileWatcher      A file watcher.
     */
    public ConfigWatcher(Path configDir,
                         ConfigReader<ConfigFileType> configReader,
                         Function<ConfigFileType, String> configIdentifier,
                         Clock clock,
                         DirWatcher dirWatcher,
                         FileWatcher fileWatcher) {
        this.clock = requireNonNull(clock, "clock == null");
        this.configDir = requireNonNull(configDir, "configDir == null");
        this.configReader = requireNonNull(configReader, "configReader == null");
        this.configIdentifier = requireNonNull(configIdentifier, "configIdentifier == null");
        this.dirWatcher = requireNonNull(dirWatcher, "dirWatcher == null");
        this.fileWatcher = requireNonNull(fileWatcher, "fileWatcher == null");

        if (!Files.isDirectory(configDir)) {
            throw new IllegalArgumentException("Not a directory: " + configDir);
        }

        this.eventListeners = new ArrayList<>();
        this.changeListeners = new ArrayList<>();
        this.loadedConfigMap = new HashMap<>();
        this.dirEventListener = this::onDirectoryEvent;
        this.fileEventListener = this::onFileEvent;
    }

    /**
     * @return The path to the config directory.
     */
    @JsonProperty("configDir")
    public Path getConfigDir() {
        return configDir;
    }

    /**
     * @return The currently loaded config map as map of absolute file path
     * to loaded config entry with metadata.
     */
    @JsonIgnore
    public Map<Path, ConfigEntry<ConfigFileType>> getLoadedConfigMap() {
        synchronized (mutex) {
            return Map.copyOf(loadedConfigMap);
        }
    }

    /**
     * Add a config event listener for events of what is happening on the config
     * watcher.
     *
     * @param listener Config Event Listener
     * @return The config watcher.
     */
    @JsonIgnore
    public ConfigWatcher<ConfigFileType> addEventListener(ConfigEventListener listener) {
        requireNonNull(listener);
        synchronized (mutex) {
            this.eventListeners.removeIf(it -> it == listener);
            this.eventListeners.add(listener);
        }
        return this;
    }

    /**
     * Add a config change listener for notification on changes to the loaded
     * config map.
     *
     * @param listener Config Event Listener
     * @return The config watcher.
     */
    @JsonIgnore
    public ConfigWatcher<ConfigFileType> addChangeListener(ConfigChangeListener<ConfigFileType> listener) {
        requireNonNull(listener);
        synchronized (mutex) {
            this.changeListeners.removeIf(it -> it == listener);
            this.changeListeners.add(listener);
        }
        return this;
    }

    /**
     * Start the NetworkConfigWatcher and throw exception if unable to read and
     * update config.
     *
     * @return The config watcher.
     * @throws ConfigException On any config read, parse or update issues.
     */
    @JsonIgnore
    public ConfigWatcher<ConfigFileType> start() throws ConfigException {
        return start(true);
    }

    /**
     * Start the NetworkConfigWatcher and throw exception if unable to read and
     * update config.
     *
     * @param failOnConfigException If the start sequence should fail on config exception. If this is set to false,
     *                              config exceptions will be logged, but otherwise ignored.
     * @return The config watcher.
     * @throws ConfigException On any config read, parse or update issues.
     */
    @JsonIgnore
    public ConfigWatcher<ConfigFileType> start(boolean failOnConfigException) throws ConfigException {
        synchronized (mutex) {
            this.dirWatcher.weakAddWatcher(configDir, dirEventListener);
            var ex = new ConfigExceptionBuffer();
            try {
                for (var file : FileUtil.list(configDir)) {
                    try {
                        var canonical = readCanonicalPath(file);
                        if (!Files.isHidden(file) && Files.isRegularFile(canonical)) {
                            fileWatcher.weakAddWatcher(file, fileEventListener);
                            onFileEventInternal(file, FileEvent.CREATED, true);
                        }
                    } catch (Exception e) {
                        if (failOnConfigException) {
                            LOGGER.error("Exception handling new config file {}", file.getFileName(), e);
                            ex.update(e);
                        } else {
                            LOGGER.warn("Exception handling new config file {}", file.getFileName(), e);
                        }
                    }
                }
            } catch (Exception e) {
                ex.update(e);
            }
            ex.throwIfPresent();
        }
        return this;
    }

    // --- Closable ---

    /**
     * Immediately remove all change listeners and stop watching files.
     */
    @Override
    public void close() {
        synchronized (mutex) {
            eventListeners.clear();
            changeListeners.clear();
            dirWatcher.removeWatcher(dirEventListener);
            fileWatcher.removeWatcher(fileEventListener);
        }
    }

    // --- Object ---

    @Override
    public String toString() {
        return "ConfigWatcher{dir=" + configDir + "}";
    }

    // *****************
    // **   PRIVATE   **
    // *****************

    private void onDirectoryEvent(Path file, FileEvent event) {
        try {
            Path canonical = readCanonicalPath(file);
            if (Files.isHidden(file) || !Files.isRegularFile(canonical)) {
                return;
            }
            if (event == FileEvent.CREATED) {
                synchronized (mutex) {
                    fileWatcher.weakAddWatcher(file, fileEventListener);
                    var config = readConfig(event, file, canonical);
                    var old = loadedConfigMap.get(file);
                    if (old != null && config.equals(old.config)) {
                        // Unlikely, but this may happen e.g. if the directory
                        // structure does not follow the writable config map
                        // pattern of kubernetes. Though That will have other
                        // unintended consequences too.
                        return;
                    }
                    var change = old == null ? CREATED : MODIFIED;
                    var identifier = configIdentifier.apply(config);
                    try {
                        LOGGER.info(
                                "{} config for '{}' from {}",
                                change.capitalizedAction(),
                                identifier,
                                file);
                        onConfigChange(change, config);
                        onConfigFileUpdate(change, file, identifier, OK);
                        loadedConfigMap.put(file, new ConfigEntry<>(
                                file, identifier, clock.instant(), config));
                    } catch (Throwable e) {
                        onConfigFileUpdate(change, file, identifier, ERROR);
                        LOGGER.error("Failed to update config: {}", file, e);
                    }
                }
            }
        } catch (Exception e) {
            LOGGER.error("Failed to update config: {}", file, e);
        }
    }

    private void onFileEvent(Path file, FileEvent event) {
        try {
            synchronized (mutex) {
                onFileEventInternal(file, event, false);
            }
        } catch (Throwable e) {
            LOGGER.error("Failed to update config: {}", file, e);
        }
    }

    private void onFileEventInternal(Path file, FileEvent event, boolean rethrow)
            throws IOException, ConfigException {
        if (event == FileEvent.CREATED || event == FileEvent.MODIFIED) {
            Path canonical = readCanonicalPath(file);
            if (!Files.exists(canonical)) {
                // Check for file exists, as sometimes the intermediate symlink may
                // be recreated (triggering event) before the source link file is
                // deleted. Ignore even in this case. This should only happen just
                // before the symlink is deleted triggering event below.
                return;
            }
            var config = readConfig(
                    loadedConfigMap.containsKey(file)
                    ? FileEvent.MODIFIED
                    : FileEvent.CREATED, file, canonical);
            var old = loadedConfigMap.get(file);
            if (old != null && config.equals(old.config)) {
                return;
            }
            var change = old == null ? CREATED : MODIFIED;
            var identifier = configIdentifier.apply(config);
            try {
                // it has become current locality.
                onConfigChange(change, config);
                onConfigFileUpdate(change, file, identifier, OK);
                loadedConfigMap.put(file, new ConfigEntry<>(
                        file, identifier, clock.instant(), config));
            } catch (Exception e) {
                onConfigFileUpdate(change, file, identifier, ERROR);
                if (rethrow) {
                    throw asConfigException(e);
                } else {
                    LOGGER.error(
                            "{} config for '{}' from {} failed: {}",
                            change.capitalizedAction(),
                            identifier,
                            file,
                            e.getMessage(),
                            e);
                }
            }
        } else if (event == FileEvent.DELETED) {
            if (Files.exists(file)) {
                // Ignore delete event if the file still exist. It refers to the file
                // that this pointed to before an update, which is deleted from disk
                // after an update. In some cases the update of the symlink related
                // to chained symlinks are slightly delayed, so we get stray delete
                // events when we still have a file.
                return;
            }
            var current = loadedConfigMap.get(file);
            if (current != null) {
                var identifier = configIdentifier.apply(current.config);
                try {
                    onConfigFileRead(FileEvent.DELETED, file, OK);
                    onConfigChange(DELETED, current.config);
                    onConfigFileUpdate(DELETED, file, identifier, OK);
                } catch (Exception e) {
                    LOGGER.error(
                            "Failed to remove network {} service from config: {}",
                            identifier,
                            file,
                            e);
                    onConfigFileUpdate(DELETED, file, identifier, ERROR);
                } finally {
                    // always remove from cache, should only trigger once,
                    // even on exception.
                    loadedConfigMap.remove(file);
                }
            }
        }
    }

    private ConfigFileType readConfig(FileEvent fileEvent, Path file, Path canonical) throws ConfigException {
        try {
            var config = configReader.readConfig(canonical);
            onConfigFileRead(fileEvent, file, OK);
            return config;
        } catch (ConfigException e) {
            onConfigFileRead(fileEvent, file, PARSE_FAILED);
            throw e;
        } catch (IOException e) {
            onConfigFileRead(fileEvent, file, READ_FAILED);
            throw new ConfigException("Unable to read config: " + e.getMessage(), e);
        }
    }

    private void onConfigChange(ConfigChangeType changeType, ConfigFileType config) throws ConfigException {
        var ex = new ConfigExceptionBuffer();
        for (var cl : changeListeners) {
            try {
                cl.onConfigChange(changeType, config);
            } catch (Exception e) {
                ex.update(e);
            }
        }
        ex.throwIfPresent();
    }

    private void onConfigFileRead(
            FileEvent fileEvent,
            Path file,
            ConfigEventListener.Status status) {
        for (var el : eventListeners) {
            try {
                el.onConfigFileRead(fileEvent, file, status);
            } catch (Exception e) {
                LOGGER.warn("Exception in config event listener: {}", e.getMessage(), e);
            }
        }
    }

    private void onConfigFileUpdate(
            ConfigChangeType changeType,
            Path file,
            String identifier,
            ConfigEventListener.Status status) {
        for (var el : eventListeners) {
            try {
                el.onConfigFileUpdate(changeType, file, identifier, status);
            } catch (Exception e) {
                LOGGER.warn("Exception in config event listener: {}", e.getMessage(), e);
            }
        }
    }

    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigWatcher.class);

    static final Supplier<DirWatcher>  DIR_WATCHER  = new LazyCachedSupplier<>(DirWatcher::new);
    static final Supplier<FileWatcher> FILE_WATCHER = new LazyCachedSupplier<>(() -> new FileWatcher(DIR_WATCHER.get()));

    private final ConfigReader<ConfigFileType>     configReader;
    private final Function<ConfigFileType, String> configIdentifier;

    private final List<ConfigEventListener>                  eventListeners;
    private final List<ConfigChangeListener<ConfigFileType>> changeListeners;
    private final Map<Path, ConfigEntry<ConfigFileType>>     loadedConfigMap;

    private final Clock             clock;
    private final Path              configDir;
    private final Object            mutex = new Object();
    private final DirWatcher        dirWatcher;
    private final FileWatcher       fileWatcher;
    private final FileEventListener fileEventListener;
    private final FileEventListener dirEventListener;
}