import com.intellij.util.Consumer
import java.util.UUID
import io.netty.channel.ChannelHandler
-import com.intellij.openapi.util.AsyncResult
import io.netty.util.CharsetUtil
import org.jetbrains.io.ChannelExceptionHandler
import org.jetbrains.io.NettyUtil
import org.junit.Rule
import org.junit.Test
import org.jetbrains.io.MessageDecoder
+import org.jetbrains.concurrency.AsyncPromise
+import org.jetbrains.concurrency.Promise
+import com.intellij.util.concurrency.Semaphore
+// we don't handle String in efficient way - because we want to test readContent/readChars also
public class BinaryRequestHandlerTest {
private val fixtureManager = FixtureRule()
Test
public fun test() {
val text = "Hello!"
- val result = AsyncResult<String>()
+ val result = AsyncPromise<String>()
val bootstrap = NettyUtil.oioClientBootstrap().handler(object : ChannelInitializer<Channel>() {
override fun initChannel(channel: Channel) {
val requiredLength = 4 + text.length()
val response = readContent(input, context, requiredLength) {(buffer, context, isCumulateBuffer) -> buffer.toString(buffer.readerIndex(), requiredLength, CharsetUtil.UTF_8) }
if (response != null) {
- result.setDone(response)
+ result.setResult(response)
}
}
}, ChannelExceptionHandler.getInstance())
val message = Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)
buffer.writeShort(message.readableBytes())
-
channel.write(buffer)
channel.writeAndFlush(message).syncUninterruptibly()
try {
- result.doWhenRejected(object : Consumer<String> {
- override fun consume(error: String) {
- TestCase.fail(error)
+ result.rejected(object : Consumer<Throwable> {
+ override fun consume(error: Throwable) {
+ TestCase.fail(error.getMessage())
}
})
- TestCase.assertEquals("got-" + text, result.getResultSync(5000))
+ if (result.getState() == Promise.State.PENDING) {
+ val semaphore = Semaphore()
+ semaphore.down()
+ result.processed { semaphore.up() }
+ if (!semaphore.waitForUnsafe(5000)) {
+ TestCase.fail("Time limit exceeded")
+ return
+ }
+ }
+
+ TestCase.assertEquals("got-" + text, result.get())
}
finally {
channel.close()
@Nullable
protected final CharSequence readChars(@NotNull ByteBuf input) throws CharacterCodingException {
- if (!input.isReadable()) {
+ int readableBytes = input.readableBytes();
+ if (readableBytes == 0) {
+ input.release();
return null;
}
int required = contentLength - consumedContentByteCount;
- if (input.readableBytes() < required) {
+ if (readableBytes < required) {
if (chunkedContent == null) {
chunkedContent = CharBuffer.allocate((int)((float)contentLength * charsetDecoder.maxCharsPerByte()));
}
- int count = input.readableBytes();
- ChannelBufferToString.readIntoCharBuffer(charsetDecoder, input, count, chunkedContent);
- consumedContentByteCount += count;
+ ChannelBufferToString.readIntoCharBuffer(charsetDecoder, input, readableBytes, chunkedContent);
+ consumedContentByteCount += readableBytes;
+ input.release();
return null;
}
else {