DirectoryMessageListStore.java
package net.morimekta.providence.storage;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.descriptor.PMessageDescriptor;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.storage.dir.FileManager;
import net.morimekta.providence.streams.MessageCollectors;
import net.morimekta.providence.streams.MessageStreams;
import net.morimekta.util.collect.Unmodifiables;
import net.morimekta.util.concurrent.ReadWriteMutex;
import net.morimekta.util.concurrent.ReentrantReadWriteMutex;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static net.morimekta.util.collect.UnmodifiableList.copyOf;
/**
* Simple file-based storage of lists of providence messages that keeps
* an in-memory key index, a message cache, and stores message lists
* to individual files in a single directly.
* <p>
* Note that the directory store is <b>not</b> parallel compatible between
* instances, as all of them would be able to read, write etc all
* the files all the time.
* <p>
* <b>TL;DR Each directory can only have one
* {@link DirectoryMessageListStore} instance active at a time.</b>
*
* @param <K> The key type.
* @param <M> The stored message type.
*/
public class DirectoryMessageListStore<K, M extends PMessage<M>>
implements MessageListStore<K,M>, Closeable {
private final ReadWriteMutex mutex;
private final Set<K> keyset;
private final FileManager<K> manager;
private final Serializer serializer;
private final PMessageDescriptor<M> descriptor;
private final LoadingCache<K, List<M>> cache;
public DirectoryMessageListStore(@Nonnull FileManager<K> manager,
@Nonnull PMessageDescriptor<M> descriptor,
@Nonnull Serializer serializer) {
this.manager = manager;
this.mutex = new ReentrantReadWriteMutex();
this.keyset = new HashSet<>(manager.initialKeySet());
this.descriptor = descriptor;
this.serializer = serializer;
this.cache = Caffeine.newBuilder().build(this::read);
}
@Override
public boolean containsKey(@Nonnull K key) {
return mutex.lockForReading(() -> keyset.contains(key));
}
@Override @Nonnull
public Collection<K> keys() {
return mutex.lockForReading(() -> new HashSet<>(keyset));
}
@Override
public int size() {
return keyset.size();
}
@Override @Nonnull
public Map<K, List<M>> getAll(@Nonnull Collection<K> keys) {
return mutex.lockForReading(() -> new HashMap<>(cache.getAll(keys)));
}
@Override
public void putAll(@Nonnull Map<K, List<M>> values) {
Map<K, List<M>> immutable = new HashMap<>();
values.forEach((k, v) -> immutable.put(k, copyOf(v)));
mutex.lockForWriting(() -> {
immutable.forEach(this::write);
cache.putAll(immutable);
keyset.addAll(immutable.keySet());
});
}
@Override
public void removeAll(Collection<K> keys) {
mutex.lockForWriting(() -> {
keyset.removeAll(keys);
cache.invalidateAll(keys);
keys.forEach(this::delete);
});
}
private List<M> read(K key) {
try {
List<M> message = MessageStreams
.file(manager.getFileFor(key).toFile(), serializer, descriptor)
.collect(Unmodifiables.toList());
keyset.add(key);
return message;
} catch (FileNotFoundException fnf) {
return null;
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private void write(K key, List<M> message) {
try {
Path tmp = manager.tmpFileFor(key);
Path file = manager.getFileFor(key);
if (!Files.deleteIfExists(tmp)) {
Files.createDirectories(tmp.getParent());
}
try {
@SuppressWarnings("unused")
int i = message.stream().collect(MessageCollectors.toPath(tmp, serializer));
keyset.add(key);
} catch (UncheckedIOException e) {
throw new IOException("Unable to write " + key.toString(), e.getCause());
}
Files.createDirectories(file.getParent());
Files.move(tmp, file, REPLACE_EXISTING, ATOMIC_MOVE);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private void delete(K key) {
try {
Path file = manager.getFileFor(key);
Files.createDirectories(file.getParent());
Files.deleteIfExists(file);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
@Override
public void close() {
cache.invalidateAll();
keyset.clear();
}
}