WEB-21991 New Node.js debug protocol incompatibilities
authorVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Fri, 10 Jun 2016 13:13:22 +0000 (15:13 +0200)
committerVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Fri, 10 Jun 2016 13:15:46 +0000 (15:15 +0200)
13 files changed:
java/compiler/impl/src/com/intellij/compiler/server/BuildMessageDispatcher.java
jps/jps-builders/src/org/jetbrains/jps/javac/ExternalJavacManager.java
lib/annotations/netty/io/netty/channel/annotations.xml
platform/built-in-server/src/org/jetbrains/ide/BuiltInServerManagerImpl.java
platform/built-in-server/src/org/jetbrains/io/jsonRpc/ClientManager.kt
platform/built-in-server/src/org/jetbrains/io/jsonRpc/socket/RpcBinaryRequestHandler.java
platform/built-in-server/src/org/jetbrains/io/webSocket/MessageChannelHandler.java
platform/built-in-server/src/org/jetbrains/io/webSocket/WebSocketHandshakeHandler.java
platform/built-in-server/src/org/jetbrains/io/webSocket/WebSocketProtocolHandler.kt [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/DelegatingHttpRequestHandler.kt
platform/platform-impl/src/org/jetbrains/io/NettyUtil.java
platform/platform-impl/src/org/jetbrains/io/netty.kt
platform/script-debugger/debugger-ui/src/RemoteVmConnection.kt

index 096a113cf7f9b33dabbd19dc5b7a9d2cb0f2fed8..2624cde1af17b182b5c293e98c66108e8b110922 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2000-2014 JetBrains s.r.o.
+ * Copyright 2000-2016 JetBrains s.r.o.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -123,7 +123,7 @@ class BuildMessageDispatcher extends SimpleChannelInboundHandlerAdapter<CmdlineR
   
   @Override
   protected void messageReceived(ChannelHandlerContext context, CmdlineRemoteProto.Message message) throws Exception {
-    SessionData sessionData = context.attr(SESSION_DATA).get();
+    SessionData sessionData = context.channel().attr(SESSION_DATA).get();
 
     UUID sessionId;
     if (sessionData == null) {
@@ -134,7 +134,7 @@ class BuildMessageDispatcher extends SimpleChannelInboundHandlerAdapter<CmdlineR
       sessionData = mySessionDescriptors.get(sessionId);
       if (sessionData != null) {
         sessionData.channel = context.channel();
-        context.attr(SESSION_DATA).set(sessionData);
+        context.channel().attr(SESSION_DATA).set(sessionData);
       }
       if (myCanceledSessions.contains(sessionId)) {
         context.channel().writeAndFlush(CmdlineProtoUtil.toMessage(sessionId, CmdlineProtoUtil.createCancelCommand()));
@@ -199,7 +199,7 @@ class BuildMessageDispatcher extends SimpleChannelInboundHandlerAdapter<CmdlineR
       super.channelInactive(context);
     }
     finally {
-      final SessionData sessionData = context.attr(SESSION_DATA).get();
+      final SessionData sessionData = context.channel().attr(SESSION_DATA).get();
       if (sessionData != null) {
         final BuilderMessageHandler handler = unregisterBuildMessageHandler(sessionData.sessionId);
         if (handler != null) {
index bed7f6bf41a3bda6662c914af034c6199779db0b..bcb413016a94a851851a9d819dd95d827e67cf18 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2000-2015 JetBrains s.r.o.
+ * Copyright 2000-2016 JetBrains s.r.o.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -326,7 +326,7 @@ public class ExternalJavacManager {
   private class CompilationRequestsHandler extends SimpleChannelInboundHandler<JavacRemoteProto.Message> {
     @Override
     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-      JavacProcessDescriptor descriptor = ctx.attr(SESSION_DESCRIPTOR).get();
+      JavacProcessDescriptor descriptor = ctx.channel().attr(SESSION_DESCRIPTOR).get();
       if (descriptor != null) {
         descriptor.setDone();
       }
@@ -335,7 +335,7 @@ public class ExternalJavacManager {
 
     @Override
     public void channelRead0(final ChannelHandlerContext context, JavacRemoteProto.Message message) throws Exception {
-      JavacProcessDescriptor descriptor = context.attr(SESSION_DESCRIPTOR).get();
+      JavacProcessDescriptor descriptor = context.channel().attr(SESSION_DESCRIPTOR).get();
   
       UUID sessionId;
       if (descriptor == null) {
@@ -345,7 +345,7 @@ public class ExternalJavacManager {
         descriptor = myMessageHandlers.get(sessionId);
         if (descriptor != null) {
           descriptor.channel = context.channel();
-          context.attr(SESSION_DESCRIPTOR).set(descriptor);
+          context.channel().attr(SESSION_DESCRIPTOR).set(descriptor);
         }
       }
       else {
index 13f5c446cf4a545dd4483e931d123fbd013c44b4..121521d60f9d7eaa3dfd9e004f33f98e46d94ca2 100644 (file)
@@ -5,6 +5,12 @@
   <item name='io.netty.channel.ChannelInboundHandlerAdapter void channelInactive(io.netty.channel.ChannelHandlerContext) 0'>
     <annotation name='org.jetbrains.annotations.NotNull'/>
   </item>
+  <item name='io.netty.channel.ChannelInboundHandlerAdapter void channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) 0'>
+    <annotation name='org.jetbrains.annotations.NotNull'/>
+  </item>
+  <item name='io.netty.channel.ChannelInboundHandlerAdapter void channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) 1'>
+    <annotation name='org.jetbrains.annotations.NotNull'/>
+  </item>
   <item name='io.netty.channel.ChannelInitializer void channelRegistered(io.netty.channel.ChannelHandlerContext) 0'>
     <annotation name='org.jetbrains.annotations.NotNull'/>
   </item>
index 6030acfc5e5088994393d8ea3b92e788ebf27ec6..a080eb51214132d86510b2f276ee0562b56cf285 100644 (file)
@@ -14,6 +14,7 @@ import com.intellij.openapi.util.text.StringUtil;
 import com.intellij.util.Url;
 import com.intellij.util.UrlImpl;
 import com.intellij.util.net.NetUtils;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import org.jetbrains.annotations.NonNls;
 import org.jetbrains.annotations.NotNull;
@@ -123,6 +124,13 @@ public class BuiltInServerManagerImpl extends BuiltInServerManager {
     return server;
   }
 
+  @NotNull
+  public EventLoopGroup getEventLoopGroup() {
+    waitForStart();
+    assert server != null;
+    return server.getEventLoopGroup();
+  }
+
   @Override
   public boolean isOnBuiltInWebServer(@Nullable Url url) {
     return url != null && !StringUtil.isEmpty(url.getAuthority()) && isOnBuiltInWebServerByAuthority(url.getAuthority());
index 4eb035b48410c6785f27c533d55ab691a0256959..10561b9cbec880234a9efac50b4b3ecd278c2edb 100644 (file)
@@ -5,7 +5,7 @@ import com.intellij.openapi.util.SimpleTimer
 import gnu.trove.THashSet
 import gnu.trove.TObjectProcedure
 import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.Channel
 import io.netty.util.AttributeKey
 import org.jetbrains.concurrency.Promise
 import org.jetbrains.io.webSocket.WebSocketServerOptions
@@ -64,7 +64,7 @@ class ClientManager(private val listener: ClientListener?, val exceptionHandler:
     })
   }
 
-  fun disconnectClient(context: ChannelHandlerContext, client: Client, closeChannel: Boolean): Boolean {
+  fun disconnectClient(channel: Channel, client: Client, closeChannel: Boolean): Boolean {
     synchronized (clients) {
       if (!clients.remove(client)) {
         return false
@@ -72,10 +72,10 @@ class ClientManager(private val listener: ClientListener?, val exceptionHandler:
     }
 
     try {
-      context.attr(CLIENT).remove()
+      channel.attr(CLIENT).remove()
 
       if (closeChannel) {
-        context.channel().close()
+        channel.close()
       }
 
       client.rejectAsyncResults(exceptionHandler)
index 8f0b44c7cfcd52ddf398861e53673fb601916184..5dfc7511143400a00ffa37f8923674aedb9c3f9a 100644 (file)
@@ -58,7 +58,7 @@ public class RpcBinaryRequestHandler extends BinaryRequestHandler implements Exc
   @Override
   public ChannelHandler getInboundHandler(@NotNull ChannelHandlerContext context) {
     SocketClient client = new SocketClient(context.channel());
-    context.attr(ClientManagerKt.getCLIENT()).set(client);
+    context.channel().attr(ClientManagerKt.getCLIENT()).set(client);
     clientManager.getValue().addClient(client);
     connected(client, null);
     return new MyDecoder(client);
@@ -127,10 +127,10 @@ public class RpcBinaryRequestHandler extends BinaryRequestHandler implements Exc
 
     @Override
     public void channelInactive(ChannelHandlerContext context) throws Exception {
-      Client client = context.attr(ClientManagerKt.getCLIENT()).get();
+      Client client = context.channel().attr(ClientManagerKt.getCLIENT()).get();
       // if null, so, has already been explicitly removed
       if (client != null) {
-        clientManager.getValue().disconnectClient(context, client, false);
+        clientManager.getValue().disconnectClient(context.channel(), client, false);
       }
     }
   }
index 1cb37b07287a86ce526ffd49a610fa27467db61b..8a94ed4807b36d7a3d90df594995c5f512fa1592 100644 (file)
@@ -1,18 +1,19 @@
 package org.jetbrains.io.webSocket;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.websocketx.*;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.io.ChannelBufferToString;
-import org.jetbrains.io.SimpleChannelInboundHandlerAdapter;
 import org.jetbrains.io.jsonRpc.Client;
 import org.jetbrains.io.jsonRpc.ClientManager;
 import org.jetbrains.io.jsonRpc.ClientManagerKt;
 import org.jetbrains.io.jsonRpc.MessageServer;
 
 @ChannelHandler.Sharable
-final class MessageChannelHandler extends SimpleChannelInboundHandlerAdapter<WebSocketFrame> {
+final class MessageChannelHandler extends WebSocketProtocolHandler {
   private final ClientManager clientManager;
   private final MessageServer messageServer;
 
@@ -22,51 +23,62 @@ final class MessageChannelHandler extends SimpleChannelInboundHandlerAdapter<Web
   }
 
   @Override
-  protected void messageReceived(ChannelHandlerContext context, WebSocketFrame message) throws Exception {
-    WebSocketClient client = (WebSocketClient)context.attr(ClientManagerKt.getCLIENT()).get();
-    if (message instanceof CloseWebSocketFrame) {
-      if (client != null) {
-        try {
-          clientManager.disconnectClient(context, client, false);
-        }
-        finally {
-          message.retain();
-          client.disconnect((CloseWebSocketFrame)message);
-        }
+  protected void closeFrameReceived(@NotNull Channel channel, @NotNull CloseWebSocketFrame message) {
+    WebSocketClient client = (WebSocketClient)channel.attr(ClientManagerKt.getCLIENT()).get();
+    if (client == null) {
+      super.closeFrameReceived(channel, message);
+    }
+    else {
+      try {
+        clientManager.disconnectClient(channel, client, false);
+      }
+      finally {
+        client.disconnect(message);
       }
     }
-    else if (message instanceof PingWebSocketFrame) {
-      context.channel().writeAndFlush(new PongWebSocketFrame(message.content()));
+  }
+
+  @Override
+  protected void textFrameReceived(@NotNull Channel channel, @NotNull TextWebSocketFrame message) {
+    WebSocketClient client = (WebSocketClient)channel.attr(ClientManagerKt.getCLIENT()).get();
+    CharSequence chars;
+    try {
+      chars = ChannelBufferToString.readChars(message.content());
     }
-    else if (message instanceof TextWebSocketFrame) {
+    catch (Throwable e) {
       try {
-        messageServer.messageReceived(client, ChannelBufferToString.readChars(message.content()));
+        message.release();
       }
-      catch (Throwable e) {
+      finally {
         clientManager.getExceptionHandler().exceptionCaught(e);
       }
+      return;
+    }
+
+    try {
+      messageServer.messageReceived(client, chars);
     }
-    else if (!(message instanceof PongWebSocketFrame)) {
-      throw new UnsupportedOperationException(message.getClass().getName() + " frame types not supported");
+    catch (Throwable e) {
+      clientManager.getExceptionHandler().exceptionCaught(e);
     }
   }
 
   @Override
   public void channelInactive(ChannelHandlerContext context) throws Exception {
-    Client client = context.attr(ClientManagerKt.getCLIENT()).get();
+    Client client = context.channel().attr(ClientManagerKt.getCLIENT()).get();
     // if null, so, has already been explicitly removed
     if (client != null) {
-      clientManager.disconnectClient(context, client, false);
+      clientManager.disconnectClient(context.channel(), client, false);
     }
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+  public void exceptionCaught(@NotNull ChannelHandlerContext context, @NotNull Throwable cause) {
     try {
       clientManager.getExceptionHandler().exceptionCaught(cause);
     }
     finally {
-      context.channel().close();
+      super.exceptionCaught(context, cause);
     }
   }
 }
index b7837792f5803c36a06a80279df1182cbc5b3c40..ecffa775317775bcf1312300d42fa00679c07744 100644 (file)
@@ -75,7 +75,7 @@ public abstract class WebSocketHandshakeHandler extends HttpRequestHandler imple
     }
 
     final Client client = new WebSocketClient(context.channel(), handshaker);
-    context.attr(ClientManagerKt.getCLIENT()).set(client);
+    context.channel().attr(ClientManagerKt.getCLIENT()).set(client);
     handshaker.handshake(context.channel(), request).addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
@@ -86,7 +86,7 @@ public abstract class WebSocketHandshakeHandler extends HttpRequestHandler imple
           BuiltInServer.replaceDefaultHandler(context, messageChannelHandler);
           ChannelHandlerContext messageChannelHandlerContext = context.pipeline().context(messageChannelHandler);
           context.pipeline().addBefore(messageChannelHandlerContext.name(), "webSocketFrameAggregator", new WebSocketFrameAggregator(NettyUtil.MAX_CONTENT_LENGTH));
-          messageChannelHandlerContext.attr(ClientManagerKt.getCLIENT()).set(client);
+          messageChannelHandlerContext.channel().attr(ClientManagerKt.getCLIENT()).set(client);
           connected(client, uriDecoder.parameters());
         }
       }
diff --git a/platform/built-in-server/src/org/jetbrains/io/webSocket/WebSocketProtocolHandler.kt b/platform/built-in-server/src/org/jetbrains/io/webSocket/WebSocketProtocolHandler.kt
new file mode 100644 (file)
index 0000000..035437d
--- /dev/null
@@ -0,0 +1,67 @@
+package org.jetbrains.io.webSocket
+package org.jetbrains.io.webSocket
+
+import io.netty.channel.Channel
+import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.ChannelInboundHandlerAdapter
+import io.netty.handler.codec.http.FullHttpResponse
+import io.netty.handler.codec.http.websocketx.*
+import io.netty.util.CharsetUtil
+import io.netty.util.ReferenceCountUtil
+import org.jetbrains.builtInWebServer.LOG
+import org.jetbrains.io.NettyUtil
+
+abstract class WebSocketProtocolHandler : ChannelInboundHandlerAdapter() {
+  override final fun channelRead(context: ChannelHandlerContext, message: Any) {
+    // Pong frames need to get ignored
+    when (message) {
+      !is WebSocketFrame, is PongWebSocketFrame -> ReferenceCountUtil.release(message)
+      is PingWebSocketFrame -> context.channel().writeAndFlush(PongWebSocketFrame(message.content()))
+      is CloseWebSocketFrame -> closeFrameReceived(context.channel(), message)
+      is TextWebSocketFrame -> {
+        try {
+          textFrameReceived(context.channel(), message)
+        }
+        finally {
+          // client should release buffer as soon as possible, so, message could be released already
+          if (message.refCnt() > 0) {
+            message.release()
+          }
+        }
+      }
+      else -> throw UnsupportedOperationException("${message.javaClass.name} frame types not supported")
+    }
+  }
+
+  abstract protected fun textFrameReceived(channel: Channel, message: TextWebSocketFrame)
+
+  protected open fun closeFrameReceived(channel: Channel, message: CloseWebSocketFrame) {
+    channel.close()
+  }
+
+  @Suppress("OverridingDeprecatedMember")
+  override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
+    NettyUtil.logAndClose(cause, LOG, context.channel())
+  }
+}
+
+open class WebSocketProtocolHandshakeHandler(private val handshaker: WebSocketClientHandshaker) : ChannelInboundHandlerAdapter() {
+  override final fun channelRead(context: ChannelHandlerContext, message: Any) {
+    val channel = context.channel()
+    if (!handshaker.isHandshakeComplete) {
+      handshaker.finishHandshake(channel, message as FullHttpResponse)
+      context.pipeline().remove(this)
+      completed()
+      return
+    }
+
+    if (message is FullHttpResponse) {
+      throw IllegalStateException("Unexpected FullHttpResponse (getStatus=${message.status()}, content=${message.content().toString(CharsetUtil.UTF_8)})")
+    }
+
+    context.fireChannelRead(message)
+  }
+
+  open protected fun completed() {
+  }
+}
\ No newline at end of file
index 806d3e6a351b790f5a4a7c7aeab652b779fc539b..ee17b364b0d761eb3d4b1aff86b79764e5f55e47 100644 (file)
@@ -41,7 +41,7 @@ internal class DelegatingHttpRequestHandler : DelegatingHttpRequestHandlerBase()
       return isSupported(request) && !request.isWriteFromBrowserWithoutOrigin() && isAccessible(request) && process(urlDecoder, request, context)
     }
 
-    val prevHandlerAttribute = context.attr(PREV_HANDLER)
+    val prevHandlerAttribute = context.channel().attr(PREV_HANDLER)
     val connectedHandler = prevHandlerAttribute.get()
     if (connectedHandler != null) {
       if (connectedHandler.checkAndProcess()) {
@@ -79,11 +79,13 @@ internal class DelegatingHttpRequestHandler : DelegatingHttpRequestHandlerBase()
     return false
   }
 
+  @Suppress("OverridingDeprecatedMember")
   override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
     try {
-      context.attr(PREV_HANDLER).remove()
+      context.channel().attr(PREV_HANDLER).remove()
     }
     finally {
+      @Suppress("DEPRECATION")
       super.exceptionCaught(context, cause)
     }
   }
index 33bd2693a253254d1787846f4681c853ae6c57a5..7b779dfbc95a7f04a1b565f5ae24fe7b4e600374 100644 (file)
@@ -85,7 +85,7 @@ public final class NettyUtil {
                            int maxAttemptCount,
                            @NotNull Condition<Void> stopCondition) throws Throwable {
     int attemptCount = 0;
-    if (bootstrap.group() instanceof NioEventLoopGroup) {
+    if (bootstrap.config().group() instanceof NioEventLoopGroup) {
       return connectNio(bootstrap, remoteAddress, promise, maxAttemptCount, stopCondition, attemptCount);
     }
 
index 044302fdb57803d822326e9005784b30270f3bcc..653910718ef29347697cbeb505b7ff93c560905a 100644 (file)
@@ -68,7 +68,7 @@ fun oioClientBootstrap(): Bootstrap {
 }
 
 inline fun ChannelFuture.addChannelListener(crossinline listener: (future: ChannelFuture) -> Unit) {
-  addListener(GenericFutureListener<io.netty.channel.ChannelFuture> { listener(it) })
+  addListener(GenericFutureListener<ChannelFuture> { listener(it) })
 }
 
 // if NIO, so, it is shared and we must not shutdown it
index cfd13cf1a3b7ef9b9143d14af2d8ff643c6f8d60..43f91b442d6e10db2323734f0d1ff384a17f105f 100644 (file)
@@ -23,13 +23,14 @@ import com.intellij.ui.ColoredListCellRenderer
 import com.intellij.ui.components.JBList
 import com.intellij.util.io.socketConnection.ConnectionStatus
 import io.netty.bootstrap.Bootstrap
+import io.netty.channel.ChannelFuture
+import io.netty.util.concurrent.GenericFutureListener
 import org.jetbrains.concurrency.AsyncPromise
 import org.jetbrains.concurrency.Promise
 import org.jetbrains.concurrency.rejectedPromise
 import org.jetbrains.concurrency.resolvedPromise
 import org.jetbrains.debugger.Vm
 import org.jetbrains.io.NettyUtil
-import org.jetbrains.io.addChannelListener
 import org.jetbrains.io.connect
 import org.jetbrains.rpc.LOG
 import java.net.ConnectException
@@ -43,12 +44,16 @@ abstract class RemoteVmConnection : VmConnection<Vm>() {
 
   private val connectCancelHandler = AtomicReference<() -> Unit>()
 
+  protected val channelCloseListener = GenericFutureListener<ChannelFuture> {
+    close("Process disconnected unexpectedly", ConnectionStatus.DISCONNECTED)
+  }
+
   abstract fun createBootstrap(address: InetSocketAddress, vmResult: AsyncPromise<Vm>): Bootstrap
 
   @JvmOverloads
   fun open(address: InetSocketAddress, stopCondition: Condition<Void>? = null): Promise<Vm> {
     port = address.port
-    isLocalAddress = address.getAddress().isAnyLocalAddress() || address.getAddress().isLoopbackAddress()
+    isLocalAddress = address.address.isAnyLocalAddress || address.address.isLoopbackAddress
     setState(ConnectionStatus.WAITING_FOR_CONNECTION, "Connecting to ${address.hostName}:${port}")
     val result = AsyncPromise<Vm>()
     val future = ApplicationManager.getApplication().executeOnPooledThread {
@@ -77,11 +82,7 @@ abstract class RemoteVmConnection : VmConnection<Vm>() {
 
       createBootstrap(address, result)
           .connect(address, connectionPromise, maxAttemptCount = if (stopCondition == null) NettyUtil.DEFAULT_CONNECT_ATTEMPT_COUNT else -1, stopCondition = stopCondition)
-          ?.let {
-            it.closeFuture().addChannelListener {
-              close("Process disconnected unexpectedly", ConnectionStatus.DISCONNECTED)
-            }
-          }
+          ?.let { it.closeFuture().addListener(channelCloseListener) }
     }
 
     connectCancelHandler.set {