HazelcastMessageSetStorage.java
package net.morimekta.providence.storage.hazelcast;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.storage.MessageSetStore;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
/**
* Note that the hazelcast message store is backed by the PMessage
* serializable property, which makes the message fields <b>not indexable</b>.
* If that is needed, use the {@link HazelcastMessageBuilderStorage} instead.
* <p>
* On the other hand, this type of map is somewhat more efficient, and does not
* require the message to be generated with hazelcast portable
* support.
*/
public class HazelcastMessageSetStorage<Key, Message extends PMessage<Message>>
implements MessageSetStore<Key, Message> {
private final IMap<Key, Message> hazelcastMap;
private final Function<Message, Key> messageToKey;
public HazelcastMessageSetStorage(Function<Message, Key> messageToKey,
IMap<Key, Message> hazelcastMap) {
this.messageToKey = messageToKey;
this.hazelcastMap = hazelcastMap;
}
@Override
public void putAll(@Nonnull Collection<Message> values) {
Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>();
values.forEach(message -> {
Key key = messageToKey.apply(message);
futureMap.put(key, hazelcastMap.putAsync(key, message));
});
futureMap.forEach((key, future) -> {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Override
public void removeAll(Collection<Key> keys) {
Map<Key, ICompletableFuture<Message>> futureMap = new HashMap<>();
keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
futureMap.forEach((key, future) -> {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Nonnull
@Override
public Map<Key, Message> getAll(@Nonnull Collection<Key> keys) {
Map<Key, Message> out = new HashMap<>();
hazelcastMap.getAll(new HashSet<>(keys))
.forEach((key, v) -> {
if (v != null) {
out.put(key, v);
}
});
return out;
}
@Override
public boolean containsKey(@Nonnull Key key) {
return hazelcastMap.containsKey(key);
}
@Nonnull
@Override
public Collection<Key> keys() {
return hazelcastMap.keySet();
}
@Override
public int size() {
return hazelcastMap.size();
}
}