FramedBufferOutputStream.java
- /*
- * Copyright 2016 Providence Authors
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package net.morimekta.providence.thrift.io;
- import net.morimekta.util.io.BigEndianBinaryWriter;
- import net.morimekta.util.io.BinaryWriter;
- import net.morimekta.util.io.ByteBufferOutputStream;
- import javax.annotation.Nonnull;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.nio.ByteBuffer;
- import java.nio.channels.WritableByteChannel;
- import java.util.Locale;
- /**
- * Wrap an output stream in a framed buffer writer similar to the thrift
- * TFramedTransport. The output stream will write everything in one single
- * block when it is closed.
- */
- public class FramedBufferOutputStream extends OutputStream {
- private static final int MAX_BUFFER_SIZE = 16384; // 16k.
- private final ByteBuffer frameSizeBuffer;
- private final ByteBuffer buffer;
- private final WritableByteChannel out;
- public FramedBufferOutputStream(WritableByteChannel out) {
- this(out, MAX_BUFFER_SIZE);
- }
- public FramedBufferOutputStream(WritableByteChannel out, int maxBufferSize) {
- this.out = out;
- this.frameSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
- this.buffer = ByteBuffer.allocateDirect(maxBufferSize);
- this.buffer.limit(maxBufferSize);
- }
- @Override
- public void write(int val) throws IOException {
- if (!buffer.hasRemaining()) {
- throw new IOException(String.format(Locale.US, "Frame size exceeded: 1 needed, 0 remaining, %d total",
- buffer.capacity()));
- }
- buffer.put((byte) val);
- }
- @Override
- public void write(@Nonnull byte[] bytes) throws IOException {
- if (buffer.remaining() < bytes.length) {
- throw new IOException(String.format(Locale.US, "Frame size exceeded: %d needed, %d remaining, %d total",
- bytes.length, buffer.remaining(), buffer.capacity()));
- }
- buffer.put(bytes);
- }
- @Override
- public void write(@Nonnull byte[] var1, int off, int len) throws IOException {
- if (buffer.remaining() < len) {
- throw new IOException(String.format(Locale.US, "Frame size exceeded: %d needed, %d remaining, %d total",
- len, buffer.remaining(), buffer.capacity()));
- }
- buffer.put(var1, off, len);
- }
- /**
- * Write the frame at the current state, and reset the buffer to be able to
- * generate a new frame.
- *
- * @throws IOException On failed write.
- */
- public void completeFrame() throws IOException {
- int frameSize = buffer.position();
- if (frameSize > 0) {
- frameSizeBuffer.rewind();
- try (ByteBufferOutputStream bos = new ByteBufferOutputStream(frameSizeBuffer);
- BinaryWriter writer = new BigEndianBinaryWriter(bos)) {
- writer.writeInt(frameSize);
- bos.flush();
- }
- frameSizeBuffer.flip();
- buffer.flip();
- synchronized (out) {
- out.write(frameSizeBuffer);
- while (buffer.hasRemaining()) {
- out.write(buffer);
- }
- }
- buffer.rewind();
- buffer.limit(buffer.capacity());
- }
- }
- @Override
- public void close() throws IOException {
- completeFrame();
- }
- }