DirectoryMessageSetStore.java
- package net.morimekta.providence.storage;
- import com.github.benmanes.caffeine.cache.Caffeine;
- import com.github.benmanes.caffeine.cache.LoadingCache;
- import net.morimekta.providence.PMessage;
- import net.morimekta.providence.descriptor.PMessageDescriptor;
- import net.morimekta.providence.serializer.Serializer;
- import net.morimekta.providence.storage.dir.FileManager;
- import net.morimekta.util.concurrent.ReadWriteMutex;
- import net.morimekta.util.concurrent.ReentrantReadWriteMutex;
- import javax.annotation.Nonnull;
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.Closeable;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.UncheckedIOException;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Map;
- import java.util.Set;
- import java.util.function.Function;
- import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
- import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
- import static net.morimekta.util.collect.Unmodifiables.toMap;
- /**
- * Simple file-based set storage of providence messages that keeps
- * an in-memory key index, a message cache, and stores messages
- * to individual files in a single directly.
- * <p>
- * Note that the directory store is <b>not</b> parallel compatible between
- * instances, as all of them would be able to read, write etc all
- * the files all the time.
- * <p>
- * <b>TL;DR Each directory can only have one
- * {@link DirectoryMessageSetStore} instance active at a time.</b>
- *
- * @param <K> The key type.
- * @param <M> The stored message type.
- */
- public class DirectoryMessageSetStore<K, M extends PMessage<M>>
- implements MessageSetStore<K,M>, Closeable {
- private final ReadWriteMutex mutex;
- private final Set<K> keyset;
- private final FileManager<K> manager;
- private final Serializer serializer;
- private final PMessageDescriptor<M> descriptor;
- private final LoadingCache<K, M> cache;
- private final Function<M, K> messageToKey;
- public DirectoryMessageSetStore(@Nonnull FileManager<K> manager,
- @Nonnull Function<M, K> messageToKey,
- @Nonnull PMessageDescriptor<M> descriptor,
- @Nonnull Serializer serializer) {
- this.messageToKey = messageToKey;
- this.manager = manager;
- this.mutex = new ReentrantReadWriteMutex();
- this.keyset = new HashSet<>(manager.initialKeySet());
- this.descriptor = descriptor;
- this.serializer = serializer;
- this.cache = Caffeine.newBuilder().build(this::read);
- }
- @Override
- public boolean containsKey(@Nonnull K key) {
- return mutex.lockForReading(() -> keyset.contains(key));
- }
- @Override @Nonnull
- public Collection<K> keys() {
- return mutex.lockForReading(() -> new HashSet<>(keyset));
- }
- @Override
- public int size() {
- return keyset.size();
- }
- @Override @Nonnull
- public Map<K, M> getAll(@Nonnull Collection<K> keys) {
- return mutex.lockForReading(() -> new HashMap<>(cache.getAll(keys)));
- }
- @Override
- public void putAll(@Nonnull Collection<M> values) {
- Map<K, M> map = values.stream().collect(toMap(messageToKey));
- mutex.lockForWriting(() -> {
- map.forEach(this::write);
- cache.putAll(map);
- keyset.addAll(map.keySet());
- });
- }
- @Override
- public void removeAll(Collection<K> keys) {
- mutex.lockForWriting(() -> {
- keyset.removeAll(keys);
- cache.invalidateAll(keys);
- keys.forEach(this::delete);
- });
- }
- private M read(K key) {
- try (FileInputStream fis = new FileInputStream(manager.getFileFor(key).toFile());
- BufferedInputStream bis = new BufferedInputStream(fis)) {
- M message = serializer.deserialize(bis, descriptor);
- keyset.add(key);
- return message;
- } catch (FileNotFoundException fnf) {
- return null;
- } catch (IOException e) {
- throw new UncheckedIOException(e.getMessage(), e);
- }
- }
- private void write(K key, M message) {
- try {
- Path tmp = manager.tmpFileFor(key);
- Path file = manager.getFileFor(key);
- if (!Files.deleteIfExists(tmp)) {
- Files.createDirectories(tmp.getParent());
- }
- try (FileOutputStream fos = new FileOutputStream(tmp.toFile(), false);
- BufferedOutputStream bos = new BufferedOutputStream(fos)) {
- serializer.serialize(bos, message);
- bos.flush();
- } catch (IOException e) {
- throw new IOException("Unable to write " + key.toString(), e);
- }
- Files.createDirectories(file.getParent());
- Files.move(tmp, file, REPLACE_EXISTING, ATOMIC_MOVE);
- } catch (IOException e) {
- throw new UncheckedIOException(e.getMessage(), e);
- }
- }
- private void delete(K key) {
- try {
- Path tmp = manager.getFileFor(key);
- Files.createDirectories(tmp.getParent());
- Files.deleteIfExists(tmp);
- } catch (IOException e) {
- throw new UncheckedIOException(e.getMessage(), e);
- }
- }
- @Override
- public void close() {
- cache.invalidateAll();
- keyset.clear();
- }
- }