FramedOutputStream.java

/*
 * Copyright (c) 2020, Stein Eldar Johnsen
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.io.sub;

import net.morimekta.io.BigEndianBinaryOutputStream;
import net.morimekta.io.ByteBufferInputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

/**
 * A framed output stream that buffers all written data and prepends a frame
 * size when the stream is closed. The frame size is written using a
 * {@link FrameSizeWriter}, and the size octets precede the data octets.
 * See {@link FramedInputStream} for reading back the content.
 * <p>
 * The default frame size encoding is BEB128 (Big Endian Base128 integer).
 * Because the frame size must be written before the data, all content is
 * buffered in memory until the stream is closed. Closing this stream does
 * not close the underlying output stream.
 * <pre>{@code
 * (size) (data)
 * }</pre>
 */
public class FramedOutputStream extends OutputStream {
    /**
     * Default buffer size.
     */
    public static final int DEFAULT_BUFFER_SIZE = 8192;

    /**
     * Frame size writer. Used to inject the act of writing the frame size to
     * the output stream.
     */
    public interface FrameSizeWriter {
        /**
         * Write the frame size.
         *
         * @param out  The output stream to write to.
         * @param size The size to write.
         * @throws IOException If unable to write frame size.
         */
        void writeFrameSize(OutputStream out, int size) throws IOException;
    }

    private final FrameSizeWriter  frameSizeWriter;
    private final List<ByteBuffer> byteBuffers;
    private       OutputStream     out;
    private       ByteBuffer       current;

    /**
     * Create a framed output stream using a big-endian base-128 frame size.
     *
     * @param out Output stream to write to.
     */
    public FramedOutputStream(OutputStream out) {
        this(out, (os, size) -> new BigEndianBinaryOutputStream(os).writeBase128(size));
    }

    /**
     * Create a framed output stream using a big-endian base-128 frame size.
     *
     * @param out        Output stream to write to.
     * @param bufferSize Buffer data in chunks of this size.
     */
    public FramedOutputStream(OutputStream out, int bufferSize) {
        this(out, (os, size) -> new BigEndianBinaryOutputStream(os).writeBase128(size), bufferSize);
    }

    /**
     * Create a framed output stream with a custom frame size writer.
     *
     * @param out             Output stream to write to.
     * @param frameSizeWriter The frame size writer to use.
     */
    public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter) {
        this(out, frameSizeWriter, DEFAULT_BUFFER_SIZE);
    }

    /**
     * Create a framed output stream with a custom frame size writer and buffer size.
     *
     * @param out             Output stream to write to.
     * @param frameSizeWriter The frame size writer to use.
     * @param bufferSize      Buffer data in chunks of this size.
     */
    public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter, int bufferSize) {
        this.out = out;
        this.frameSizeWriter = frameSizeWriter;
        this.current = ByteBuffer.allocateDirect(bufferSize);
        this.byteBuffers = new ArrayList<>();
    }

    @Override
    public void write(int i) throws IOException {
        if (out == null) throw new IOException("Writing to closed stream");
        if (!current.hasRemaining()) {
            byteBuffers.add(current);
            current = ByteBuffer.allocateDirect(current.capacity());
        }
        current.put((byte) i);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (out == null) throw new IOException("Writing to closed stream");
        if (current.remaining() < len) {
            int first = current.remaining();
            current.put(b, off, first);
            byteBuffers.add(current);
            current = ByteBuffer.allocateDirect(current.capacity());
            write(b, off + first, len - first);
        } else {
            current.put(b, off, len);
        }
    }

    @Override
    public void close() throws IOException {
        if (out != null) {
            try {
                int frameSize = current.position() + (byteBuffers.size() * current.capacity());
                this.frameSizeWriter.writeFrameSize(out, frameSize);
                for (ByteBuffer buffer : byteBuffers) {
                    buffer.flip();
                    new ByteBufferInputStream(buffer).transferTo(out);
                }
                current.flip();
                new ByteBufferInputStream(current).transferTo(out);
            } finally {
                out = null;
            }
        }
    }
}