FramedInputStream.java

package net.morimekta.io.sub;

import net.morimekta.io.BigEndianBinaryInputStream;

import java.io.IOException;
import java.io.InputStream;

import static java.lang.Math.min;

/**
 * The framed input stream is a stream with a fixed size as pars of the underlying
 * input stream. The frame size can be read from the stream using the {@link FrameSizeReader},
 * or be provided statically.
 * <p>
 * The default mean to read frame size is BEB128 (Big Endian Base128 integer), which
 * will provide an empty frame if already at end of stream. The size octets are always
 * meant to precede the data octets.
 * <pre>{@code
 * (size) (data)
 * }</pre>
 */
public class FramedInputStream extends InputStream {
    /**
     * Frame size reader. Used to inject the act of reading the frame size from
     * the buffer.
     */
    public interface FrameSizeReader {
        /**
         * Read the frame size.
         *
         * @param in The input stream to read from.
         * @return The resulting frame size.
         * @throws IOException If unable to read frame size.
         */
        int readFrameSize(InputStream in) throws IOException;
    }

    private final InputStream in;
    private final int         frameSize;
    private       int         remaining;
    private       int         remainingOnMark = -1;

    /**
     * Create a framed input stream, using a big-endian base-128 frame size.
     *
     * @param in Input stream to read from.
     * @throws IOException If unable to read frame size.
     */
    public FramedInputStream(InputStream in) throws IOException {
        this(in, i -> new BigEndianBinaryInputStream(i).readIntBase128());
    }

    /**
     * Create a framed input stream.
     *
     * @param in            Input stream to read from.
     * @param readFrameSize Specified frame size reader.
     * @throws IOException If unable to read frame size.
     */
    public FramedInputStream(InputStream in, FrameSizeReader readFrameSize) throws IOException {
        this(in, readFrameSize.readFrameSize(in));
    }

    /**
     * Create a framed input stream with a pre-specified frame size.
     *
     * @param in        Input stream to read from.
     * @param frameSize Specified frame size reader.
     */
    public FramedInputStream(InputStream in, int frameSize) {
        this.in = in;
        this.frameSize = frameSize;
        this.remaining = frameSize;
    }

    /**
     * @return The frame size.
     */
    public int getFrameSize() {
        return frameSize;
    }

    @Override
    public int read() throws IOException {
        if (remaining < 1) return -1;
        --remaining;
        return in.read();
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (remaining < 1) return -1;
        len = min(remaining, len);
        int read = in.read(b, off, len);
        if (read > 0) {
            remaining -= read;
        }
        return read;
    }

    @Override
    public int available() throws IOException {
        if (in == null) return 0;
        return min(remaining, in.available());
    }

    @Override
    public void close() throws IOException {
        if (remaining > 0) {
            byte[] buf = new byte[1024];
            int r;
            while (remaining > 0 && (r = in.read(buf, 0, min(remaining, buf.length))) > 0) {
                remaining -= r;
            }
            remaining = 0;
        }
    }

    @Override
    public boolean markSupported() {
        return in.markSupported();
    }

    @Override
    public synchronized void mark(int readLimit) {
        in.mark(readLimit);
        remainingOnMark = remaining;
    }

    @Override
    public synchronized void reset() throws IOException {
        in.reset();
        remaining = remainingOnMark;
    }
}