HazelcastMessageSetBuilderStorage.java
package net.morimekta.providence.storage.hazelcast;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.nio.serialization.Portable;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageBuilder;
import net.morimekta.providence.storage.MessageSetStore;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import static net.morimekta.providence.util.MessageUtil.toBuilderAll;
import static net.morimekta.providence.util.MessageUtil.toMessageValues;
/**
* A message store containing message builders. Note that there are
* no 'list' variants of this type of store. The benefit of using
* the {@link HazelcastMessageSetBuilderStorage} is that it can be
* combined with using the hazelcast Portable indexing and query
* systems.
*
* @see Portable And
* <a href="http://docs.hazelcast.org/docs/3.8.4/manual/html-single/index.html">Hazelcast Docs</a>
* for reference on how to utilize portable and querying the
* data grid.
*/
public class HazelcastMessageSetBuilderStorage<
Key,
Message extends PMessage<Message>,
Builder extends PMessageBuilder<Message> & Portable>
implements MessageSetStore<Key, Message> {
private final IMap<Key, Builder> hazelcastMap;
private final Function<Message, Key> messageToKey;
public HazelcastMessageSetBuilderStorage(Function<Message, Key> messageToKey,
IMap<Key, Builder> hazelcastMap) {
this.messageToKey = messageToKey;
this.hazelcastMap = hazelcastMap;
}
@Override
public void putAll(@Nonnull Collection<Message> values) {
putAllBuilders(toBuilderAll(values));
}
@Override
@SuppressWarnings("unchecked")
public <B extends PMessageBuilder<Message>> void putAllBuilders(@Nonnull Collection<B> builders) {
Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>();
builders.forEach((builder) -> {
Key key = messageToKey.apply(builder.build());
futureMap.put(key, hazelcastMap.putAsync(key, (Builder) builder));
});
futureMap.forEach((key, future) -> {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Override
public void removeAll(Collection<Key> keys) {
Map<Key, ICompletableFuture<Builder>> futureMap = new HashMap<>();
keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
futureMap.forEach((key, builder) -> {
try {
builder.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Nonnull
@Override
public Map<Key, Message> getAll(@Nonnull Collection<Key> keys) {
return toMessageValues(getAllBuilders(keys));
}
@Nonnull
@Override
@SuppressWarnings("unchecked")
public <B extends PMessageBuilder<Message>> Map<Key, B> getAllBuilders(@Nonnull Collection<Key> keys) {
Map<Key, B> out = new HashMap<>();
hazelcastMap.getAll(new HashSet<>(keys))
.forEach((key, v) -> {
if (v != null) {
out.put(key, (B) v);
}
});
return out;
}
@Override
public boolean containsKey(@Nonnull Key key) {
return hazelcastMap.containsKey(key);
}
@Nonnull
@Override
public Collection<Key> keys() {
return hazelcastMap.keySet();
}
@Override
public int size() {
return hazelcastMap.size();
}
}