FramedInputStream.java

/*
 * Copyright (c) 2020, Stein Eldar Johnsen
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package net.morimekta.io.sub;

import net.morimekta.io.BigEndianBinaryInputStream;

import java.io.IOException;
import java.io.InputStream;

import static java.lang.Math.min;

/**
 * An input stream that reads a fixed number of bytes (a frame) from an
 * underlying stream. The frame size can be read from the stream using a
 * {@link FrameSizeReader}, or provided as a constant.
 * <p>
 * The default frame size encoding is BEB128 (Big Endian Base128 integer),
 * which returns an empty frame if the stream is already at end of input.
 * The size octets always precede the data octets.
 * <pre>{@code
 * (size) (data)
 * }</pre>
 */
public class FramedInputStream extends InputStream {
    /**
     * Frame size reader. Used to inject the act of reading the frame size from
     * the buffer.
     */
    public interface FrameSizeReader {
        /**
         * Read the frame size.
         *
         * @param in The input stream to read from.
         * @return The resulting frame size.
         * @throws IOException If unable to read frame size.
         */
        int readFrameSize(InputStream in) throws IOException;
    }

    private final InputStream in;
    private final int         frameSize;
    private       int         remaining;
    private       int         remainingOnMark = -1;

    /**
     * Create a framed input stream, using a big-endian base-128 frame size.
     *
     * @param in Input stream to read from.
     * @throws IOException If unable to read frame size.
     */
    public FramedInputStream(InputStream in) throws IOException {
        this(in, i -> new BigEndianBinaryInputStream(i).readIntBase128());
    }

    /**
     * Create a framed input stream.
     *
     * @param in            Input stream to read from.
     * @param readFrameSize Specified frame size reader.
     * @throws IOException If unable to read frame size.
     */
    public FramedInputStream(InputStream in, FrameSizeReader readFrameSize) throws IOException {
        this(in, readFrameSize.readFrameSize(in));
    }

    /**
     * Create a framed input stream with a pre-specified frame size.
     *
     * @param in        Input stream to read from.
     * @param frameSize The number of bytes in the frame.
     */
    public FramedInputStream(InputStream in, int frameSize) {
        this.in = in;
        this.frameSize = frameSize;
        this.remaining = frameSize;
    }

    /**
     * @return The frame size.
     */
    public int getFrameSize() {
        return frameSize;
    }

    @Override
    public int read() throws IOException {
        if (remaining < 1) return -1;
        --remaining;
        return in.read();
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (remaining < 1) return -1;
        len = min(remaining, len);
        int read = in.read(b, off, len);
        if (read > 0) {
            remaining -= read;
        }
        return read;
    }

    @Override
    public int available() throws IOException {
        if (in == null) return 0;
        return min(remaining, in.available());
    }

    @Override
    public void close() throws IOException {
        if (remaining > 0) {
            byte[] buf = new byte[1024];
            int r;
            while (remaining > 0 && (r = in.read(buf, 0, min(remaining, buf.length))) > 0) {
                remaining -= r;
            }
            remaining = 0;
        }
    }

    @Override
    public boolean markSupported() {
        return in.markSupported();
    }

    @Override
    public synchronized void mark(int readLimit) {
        in.mark(readLimit);
        remainingOnMark = remaining;
    }

    @Override
    public synchronized void reset() throws IOException {
        in.reset();
        remaining = remainingOnMark;
    }
}