HazelcastMessageListStorage.java

package net.morimekta.providence.storage.hazelcast;

import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.storage.MessageListStore;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static net.morimekta.util.collect.UnmodifiableList.copyOf;

/**
 * Note that the hazelcast message store is backed by the PMessage serializable
 * properties, which makes the message field not indexed. If that is needed,
 * use the {@link HazelcastMessageBuilderStorage} instead.
 */
public class HazelcastMessageListStorage<
        K, M extends PMessage<M>>
        implements MessageListStore<K,M> {
    private final IMap<K, List<M>> hazelcastMap;

    public HazelcastMessageListStorage(IMap<K, List<M>> hazelcastMap) {
        this.hazelcastMap = hazelcastMap;
    }

    @Override
    public void putAll(@Nonnull Map<K, List<M>> values) {
        Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>();
        values.forEach((key, message) -> {
            // Make sure to use a serializable list.
            futureMap.put(key, hazelcastMap.putAsync(key, new ArrayList<>(message)));
        });
        futureMap.forEach((key, future) -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Override
    public void removeAll(Collection<K> keys) {
        Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>();
        keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
        futureMap.forEach((key, future) -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Nonnull
    @Override
    public Map<K, List<M>> getAll(@Nonnull Collection<K> keys) {
        Map<K, List<M>> ret = new HashMap<>();
        hazelcastMap.getAll(new HashSet<>(keys))
                    .forEach((k, v) -> {
                        if (v != null) {
                            ret.put(k, copyOf(v));
                        }
                    });
        return ret;
    }

    @Override
    public boolean containsKey(@Nonnull K key) {
        return hazelcastMap.containsKey(key);
    }

    @Nonnull
    @Override
    public Collection<K> keys() {
        return hazelcastMap.keySet();
    }

    @Override
    public int size() {
        return hazelcastMap.size();
    }
}