MessageUpserter.java

/*
 * Copyright 2018-2019 Providence Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.providence.jdbi.v2;

import net.morimekta.providence.PMessage;
import net.morimekta.providence.PMessageOrBuilder;
import net.morimekta.providence.descriptor.PField;
import net.morimekta.providence.descriptor.PMessageDescriptor;
import net.morimekta.util.collect.UnmodifiableList;
import net.morimekta.util.collect.UnmodifiableMap;
import net.morimekta.util.collect.UnmodifiableSet;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Update;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static net.morimekta.providence.jdbi.v2.MessageFieldArgument.getDefaultColumnType;

/**
 * Helper class to handle inserting content from messages into a table.
 * The helper will only select values form the message itself, not using
 * nested structure or anything like that.
 *
 * The inserter is built in  such a way that you can create the inserter
 * (even as a static field), and use it any number of times with a handle
 * to do the pre-programmed insert. The execute method is thread safe, as
 * long as none of the modification methods are called.
 *
 * <pre>{@code
 * class MyInserter {
 *     private static final MessageUpserter&lt;MyMessage,MyMessage._Field&gt; INSERTER =
 *             new MessageUpserter.Builder&lt;&gt;("my_message")
 *                     .set(MyMessage.UUID, MyMessage.NAME)
 *                     .set("amount", MyMessage.VALUE, Types.INTEGER)  // DOUBLE -&gt; INTEGER
 *                     .onDuplicateKeyUpdate(MyMessage.VALUE)
 *                     .build();
 *
 *     private final Jdbi dbi;
 *
 *     public MyInserter(Jdbi dbi) {
 *         this.dbi = dbi;
 *     }
 *
 *     int insert(HandleMyMessage... messages) {
 *         try (Handle handle = dbi.open()) {
 *             return INSERTER.execute(handle, messages);
 *         }
 *     }
 * }
 * }</pre>
 *
 * Or it can be handled in line where needed. The building process is pretty cheap,
 * so this should not be a problem unless it is called <i>a lot</i> for very small
 * message.
 *
 * <pre>{@code
 * class MyInserter {
 *     int insert(HandleMyMessage... messages) {
 *         try (Handle handle = dbi.open()) {
 *             return new MessageUpserter.Builder&lt;MyMessage,MyMessage._Field&gt;("my_message")
 *                     .set(MyMessage.UUID, MyMessage.NAME)
 *                     .set("amount", MyMessage.VALUE, Types.INTEGER)  // DOUBLE -&gt; INTEGER
 *                     .onDuplicateKeyUpdateAllExcept(MyMessage.UUID)
 *                     .build()
 *                     .execute(handle, messages);
 *         }
 *     }
 * }
 * }</pre>
 *
 * The rules for using this is pretty simple:
 *
 * <ul>
 *     <li>
 *         All fields set must be specified before onDuplicateKey* behavior.
 *     </li>
 *     <li>
 *         Only one of <code>onDuplicateKeyIgnore</code> and <code>onDuplicateKeyUpdate</code>
 *         can be set.
 *     </li>
 *     <li>
 *         <code>execute(...)</code> can be called any number of times, and is thread safe.
 *     </li>
 * </ul>
 */
public class MessageUpserter<M extends PMessage<M>> {
    private final String                 queryPrefix;
    private final String                 querySuffix;
    private final Map<String, PField<M>> columnToFieldMap;
    private final Map<String, Integer>   columnTypeMap;
    private final List<String>           columnOrder;
    private final String                 valueMarkers;

    private MessageUpserter(String queryPrefix,
                            String querySuffix,
                            List<String> columnOrder,
                            Map<String, PField<M>> columnToFieldMap,
                            Map<String, Integer> columnTypeMap) {
        this.queryPrefix = queryPrefix;
        this.querySuffix = querySuffix;
        this.columnOrder = UnmodifiableList.copyOf(columnOrder);
        this.columnToFieldMap = UnmodifiableMap.copyOf(columnToFieldMap);
        this.columnTypeMap = UnmodifiableMap.copyOf(columnTypeMap);
        this.valueMarkers = "(" + columnOrder.stream()
                                             .map(k -> "?")
                                             .collect(Collectors.joining(",")) + ")";
    }

    @Override
    public String toString() {
        return queryPrefix + " (...) " + querySuffix;
    }

    @SafeVarargs
    public final int execute(@Nonnull Handle handle, @Nonnull PMessageOrBuilder<M>... items) {
        return execute(handle, UnmodifiableList.copyOf(items));
    }

    public <MB extends PMessageOrBuilder<M>>
    int execute(@Nonnull Handle handle, @Nonnull Collection<MB> items) {
        if (items.isEmpty()) {
            return 0;
        }

        String query = queryPrefix + items.stream()
                                          .map(item -> valueMarkers)
                                          .collect(Collectors.joining(", ")) +
                       querySuffix;
        Update update = handle.createStatement(query);
        int    offset = 0;
        for (PMessageOrBuilder<M> item : items) {
            for (String column : columnOrder) {
                PField<M> field = columnToFieldMap.get(column);
                int type = columnTypeMap.get(column);
                update.bind(offset++, new MessageFieldArgument<>(item, field, type));
            }
        }
        return update.execute();
    }

    public static class Builder<M extends PMessage<M>> {
        private final String                 intoTable;
        private final Map<String, PField<M>> columnToFieldMap;
        private final Map<String, Integer>   columnTypeMap;
        private final Set<String>            onDuplicateUpdate;
        private final AtomicBoolean          onDuplicateIgnore;
        private final PMessageDescriptor<M>  descriptor;

        /**
         * Create a message inserter builder.
         *
         * @param descriptor The type descriptor.
         * @param intoTable The table name to insert info.
         */
        public Builder(@Nonnull PMessageDescriptor<M> descriptor,
                       @Nonnull String intoTable) {
            this.descriptor = descriptor;
            this.intoTable = intoTable;
            this.columnToFieldMap = new LinkedHashMap<>();
            this.columnTypeMap = new HashMap<>();
            this.onDuplicateUpdate = new TreeSet<>();
            this.onDuplicateIgnore = new AtomicBoolean();
        }

        /**
         * Set all fields not already handled with default name and type.
         *
         * @return The builder.
         */
        public final Builder<M> setAll() {
            for (PField<M> field : descriptor.getFields()) {
                // Not efficient, but should not be called often.
                if (!columnToFieldMap.containsValue(field)) {
                    set(field);
                }
            }
            return this;
        }

        /**
         * Set all fields with defaults.
         *
         * @param except Fields to exclude.
         * @return The builder.
         */
        @SafeVarargs
        public final Builder<M> setAllExcept(PField<M>... except) {
            return setAllExcept(UnmodifiableSet.copyOf(except));
        }

        /**
         * Set all fields with defaults.
         *
         * @param except Fields to exclude.
         * @return The builder.
         */
        public Builder<M> setAllExcept(Collection<PField<M>> except) {
            for (PField<M> field : descriptor.getFields()) {
                if (!except.contains(field)) {
                    set(field.getName(), field, getDefaultColumnType(field));
                }
            }
            return this;
        }

        /**
         * Set the specific fields with default name and type.
         *
         * @param fields The fields to be set.
         * @return The builder.
         */
        @SafeVarargs
        public final Builder<M> set(PField<M>... fields) {
            return set(UnmodifiableList.copyOf(fields));
        }

        /**
         * Set the specific fields with default name and type.
         *
         * @param fields The fields to be set.
         * @return The builder.
         */
        public final Builder<M> set(Collection<PField<M>> fields) {
            for (PField<M> field : fields) {
                set(field.getName(), field, getDefaultColumnType(field));
            }
            return this;
        }

        /**
         * Set the specific field with name and default type.
         *
         * @param column The column name to set.
         * @param field The field to be set.
         * @return The builder.
         */
        public final Builder<M> set(String column, PField<M> field) {
            return set(column, field, getDefaultColumnType(field));
        }

        /**
         * Set the specific field with specific type and default name.
         *
         * @param field The field to be set.
         * @param type The field type to set as.
         * @return The builder.
         */
        public final Builder<M> set(PField<M> field, int type) {
            return set(field.getName(), field, type);
        }

        /**
         * Set the specific field with specific name and type.
         *
         * @param column The column name to set.
         * @param field The field to be set.
         * @param type The field type to set as.
         * @return The builder.
         */
        public final Builder<M> set(String column, PField<M> field, int type) {
            if (columnToFieldMap.containsKey(column)) {
                throw new IllegalArgumentException("Column " + column + " already inserted");
            }
            if (onDuplicateIgnore.get() || onDuplicateUpdate.size() > 0) {
                throw new IllegalStateException("Duplicate key behavior already determined");
            }
            this.columnToFieldMap.put(column, field);
            this.columnTypeMap.put(column, type);
            return this;
        }

        /**
         * On duplicate keys update the given fields.
         *
         * @param fields The fields to update.
         * @return The builder.
         */
        @SafeVarargs
        public final Builder<M> onDuplicateKeyUpdate(PField<M>... fields) {
            return onDuplicateKeyUpdate(UnmodifiableList.copyOf(fields));
        }

        /**
         * On duplicate keys update the given fields.
         *
         * @param fields The fields to update.
         * @return The builder.
         */
        public final Builder<M> onDuplicateKeyUpdate(Collection<PField<M>> fields) {
            List<String> columns = new ArrayList<>(fields.size());
            fields.forEach(field -> {
                AtomicBoolean found = new AtomicBoolean();
                columnToFieldMap.forEach((column, f) -> {
                    if (f.equals(field)) {
                        columns.add(column);
                        found.set(true);
                    }
                });
                if (!found.get()) {
                    throw new IllegalArgumentException("Field " + field + " not inserted");
                }
            });
            return onDuplicateKeyUpdate(columns.toArray(new String[0]));
        }

        /**
         * On duplicate keys update all except the given fields.
         *
         * @param fields The fields to NOT update.
         * @return The builder.
         */
        @SafeVarargs
        public final Builder<M> onDuplicateKeyUpdateAllExcept(PField<M>... fields) {
            return onDuplicateKeyUpdateAllExcept(UnmodifiableList.copyOf(fields));
        }

        /**
         * On duplicate keys update all except the given fields.
         *
         * @param fields The fields to NOT update.
         * @return The builder.
         */
        public final Builder<M> onDuplicateKeyUpdateAllExcept(Collection<PField<M>> fields) {
            List<String> columns = new ArrayList<>(fields.size());
            fields.forEach(field -> {
                AtomicBoolean found = new AtomicBoolean();
                columnToFieldMap.forEach((column, f) -> {
                    if (f.equals(field)) {
                        columns.add(column);
                        found.set(true);
                    }
                });
                if (!found.get()) {
                    throw new IllegalArgumentException("Field " + field + " not inserted");
                }
            });
            return onDuplicateKeyUpdateAllExcept(columns.toArray(new String[0]));
        }

        /**
         * On duplicate keys update all except the given fields.
         *
         * @param exceptColumns The column names NOT to update.
         * @return The builder.
         */
        public final Builder<M> onDuplicateKeyUpdateAllExcept(String... exceptColumns) {
            TreeSet<String> columns = new TreeSet<>(columnToFieldMap.keySet());
            columns.removeAll(UnmodifiableList.copyOf(exceptColumns));
            return onDuplicateKeyUpdate(columns.toArray(new String[0]));
        }

        /**
         * On duplicate keys update the given columns.
         *
         * @param columns The column names NOT to update.
         * @return The builder.
         */
        public final Builder<M> onDuplicateKeyUpdate(String... columns) {
            if (onDuplicateIgnore.get()) {
                throw new IllegalStateException("Duplicate key behavior already set to ignore");
            }
            Collections.addAll(onDuplicateUpdate, columns);
            return this;
        }

        /**
         * On duplicate keys ignore updates.
         *
         * @return The builder.
         */
        public final Builder<M> onDuplicateKeyIgnore() {
            if (onDuplicateUpdate.size() > 0) {
                throw new IllegalStateException("Duplicate key behavior already set to update");
            }
            onDuplicateIgnore.set(true);
            return this;
        }

        /**
         * @return The final built inserter.
         */
        public MessageUpserter<M> build() {
            if (columnToFieldMap.isEmpty()) {
                throw new IllegalStateException("No columns inserted");
            }
            List<String> columnOrder = new ArrayList<>(columnToFieldMap.keySet());

            StringBuilder prefixBuilder = new StringBuilder("INSERT ");
            if (onDuplicateIgnore.get()) {
                prefixBuilder.append("IGNORE ");
            }
            prefixBuilder.append("INTO ")
                   .append(intoTable)
                   .append(" (")
                   .append(columnOrder.stream()
                                      .map(col -> "`" + col + "`")
                                      .collect(Collectors.joining(", ")))
                   .append(") VALUES ");

            StringBuilder suffixBuilder = new StringBuilder();

            if (onDuplicateUpdate.size() > 0) {
                suffixBuilder.append(" ON DUPLICATE KEY UPDATE");
                boolean first = true;
                for (String column : onDuplicateUpdate) {
                    if (first) {
                        first = false;
                    } else {
                        suffixBuilder.append(",");
                    }

                    suffixBuilder.append(" `")
                           .append(column)
                           .append("` = VALUES(`")
                           .append(column)
                           .append("`)");
                }
            }

            return new MessageUpserter<>(prefixBuilder.toString(),
                                         suffixBuilder.toString(),
                                         columnOrder,
                                         columnToFieldMap,
                                         columnTypeMap);
        }
    }
}