FramedOutputStream.java

package net.morimekta.io.sub;

import net.morimekta.io.BigEndianBinaryOutputStream;
import net.morimekta.io.ByteBufferInputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

/**
 * The framed input stream is a stream with a fixed size as pars of the underlying
 * input stream. The frame size is written to the stream using the {@link FrameSizeWriter}.
 * This writer can be used to write framed bytes if the data size is not known before
 * writing. See {@link FramedInputStream} for reading back the content.
 * <p>
 * The default mean to write frame size is BEB128 (Big Endian Base128 integer).
 * The size octets will precede the data octets. This means the written content must
 * be cached in memory until complete, and the size and data will be written when
 * this stream is closed. It will not close the contained output stream.
 * <pre>{@code
 * (size) (data)
 * }</pre>
 */
public class FramedOutputStream extends OutputStream {
    public static final int DEFAULT_BUFFER_SIZE = 8192;

    public interface FrameSizeWriter {
        void writeFrameSize(OutputStream out, int size) throws IOException;
    }

    private final FrameSizeWriter frameSizeWriter;
    private final List<ByteBuffer> byteBuffers;
    private OutputStream out;
    private ByteBuffer current;

    public FramedOutputStream(OutputStream out) {
        this(out, (os, size) -> new BigEndianBinaryOutputStream(out).writeBase128(size));
    }

    public FramedOutputStream(OutputStream out, int bufferSize) {
        this(out, (os, size) -> new BigEndianBinaryOutputStream(out).writeBase128(size), bufferSize);
    }

    public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter) {
        this(out, frameSizeWriter, DEFAULT_BUFFER_SIZE);
    }

    public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter, int bufferSize) {
        this.out = out;
        this.frameSizeWriter = frameSizeWriter;
        this.current = ByteBuffer.allocateDirect(bufferSize);
        this.byteBuffers = new ArrayList<>();
    }

    @Override
    public void write(int i) throws IOException {
        if (out == null) throw new IOException("Writing to closed stream");
        if (!current.hasRemaining()) {
            byteBuffers.add(current);
            current = ByteBuffer.allocateDirect(current.capacity());
        }
        current.put((byte) i);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (out == null) throw new IOException("Writing to closed stream");
        if (current.remaining() < len) {
            int first = current.remaining();
            current.put(b, off, first);
            byteBuffers.add(current);
            current = ByteBuffer.allocateDirect(current.capacity());
            write(b, off + first, len - first);
        } else {
            current.put(b, off, len);
        }
    }

    @Override
    public void close() throws IOException {
        if (out != null) {
            try {
                int frameSize = current.position() + (byteBuffers.size() * current.capacity());
                this.frameSizeWriter.writeFrameSize(out, frameSize);
                for (ByteBuffer buffer : byteBuffers) {
                    buffer.flip();
                    new ByteBufferInputStream(buffer).transferTo(out);
                }
                current.flip();
                new ByteBufferInputStream(current).transferTo(out);
            } finally {
                out = null;
            }
        }
    }
}