FramedOutputStream.java
/*
* Copyright (c) 2020, Stein Eldar Johnsen
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.io.sub;
import net.morimekta.io.BigEndianBinaryOutputStream;
import net.morimekta.io.ByteBufferInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* A framed output stream that buffers all written data and prepends a frame
* size when the stream is closed. The frame size is written using a
* {@link FrameSizeWriter}, and the size octets precede the data octets.
* See {@link FramedInputStream} for reading back the content.
* <p>
* The default frame size encoding is BEB128 (Big Endian Base128 integer).
* Because the frame size must be written before the data, all content is
* buffered in memory until the stream is closed. Closing this stream does
* not close the underlying output stream.
* <pre>{@code
* (size) (data)
* }</pre>
*/
public class FramedOutputStream extends OutputStream {
/**
* Default buffer size.
*/
public static final int DEFAULT_BUFFER_SIZE = 8192;
/**
* Frame size writer. Used to inject the act of writing the frame size to
* the output stream.
*/
public interface FrameSizeWriter {
/**
* Write the frame size.
*
* @param out The output stream to write to.
* @param size The size to write.
* @throws IOException If unable to write frame size.
*/
void writeFrameSize(OutputStream out, int size) throws IOException;
}
private final FrameSizeWriter frameSizeWriter;
private final List<ByteBuffer> byteBuffers;
private OutputStream out;
private ByteBuffer current;
/**
* Create a framed output stream using a big-endian base-128 frame size.
*
* @param out Output stream to write to.
*/
public FramedOutputStream(OutputStream out) {
this(out, (os, size) -> new BigEndianBinaryOutputStream(os).writeBase128(size));
}
/**
* Create a framed output stream using a big-endian base-128 frame size.
*
* @param out Output stream to write to.
* @param bufferSize Buffer data in chunks of this size.
*/
public FramedOutputStream(OutputStream out, int bufferSize) {
this(out, (os, size) -> new BigEndianBinaryOutputStream(os).writeBase128(size), bufferSize);
}
/**
* Create a framed output stream with a custom frame size writer.
*
* @param out Output stream to write to.
* @param frameSizeWriter The frame size writer to use.
*/
public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter) {
this(out, frameSizeWriter, DEFAULT_BUFFER_SIZE);
}
/**
* Create a framed output stream with a custom frame size writer and buffer size.
*
* @param out Output stream to write to.
* @param frameSizeWriter The frame size writer to use.
* @param bufferSize Buffer data in chunks of this size.
*/
public FramedOutputStream(OutputStream out, FrameSizeWriter frameSizeWriter, int bufferSize) {
this.out = out;
this.frameSizeWriter = frameSizeWriter;
this.current = ByteBuffer.allocateDirect(bufferSize);
this.byteBuffers = new ArrayList<>();
}
@Override
public void write(int i) throws IOException {
if (out == null) throw new IOException("Writing to closed stream");
if (!current.hasRemaining()) {
byteBuffers.add(current);
current = ByteBuffer.allocateDirect(current.capacity());
}
current.put((byte) i);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (out == null) throw new IOException("Writing to closed stream");
if (current.remaining() < len) {
int first = current.remaining();
current.put(b, off, first);
byteBuffers.add(current);
current = ByteBuffer.allocateDirect(current.capacity());
write(b, off + first, len - first);
} else {
current.put(b, off, len);
}
}
@Override
public void close() throws IOException {
if (out != null) {
try {
int frameSize = current.position() + (byteBuffers.size() * current.capacity());
this.frameSizeWriter.writeFrameSize(out, frameSize);
for (ByteBuffer buffer : byteBuffers) {
buffer.flip();
new ByteBufferInputStream(buffer).transferTo(out);
}
current.flip();
new ByteBufferInputStream(current).transferTo(out);
} finally {
out = null;
}
}
}
}