HazelcastMessageListStorage.java
package net.morimekta.providence.storage.hazelcast;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.storage.MessageListStore;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static net.morimekta.util.collect.UnmodifiableList.copyOf;
/**
* Note that the hazelcast message store is backed by the PMessage serializable
* properties, which makes the message field not indexed. If that is needed,
* use the {@link HazelcastMessageBuilderStorage} instead.
*/
public class HazelcastMessageListStorage<
K, M extends PMessage<M>>
implements MessageListStore<K,M> {
private final IMap<K, List<M>> hazelcastMap;
public HazelcastMessageListStorage(IMap<K, List<M>> hazelcastMap) {
this.hazelcastMap = hazelcastMap;
}
@Override
public void putAll(@Nonnull Map<K, List<M>> values) {
Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>();
values.forEach((key, message) -> {
// Make sure to use a serializable list.
futureMap.put(key, hazelcastMap.putAsync(key, new ArrayList<>(message)));
});
futureMap.forEach((key, future) -> {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Override
public void removeAll(Collection<K> keys) {
Map<K, ICompletableFuture<List<M>>> futureMap = new HashMap<>();
keys.forEach(key -> futureMap.put(key, hazelcastMap.removeAsync(key)));
futureMap.forEach((key, future) -> {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e.getMessage(), e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
}
@Nonnull
@Override
public Map<K, List<M>> getAll(@Nonnull Collection<K> keys) {
Map<K, List<M>> ret = new HashMap<>();
hazelcastMap.getAll(new HashSet<>(keys))
.forEach((k, v) -> {
if (v != null) {
ret.put(k, copyOf(v));
}
});
return ret;
}
@Override
public boolean containsKey(@Nonnull K key) {
return hazelcastMap.containsKey(key);
}
@Nonnull
@Override
public Collection<K> keys() {
return hazelcastMap.keySet();
}
@Override
public int size() {
return hazelcastMap.size();
}
}