client should release buffer as soon as possible
authorVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Mon, 16 Feb 2015 10:48:59 +0000 (11:48 +0100)
committerVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Mon, 16 Feb 2015 10:50:07 +0000 (11:50 +0100)
platform/built-in-server/testSrc/BinaryRequestHandlerTest.kt
platform/platform-impl/src/org/jetbrains/io/Decoder.java
platform/platform-impl/src/org/jetbrains/io/MessageDecoder.java

index 0659a9c0e9a32fad0e10db4d4f47a9bd9681807d..05fca4ada6aa2ca25b015927019dfb72e06df9a5 100644 (file)
@@ -8,7 +8,6 @@ import io.netty.buffer.ByteBuf
 import com.intellij.util.Consumer
 import java.util.UUID
 import io.netty.channel.ChannelHandler
-import com.intellij.openapi.util.AsyncResult
 import io.netty.util.CharsetUtil
 import org.jetbrains.io.ChannelExceptionHandler
 import org.jetbrains.io.NettyUtil
@@ -19,7 +18,11 @@ import org.junit.rules.RuleChain
 import org.junit.Rule
 import org.junit.Test
 import org.jetbrains.io.MessageDecoder
+import org.jetbrains.concurrency.AsyncPromise
+import org.jetbrains.concurrency.Promise
+import com.intellij.util.concurrency.Semaphore
 
+// we don't handle String in efficient way - because we want to test readContent/readChars also
 public class BinaryRequestHandlerTest {
   private val fixtureManager = FixtureRule()
 
@@ -32,7 +35,7 @@ public class BinaryRequestHandlerTest {
   Test
   public fun test() {
     val text = "Hello!"
-    val result = AsyncResult<String>()
+    val result = AsyncPromise<String>()
 
     val bootstrap = NettyUtil.oioClientBootstrap().handler(object : ChannelInitializer<Channel>() {
       override fun initChannel(channel: Channel) {
@@ -41,7 +44,7 @@ public class BinaryRequestHandlerTest {
             val requiredLength = 4 + text.length()
             val response = readContent(input, context, requiredLength) {(buffer, context, isCumulateBuffer) -> buffer.toString(buffer.readerIndex(), requiredLength, CharsetUtil.UTF_8) }
             if (response != null) {
-              result.setDone(response)
+              result.setResult(response)
             }
           }
         }, ChannelExceptionHandler.getInstance())
@@ -58,18 +61,27 @@ public class BinaryRequestHandlerTest {
 
     val message = Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)
     buffer.writeShort(message.readableBytes())
-
     channel.write(buffer)
     channel.writeAndFlush(message).syncUninterruptibly()
 
     try {
-      result.doWhenRejected(object : Consumer<String> {
-        override fun consume(error: String) {
-          TestCase.fail(error)
+      result.rejected(object : Consumer<Throwable> {
+        override fun consume(error: Throwable) {
+          TestCase.fail(error.getMessage())
         }
       })
 
-      TestCase.assertEquals("got-" + text, result.getResultSync(5000))
+      if (result.getState() == Promise.State.PENDING) {
+        val semaphore = Semaphore()
+        semaphore.down()
+        result.processed { semaphore.up() }
+        if (!semaphore.waitForUnsafe(5000)) {
+          TestCase.fail("Time limit exceeded")
+          return
+        }
+      }
+
+      TestCase.assertEquals("got-" + text, result.get())
     }
     finally {
       channel.close()
index 99e93e526f46ab51892afd9499590c152e1f22d5..83b673997ff2beec6ad8aa3f0a43f4a705cdadea 100644 (file)
@@ -38,7 +38,10 @@ public abstract class Decoder extends ChannelInboundHandlerAdapter {
         messageReceived(context, input);
       }
       finally {
-        input.release();
+        // client should release buffer as soon as possible, so, input could be released already
+        if (input.refCnt() > 0) {
+          input.release();
+        }
       }
     }
     else {
index 152f62de93a5715608ec498268951aa91bdb0796..473ee989c3185a4133a6c58eb69afbf46bd61b11 100644 (file)
@@ -25,19 +25,21 @@ public abstract class MessageDecoder extends Decoder {
 
   @Nullable
   protected final CharSequence readChars(@NotNull ByteBuf input) throws CharacterCodingException {
-    if (!input.isReadable()) {
+    int readableBytes = input.readableBytes();
+    if (readableBytes == 0) {
+      input.release();
       return null;
     }
 
     int required = contentLength - consumedContentByteCount;
-    if (input.readableBytes() < required) {
+    if (readableBytes < required) {
       if (chunkedContent == null) {
         chunkedContent = CharBuffer.allocate((int)((float)contentLength * charsetDecoder.maxCharsPerByte()));
       }
 
-      int count = input.readableBytes();
-      ChannelBufferToString.readIntoCharBuffer(charsetDecoder, input, count, chunkedContent);
-      consumedContentByteCount += count;
+      ChannelBufferToString.readIntoCharBuffer(charsetDecoder, input, readableBytes, chunkedContent);
+      consumedContentByteCount += readableBytes;
+      input.release();
       return null;
     }
     else {