ConfigWatcher.java
/*
* Copyright 2023 Morimekta Utils Authors
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import net.morimekta.collect.util.LazyCachedSupplier;
import net.morimekta.config.readers.ConfigReader;
import net.morimekta.file.DirWatcher;
import net.morimekta.file.FileEvent;
import net.morimekta.file.FileEventListener;
import net.morimekta.file.FileUtil;
import net.morimekta.file.FileWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
import static net.morimekta.config.ConfigChangeType.CREATED;
import static net.morimekta.config.ConfigChangeType.DELETED;
import static net.morimekta.config.ConfigChangeType.MODIFIED;
import static net.morimekta.config.ConfigEventListener.Status.ERROR;
import static net.morimekta.config.ConfigEventListener.Status.OK;
import static net.morimekta.config.ConfigEventListener.Status.PARSE_FAILED;
import static net.morimekta.config.ConfigEventListener.Status.READ_FAILED;
import static net.morimekta.config.ConfigException.asConfigException;
import static net.morimekta.file.FileUtil.readCanonicalPath;
/**
* The config watcher watches the whole directory of config files of the same
* type, and notifies listeners of any changes. The purpose of this watcher is
* to manage config files serving different entities of the same type, where the
* entity config for each can change live, be created and removed.
*/
public class ConfigWatcher<ConfigFileType> implements Closeable {
/**
* A config file entry cached in the watcher.
*
* @param <ConfigFileType> The parsed config file type.
*/
public static class ConfigEntry<ConfigFileType> {
/**
* The file name of the config file.
*/
public final Path file;
public final String identifier;
public final Instant updatedAt;
public final ConfigFileType config;
/**
* @param file The config file.
* @param identifier The config identifier.
* @param updatedAt The timestamp it was last successfully updated.
* @param config The current config content.
*/
public ConfigEntry(Path file,
String identifier,
Instant updatedAt,
ConfigFileType config) {
this.file = requireNonNull(file, "file == null");
this.identifier = requireNonNull(identifier, "identifier == null");
this.updatedAt = requireNonNull(updatedAt, "updatedAt == null");
this.config = requireNonNull(config, "config == null");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConfigEntry<?> that = (ConfigEntry<?>) o;
return file.equals(that.file) &&
identifier.equals(that.identifier) &&
updatedAt.equals(that.updatedAt) &&
config.equals(that.config);
}
@Override
public int hashCode() {
return Objects.hash(file, identifier, updatedAt, config);
}
@Override
public String toString() {
return "ConfigEntry{" +
"file=" + file +
", identifier='" + identifier + '\'' +
", updatedAt=" + updatedAt +
", config=" + config +
'}';
}
}
/**
* Create a config watcher instance.
*
* @param configDir The directory where the config files should be.
* @param configReader The config file reader used for parsing.
* @param configIdentifier Get identifiers from the config file for
* metrics and logging.
*/
public ConfigWatcher(Path configDir,
ConfigReader<ConfigFileType> configReader,
Function<ConfigFileType, String> configIdentifier) {
this(configDir, configReader, configIdentifier, Clock.systemUTC());
}
/**
* Create a config watcher instance.
*
* @param configDir The directory where the config files should be.
* @param configReader The config file reader used for parsing.
* @param configIdentifier Get identifiers from the config file for
* metrics and logging.
* @param clock Clock for knowing config update times.
*/
public ConfigWatcher(Path configDir,
ConfigReader<ConfigFileType> configReader,
Function<ConfigFileType, String> configIdentifier,
Clock clock) {
this(configDir, configReader, configIdentifier, clock, DIR_WATCHER.get(), FILE_WATCHER.get());
}
/**
* Create a config watcher instance.
*
* @param configDir The directory where the config files should be.
* @param configReader The config file reader used for parsing.
* @param configIdentifier Get identifiers from the config file for
* metrics and logging.
* @param clock Clock for knowing config update times.
* @param dirWatcher A directory watcher.
* @param fileWatcher A file watcher.
*/
public ConfigWatcher(Path configDir,
ConfigReader<ConfigFileType> configReader,
Function<ConfigFileType, String> configIdentifier,
Clock clock,
DirWatcher dirWatcher,
FileWatcher fileWatcher) {
this.clock = requireNonNull(clock, "clock == null");
this.configDir = requireNonNull(configDir, "configDir == null");
this.configReader = requireNonNull(configReader, "configReader == null");
this.configIdentifier = requireNonNull(configIdentifier, "configIdentifier == null");
this.dirWatcher = requireNonNull(dirWatcher, "dirWatcher == null");
this.fileWatcher = requireNonNull(fileWatcher, "fileWatcher == null");
if (!Files.isDirectory(configDir)) {
throw new IllegalArgumentException("Not a directory: " + configDir);
}
this.eventListeners = new ArrayList<>();
this.changeListeners = new ArrayList<>();
this.loadedConfigMap = new HashMap<>();
this.dirEventListener = this::onDirectoryEvent;
this.fileEventListener = this::onFileEvent;
}
/**
* @return The path to the config directory.
*/
@JsonProperty("configDir")
public Path getConfigDir() {
return configDir;
}
/**
* @return The currently loaded config map as map of absolute file path
* to loaded config entry with metadata.
*/
@JsonIgnore
public Map<Path, ConfigEntry<ConfigFileType>> getLoadedConfigMap() {
synchronized (mutex) {
return Map.copyOf(loadedConfigMap);
}
}
/**
* Add a config event listener for events of what is happening on the config
* watcher.
*
* @param listener Config Event Listener
* @return The config watcher.
*/
@JsonIgnore
public ConfigWatcher<ConfigFileType> addEventListener(ConfigEventListener listener) {
requireNonNull(listener);
synchronized (mutex) {
this.eventListeners.removeIf(it -> it == listener);
this.eventListeners.add(listener);
}
return this;
}
/**
* Add a config change listener for notification on changes to the loaded
* config map.
*
* @param listener Config Event Listener
* @return The config watcher.
*/
@JsonIgnore
public ConfigWatcher<ConfigFileType> addChangeListener(ConfigChangeListener<ConfigFileType> listener) {
requireNonNull(listener);
synchronized (mutex) {
this.changeListeners.removeIf(it -> it == listener);
this.changeListeners.add(listener);
}
return this;
}
/**
* Start the NetworkConfigWatcher and throw exception if unable to read and
* update config.
*
* @return The config watcher.
* @throws ConfigException On any config read, parse or update issues.
*/
@JsonIgnore
public ConfigWatcher<ConfigFileType> start() throws ConfigException {
return start(true);
}
/**
* Start the NetworkConfigWatcher and throw exception if unable to read and
* update config.
*
* @param failOnConfigException If the start sequence should fail on config exception. If this is set to false,
* config exceptions will be logged, but otherwise ignored.
* @return The config watcher.
* @throws ConfigException On any config read, parse or update issues.
*/
@JsonIgnore
public ConfigWatcher<ConfigFileType> start(boolean failOnConfigException) throws ConfigException {
synchronized (mutex) {
this.dirWatcher.weakAddWatcher(configDir, dirEventListener);
var ex = new ConfigExceptionBuffer();
try {
for (var file : FileUtil.list(configDir)) {
try {
var canonical = readCanonicalPath(file);
if (!Files.isHidden(file) && Files.isRegularFile(canonical)) {
fileWatcher.weakAddWatcher(file, fileEventListener);
onFileEventInternal(file, FileEvent.CREATED, true);
}
} catch (Exception e) {
if (failOnConfigException) {
LOGGER.error("Exception handling new config file {}", file.getFileName(), e);
ex.update(e);
} else {
LOGGER.warn("Exception handling new config file {}", file.getFileName(), e);
}
}
}
} catch (Exception e) {
ex.update(e);
}
ex.throwIfPresent();
}
return this;
}
// --- Closable ---
/**
* Immediately remove all change listeners and stop watching files.
*/
@Override
public void close() {
synchronized (mutex) {
eventListeners.clear();
changeListeners.clear();
dirWatcher.removeWatcher(dirEventListener);
fileWatcher.removeWatcher(fileEventListener);
}
}
// --- Object ---
@Override
public String toString() {
return "ConfigWatcher{dir=" + configDir + "}";
}
// *****************
// ** PRIVATE **
// *****************
private void onDirectoryEvent(Path file, FileEvent event) {
try {
Path canonical = readCanonicalPath(file);
if (Files.isHidden(file) || !Files.isRegularFile(canonical)) {
return;
}
if (event == FileEvent.CREATED) {
synchronized (mutex) {
fileWatcher.weakAddWatcher(file, fileEventListener);
var config = readConfig(event, file, canonical);
var old = loadedConfigMap.get(file);
if (old != null && config.equals(old.config)) {
// Unlikely, but this may happen e.g. if the directory
// structure does not follow the writable config map
// pattern of kubernetes. Though That will have other
// unintended consequences too.
return;
}
var change = old == null ? CREATED : MODIFIED;
var identifier = configIdentifier.apply(config);
try {
LOGGER.info(
"{} config for '{}' from {}",
change.capitalizedAction(),
identifier,
file);
onConfigChange(change, config);
onConfigFileUpdate(change, file, identifier, OK);
loadedConfigMap.put(file, new ConfigEntry<>(
file, identifier, clock.instant(), config));
} catch (Throwable e) {
onConfigFileUpdate(change, file, identifier, ERROR);
LOGGER.error("Failed to update config: {}", file, e);
}
}
}
} catch (Exception e) {
LOGGER.error("Failed to update config: {}", file, e);
}
}
private void onFileEvent(Path file, FileEvent event) {
try {
synchronized (mutex) {
onFileEventInternal(file, event, false);
}
} catch (Throwable e) {
LOGGER.error("Failed to update config: {}", file, e);
}
}
private void onFileEventInternal(Path file, FileEvent event, boolean rethrow)
throws IOException, ConfigException {
if (event == FileEvent.CREATED || event == FileEvent.MODIFIED) {
Path canonical = readCanonicalPath(file);
if (!Files.exists(canonical)) {
// Check for file exists, as sometimes the intermediate symlink may
// be recreated (triggering event) before the source link file is
// deleted. Ignore even in this case. This should only happen just
// before the symlink is deleted triggering event below.
return;
}
var config = readConfig(
loadedConfigMap.containsKey(file)
? FileEvent.MODIFIED
: FileEvent.CREATED, file, canonical);
var old = loadedConfigMap.get(file);
if (old != null && config.equals(old.config)) {
return;
}
var change = old == null ? CREATED : MODIFIED;
var identifier = configIdentifier.apply(config);
try {
// it has become current locality.
onConfigChange(change, config);
onConfigFileUpdate(change, file, identifier, OK);
loadedConfigMap.put(file, new ConfigEntry<>(
file, identifier, clock.instant(), config));
} catch (Exception e) {
onConfigFileUpdate(change, file, identifier, ERROR);
if (rethrow) {
throw asConfigException(e);
} else {
LOGGER.error(
"{} config for '{}' from {} failed: {}",
change.capitalizedAction(),
identifier,
file,
e.getMessage(),
e);
}
}
} else if (event == FileEvent.DELETED) {
if (Files.exists(file)) {
// Ignore delete event if the file still exist. It refers to the file
// that this pointed to before an update, which is deleted from disk
// after an update. In some cases the update of the symlink related
// to chained symlinks are slightly delayed, so we get stray delete
// events when we still have a file.
return;
}
var current = loadedConfigMap.get(file);
if (current != null) {
var identifier = configIdentifier.apply(current.config);
try {
onConfigFileRead(FileEvent.DELETED, file, OK);
onConfigChange(DELETED, current.config);
onConfigFileUpdate(DELETED, file, identifier, OK);
} catch (Exception e) {
LOGGER.error(
"Failed to remove network {} service from config: {}",
identifier,
file,
e);
onConfigFileUpdate(DELETED, file, identifier, ERROR);
} finally {
// always remove from cache, should only trigger once,
// even on exception.
loadedConfigMap.remove(file);
}
}
}
}
private ConfigFileType readConfig(FileEvent fileEvent, Path file, Path canonical) throws ConfigException {
try {
var config = configReader.readConfig(canonical);
onConfigFileRead(fileEvent, file, OK);
return config;
} catch (ConfigException e) {
onConfigFileRead(fileEvent, file, PARSE_FAILED);
throw e;
} catch (IOException e) {
onConfigFileRead(fileEvent, file, READ_FAILED);
throw new ConfigException("Unable to read config: " + e.getMessage(), e);
}
}
private void onConfigChange(ConfigChangeType changeType, ConfigFileType config) throws ConfigException {
var ex = new ConfigExceptionBuffer();
for (var cl : changeListeners) {
try {
cl.onConfigChange(changeType, config);
} catch (Exception e) {
ex.update(e);
}
}
ex.throwIfPresent();
}
private void onConfigFileRead(
FileEvent fileEvent,
Path file,
ConfigEventListener.Status status) {
for (var el : eventListeners) {
try {
el.onConfigFileRead(fileEvent, file, status);
} catch (Exception e) {
LOGGER.warn("Exception in config event listener: {}", e.getMessage(), e);
}
}
}
private void onConfigFileUpdate(
ConfigChangeType changeType,
Path file,
String identifier,
ConfigEventListener.Status status) {
for (var el : eventListeners) {
try {
el.onConfigFileUpdate(changeType, file, identifier, status);
} catch (Exception e) {
LOGGER.warn("Exception in config event listener: {}", e.getMessage(), e);
}
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigWatcher.class);
static final Supplier<DirWatcher> DIR_WATCHER = new LazyCachedSupplier<>(DirWatcher::new);
static final Supplier<FileWatcher> FILE_WATCHER = new LazyCachedSupplier<>(() -> new FileWatcher(DIR_WATCHER.get()));
private final ConfigReader<ConfigFileType> configReader;
private final Function<ConfigFileType, String> configIdentifier;
private final List<ConfigEventListener> eventListeners;
private final List<ConfigChangeListener<ConfigFileType>> changeListeners;
private final Map<Path, ConfigEntry<ConfigFileType>> loadedConfigMap;
private final Clock clock;
private final Path configDir;
private final Object mutex = new Object();
private final DirWatcher dirWatcher;
private final FileWatcher fileWatcher;
private final FileEventListener fileEventListener;
private final FileEventListener dirEventListener;
}