HazelcastMessageSetStorage.java

package net.morimekta.providence.storage.hazelcast;

import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.storage.MessageSetStore;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/**
 * Note that the hazelcast message store is backed by the PMessage
 * serializable property, which makes the message fields <b>not indexable</b>.
 * If that is needed, use the {@link HazelcastMessageBuilderStorage} instead.
 * <p>
 * On the other hand, this type of map is somewhat more efficient, and does not
 * require the message to be generated with hazelcast portable
 * support.
 */
public class HazelcastMessageSetStorage<Key, Message extends PMessage<Message>>
        implements MessageSetStore<Key, Message> {
    private final IMap<Key, Message> hazelcastMap;
    private final Function<Message, Key> messageToKey;

    public HazelcastMessageSetStorage(Function<Message, Key> messageToKey,
                                      IMap<Key, Message> hazelcastMap) {
        this.messageToKey = messageToKey;
        this.hazelcastMap = hazelcastMap;
    }

    @Override
    public void putAll(@Nonnull Collection<Message> values) {
        Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>();
        values.forEach(message -> {
            Key key = messageToKey.apply(message);
            futureMap.put(key, hazelcastMap.putAsync(key, message));
        });
        futureMap.forEach((key, future) -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Override
    public void removeAll(Collection<Key> keys) {
        Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>();
        keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
        futureMap.forEach((key, future) -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        });
    }

    @Nonnull
    @Override
    public Map<Key, Message> getAll(@Nonnull Collection<Key> keys) {
        Map<Key, Message> out = new HashMap<>();
        hazelcastMap.getAll(new HashSet<>(keys))
                    .forEach((key, v) -> {
                        if (v != null) {
                            out.put(key, v);
                        }
                    });
        return out;
    }

    @Override
    public boolean containsKey(@Nonnull Key key) {
        return hazelcastMap.containsKey(key);
    }

    @Nonnull
    @Override
    public Collection<Key> keys() {
        return hazelcastMap.keySet();
    }

    @Override
    public int size() {
        return hazelcastMap.size();
    }
}