DirectoryMessageSetStore.java

  1. package net.morimekta.providence.storage;

  2. import com.github.benmanes.caffeine.cache.Caffeine;
  3. import com.github.benmanes.caffeine.cache.LoadingCache;
  4. import net.morimekta.providence.PMessage;
  5. import net.morimekta.providence.descriptor.PMessageDescriptor;
  6. import net.morimekta.providence.serializer.Serializer;
  7. import net.morimekta.providence.storage.dir.FileManager;
  8. import net.morimekta.util.concurrent.ReadWriteMutex;
  9. import net.morimekta.util.concurrent.ReentrantReadWriteMutex;

  10. import javax.annotation.Nonnull;
  11. import java.io.BufferedInputStream;
  12. import java.io.BufferedOutputStream;
  13. import java.io.Closeable;
  14. import java.io.FileInputStream;
  15. import java.io.FileNotFoundException;
  16. import java.io.FileOutputStream;
  17. import java.io.IOException;
  18. import java.io.UncheckedIOException;
  19. import java.nio.file.Files;
  20. import java.nio.file.Path;
  21. import java.util.Collection;
  22. import java.util.HashMap;
  23. import java.util.HashSet;
  24. import java.util.Map;
  25. import java.util.Set;
  26. import java.util.function.Function;

  27. import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
  28. import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
  29. import static net.morimekta.util.collect.Unmodifiables.toMap;

  30. /**
  31.  * Simple file-based set storage of providence messages that keeps
  32.  * an in-memory key index, a message cache, and stores messages
  33.  * to individual files in a single directly.
  34.  * <p>
  35.  * Note that the directory store is <b>not</b> parallel compatible between
  36.  * instances, as all of them would be able to read, write etc all
  37.  * the files all the time.
  38.  * <p>
  39.  * <b>TL;DR Each directory can only have one
  40.  * {@link DirectoryMessageSetStore} instance active at a time.</b>
  41.  *
  42.  * @param <K> The key type.
  43.  * @param <M> The stored message type.
  44.  */
  45. public class DirectoryMessageSetStore<K, M extends PMessage<M>>
  46.         implements MessageSetStore<K,M>, Closeable {
  47.     private final ReadWriteMutex        mutex;
  48.     private final Set<K>                keyset;
  49.     private final FileManager<K>        manager;
  50.     private final Serializer            serializer;
  51.     private final PMessageDescriptor<M> descriptor;
  52.     private final LoadingCache<K, M>    cache;
  53.     private final Function<M, K>        messageToKey;

  54.     public DirectoryMessageSetStore(@Nonnull FileManager<K> manager,
  55.                                     @Nonnull Function<M, K> messageToKey,
  56.                                     @Nonnull PMessageDescriptor<M> descriptor,
  57.                                     @Nonnull Serializer serializer) {
  58.         this.messageToKey = messageToKey;
  59.         this.manager = manager;
  60.         this.mutex = new ReentrantReadWriteMutex();
  61.         this.keyset = new HashSet<>(manager.initialKeySet());
  62.         this.descriptor = descriptor;
  63.         this.serializer = serializer;
  64.         this.cache = Caffeine.newBuilder().build(this::read);
  65.     }

  66.     @Override
  67.     public boolean containsKey(@Nonnull K key) {
  68.         return mutex.lockForReading(() -> keyset.contains(key));
  69.     }

  70.     @Override @Nonnull
  71.     public Collection<K> keys() {
  72.         return mutex.lockForReading(() -> new HashSet<>(keyset));
  73.     }

  74.     @Override
  75.     public int size() {
  76.         return keyset.size();
  77.     }

  78.     @Override @Nonnull
  79.     public Map<K, M> getAll(@Nonnull Collection<K> keys) {
  80.         return mutex.lockForReading(() -> new HashMap<>(cache.getAll(keys)));
  81.     }

  82.     @Override
  83.     public void putAll(@Nonnull Collection<M> values) {
  84.         Map<K, M> map = values.stream().collect(toMap(messageToKey));
  85.         mutex.lockForWriting(() -> {
  86.             map.forEach(this::write);
  87.             cache.putAll(map);
  88.             keyset.addAll(map.keySet());
  89.         });
  90.     }

  91.     @Override
  92.     public void removeAll(Collection<K> keys) {
  93.         mutex.lockForWriting(() -> {
  94.             keyset.removeAll(keys);
  95.             cache.invalidateAll(keys);
  96.             keys.forEach(this::delete);
  97.         });
  98.     }

  99.     private M read(K key) {
  100.         try (FileInputStream fis = new FileInputStream(manager.getFileFor(key).toFile());
  101.              BufferedInputStream bis = new BufferedInputStream(fis)) {
  102.             M message = serializer.deserialize(bis, descriptor);
  103.             keyset.add(key);
  104.             return message;
  105.         } catch (FileNotFoundException fnf) {
  106.             return null;
  107.         } catch (IOException e) {
  108.             throw new UncheckedIOException(e.getMessage(), e);
  109.         }
  110.     }

  111.     private void write(K key, M message) {
  112.         try {
  113.             Path tmp  = manager.tmpFileFor(key);
  114.             Path file = manager.getFileFor(key);

  115.             if (!Files.deleteIfExists(tmp)) {
  116.                 Files.createDirectories(tmp.getParent());
  117.             }
  118.             try (FileOutputStream fos = new FileOutputStream(tmp.toFile(), false);
  119.                  BufferedOutputStream bos = new BufferedOutputStream(fos)) {
  120.                 serializer.serialize(bos, message);
  121.                 bos.flush();
  122.             } catch (IOException e) {
  123.                 throw new IOException("Unable to write " + key.toString(), e);
  124.             }
  125.             Files.createDirectories(file.getParent());
  126.             Files.move(tmp, file, REPLACE_EXISTING, ATOMIC_MOVE);
  127.         } catch (IOException e) {
  128.             throw new UncheckedIOException(e.getMessage(), e);
  129.         }
  130.     }

  131.     private void delete(K key) {
  132.         try {
  133.             Path tmp = manager.getFileFor(key);
  134.             Files.createDirectories(tmp.getParent());
  135.             Files.deleteIfExists(tmp);
  136.         } catch (IOException e) {
  137.             throw new UncheckedIOException(e.getMessage(), e);
  138.         }
  139.     }

  140.     @Override
  141.     public void close() {
  142.         cache.invalidateAll();
  143.         keyset.clear();
  144.     }
  145. }