DirectoryMessageListStore.java

package net.morimekta.providence.storage;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.descriptor.PMessageDescriptor;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.storage.dir.FileManager;
import net.morimekta.providence.streams.MessageCollectors;
import net.morimekta.providence.streams.MessageStreams;
import net.morimekta.util.collect.Unmodifiables;
import net.morimekta.util.concurrent.ReadWriteMutex;
import net.morimekta.util.concurrent.ReentrantReadWriteMutex;

import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static net.morimekta.util.collect.UnmodifiableList.copyOf;

/**
 * Simple file-based storage of lists of providence messages that keeps
 * an in-memory key index, a message cache, and stores message lists
 * to individual files in a single directly.
 * <p>
 * Note that the directory store is <b>not</b> parallel compatible between
 * instances, as all of them would be able to read, write etc all
 * the files all the time.
 * <p>
 * <b>TL;DR Each directory can only have one
 * {@link DirectoryMessageListStore} instance active at a time.</b>
 *
 * @param <K> The key type.
 * @param <M> The stored message type.
 */
public class DirectoryMessageListStore<K, M extends PMessage<M>>
        implements MessageListStore<K,M>, Closeable {
    private final ReadWriteMutex        mutex;
    private final Set<K>                keyset;
    private final FileManager<K>        manager;
    private final Serializer            serializer;
    private final PMessageDescriptor<M> descriptor;
    private final LoadingCache<K, List<M>> cache;

    public DirectoryMessageListStore(@Nonnull FileManager<K> manager,
                                     @Nonnull PMessageDescriptor<M> descriptor,
                                     @Nonnull Serializer serializer) {
        this.manager = manager;
        this.mutex = new ReentrantReadWriteMutex();
        this.keyset = new HashSet<>(manager.initialKeySet());
        this.descriptor = descriptor;
        this.serializer = serializer;
        this.cache = Caffeine.newBuilder().build(this::read);
    }

    @Override
    public boolean containsKey(@Nonnull K key) {
        return mutex.lockForReading(() -> keyset.contains(key));
    }

    @Override @Nonnull
    public Collection<K> keys() {
        return mutex.lockForReading(() -> new HashSet<>(keyset));
    }

    @Override
    public int size() {
        return keyset.size();
    }

    @Override @Nonnull
    public Map<K, List<M>> getAll(@Nonnull Collection<K> keys) {
        return mutex.lockForReading(() -> new HashMap<>(cache.getAll(keys)));
    }

    @Override
    public void putAll(@Nonnull Map<K, List<M>> values) {
        Map<K, List<M>> immutable = new HashMap<>();
        values.forEach((k, v) -> immutable.put(k, copyOf(v)));
        mutex.lockForWriting(() -> {
            immutable.forEach(this::write);
            cache.putAll(immutable);
            keyset.addAll(immutable.keySet());
        });
    }

    @Override
    public void removeAll(Collection<K> keys) {
        mutex.lockForWriting(() -> {
            keyset.removeAll(keys);
            cache.invalidateAll(keys);
            keys.forEach(this::delete);
        });
    }

    private List<M> read(K key) {
        try {
            List<M> message = MessageStreams
                    .file(manager.getFileFor(key).toFile(), serializer, descriptor)
                    .collect(Unmodifiables.toList());
            keyset.add(key);
            return message;
        } catch (FileNotFoundException fnf) {
            return null;
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    private void write(K key, List<M> message) {
        try {
            Path tmp  = manager.tmpFileFor(key);
            Path file = manager.getFileFor(key);

            if (!Files.deleteIfExists(tmp)) {
                Files.createDirectories(tmp.getParent());
            }
            try {
                @SuppressWarnings("unused")
                int i = message.stream().collect(MessageCollectors.toPath(tmp, serializer));
                keyset.add(key);
            } catch (UncheckedIOException e) {
                throw new IOException("Unable to write " + key.toString(), e.getCause());
            }
            Files.createDirectories(file.getParent());
            Files.move(tmp, file, REPLACE_EXISTING, ATOMIC_MOVE);
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    private void delete(K key) {
        try {
            Path file = manager.getFileFor(key);
            Files.createDirectories(file.getParent());
            Files.deleteIfExists(file);
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    @Override
    public void close() {
        cache.invalidateAll();
        keyset.clear();
    }
}