MessageCollectors.java

/*
 * Copyright 2016 Providence Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.providence.streams;

import net.morimekta.providence.PMessage;
import net.morimekta.providence.descriptor.PField;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.serializer.SerializerException;

import javax.annotation.Nonnull;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collector;

/**
 * Collector helpers for writing a number of messages to a output stream, file etc.
 */
public class MessageCollectors {
    /**
     * Write stream of messages to file described by path.
     *
     * @param file The file path.
     * @param serializer The serializer to use.
     * @param <Message> The message type.
     * @return The collector.
     */
    @Nonnull
    public static <Message extends PMessage<Message>>
    Collector<Message, OutputStream, Integer> toPath(Path file,
                                                     Serializer serializer) {
        return toFile(file.toFile(), serializer);
    }

    /**
     * write stream of messages to file.
     *
     * @param file The file to write.
     * @param serializer The serializer to use.
     * @param <Message> The message type.
     * @return The collector.
     */
    @Nonnull
    public static <Message extends PMessage<Message>>
    Collector<Message, OutputStream, Integer> toFile(File file,
                                                     Serializer serializer) {
        final AtomicInteger result = new AtomicInteger(0);
        final AtomicReference<BufferedOutputStream> os = new AtomicReference<>();
        final Object mutex = new Object();

        return Collector.of(() -> {
            // Delay file creation until the write starts.
            return os.updateAndGet(bos -> {
                if (bos == null) {
                    try {
                        bos = new BufferedOutputStream(new FileOutputStream(file));
                    } catch (IOException e) {
                        throw new UncheckedIOException("Unable to open " + file.getName(), e);
                    }
                }
                return bos;
            });
        }, (outputStream, t) -> {
            try {
                synchronized (mutex) {
                    result.addAndGet(serializer.serialize(outputStream, t));
                    if (!serializer.binaryProtocol()) {
                        result.addAndGet(writeReadableSep(outputStream));
                    }
                }
            } catch (SerializerException e) {
                throw new UncheckedIOException("Bad data", e);
            } catch (IOException e) {
                throw new UncheckedIOException("Unable to write to " + file.getName(), e);
            }
        }, (a, b) -> a, (outputStream) -> {
            try {
                outputStream.flush();
                outputStream.close();
            } catch (IOException e) {
                throw new UncheckedIOException("Unable to close " + file.getName(), e);
            }
            return result.getAndSet(0);
        });
    }

    /**
     * Serialize stream of messages into stream.
     *
     * @param out The output stream to write to.
     * @param serializer The serializer to use.
     * @param <Message> The message type.
     * @return The collector.
     */
    @Nonnull
    public static <Message extends PMessage<Message>>
    Collector<Message, AtomicInteger, Integer> toStream(OutputStream out,
                                                        Serializer serializer) {
        return toStream(out, serializer, false);
    }

    /**
     * Serialize stream of messages into stream.
     *
     * @param out The output stream to write to.
     * @param serializer The serializer to use.
     * @param close Close the stream when ending.
     * @param <Message> The message type.
     * @return The collector.
     */
    @Nonnull
    public static <Message extends PMessage<Message>>
    Collector<Message, AtomicInteger, Integer> toStream(OutputStream out,
                                                        Serializer serializer,
                                                        boolean close) {
        return Collector.of(AtomicInteger::new, (counter, t) -> {
            try {
                synchronized (out) {
                    counter.addAndGet(serializer.serialize(out, t));
                    if (!serializer.binaryProtocol()) {
                        counter.addAndGet(writeReadableSep(out));
                    }
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e.getMessage(), e);
            }
        }, (a, b) -> {
            a.addAndGet(b.get());
            return a;
        }, i -> {
            try {
                out.flush();
                if (close) {
                    out.close();
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e.getMessage(), e);
            }
            return i.getAndSet(0);
        });
    }

    private static int writeReadableSep(OutputStream out) throws IOException {
        out.write('\n');
        return 1;
    }

    private MessageCollectors() {}
}