FramedBufferOutputStream.java

  1. /*
  2.  * Copyright 2016 Providence Authors
  3.  *
  4.  * Licensed to the Apache Software Foundation (ASF) under one
  5.  * or more contributor license agreements. See the NOTICE file
  6.  * distributed with this work for additional information
  7.  * regarding copyright ownership. The ASF licenses this file
  8.  * to you under the Apache License, Version 2.0 (the
  9.  * "License"); you may not use this file except in compliance
  10.  * with the License. You may obtain a copy of the License at
  11.  *
  12.  *   http://www.apache.org/licenses/LICENSE-2.0
  13.  *
  14.  * Unless required by applicable law or agreed to in writing,
  15.  * software distributed under the License is distributed on an
  16.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17.  * KIND, either express or implied. See the License for the
  18.  * specific language governing permissions and limitations
  19.  * under the License.
  20.  */
  21. package net.morimekta.providence.thrift.io;

  22. import net.morimekta.util.io.BigEndianBinaryWriter;
  23. import net.morimekta.util.io.BinaryWriter;
  24. import net.morimekta.util.io.ByteBufferOutputStream;

  25. import javax.annotation.Nonnull;
  26. import java.io.IOException;
  27. import java.io.OutputStream;
  28. import java.nio.ByteBuffer;
  29. import java.nio.channels.WritableByteChannel;
  30. import java.util.Locale;

  31. /**
  32.  * Wrap an output stream in a framed buffer writer similar to the thrift
  33.  * TFramedTransport. The output stream will write everything in one single
  34.  * block when it is closed.
  35.  */
  36. public class FramedBufferOutputStream extends OutputStream {
  37.     private static final int MAX_BUFFER_SIZE = 16384;  // 16k.

  38.     private final ByteBuffer          frameSizeBuffer;
  39.     private final ByteBuffer          buffer;
  40.     private final WritableByteChannel out;

  41.     public FramedBufferOutputStream(WritableByteChannel out) {
  42.         this(out, MAX_BUFFER_SIZE);
  43.     }

  44.     public FramedBufferOutputStream(WritableByteChannel out, int maxBufferSize) {
  45.         this.out = out;
  46.         this.frameSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
  47.         this.buffer = ByteBuffer.allocateDirect(maxBufferSize);
  48.         this.buffer.limit(maxBufferSize);
  49.     }

  50.     @Override
  51.     public void write(int val) throws IOException {
  52.         if (!buffer.hasRemaining()) {
  53.             throw new IOException(String.format(Locale.US, "Frame size exceeded: 1 needed, 0 remaining, %d total",
  54.                                                 buffer.capacity()));
  55.         }
  56.         buffer.put((byte) val);
  57.     }

  58.     @Override
  59.     public void write(@Nonnull byte[] bytes) throws IOException {
  60.         if (buffer.remaining() < bytes.length) {
  61.             throw new IOException(String.format(Locale.US, "Frame size exceeded: %d needed, %d remaining, %d total",
  62.                                                 bytes.length, buffer.remaining(), buffer.capacity()));
  63.         }
  64.         buffer.put(bytes);
  65.     }

  66.     @Override
  67.     public void write(@Nonnull byte[] var1, int off, int len) throws IOException {
  68.         if (buffer.remaining() < len) {
  69.             throw new IOException(String.format(Locale.US, "Frame size exceeded: %d needed, %d remaining, %d total",
  70.                                                 len, buffer.remaining(), buffer.capacity()));
  71.         }
  72.         buffer.put(var1, off, len);
  73.     }

  74.     /**
  75.      * Write the frame at the current state, and reset the buffer to be able to
  76.      * generate a new frame.
  77.      *
  78.      * @throws IOException On failed write.
  79.      */
  80.     public void completeFrame() throws IOException {
  81.         int frameSize = buffer.position();
  82.         if (frameSize > 0) {
  83.             frameSizeBuffer.rewind();
  84.             try (ByteBufferOutputStream bos = new ByteBufferOutputStream(frameSizeBuffer);
  85.                  BinaryWriter writer = new BigEndianBinaryWriter(bos)) {
  86.                 writer.writeInt(frameSize);
  87.                 bos.flush();
  88.             }
  89.             frameSizeBuffer.flip();
  90.             buffer.flip();

  91.             synchronized (out) {
  92.                 out.write(frameSizeBuffer);
  93.                 while (buffer.hasRemaining()) {
  94.                     out.write(buffer);
  95.                 }
  96.             }

  97.             buffer.rewind();
  98.             buffer.limit(buffer.capacity());
  99.         }
  100.     }

  101.     @Override
  102.     public void close() throws IOException {
  103.         completeFrame();
  104.     }
  105. }