ProvidenceServlet.java

/*
 * Copyright 2016-2017 Providence Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.providence.server;

import net.morimekta.providence.PApplicationException;
import net.morimekta.providence.PApplicationExceptionType;
import net.morimekta.providence.PProcessor;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.PServiceCallInstrumentation;
import net.morimekta.providence.PServiceCallType;
import net.morimekta.providence.serializer.DefaultSerializerProvider;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.serializer.SerializerException;
import net.morimekta.providence.serializer.SerializerProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import static net.morimekta.providence.PServiceCallInstrumentation.NS_IN_MILLIS;

/**
 * A javax.servlet implementation for providence. Transfers data like the
 * Thrift's <code>org.apache.thrift.server.TServlet</code> server.
 */
public class ProvidenceServlet extends HttpServlet {
    private final static Logger LOGGER = LoggerFactory.getLogger(ProvidenceServlet.class);

    private final ProcessorProvider           processorProvider;
    private final SerializerProvider          serializerProvider;
    private final PServiceCallInstrumentation instrumentation;
    private final boolean                     sizedOutput;

    /**
     * Creates a providence servlet that uses the same processor every time.
     *
     * @param processor The providence service processor.
     */
    public ProvidenceServlet(@Nonnull PProcessor processor) {
        // Default is to always use the same processor.
        this(processor, DefaultSerializerProvider.INSTANCE);
    }

    /**
     * Creates a providence servlet that uses the same processor every time.
     *
     * @param processor The providence service processor.
     * @param serializerProvider The serializer provider.
     */
    public ProvidenceServlet(@Nonnull PProcessor processor,
                             @Nonnull SerializerProvider serializerProvider) {
        // Default is to always use the same processor.
        this(r -> processor, serializerProvider);
    }

    /**
     * Creates a providence servlet that uses the same processor every time.
     *
     * @param processor The providence service processor.
     * @param serializerProvider The serializer provider.
     * @param instrumentation Instrumentation instance.
     */
    public ProvidenceServlet(@Nonnull PProcessor processor,
                             @Nonnull SerializerProvider serializerProvider,
                             @Nonnull PServiceCallInstrumentation instrumentation) {
        // Default is to always use the same processor.
        this(r -> processor, serializerProvider, instrumentation, true);
    }

    /**
     * Creates a providence servlet that uses a per request processor.
     *
     * @param processorProvider The processor supplier.
     */
    public ProvidenceServlet(@Nonnull ProcessorProvider processorProvider) {
        this(processorProvider, DefaultSerializerProvider.INSTANCE, PServiceCallInstrumentation.NOOP);
    }

    /**
     * Creates a providence servlet that uses a per request processor.
     *
     * @param processorProvider The processor supplier.
     * @param serializerProvider The serializer provider.
     */
    public ProvidenceServlet(@Nonnull ProcessorProvider processorProvider,
                             @Nonnull SerializerProvider serializerProvider) {
        this(processorProvider, serializerProvider, PServiceCallInstrumentation.NOOP);
    }

    /**
     * Creates a providence servlet that uses a per request processor.
     *
     * @param processorProvider The processor supplier.
     * @param serializerProvider The serializer provider.
     * @param instrumentation Instrumentation instance.
     */
    public ProvidenceServlet(@Nonnull ProcessorProvider processorProvider,
                             @Nonnull SerializerProvider serializerProvider,
                             @Nonnull PServiceCallInstrumentation instrumentation) {
        this(processorProvider, serializerProvider, instrumentation, true);
    }

    /**
     * Creates a providence servlet that uses a per request processor.
     *
     * @param processorProvider The processor supplier.
     * @param serializerProvider The serializer provider.
     * @param instrumentation Instrumentation instance.
     * @param sizedOutput If the output should have content-length. If false will
     *                    write output faster and using less memory, but will loose
     *                    some compatibility with apache thrift.
     */
    public ProvidenceServlet(@Nonnull ProcessorProvider processorProvider,
                             @Nonnull SerializerProvider serializerProvider,
                             @Nonnull PServiceCallInstrumentation instrumentation,
                             boolean sizedOutput) {
        this.processorProvider = processorProvider;
        this.serializerProvider = serializerProvider;
        this.instrumentation = instrumentation;
        this.sizedOutput = sizedOutput;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) {
        long startTime = System.nanoTime();

        PServiceCall request = null;
        PServiceCall response = null;
        try {
            PProcessor processor = processorProvider.processorForRequest(req);
            Serializer requestSerializer = serializerProvider.getDefault();
            if (req.getContentType() != null) {
                try {
                    requestSerializer = serializerProvider.getSerializer(req.getContentType());
                } catch (IllegalArgumentException e) {
                    resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Unknown content-type: " + req.getContentType());
                    LOGGER.warn("Unknown content type in request", e);
                    return;
                }
            } else {
                LOGGER.debug("Request is missing content type.");
            }

            Serializer responseSerializer = requestSerializer;
            String acceptHeader = req.getHeader("Accept");
            if (acceptHeader != null) {
                String[] entries = acceptHeader.split(",");
                for (String entry : entries) {
                    entry = entry.trim();
                    if (entry.isEmpty()) {
                        continue;
                    }
                    if ("*/*".equals(entry)) {
                        // Then responding same as request is good.
                        break;
                    }

                    try {
                        responseSerializer = serializerProvider.getSerializer(entry);
                        break;
                    } catch (IllegalArgumentException ignore) {
                        // Ignore. Bad header input is pretty common.
                    }
                }
            }

            try (InputStream in = req.getInputStream()) {
                request = requestSerializer.deserialize(in, processor.getDescriptor());
                requestSerializer.verifyEndOfContent(in);
                if (request.getType() == PServiceCallType.REPLY ||
                    request.getType() == PServiceCallType.EXCEPTION) {
                    PApplicationException ex = new PApplicationException(
                            "Invalid service request call type: " + request.getType(),
                            PApplicationExceptionType.INVALID_MESSAGE_TYPE);
                    response = new PServiceCall(request.getMethod(), PServiceCallType.EXCEPTION, request.getSequence(), ex);
                } else {
                    response = processor.handleCall(request);
                }
            } catch (SerializerException e) {
                if (e.getMethodName() != null) {
                    LOGGER.error("Error when reading service call " + processor.getDescriptor().getName() + "." + e.getMethodName() + "()", e);
                } else {
                    LOGGER.error("Error when reading service call " + processor.getDescriptor().getName(), e);
                }
                PApplicationException ex = new PApplicationException(e.getMessage(), e.getExceptionType()).initCause(e);
                response = new PServiceCall(e.getMethodName(), PServiceCallType.EXCEPTION, e.getSequenceNo(), ex);
            }

            resp.setStatus(HttpServletResponse.SC_OK);
            if (response != null) {
                resp.setContentType(responseSerializer.mediaType());
                if (sizedOutput) {
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    responseSerializer.serialize(baos, response);

                    resp.setContentLength(baos.size());
                    resp.getOutputStream().write(baos.toByteArray());
                } else {
                    responseSerializer.serialize(resp.getOutputStream(), response);
                }
            }
            resp.flushBuffer();

            long endTime = System.nanoTime();
            double duration = ((double) (endTime - startTime)) / NS_IN_MILLIS;
            try {
                instrumentation.onComplete(duration, request, response);
            } catch (Exception th) {
                LOGGER.error("Exception in service instrumentation", th);
            }
        } catch (EOFException e) {
            // output stream closed before write is complete.
            // So we cannot even try to respond.

            long endTime = System.nanoTime();
            double duration = ((double) (endTime - startTime)) / NS_IN_MILLIS;
            try {
                instrumentation.onTransportException(e, duration, request, response);
            } catch (Exception th) {
                LOGGER.error("Exception in service instrumentation", th);
            }
        } catch (Exception e) {
            LOGGER.warn("Unhandled exception in {}", req.getPathInfo(), e);
            if (!resp.isCommitted()) {
                try {
                    resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                                   "Internal server error: " + e.getMessage());
                    resp.flushBuffer();
                } catch (IOException ioEx) {
                    e.addSuppressed(ioEx);
                }
            }

            long endTime = System.nanoTime();
            double duration = ((double) (endTime - startTime)) / NS_IN_MILLIS;
            try {
                instrumentation.onTransportException(e, duration, request, response);
            } catch (Exception th) {
                LOGGER.error("Exception in service instrumentation", th);
            }
        }
    }
}