/*
- * Copyright 2000-2014 JetBrains s.r.o.
+ * Copyright 2000-2016 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@Override
protected void messageReceived(ChannelHandlerContext context, CmdlineRemoteProto.Message message) throws Exception {
- SessionData sessionData = context.attr(SESSION_DATA).get();
+ SessionData sessionData = context.channel().attr(SESSION_DATA).get();
UUID sessionId;
if (sessionData == null) {
sessionData = mySessionDescriptors.get(sessionId);
if (sessionData != null) {
sessionData.channel = context.channel();
- context.attr(SESSION_DATA).set(sessionData);
+ context.channel().attr(SESSION_DATA).set(sessionData);
}
if (myCanceledSessions.contains(sessionId)) {
context.channel().writeAndFlush(CmdlineProtoUtil.toMessage(sessionId, CmdlineProtoUtil.createCancelCommand()));
super.channelInactive(context);
}
finally {
- final SessionData sessionData = context.attr(SESSION_DATA).get();
+ final SessionData sessionData = context.channel().attr(SESSION_DATA).get();
if (sessionData != null) {
final BuilderMessageHandler handler = unregisterBuildMessageHandler(sessionData.sessionId);
if (handler != null) {
/*
- * Copyright 2000-2015 JetBrains s.r.o.
+ * Copyright 2000-2016 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
private class CompilationRequestsHandler extends SimpleChannelInboundHandler<JavacRemoteProto.Message> {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- JavacProcessDescriptor descriptor = ctx.attr(SESSION_DESCRIPTOR).get();
+ JavacProcessDescriptor descriptor = ctx.channel().attr(SESSION_DESCRIPTOR).get();
if (descriptor != null) {
descriptor.setDone();
}
@Override
public void channelRead0(final ChannelHandlerContext context, JavacRemoteProto.Message message) throws Exception {
- JavacProcessDescriptor descriptor = context.attr(SESSION_DESCRIPTOR).get();
+ JavacProcessDescriptor descriptor = context.channel().attr(SESSION_DESCRIPTOR).get();
UUID sessionId;
if (descriptor == null) {
descriptor = myMessageHandlers.get(sessionId);
if (descriptor != null) {
descriptor.channel = context.channel();
- context.attr(SESSION_DESCRIPTOR).set(descriptor);
+ context.channel().attr(SESSION_DESCRIPTOR).set(descriptor);
}
}
else {
<item name='io.netty.channel.ChannelInboundHandlerAdapter void channelInactive(io.netty.channel.ChannelHandlerContext) 0'>
<annotation name='org.jetbrains.annotations.NotNull'/>
</item>
+ <item name='io.netty.channel.ChannelInboundHandlerAdapter void channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) 0'>
+ <annotation name='org.jetbrains.annotations.NotNull'/>
+ </item>
+ <item name='io.netty.channel.ChannelInboundHandlerAdapter void channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) 1'>
+ <annotation name='org.jetbrains.annotations.NotNull'/>
+ </item>
<item name='io.netty.channel.ChannelInitializer void channelRegistered(io.netty.channel.ChannelHandlerContext) 0'>
<annotation name='org.jetbrains.annotations.NotNull'/>
</item>
import com.intellij.util.Url;
import com.intellij.util.UrlImpl;
import com.intellij.util.net.NetUtils;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import org.jetbrains.annotations.NonNls;
import org.jetbrains.annotations.NotNull;
return server;
}
+ @NotNull
+ public EventLoopGroup getEventLoopGroup() {
+ waitForStart();
+ assert server != null;
+ return server.getEventLoopGroup();
+ }
+
@Override
public boolean isOnBuiltInWebServer(@Nullable Url url) {
return url != null && !StringUtil.isEmpty(url.getAuthority()) && isOnBuiltInWebServerByAuthority(url.getAuthority());
import gnu.trove.THashSet
import gnu.trove.TObjectProcedure
import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.Channel
import io.netty.util.AttributeKey
import org.jetbrains.concurrency.Promise
import org.jetbrains.io.webSocket.WebSocketServerOptions
})
}
- fun disconnectClient(context: ChannelHandlerContext, client: Client, closeChannel: Boolean): Boolean {
+ fun disconnectClient(channel: Channel, client: Client, closeChannel: Boolean): Boolean {
synchronized (clients) {
if (!clients.remove(client)) {
return false
}
try {
- context.attr(CLIENT).remove()
+ channel.attr(CLIENT).remove()
if (closeChannel) {
- context.channel().close()
+ channel.close()
}
client.rejectAsyncResults(exceptionHandler)
@Override
public ChannelHandler getInboundHandler(@NotNull ChannelHandlerContext context) {
SocketClient client = new SocketClient(context.channel());
- context.attr(ClientManagerKt.getCLIENT()).set(client);
+ context.channel().attr(ClientManagerKt.getCLIENT()).set(client);
clientManager.getValue().addClient(client);
connected(client, null);
return new MyDecoder(client);
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
- Client client = context.attr(ClientManagerKt.getCLIENT()).get();
+ Client client = context.channel().attr(ClientManagerKt.getCLIENT()).get();
// if null, so, has already been explicitly removed
if (client != null) {
- clientManager.getValue().disconnectClient(context, client, false);
+ clientManager.getValue().disconnectClient(context.channel(), client, false);
}
}
}
package org.jetbrains.io.webSocket;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.websocketx.*;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.io.ChannelBufferToString;
-import org.jetbrains.io.SimpleChannelInboundHandlerAdapter;
import org.jetbrains.io.jsonRpc.Client;
import org.jetbrains.io.jsonRpc.ClientManager;
import org.jetbrains.io.jsonRpc.ClientManagerKt;
import org.jetbrains.io.jsonRpc.MessageServer;
@ChannelHandler.Sharable
-final class MessageChannelHandler extends SimpleChannelInboundHandlerAdapter<WebSocketFrame> {
+final class MessageChannelHandler extends WebSocketProtocolHandler {
private final ClientManager clientManager;
private final MessageServer messageServer;
}
@Override
- protected void messageReceived(ChannelHandlerContext context, WebSocketFrame message) throws Exception {
- WebSocketClient client = (WebSocketClient)context.attr(ClientManagerKt.getCLIENT()).get();
- if (message instanceof CloseWebSocketFrame) {
- if (client != null) {
- try {
- clientManager.disconnectClient(context, client, false);
- }
- finally {
- message.retain();
- client.disconnect((CloseWebSocketFrame)message);
- }
+ protected void closeFrameReceived(@NotNull Channel channel, @NotNull CloseWebSocketFrame message) {
+ WebSocketClient client = (WebSocketClient)channel.attr(ClientManagerKt.getCLIENT()).get();
+ if (client == null) {
+ super.closeFrameReceived(channel, message);
+ }
+ else {
+ try {
+ clientManager.disconnectClient(channel, client, false);
+ }
+ finally {
+ client.disconnect(message);
}
}
- else if (message instanceof PingWebSocketFrame) {
- context.channel().writeAndFlush(new PongWebSocketFrame(message.content()));
+ }
+
+ @Override
+ protected void textFrameReceived(@NotNull Channel channel, @NotNull TextWebSocketFrame message) {
+ WebSocketClient client = (WebSocketClient)channel.attr(ClientManagerKt.getCLIENT()).get();
+ CharSequence chars;
+ try {
+ chars = ChannelBufferToString.readChars(message.content());
}
- else if (message instanceof TextWebSocketFrame) {
+ catch (Throwable e) {
try {
- messageServer.messageReceived(client, ChannelBufferToString.readChars(message.content()));
+ message.release();
}
- catch (Throwable e) {
+ finally {
clientManager.getExceptionHandler().exceptionCaught(e);
}
+ return;
+ }
+
+ try {
+ messageServer.messageReceived(client, chars);
}
- else if (!(message instanceof PongWebSocketFrame)) {
- throw new UnsupportedOperationException(message.getClass().getName() + " frame types not supported");
+ catch (Throwable e) {
+ clientManager.getExceptionHandler().exceptionCaught(e);
}
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
- Client client = context.attr(ClientManagerKt.getCLIENT()).get();
+ Client client = context.channel().attr(ClientManagerKt.getCLIENT()).get();
// if null, so, has already been explicitly removed
if (client != null) {
- clientManager.disconnectClient(context, client, false);
+ clientManager.disconnectClient(context.channel(), client, false);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ public void exceptionCaught(@NotNull ChannelHandlerContext context, @NotNull Throwable cause) {
try {
clientManager.getExceptionHandler().exceptionCaught(cause);
}
finally {
- context.channel().close();
+ super.exceptionCaught(context, cause);
}
}
}
}
final Client client = new WebSocketClient(context.channel(), handshaker);
- context.attr(ClientManagerKt.getCLIENT()).set(client);
+ context.channel().attr(ClientManagerKt.getCLIENT()).set(client);
handshaker.handshake(context.channel(), request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
BuiltInServer.replaceDefaultHandler(context, messageChannelHandler);
ChannelHandlerContext messageChannelHandlerContext = context.pipeline().context(messageChannelHandler);
context.pipeline().addBefore(messageChannelHandlerContext.name(), "webSocketFrameAggregator", new WebSocketFrameAggregator(NettyUtil.MAX_CONTENT_LENGTH));
- messageChannelHandlerContext.attr(ClientManagerKt.getCLIENT()).set(client);
+ messageChannelHandlerContext.channel().attr(ClientManagerKt.getCLIENT()).set(client);
connected(client, uriDecoder.parameters());
}
}
--- /dev/null
+package org.jetbrains.io.webSocket
+package org.jetbrains.io.webSocket
+
+import io.netty.channel.Channel
+import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.ChannelInboundHandlerAdapter
+import io.netty.handler.codec.http.FullHttpResponse
+import io.netty.handler.codec.http.websocketx.*
+import io.netty.util.CharsetUtil
+import io.netty.util.ReferenceCountUtil
+import org.jetbrains.builtInWebServer.LOG
+import org.jetbrains.io.NettyUtil
+
+abstract class WebSocketProtocolHandler : ChannelInboundHandlerAdapter() {
+ override final fun channelRead(context: ChannelHandlerContext, message: Any) {
+ // Pong frames need to get ignored
+ when (message) {
+ !is WebSocketFrame, is PongWebSocketFrame -> ReferenceCountUtil.release(message)
+ is PingWebSocketFrame -> context.channel().writeAndFlush(PongWebSocketFrame(message.content()))
+ is CloseWebSocketFrame -> closeFrameReceived(context.channel(), message)
+ is TextWebSocketFrame -> {
+ try {
+ textFrameReceived(context.channel(), message)
+ }
+ finally {
+ // client should release buffer as soon as possible, so, message could be released already
+ if (message.refCnt() > 0) {
+ message.release()
+ }
+ }
+ }
+ else -> throw UnsupportedOperationException("${message.javaClass.name} frame types not supported")
+ }
+ }
+
+ abstract protected fun textFrameReceived(channel: Channel, message: TextWebSocketFrame)
+
+ protected open fun closeFrameReceived(channel: Channel, message: CloseWebSocketFrame) {
+ channel.close()
+ }
+
+ @Suppress("OverridingDeprecatedMember")
+ override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
+ NettyUtil.logAndClose(cause, LOG, context.channel())
+ }
+}
+
+open class WebSocketProtocolHandshakeHandler(private val handshaker: WebSocketClientHandshaker) : ChannelInboundHandlerAdapter() {
+ override final fun channelRead(context: ChannelHandlerContext, message: Any) {
+ val channel = context.channel()
+ if (!handshaker.isHandshakeComplete) {
+ handshaker.finishHandshake(channel, message as FullHttpResponse)
+ context.pipeline().remove(this)
+ completed()
+ return
+ }
+
+ if (message is FullHttpResponse) {
+ throw IllegalStateException("Unexpected FullHttpResponse (getStatus=${message.status()}, content=${message.content().toString(CharsetUtil.UTF_8)})")
+ }
+
+ context.fireChannelRead(message)
+ }
+
+ open protected fun completed() {
+ }
+}
\ No newline at end of file
return isSupported(request) && !request.isWriteFromBrowserWithoutOrigin() && isAccessible(request) && process(urlDecoder, request, context)
}
- val prevHandlerAttribute = context.attr(PREV_HANDLER)
+ val prevHandlerAttribute = context.channel().attr(PREV_HANDLER)
val connectedHandler = prevHandlerAttribute.get()
if (connectedHandler != null) {
if (connectedHandler.checkAndProcess()) {
return false
}
+ @Suppress("OverridingDeprecatedMember")
override fun exceptionCaught(context: ChannelHandlerContext, cause: Throwable) {
try {
- context.attr(PREV_HANDLER).remove()
+ context.channel().attr(PREV_HANDLER).remove()
}
finally {
+ @Suppress("DEPRECATION")
super.exceptionCaught(context, cause)
}
}
int maxAttemptCount,
@NotNull Condition<Void> stopCondition) throws Throwable {
int attemptCount = 0;
- if (bootstrap.group() instanceof NioEventLoopGroup) {
+ if (bootstrap.config().group() instanceof NioEventLoopGroup) {
return connectNio(bootstrap, remoteAddress, promise, maxAttemptCount, stopCondition, attemptCount);
}
}
inline fun ChannelFuture.addChannelListener(crossinline listener: (future: ChannelFuture) -> Unit) {
- addListener(GenericFutureListener<io.netty.channel.ChannelFuture> { listener(it) })
+ addListener(GenericFutureListener<ChannelFuture> { listener(it) })
}
// if NIO, so, it is shared and we must not shutdown it
import com.intellij.ui.components.JBList
import com.intellij.util.io.socketConnection.ConnectionStatus
import io.netty.bootstrap.Bootstrap
+import io.netty.channel.ChannelFuture
+import io.netty.util.concurrent.GenericFutureListener
import org.jetbrains.concurrency.AsyncPromise
import org.jetbrains.concurrency.Promise
import org.jetbrains.concurrency.rejectedPromise
import org.jetbrains.concurrency.resolvedPromise
import org.jetbrains.debugger.Vm
import org.jetbrains.io.NettyUtil
-import org.jetbrains.io.addChannelListener
import org.jetbrains.io.connect
import org.jetbrains.rpc.LOG
import java.net.ConnectException
private val connectCancelHandler = AtomicReference<() -> Unit>()
+ protected val channelCloseListener = GenericFutureListener<ChannelFuture> {
+ close("Process disconnected unexpectedly", ConnectionStatus.DISCONNECTED)
+ }
+
abstract fun createBootstrap(address: InetSocketAddress, vmResult: AsyncPromise<Vm>): Bootstrap
@JvmOverloads
fun open(address: InetSocketAddress, stopCondition: Condition<Void>? = null): Promise<Vm> {
port = address.port
- isLocalAddress = address.getAddress().isAnyLocalAddress() || address.getAddress().isLoopbackAddress()
+ isLocalAddress = address.address.isAnyLocalAddress || address.address.isLoopbackAddress
setState(ConnectionStatus.WAITING_FOR_CONNECTION, "Connecting to ${address.hostName}:${port}")
val result = AsyncPromise<Vm>()
val future = ApplicationManager.getApplication().executeOnPooledThread {
createBootstrap(address, result)
.connect(address, connectionPromise, maxAttemptCount = if (stopCondition == null) NettyUtil.DEFAULT_CONNECT_ATTEMPT_COUNT else -1, stopCondition = stopCondition)
- ?.let {
- it.closeFuture().addChannelListener {
- close("Process disconnected unexpectedly", ConnectionStatus.DISCONNECTED)
- }
- }
+ ?.let { it.closeFuture().addListener(channelCloseListener) }
}
connectCancelHandler.set {