ChannelRegistrar — don't use netty channel group
authorVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Tue, 21 Jun 2016 13:20:08 +0000 (15:20 +0200)
committerVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Tue, 21 Jun 2016 13:43:18 +0000 (15:43 +0200)
java/compiler/impl/src/com/intellij/compiler/server/BuildManager.java
platform/built-in-server/src/org/jetbrains/io/SubServer.java
platform/platform-impl/src/org/jetbrains/io/BuiltInServer.java
platform/platform-impl/src/org/jetbrains/io/ChannelRegistrar.java

index 9727c3db46cf6e276e5ce37c0c428b4d899ce500..97aadd99d13916134867e06f08e9fa61a82ca323 100644 (file)
@@ -1292,10 +1292,9 @@ public class BuildManager implements Disposable {
     return 0;
   }
 
-  @NotNull
-  private Future<?> stopListening() {
+  private void stopListening() {
     myListenPort = -1;
-    return myChannelRegistrar.close();
+    myChannelRegistrar.close();
   }
 
   private int startListening() throws Exception {
@@ -1321,7 +1320,7 @@ public class BuildManager implements Disposable {
       }
     });
     Channel serverChannel = bootstrap.bind(InetAddress.getLoopbackAddress(), 0).syncUninterruptibly().channel();
-    myChannelRegistrar.add(serverChannel, isOwnEventLoopGroup);
+    myChannelRegistrar.setServerChannel(serverChannel, isOwnEventLoopGroup);
     return ((InetSocketAddress)serverChannel.localAddress()).getPort();
   }
 
index 62aab4d2fec3e928730a7628beb4494f5008a6e2..5fdf36683de5acbb717c15e10529178dc9e732ea 100644 (file)
@@ -76,7 +76,7 @@ public final class SubServer implements CustomPortServerManager.CustomPortServic
 
     try {
       bootstrap.localAddress(user.isAvailableExternally() ? new InetSocketAddress(port) : NetKt.loopbackSocketAddress(port));
-      channelRegistrar.add(bootstrap.bind().syncUninterruptibly().channel(), false);
+      channelRegistrar.setServerChannel(bootstrap.bind().syncUninterruptibly().channel(), false);
       return true;
     }
     catch (Exception e) {
index d432bcd9088e43507a8bba110246c58d418905d0..550e7446ac55f9a6b3a129fe418453872615365a 100644 (file)
@@ -148,7 +148,7 @@ public class BuiltInServer implements Disposable {
 
       ChannelFuture future = bootstrap.bind(address, port).awaitUninterruptibly();
       if (future.isSuccess()) {
-        channelRegistrar.add(future.channel(), isEventLoopGroupOwner);
+        channelRegistrar.setServerChannel(future.channel(), isEventLoopGroupOwner);
         return port;
       }
       else if (!tryAnyPort && i == portsCount - 1) {
@@ -159,7 +159,7 @@ public class BuiltInServer implements Disposable {
     Logger.getInstance(BuiltInServer.class).info("We cannot bind to our default range, so, try to bind to any free port");
     ChannelFuture future = bootstrap.bind(address, 0).awaitUninterruptibly();
     if (future.isSuccess()) {
-      channelRegistrar.add(future.channel(), isEventLoopGroupOwner);
+      channelRegistrar.setServerChannel(future.channel(), isEventLoopGroupOwner);
       return ((InetSocketAddress)future.channel().localAddress()).getPort();
     }
     ExceptionUtil.rethrowAll(future.cause());
index 9d9e8f5d877ffbaaf61871c3626d36f58561809e..7ccb5906b5e5de829845c8946ab723d1db18bee0 100644 (file)
 package org.jetbrains.io;
 
 import com.intellij.openapi.diagnostic.Logger;
+import com.intellij.util.containers.ContainerUtil;
 import io.netty.channel.*;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.ChannelGroupFuture;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.jetbrains.annotations.NotNull;
 
 import java.util.Arrays;
-import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 @ChannelHandler.Sharable
 public final class ChannelRegistrar extends ChannelInboundHandlerAdapter {
   private static final Logger LOG = Logger.getInstance(ChannelRegistrar.class);
 
-  private final ChannelGroup openChannels = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE, true);
+  private final AtomicReference<ServerChannel> serverChannel = new AtomicReference<>();
+  private final Set<Channel> clientChannels = ContainerUtil.newConcurrentSet();
+
   private boolean isEventLoopGroupOwner;
 
   public boolean isEmpty() {
-    return openChannels.isEmpty();
+    return serverChannel.get() == null && clientChannels.isEmpty();
   }
 
-  public void add(@NotNull Channel serverChannel, boolean isOwnEventLoopGroup) {
+  public void setServerChannel(@NotNull Channel channel, boolean isOwnEventLoopGroup) {
+    boolean isSet = serverChannel.compareAndSet(null, (ServerChannel)channel);
+    LOG.assertTrue(isSet);
+
     this.isEventLoopGroupOwner = isOwnEventLoopGroup;
-    assert serverChannel instanceof ServerChannel;
-    openChannels.add(serverChannel);
   }
 
   @Override
-  public void channelActive(ChannelHandlerContext context) throws Exception {
-    // we don't need to remove channel on close - ChannelGroup do it
-    openChannels.add(context.channel());
+  public void channelActive(@NotNull ChannelHandlerContext context) throws Exception {
+    clientChannels.add(context.channel());
 
     super.channelActive(context);
   }
 
-  @NotNull
-  public Future<?> close() {
-    return close(isEventLoopGroupOwner);
+  @Override
+  public void channelInactive(@NotNull ChannelHandlerContext context) throws Exception {
+    clientChannels.remove(context.channel());
+
+    super.channelInactive(context);
   }
 
-  @NotNull
-  private Future<?> close(boolean shutdownEventLoopGroup) {
-    EventLoopGroup eventLoopGroup = null;
-    if (shutdownEventLoopGroup) {
-      for (Channel channel : openChannels) {
-        if (channel instanceof ServerChannel) {
-          eventLoopGroup = channel.eventLoop().parent();
-          break;
-        }
-      }
+  public void close() {
+    close(isEventLoopGroupOwner);
+  }
+
+  private void close(boolean shutdownEventLoopGroup) {
+    ServerChannel serverChannel = this.serverChannel.get();
+    if (serverChannel == null) {
+      LOG.assertTrue(clientChannels.isEmpty());
+      return;
+    }
+    else if (!this.serverChannel.compareAndSet(serverChannel, null)) {
+      return;
     }
 
-    Future<?> result;
+    EventLoopGroup eventLoopGroup = shutdownEventLoopGroup ? serverChannel.eventLoop().parent() : null;
     try {
       long start = System.currentTimeMillis();
-      Object[] channels = openChannels.toArray(new Channel[]{});
-      ChannelGroupFuture groupFuture = openChannels.close();
-      // server channels are closed in first turn, so, small timeout is relatively ok
-      if (!groupFuture.awaitUninterruptibly(10, TimeUnit.SECONDS)) {
-        LOG.warn("Cannot close all channels for 10 seconds, channels: " + Arrays.toString(channels));
+      Channel[] clientChannels = this.clientChannels.toArray(new Channel[]{});
+      this.clientChannels.clear();
+
+      final CountDownLatch countDown = new CountDownLatch(clientChannels.length + 1);
+      GenericFutureListener<ChannelFuture> listener = new GenericFutureListener<ChannelFuture>() {
+        @Override
+        public void operationComplete(@NotNull ChannelFuture future) throws Exception {
+          try {
+            Throwable cause = future.cause();
+            if (cause != null) {
+              LOG.warn(cause);
+            }
+          }
+          finally {
+            countDown.countDown();
+          }
+        }
+      };
+      serverChannel.close().addListener(listener);
+      for (Channel channel : clientChannels) {
+        channel.close().addListener(listener);
+      }
+
+      try {
+        countDown.await(5, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e) {
+        LOG.warn("Cannot close all channels for 10 seconds, channels: " + Arrays.toString(clientChannels));
       }
-      result = groupFuture;
 
       long duration = System.currentTimeMillis() - start;
       if (duration > 1000) {
@@ -87,9 +115,8 @@ public final class ChannelRegistrar extends ChannelInboundHandlerAdapter {
     }
     finally {
       if (eventLoopGroup != null) {
-        result = eventLoopGroup.shutdownGracefully(1, 2, TimeUnit.NANOSECONDS);
+        eventLoopGroup.shutdownGracefully(1, 2, TimeUnit.NANOSECONDS);
       }
     }
-    return result;
   }
 }