HazelcastMessageBuilderStorage.java

package net.morimekta.providence.storage.hazelcast;

import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.nio.serialization.Portable;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageBuilder;
import net.morimekta.providence.storage.MessageStore;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static net.morimekta.providence.util.MessageUtil.toBuilderValues;
import static net.morimekta.providence.util.MessageUtil.toMessageValues;

/**
 * A message store containing message builders. Note that there are
 * no 'list' variants of this type of store. The benefit of using
 * the {@link HazelcastMessageBuilderStorage} is that it can be
 * combined with using the hazelcast Portable indexing and query
 * systems.
 *
 * @see Portable And
 *     <a href="http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html">Hazelcast Docs</a>
 *     for reference on how to utilize portable and querying the
 *     data grid.
 */
public class HazelcastMessageBuilderStorage<
        Key,
        Message extends PMessage<Message>,
        Builder extends PMessageBuilder<Message> & Portable>
        implements MessageStore<Key, Message> {
    private final IMap<Key, Builder> hazelcastMap;

    public HazelcastMessageBuilderStorage(IMap<Key, Builder> hazelcastMap) {
        this.hazelcastMap = hazelcastMap;
    }

    @Override
    public void putAll(@Nonnull Map<Key, Message> values) {
        putAllBuilders(toBuilderValues(values));
    }

    @Override
    @SuppressWarnings("unchecked")
    public <B extends PMessageBuilder<Message>> void putAllBuilders(@Nonnull Map<Key, B> builders) {
        Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>();
        builders.forEach((key, builder) -> futureMap.put(key, hazelcastMap.putAsync(key, (Builder) builder)));
        futureMap.forEach((key, future) -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Override
    public void removeAll(Collection<Key> keys) {
        Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>();
        keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
        futureMap.forEach((key, builder) -> {
            try {
                builder.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Nonnull
    @Override
    public Map<Key, Message> getAll(@Nonnull Collection<Key> keys) {
        return toMessageValues(getAllBuilders(keys));
    }

    @Nonnull
    @Override
    @SuppressWarnings("unchecked")
    public <B extends PMessageBuilder<Message>> Map<Key, B> getAllBuilders(@Nonnull Collection<Key> keys) {
        Map<Key, B> out = new HashMap<>();
        hazelcastMap.getAll(new HashSet<>(keys))
                    .forEach((key, v) -> {
                        if (v != null) {
                            out.put(key, (B) v);
                        }
                    });
        return out;
    }

    @Override
    public boolean containsKey(@Nonnull Key key) {
        return hazelcastMap.containsKey(key);
    }

    @Nonnull
    @Override
    public Collection<Key> keys() {
        return hazelcastMap.keySet();
    }

    @Override
    public int size() {
        return hazelcastMap.size();
    }
}