MessageUpserter.java
/*
* Copyright 2018-2019 Providence Authors
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.providence.jdbi.v2;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageOrBuilder;
import net.morimekta.providence.descriptor.PField;
import net.morimekta.providence.descriptor.PMessageDescriptor;
import net.morimekta.util.collect.UnmodifiableList;
import net.morimekta.util.collect.UnmodifiableMap;
import net.morimekta.util.collect.UnmodifiableSet;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Update;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static net.morimekta.providence.jdbi.v2.MessageFieldArgument.getDefaultColumnType;
/**
* Helper class to handle inserting content from messages into a table.
* The helper will only select values form the message itself, not using
* nested structure or anything like that.
*
* The inserter is built in such a way that you can create the inserter
* (even as a static field), and use it any number of times with a handle
* to do the pre-programmed insert. The execute method is thread safe, as
* long as none of the modification methods are called.
*
* <pre>{@code
* class MyInserter {
* private static final MessageUpserter<MyMessage,MyMessage._Field> INSERTER =
* new MessageUpserter.Builder<>("my_message")
* .set(MyMessage.UUID, MyMessage.NAME)
* .set("amount", MyMessage.VALUE, Types.INTEGER) // DOUBLE -> INTEGER
* .onDuplicateKeyUpdate(MyMessage.VALUE)
* .build();
*
* private final Jdbi dbi;
*
* public MyInserter(Jdbi dbi) {
* this.dbi = dbi;
* }
*
* int insert(HandleMyMessage... messages) {
* try (Handle handle = dbi.open()) {
* return INSERTER.execute(handle, messages);
* }
* }
* }
* }</pre>
*
* Or it can be handled in line where needed. The building process is pretty cheap,
* so this should not be a problem unless it is called <i>a lot</i> for very small
* message.
*
* <pre>{@code
* class MyInserter {
* int insert(HandleMyMessage... messages) {
* try (Handle handle = dbi.open()) {
* return new MessageUpserter.Builder<MyMessage,MyMessage._Field>("my_message")
* .set(MyMessage.UUID, MyMessage.NAME)
* .set("amount", MyMessage.VALUE, Types.INTEGER) // DOUBLE -> INTEGER
* .onDuplicateKeyUpdateAllExcept(MyMessage.UUID)
* .build()
* .execute(handle, messages);
* }
* }
* }
* }</pre>
*
* The rules for using this is pretty simple:
*
* <ul>
* <li>
* All fields set must be specified before onDuplicateKey* behavior.
* </li>
* <li>
* Only one of <code>onDuplicateKeyIgnore</code> and <code>onDuplicateKeyUpdate</code>
* can be set.
* </li>
* <li>
* <code>execute(...)</code> can be called any number of times, and is thread safe.
* </li>
* </ul>
*/
public class MessageUpserter<M extends PMessage<M>> {
private final String queryPrefix;
private final String querySuffix;
private final Map<String, PField<M>> columnToFieldMap;
private final Map<String, Integer> columnTypeMap;
private final List<String> columnOrder;
private final String valueMarkers;
private MessageUpserter(String queryPrefix,
String querySuffix,
List<String> columnOrder,
Map<String, PField<M>> columnToFieldMap,
Map<String, Integer> columnTypeMap) {
this.queryPrefix = queryPrefix;
this.querySuffix = querySuffix;
this.columnOrder = UnmodifiableList.copyOf(columnOrder);
this.columnToFieldMap = UnmodifiableMap.copyOf(columnToFieldMap);
this.columnTypeMap = UnmodifiableMap.copyOf(columnTypeMap);
this.valueMarkers = "(" + columnOrder.stream()
.map(k -> "?")
.collect(Collectors.joining(",")) + ")";
}
@Override
public String toString() {
return queryPrefix + " (...) " + querySuffix;
}
@SafeVarargs
public final int execute(@Nonnull Handle handle, @Nonnull PMessageOrBuilder<M>... items) {
return execute(handle, UnmodifiableList.copyOf(items));
}
public <MB extends PMessageOrBuilder<M>>
int execute(@Nonnull Handle handle, @Nonnull Collection<MB> items) {
if (items.isEmpty()) {
return 0;
}
String query = queryPrefix + items.stream()
.map(item -> valueMarkers)
.collect(Collectors.joining(", ")) +
querySuffix;
Update update = handle.createStatement(query);
int offset = 0;
for (PMessageOrBuilder<M> item : items) {
for (String column : columnOrder) {
PField<M> field = columnToFieldMap.get(column);
int type = columnTypeMap.get(column);
update.bind(offset++, new MessageFieldArgument<>(item, field, type));
}
}
return update.execute();
}
public static class Builder<M extends PMessage<M>> {
private final String intoTable;
private final Map<String, PField<M>> columnToFieldMap;
private final Map<String, Integer> columnTypeMap;
private final Set<String> onDuplicateUpdate;
private final AtomicBoolean onDuplicateIgnore;
private final PMessageDescriptor<M> descriptor;
/**
* Create a message inserter builder.
*
* @param descriptor The type descriptor.
* @param intoTable The table name to insert info.
*/
public Builder(@Nonnull PMessageDescriptor<M> descriptor,
@Nonnull String intoTable) {
this.descriptor = descriptor;
this.intoTable = intoTable;
this.columnToFieldMap = new LinkedHashMap<>();
this.columnTypeMap = new HashMap<>();
this.onDuplicateUpdate = new TreeSet<>();
this.onDuplicateIgnore = new AtomicBoolean();
}
/**
* Set all fields not already handled with default name and type.
*
* @return The builder.
*/
public final Builder<M> setAll() {
for (PField<M> field : descriptor.getFields()) {
// Not efficient, but should not be called often.
if (!columnToFieldMap.containsValue(field)) {
set(field);
}
}
return this;
}
/**
* Set all fields with defaults.
*
* @param except Fields to exclude.
* @return The builder.
*/
@SafeVarargs
public final Builder<M> setAllExcept(PField<M>... except) {
return setAllExcept(UnmodifiableSet.copyOf(except));
}
/**
* Set all fields with defaults.
*
* @param except Fields to exclude.
* @return The builder.
*/
public Builder<M> setAllExcept(Collection<PField<M>> except) {
for (PField<M> field : descriptor.getFields()) {
if (!except.contains(field)) {
set(field.getName(), field, getDefaultColumnType(field));
}
}
return this;
}
/**
* Set the specific fields with default name and type.
*
* @param fields The fields to be set.
* @return The builder.
*/
@SafeVarargs
public final Builder<M> set(PField<M>... fields) {
return set(UnmodifiableList.copyOf(fields));
}
/**
* Set the specific fields with default name and type.
*
* @param fields The fields to be set.
* @return The builder.
*/
public final Builder<M> set(Collection<PField<M>> fields) {
for (PField<M> field : fields) {
set(field.getName(), field, getDefaultColumnType(field));
}
return this;
}
/**
* Set the specific field with name and default type.
*
* @param column The column name to set.
* @param field The field to be set.
* @return The builder.
*/
public final Builder<M> set(String column, PField<M> field) {
return set(column, field, getDefaultColumnType(field));
}
/**
* Set the specific field with specific type and default name.
*
* @param field The field to be set.
* @param type The field type to set as.
* @return The builder.
*/
public final Builder<M> set(PField<M> field, int type) {
return set(field.getName(), field, type);
}
/**
* Set the specific field with specific name and type.
*
* @param column The column name to set.
* @param field The field to be set.
* @param type The field type to set as.
* @return The builder.
*/
public final Builder<M> set(String column, PField<M> field, int type) {
if (columnToFieldMap.containsKey(column)) {
throw new IllegalArgumentException("Column " + column + " already inserted");
}
if (onDuplicateIgnore.get() || onDuplicateUpdate.size() > 0) {
throw new IllegalStateException("Duplicate key behavior already determined");
}
this.columnToFieldMap.put(column, field);
this.columnTypeMap.put(column, type);
return this;
}
/**
* On duplicate keys update the given fields.
*
* @param fields The fields to update.
* @return The builder.
*/
@SafeVarargs
public final Builder<M> onDuplicateKeyUpdate(PField<M>... fields) {
return onDuplicateKeyUpdate(UnmodifiableList.copyOf(fields));
}
/**
* On duplicate keys update the given fields.
*
* @param fields The fields to update.
* @return The builder.
*/
public final Builder<M> onDuplicateKeyUpdate(Collection<PField<M>> fields) {
List<String> columns = new ArrayList<>(fields.size());
fields.forEach(field -> {
AtomicBoolean found = new AtomicBoolean();
columnToFieldMap.forEach((column, f) -> {
if (f.equals(field)) {
columns.add(column);
found.set(true);
}
});
if (!found.get()) {
throw new IllegalArgumentException("Field " + field + " not inserted");
}
});
return onDuplicateKeyUpdate(columns.toArray(new String[0]));
}
/**
* On duplicate keys update all except the given fields.
*
* @param fields The fields to NOT update.
* @return The builder.
*/
@SafeVarargs
public final Builder<M> onDuplicateKeyUpdateAllExcept(PField<M>... fields) {
return onDuplicateKeyUpdateAllExcept(UnmodifiableList.copyOf(fields));
}
/**
* On duplicate keys update all except the given fields.
*
* @param fields The fields to NOT update.
* @return The builder.
*/
public final Builder<M> onDuplicateKeyUpdateAllExcept(Collection<PField<M>> fields) {
List<String> columns = new ArrayList<>(fields.size());
fields.forEach(field -> {
AtomicBoolean found = new AtomicBoolean();
columnToFieldMap.forEach((column, f) -> {
if (f.equals(field)) {
columns.add(column);
found.set(true);
}
});
if (!found.get()) {
throw new IllegalArgumentException("Field " + field + " not inserted");
}
});
return onDuplicateKeyUpdateAllExcept(columns.toArray(new String[0]));
}
/**
* On duplicate keys update all except the given fields.
*
* @param exceptColumns The column names NOT to update.
* @return The builder.
*/
public final Builder<M> onDuplicateKeyUpdateAllExcept(String... exceptColumns) {
TreeSet<String> columns = new TreeSet<>(columnToFieldMap.keySet());
columns.removeAll(UnmodifiableList.copyOf(exceptColumns));
return onDuplicateKeyUpdate(columns.toArray(new String[0]));
}
/**
* On duplicate keys update the given columns.
*
* @param columns The column names NOT to update.
* @return The builder.
*/
public final Builder<M> onDuplicateKeyUpdate(String... columns) {
if (onDuplicateIgnore.get()) {
throw new IllegalStateException("Duplicate key behavior already set to ignore");
}
Collections.addAll(onDuplicateUpdate, columns);
return this;
}
/**
* On duplicate keys ignore updates.
*
* @return The builder.
*/
public final Builder<M> onDuplicateKeyIgnore() {
if (onDuplicateUpdate.size() > 0) {
throw new IllegalStateException("Duplicate key behavior already set to update");
}
onDuplicateIgnore.set(true);
return this;
}
/**
* @return The final built inserter.
*/
public MessageUpserter<M> build() {
if (columnToFieldMap.isEmpty()) {
throw new IllegalStateException("No columns inserted");
}
List<String> columnOrder = new ArrayList<>(columnToFieldMap.keySet());
StringBuilder prefixBuilder = new StringBuilder("INSERT ");
if (onDuplicateIgnore.get()) {
prefixBuilder.append("IGNORE ");
}
prefixBuilder.append("INTO ")
.append(intoTable)
.append(" (")
.append(columnOrder.stream()
.map(col -> "`" + col + "`")
.collect(Collectors.joining(", ")))
.append(") VALUES ");
StringBuilder suffixBuilder = new StringBuilder();
if (onDuplicateUpdate.size() > 0) {
suffixBuilder.append(" ON DUPLICATE KEY UPDATE");
boolean first = true;
for (String column : onDuplicateUpdate) {
if (first) {
first = false;
} else {
suffixBuilder.append(",");
}
suffixBuilder.append(" `")
.append(column)
.append("` = VALUES(`")
.append(column)
.append("`)");
}
}
return new MessageUpserter<>(prefixBuilder.toString(),
suffixBuilder.toString(),
columnOrder,
columnToFieldMap,
columnTypeMap);
}
}
}