DirectoryMessageSetStore.java
package net.morimekta.providence.storage;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.descriptor.PMessageDescriptor;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.storage.dir.FileManager;
import net.morimekta.util.concurrent.ReadWriteMutex;
import net.morimekta.util.concurrent.ReentrantReadWriteMutex;
import javax.annotation.Nonnull;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static net.morimekta.util.collect.Unmodifiables.toMap;
/**
* Simple file-based set storage of providence messages that keeps
* an in-memory key index, a message cache, and stores messages
* to individual files in a single directly.
* <p>
* Note that the directory store is <b>not</b> parallel compatible between
* instances, as all of them would be able to read, write etc all
* the files all the time.
* <p>
* <b>TL;DR Each directory can only have one
* {@link DirectoryMessageSetStore} instance active at a time.</b>
*
* @param <K> The key type.
* @param <M> The stored message type.
*/
public class DirectoryMessageSetStore<K, M extends PMessage<M>>
implements MessageSetStore<K,M>, Closeable {
private final ReadWriteMutex mutex;
private final Set<K> keyset;
private final FileManager<K> manager;
private final Serializer serializer;
private final PMessageDescriptor<M> descriptor;
private final LoadingCache<K, M> cache;
private final Function<M, K> messageToKey;
public DirectoryMessageSetStore(@Nonnull FileManager<K> manager,
@Nonnull Function<M, K> messageToKey,
@Nonnull PMessageDescriptor<M> descriptor,
@Nonnull Serializer serializer) {
this.messageToKey = messageToKey;
this.manager = manager;
this.mutex = new ReentrantReadWriteMutex();
this.keyset = new HashSet<>(manager.initialKeySet());
this.descriptor = descriptor;
this.serializer = serializer;
this.cache = Caffeine.newBuilder().build(this::read);
}
@Override
public boolean containsKey(@Nonnull K key) {
return mutex.lockForReading(() -> keyset.contains(key));
}
@Override @Nonnull
public Collection<K> keys() {
return mutex.lockForReading(() -> new HashSet<>(keyset));
}
@Override
public int size() {
return keyset.size();
}
@Override @Nonnull
public Map<K, M> getAll(@Nonnull Collection<K> keys) {
return mutex.lockForReading(() -> new HashMap<>(cache.getAll(keys)));
}
@Override
public void putAll(@Nonnull Collection<M> values) {
Map<K, M> map = values.stream().collect(toMap(messageToKey));
mutex.lockForWriting(() -> {
map.forEach(this::write);
cache.putAll(map);
keyset.addAll(map.keySet());
});
}
@Override
public void removeAll(Collection<K> keys) {
mutex.lockForWriting(() -> {
keyset.removeAll(keys);
cache.invalidateAll(keys);
keys.forEach(this::delete);
});
}
private M read(K key) {
try (FileInputStream fis = new FileInputStream(manager.getFileFor(key).toFile());
BufferedInputStream bis = new BufferedInputStream(fis)) {
M message = serializer.deserialize(bis, descriptor);
keyset.add(key);
return message;
} catch (FileNotFoundException fnf) {
return null;
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private void write(K key, M message) {
try {
Path tmp = manager.tmpFileFor(key);
Path file = manager.getFileFor(key);
if (!Files.deleteIfExists(tmp)) {
Files.createDirectories(tmp.getParent());
}
try (FileOutputStream fos = new FileOutputStream(tmp.toFile(), false);
BufferedOutputStream bos = new BufferedOutputStream(fos)) {
serializer.serialize(bos, message);
bos.flush();
} catch (IOException e) {
throw new IOException("Unable to write " + key.toString(), e);
}
Files.createDirectories(file.getParent());
Files.move(tmp, file, REPLACE_EXISTING, ATOMIC_MOVE);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
private void delete(K key) {
try {
Path tmp = manager.getFileFor(key);
Files.createDirectories(tmp.getParent());
Files.deleteIfExists(tmp);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
@Override
public void close() {
cache.invalidateAll();
keyset.clear();
}
}