move JsonRpcServer to platform-impl module
authorVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Wed, 21 Jan 2015 11:52:29 +0000 (12:52 +0100)
committerVladimir Krivosheev <vladimir.krivosheev@jetbrains.com>
Wed, 21 Jan 2015 11:52:29 +0000 (12:52 +0100)
21 files changed:
platform/platform-impl/src/org/jetbrains/io/ChannelBufferToString.java [moved from platform/script-debugger/backend/src/org/jetbrains/rpc/ChannelBufferToString.java with 98% similarity]
platform/platform-impl/src/org/jetbrains/io/JsonReaderEx.java [moved from platform/script-debugger/protocol/protocol-reader-runtime/src/org/jetbrains/io/JsonReaderEx.java with 100% similarity]
platform/platform-impl/src/org/jetbrains/io/JsonUtil.java [moved from platform/script-debugger/protocol/protocol-reader-runtime/src/org/jetbrains/io/JsonUtil.java with 100% similarity]
platform/platform-impl/src/org/jetbrains/io/jsonRpc/IdeaAwareJsonRpcServer.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonRpcServer.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonServiceInvocator.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/jsonRpc/protocol.txt [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/Client.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandler.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandlerImpl.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/MessageChannelHandler.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/MessageServer.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketClient.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketHandshakeHandler.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServer.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListener.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListenerAdapter.java [new file with mode: 0644]
platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerOptions.java [new file with mode: 0644]
platform/script-debugger/backend/src/org/jetbrains/rpc/MessageDecoder.java
platform/script-debugger/protocol/protocol-reader-runtime/protocol-reader-runtime.iml
platform/script-debugger/protocol/protocol-reader/protocol-reader.iml

similarity index 98%
rename from platform/script-debugger/backend/src/org/jetbrains/rpc/ChannelBufferToString.java
rename to platform/platform-impl/src/org/jetbrains/io/ChannelBufferToString.java
index bb776e55ea2089b76adeb595571f6c2b272fe66c..dc2105352ef995831a55f0a867b9e26d4f44fead 100644 (file)
@@ -1,4 +1,4 @@
-package org.jetbrains.rpc;
+package org.jetbrains.io;
 
 import com.intellij.util.text.StringFactory;
 import io.netty.buffer.ByteBuf;
diff --git a/platform/platform-impl/src/org/jetbrains/io/jsonRpc/IdeaAwareJsonRpcServer.java b/platform/platform-impl/src/org/jetbrains/io/jsonRpc/IdeaAwareJsonRpcServer.java
new file mode 100644 (file)
index 0000000..dca60b3
--- /dev/null
@@ -0,0 +1,38 @@
+package org.jetbrains.io.jsonRpc;
+
+import com.intellij.openapi.application.ApplicationManager;
+import com.intellij.openapi.util.AsyncResult;
+import com.intellij.openapi.util.Pair;
+import com.intellij.util.Consumer;
+import com.intellij.util.concurrency.QueueProcessor;
+import io.netty.buffer.ByteBuf;
+import org.jetbrains.io.webSocket.Client;
+import org.jetbrains.io.webSocket.ExceptionHandler;
+import org.jetbrains.io.webSocket.WebSocketServer;
+
+import java.util.List;
+
+public class IdeaAwareJsonRpcServer extends JsonRpcServer {
+  private final QueueProcessor<ByteBuf> messageQueueProcessor = new QueueProcessor<ByteBuf>(new Consumer<ByteBuf>() {
+    @Override
+    public void consume(ByteBuf message) {
+      IdeaAwareJsonRpcServer.super.doSend(-1, null, message);
+    }
+  });
+
+  public IdeaAwareJsonRpcServer(WebSocketServer webSocketServer, ExceptionHandler exceptionHandler) {
+    super(webSocketServer, exceptionHandler);
+  }
+
+  @Override
+  protected <T> void doSend(int messageId, List<AsyncResult<Pair<Client, T>>> results, ByteBuf message) {
+    if (ApplicationManager.getApplication().isDispatchThread()) {
+      // it is bad idea to hide knowledge about real work thread from client of this rpc server, so, currently, it works only for particular case
+      LOG.assertTrue(messageId == -1 && results == null);
+      messageQueueProcessor.add(message);
+    }
+    else {
+      super.doSend(messageId, results, message);
+    }
+  }
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonRpcServer.java b/platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonRpcServer.java
new file mode 100644 (file)
index 0000000..f1fb050
--- /dev/null
@@ -0,0 +1,367 @@
+package org.jetbrains.io.jsonRpc;
+
+import com.google.gson.*;
+import com.google.gson.internal.Streams;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import com.intellij.openapi.diagnostic.Logger;
+import com.intellij.openapi.util.AsyncResult;
+import com.intellij.openapi.util.NotNullLazyValue;
+import com.intellij.openapi.util.Pair;
+import com.intellij.openapi.util.Ref;
+import com.intellij.util.ArrayUtil;
+import com.intellij.util.ArrayUtilRt;
+import com.intellij.util.Consumer;
+import com.intellij.util.SmartList;
+import com.intellij.util.text.CharSequenceBackedByArray;
+import gnu.trove.THashMap;
+import gnu.trove.TIntArrayList;
+import gnu.trove.TIntProcedure;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.CharsetUtil;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.io.JsonReaderEx;
+import org.jetbrains.io.JsonUtil;
+import org.jetbrains.io.webSocket.Client;
+import org.jetbrains.io.webSocket.ExceptionHandler;
+import org.jetbrains.io.webSocket.MessageServer;
+import org.jetbrains.io.webSocket.WebSocketServer;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class JsonRpcServer implements MessageServer {
+  protected static final Logger LOG = Logger.getInstance(JsonRpcServer.class);
+
+  private static final TypeAdapterFactory INT_LIST_TYPE_ADAPTER_FACTORY = new TypeAdapterFactory() {
+    private IntArrayListTypeAdapter<TIntArrayList> typeAdapter;
+
+    @Nullable
+    @Override
+    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+      if (type.getType() != TIntArrayList.class) {
+        return null;
+      }
+      if (typeAdapter == null) {
+        typeAdapter = new IntArrayListTypeAdapter<TIntArrayList>();
+      }
+      //noinspection unchecked
+      return (TypeAdapter<T>)typeAdapter;
+    }
+  };
+
+  private final AtomicInteger messageIdCounter = new AtomicInteger();
+  private final WebSocketServer webSocketServer;
+  private final ExceptionHandler exceptionHandler;
+  private final Gson gson;
+
+  private final Map<String, NotNullLazyValue> domains = new THashMap<String, NotNullLazyValue>();
+
+  public JsonRpcServer(@NotNull WebSocketServer webSocketServer, @NotNull ExceptionHandler exceptionHandler) {
+    this.webSocketServer = webSocketServer;
+    this.exceptionHandler = exceptionHandler;
+    gson = new GsonBuilder().registerTypeAdapter(CharSequenceBackedByArray.class, new JsonSerializer<CharSequenceBackedByArray>() {
+      @Override
+      public JsonElement serialize(CharSequenceBackedByArray src, Type typeOfSrc, JsonSerializationContext context) {
+        return new JsonPrimitive(src.toString());
+      }
+    }).registerTypeAdapterFactory(INT_LIST_TYPE_ADAPTER_FACTORY).disableHtmlEscaping().create();
+  }
+
+  public void registerDomain(@NotNull String name, @NotNull NotNullLazyValue commands) {
+    registerDomain(name, commands, false);
+  }
+
+  public void registerDomain(@NotNull String name, @NotNull NotNullLazyValue commands, boolean overridable) {
+    if (domains.containsKey(name)) {
+      if (overridable) {
+        return;
+      }
+      else {
+        throw new IllegalArgumentException(name + " is already registered");
+      }
+    }
+
+    domains.put(name, commands);
+  }
+
+  @Override
+  public void message(@NotNull Client client, String message) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("IN " + message);
+    }
+
+    JsonReaderEx reader = new JsonReaderEx(message);
+    reader.beginArray();
+    int messageId = reader.peek() == JsonToken.NUMBER ? reader.nextInt() : -1;
+    String domainName = reader.nextString();
+    if (domainName.length() == 1) {
+      AsyncResult asyncResult = webSocketServer.removeAsyncResult(client, messageId);
+      if (domainName.charAt(0) == 'r') {
+        if (asyncResult == null) {
+          LOG.error("Response with id " + messageId + " was already processed");
+          return;
+        }
+        //noinspection unchecked
+        asyncResult.setDone(JsonUtil.nextAny(reader));
+      }
+      else {
+        asyncResult.setRejected();
+      }
+      return;
+    }
+
+    NotNullLazyValue domainHolder = domains.get(domainName);
+    if (domainHolder == null) {
+      LOG.error("Cannot find domain " + domainName);
+      return;
+    }
+
+    Object domain = domainHolder.getValue();
+    String command = reader.nextString();
+    if (domain instanceof JsonServiceInvocator) {
+      ((JsonServiceInvocator)domain).invoke(command, client, reader, message, messageId);
+      return;
+    }
+
+    Object[] parameters;
+    if (reader.hasNext()) {
+      List<Object> list = new SmartList<Object>();
+      JsonUtil.readListBody(reader, list);
+      parameters = list.toArray(new Object[list.size()]);
+    }
+    else {
+      parameters = ArrayUtilRt.EMPTY_OBJECT_ARRAY;
+    }
+
+    reader.endArray();
+    LOG.assertTrue(reader.peek() == JsonToken.END_DOCUMENT);
+
+    try {
+      boolean isStatic = domain instanceof Class;
+      Method[] methods;
+      if (isStatic) {
+        methods = ((Class)domain).getDeclaredMethods();
+      }
+      else {
+        methods = domain.getClass().getMethods();
+      }
+      for (Method method : methods) {
+        if (method.getName().equals(command)) {
+          method.setAccessible(true);
+          Object result = method.invoke(isStatic ? null : domain, parameters);
+          if (messageId != -1) {
+            ByteBuf response = encodeMessage(messageId, null, null, new Object[]{result});
+            if (response != null) {
+              webSocketServer.sendResponse(client, response);
+            }
+          }
+          return;
+        }
+      }
+
+      throw new NoSuchMethodException(command);
+    }
+    catch (Throwable e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void sendResponse(int messageId, @NotNull Client client, @Nullable CharSequence rawMessage) {
+    ByteBuf response = encodeMessage(messageId, null, null, rawMessage, ArrayUtil.EMPTY_OBJECT_ARRAY);
+    assert response != null;
+    webSocketServer.sendResponse(client, response);
+  }
+
+  public void sendErrorResponse(int messageId, @NotNull Client client, @Nullable CharSequence rawMessage) {
+    ByteBuf response = encodeMessage(messageId, "e", null, rawMessage, ArrayUtil.EMPTY_OBJECT_ARRAY);
+    assert response != null;
+    webSocketServer.sendResponse(client, response);
+  }
+
+  public void send(String domain, String name) {
+    send(domain, name, null);
+  }
+
+  public <T> void send(String domain, String command, @Nullable final List<AsyncResult<Pair<Client, T>>> results, Object... params) {
+    if (webSocketServer.hasClients()) {
+      send(results == null ? -1 : messageIdCounter.getAndIncrement(), domain, command, results, params);
+    }
+  }
+
+  @Nullable
+  private ByteBuf encodeMessage(int messageId, @Nullable String domain, @Nullable String command, Object[] params) {
+    return encodeMessage(messageId, domain, command, null, params);
+  }
+
+  @Nullable
+  private ByteBuf encodeMessage(int messageId,
+                                @Nullable String domain,
+                                @Nullable String command,
+                                @Nullable CharSequence rawMessage,
+                                @Nullable Object[] params) {
+    StringBuilder sb = new StringBuilder();
+    try {
+      encodeCall(sb, messageId, domain, command, params, rawMessage);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("OUT " + sb.toString());
+      }
+      return Unpooled.copiedBuffer(sb, CharsetUtil.UTF_8);
+    }
+    catch (IOException e) {
+      exceptionHandler.exceptionCaught(e);
+      return null;
+    }
+  }
+
+  public boolean sendWithRawPart(Client client, String domain, String command, @Nullable CharSequence rawMessage, Object... params) {
+    ByteBuf message = encodeMessage(-1, domain, command, rawMessage, params);
+    if (message != null) {
+      webSocketServer.send(client, -1, message);
+    }
+    return message != null;
+  }
+
+  public void send(Client client, String domain, String command, Object... params) {
+    sendWithRawPart(client, domain, command, null, params);
+  }
+
+  public <T> AsyncResult<T> call(Client client, String domain, String command, Object... params) {
+    int messageId = messageIdCounter.getAndIncrement();
+    ByteBuf message = encodeMessage(messageId, domain, command, params);
+    if (message == null) {
+      return new AsyncResult.Rejected<T>();
+    }
+
+    AsyncResult<T> result = webSocketServer.send(client, messageId, message);
+    return result == null ? new AsyncResult.Rejected<T>() : result;
+  }
+
+  private <T> void send(int messageId, @Nullable String domain, @Nullable String command, @Nullable final List<AsyncResult<Pair<Client, T>>> results, Object[] params) {
+    ByteBuf message = encodeMessage(messageId, domain, command, params);
+    if (message != null) {
+      doSend(messageId, results, message);
+    }
+  }
+
+  protected  <T> void doSend(int messageId, List<AsyncResult<Pair<Client, T>>> results, ByteBuf message) {
+    webSocketServer.send(messageId, message, results);
+  }
+
+  private void encodeCall(StringBuilder sb, int id, @Nullable String domain, @Nullable String command, @Nullable Object[] params, @Nullable CharSequence rawData)
+    throws IOException {
+    sb.append('[');
+    boolean hasPrev = false;
+    if (id != -1) {
+      sb.append(id);
+      hasPrev = true;
+    }
+
+    if (domain != null) {
+      if (hasPrev) {
+        sb.append(',');
+      }
+      sb.append('"').append(domain).append("\",\"");
+
+      if (command == null) {
+        if (rawData != null) {
+          sb.append(rawData);
+        }
+        sb.append('"');
+        return;
+      }
+      else {
+        sb.append(command).append('"');
+      }
+    }
+
+    encodeParameters(sb, params == null ? ArrayUtil.EMPTY_OBJECT_ARRAY : params, rawData);
+    sb.append(']');
+  }
+
+  private void encodeParameters(StringBuilder sb, Object[] params, @Nullable CharSequence rawData) throws IOException {
+    if (params.length == 0 && rawData == null) {
+      return;
+    }
+
+    JsonWriter writer = null;
+    sb.append(',').append('[');
+    boolean hasPrev = false;
+    for (Object param : params) {
+      if (hasPrev) {
+        sb.append(',');
+      }
+      else {
+        hasPrev = true;
+      }
+
+      // gson - SOE if param has type class com.intellij.openapi.editor.impl.DocumentImpl$MyCharArray, so, use hack
+      if (param instanceof CharSequence) {
+        JsonUtil.escape(((CharSequence)param), sb);
+      }
+      else if (param == null) {
+        sb.append("null");
+      }
+      else if (param instanceof Number || param instanceof Boolean) {
+        sb.append(param.toString());
+      }
+      else if (param instanceof Consumer) {
+        //noinspection unchecked
+        ((Consumer<StringBuilder>)param).consume(sb);
+      }
+      else {
+        if (writer == null) {
+          writer = new JsonWriter(Streams.writerForAppendable(sb));
+        }
+        //noinspection unchecked
+        ((TypeAdapter<Object>)gson.getAdapter(param.getClass())).write(writer, param);
+      }
+    }
+
+    if (rawData != null) {
+      if (hasPrev) {
+        sb.append(',');
+      }
+      sb.append(rawData);
+    }
+    sb.append(']');
+  }
+
+  private static class IntArrayListTypeAdapter<T> extends TypeAdapter<T> {
+    @Override
+    public void write(final JsonWriter out, T value) throws IOException {
+      final Ref<IOException> error = new Ref<IOException>();
+      out.beginArray();
+      ((TIntArrayList)value).forEach(new TIntProcedure() {
+        @Override
+        public boolean execute(int value) {
+          try {
+            out.value(value);
+          }
+          catch (IOException e) {
+            error.set(e);
+          }
+          return error.isNull();
+        }
+      });
+
+      if (!error.isNull()) {
+        throw error.get();
+      }
+
+      out.endArray();
+    }
+
+    @Override
+    public T read(@SuppressWarnings("UnnecessaryFullyQualifiedName") com.google.gson.stream.JsonReader in) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonServiceInvocator.java b/platform/platform-impl/src/org/jetbrains/io/jsonRpc/JsonServiceInvocator.java
new file mode 100644 (file)
index 0000000..70c4303
--- /dev/null
@@ -0,0 +1,11 @@
+package org.jetbrains.io.jsonRpc;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.io.JsonReaderEx;
+import org.jetbrains.io.webSocket.Client;
+
+import java.io.IOException;
+
+public interface JsonServiceInvocator {
+  void invoke(@NotNull String command, @NotNull Client client, @NotNull JsonReaderEx reader, @NotNull String message, int messageId) throws IOException;
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/jsonRpc/protocol.txt b/platform/platform-impl/src/org/jetbrains/io/jsonRpc/protocol.txt
new file mode 100644 (file)
index 0000000..a899cbf
--- /dev/null
@@ -0,0 +1 @@
+[(id:Int)?, domain:String, command:String, (parameters:Array<Any>)?]
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/Client.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/Client.java
new file mode 100644 (file)
index 0000000..e8af041
--- /dev/null
@@ -0,0 +1,87 @@
+package org.jetbrains.io.webSocket;
+
+import com.intellij.openapi.util.AsyncResult;
+import com.intellij.openapi.util.UserDataHolderBase;
+import com.intellij.util.containers.ConcurrentIntObjectMap;
+import com.intellij.util.containers.ContainerUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoop;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Enumeration;
+
+public abstract class Client extends UserDataHolderBase {
+  protected final Channel channel;
+
+  final ConcurrentIntObjectMap<AsyncResult> messageCallbackMap = ContainerUtil.createConcurrentIntObjectMap();
+
+  protected Client(@NotNull Channel channel) {
+    this.channel = channel;
+  }
+
+  @NotNull
+  public EventLoop getEventLoop() {
+    return channel.eventLoop();
+  }
+
+  protected abstract ChannelFuture send(@NotNull ByteBuf message);
+
+  abstract void sendHeartbeat();
+
+  @Nullable
+  final <T> AsyncResult<T> send(int messageId, ByteBuf message) {
+    ChannelFuture channelFuture = send(message);
+    if (messageId == -1) {
+      return null;
+    }
+
+    ChannelFutureAwareAsyncResult<T> callback = new ChannelFutureAwareAsyncResult<T>(messageId, messageCallbackMap);
+    channelFuture.addListener(callback);
+    messageCallbackMap.put(messageId, callback);
+    return callback;
+  }
+
+  void rejectAsyncResults(ExceptionHandler exceptionHandler) {
+    if (!messageCallbackMap.isEmpty()) {
+      Enumeration<AsyncResult> elements = messageCallbackMap.elements();
+      while (elements.hasMoreElements()) {
+        try {
+          elements.nextElement().setRejected();
+        }
+        catch (Throwable e) {
+          exceptionHandler.exceptionCaught(e);
+        }
+      }
+    }
+  }
+
+  private static final class ChannelFutureAwareAsyncResult<T> extends AsyncResult<T> implements ChannelFutureListener {
+    private final int messageId;
+    private final ConcurrentIntObjectMap<AsyncResult> messageCallbackMap;
+
+    public ChannelFutureAwareAsyncResult(int messageId, ConcurrentIntObjectMap<AsyncResult> messageCallbackMap) {
+      this.messageId = messageId;
+      this.messageCallbackMap = messageCallbackMap;
+    }
+
+    @Override
+    public void setRejected() {
+      super.setRejected();
+
+      if (isProcessed()) {
+        messageCallbackMap.remove(messageId);
+      }
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!isProcessed() && !future.isSuccess()) {
+        setRejected();
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandler.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandler.java
new file mode 100644 (file)
index 0000000..e775107
--- /dev/null
@@ -0,0 +1,8 @@
+package org.jetbrains.io.webSocket;
+
+public interface ExceptionHandler {
+  /**
+   * @param e Exception while encode message (on send)
+   */
+  void exceptionCaught(Throwable e);
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandlerImpl.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/ExceptionHandlerImpl.java
new file mode 100644 (file)
index 0000000..32bd888
--- /dev/null
@@ -0,0 +1,9 @@
+package org.jetbrains.io.webSocket;
+
+public class ExceptionHandlerImpl implements ExceptionHandler {
+  @Override
+  public void exceptionCaught(Throwable e) {
+    //noinspection CallToPrintStackTrace
+    e.printStackTrace();
+  }
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/MessageChannelHandler.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/MessageChannelHandler.java
new file mode 100644 (file)
index 0000000..7b90644
--- /dev/null
@@ -0,0 +1,71 @@
+package org.jetbrains.io.webSocket;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.websocketx.*;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.io.ChannelBufferToString;
+import org.jetbrains.io.SimpleChannelInboundHandlerAdapter;
+
+import java.io.IOException;
+
+@ChannelHandler.Sharable
+final class MessageChannelHandler extends SimpleChannelInboundHandlerAdapter<WebSocketFrame> {
+  private final WebSocketServer server;
+  private final MessageServer messageServer;
+
+  MessageChannelHandler(@NotNull WebSocketServer server, @NotNull MessageServer messageServer) {
+    this.server = server;
+    this.messageServer = messageServer;
+  }
+
+  @Override
+  protected void messageReceived(ChannelHandlerContext context, WebSocketFrame message) throws Exception {
+    WebSocketClient client = (WebSocketClient)context.attr(WebSocketHandshakeHandler.CLIENT).get();
+    if (message instanceof CloseWebSocketFrame) {
+      if (client != null) {
+        try {
+          server.disconnectClient(context, client, false);
+        }
+        finally {
+          message.retain();
+          client.disconnect((CloseWebSocketFrame)message);
+        }
+      }
+    }
+    else if (message instanceof PingWebSocketFrame) {
+      context.channel().writeAndFlush(new PongWebSocketFrame(message.content()));
+    }
+    else if (message instanceof TextWebSocketFrame) {
+      String text = ChannelBufferToString.readString(message.content());
+      try {
+        messageServer.message(client, text);
+      }
+      catch (Throwable e) {
+        server.exceptionHandler.exceptionCaught(new IOException("Exception while handle message: " + text, e));
+      }
+    }
+    else if (!(message instanceof PongWebSocketFrame)) {
+      throw new UnsupportedOperationException(message.getClass().getName() + " frame types not supported");
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext context) throws Exception {
+    Client client = context.attr(WebSocketHandshakeHandler.CLIENT).get();
+    // if null, so, has already been explicitly removed
+    if (client != null) {
+      server.disconnectClient(context, client, false);
+    }
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+    try {
+      server.exceptionHandler.exceptionCaught(cause);
+    }
+    finally {
+      context.channel().close();
+    }
+  }
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/MessageServer.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/MessageServer.java
new file mode 100644 (file)
index 0000000..2503543
--- /dev/null
@@ -0,0 +1,9 @@
+package org.jetbrains.io.webSocket;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+
+public interface MessageServer {
+  void message(@NotNull Client client, String message) throws IOException;
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketClient.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketClient.java
new file mode 100644 (file)
index 0000000..25a4a88
--- /dev/null
@@ -0,0 +1,39 @@
+package org.jetbrains.io.webSocket;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import org.jetbrains.annotations.NotNull;
+
+import java.nio.channels.ClosedChannelException;
+
+class WebSocketClient extends Client {
+  private final WebSocketServerHandshaker handshaker;
+
+  public WebSocketClient(@NotNull Channel channel, @NotNull WebSocketServerHandshaker handshaker) {
+    super(channel);
+
+    this.handshaker = handshaker;
+  }
+
+  @Override
+  public ChannelFuture send(@NotNull ByteBuf message) {
+    if (!channel.isOpen()) {
+      return channel.newFailedFuture(new ClosedChannelException());
+    }
+    return channel.writeAndFlush(new TextWebSocketFrame(message));
+  }
+
+  @Override
+  void sendHeartbeat() {
+    channel.writeAndFlush(new PingWebSocketFrame());
+  }
+
+  public void disconnect(@NotNull CloseWebSocketFrame frame) {
+    handshaker.close(channel, frame);
+  }
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketHandshakeHandler.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketHandshakeHandler.java
new file mode 100644 (file)
index 0000000..12ff6c7
--- /dev/null
@@ -0,0 +1,105 @@
+package org.jetbrains.io.webSocket;
+
+import com.intellij.openapi.Disposable;
+import com.intellij.openapi.diagnostic.Logger;
+import com.intellij.openapi.util.AtomicNotNullLazyValue;
+import com.intellij.openapi.util.Disposer;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
+import io.netty.util.AttributeKey;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.ide.BuiltInServerManager;
+import org.jetbrains.ide.HttpRequestHandler;
+import org.jetbrains.io.BuiltInServer;
+import org.jetbrains.io.NettyUtil;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class WebSocketHandshakeHandler extends HttpRequestHandler implements WebSocketServerListener, ExceptionHandler {
+  private static final Logger LOG = Logger.getInstance(WebSocketHandshakeHandler.class);
+
+  static final AttributeKey<Client> CLIENT = AttributeKey.valueOf("WebSocketHandler.client");
+
+  private final AtomicNotNullLazyValue<WebSocketServer> server = new AtomicNotNullLazyValue<WebSocketServer>() {
+    @NotNull
+    @Override
+    protected WebSocketServer compute() {
+      WebSocketServer result = new WebSocketServer(WebSocketHandshakeHandler.this, WebSocketHandshakeHandler.this);
+      Disposable serverDisposable = BuiltInServerManager.getInstance().getServerDisposable();
+      assert serverDisposable != null;
+      Disposer.register(serverDisposable, result);
+      serverCreated(result);
+      return result;
+    }
+  };
+
+  @Override
+  public boolean isSupported(@NotNull FullHttpRequest request) {
+    return request.method() == HttpMethod.GET &&
+           "WebSocket".equalsIgnoreCase(request.headers().get(HttpHeaders.Names.UPGRADE)) &&
+           request.uri().length() > 2;
+  }
+
+  protected void serverCreated(@NotNull WebSocketServer server) {
+  }
+
+  @Override
+  public void exceptionCaught(Throwable e) {
+    NettyUtil.log(e, LOG);
+  }
+
+  @Override
+  public boolean process(@NotNull QueryStringDecoder urlDecoder, @NotNull FullHttpRequest request, @NotNull ChannelHandlerContext context) {
+    handleWebSocketRequest(context, request, urlDecoder);
+    return true;
+  }
+
+  private void handleWebSocketRequest(final ChannelHandlerContext context,
+                                      FullHttpRequest request,
+                                      final QueryStringDecoder uriDecoder) {
+    WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://" + HttpHeaders.getHost(request) + uriDecoder.path(), null, false, NettyUtil.MAX_CONTENT_LENGTH);
+    WebSocketServerHandshaker handshaker = factory.newHandshaker(request);
+    if (handshaker == null) {
+      WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel());
+      return;
+    }
+
+    if (!context.channel().isOpen()) {
+      return;
+    }
+
+    final Client client = new WebSocketClient(context.channel(), handshaker);
+    context.attr(CLIENT).set(client);
+    handshaker.handshake(context.channel(), request).addListener(new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (future.isSuccess()) {
+          WebSocketServer webSocketServer = server.getValue();
+          webSocketServer.addClient(client);
+          MessageChannelHandler messageChannelHandler = new MessageChannelHandler(webSocketServer, getMessageServer());
+          BuiltInServer.replaceDefaultHandler(context, messageChannelHandler);
+          ChannelHandlerContext messageChannelHandlerContext = context.pipeline().context(messageChannelHandler);
+          context.pipeline().addBefore(messageChannelHandlerContext.name(), "webSocketFrameAggregator", new WebSocketFrameAggregator(NettyUtil.MAX_CONTENT_LENGTH));
+          messageChannelHandlerContext.attr(CLIENT).set(client);
+          connected(client, uriDecoder.parameters());
+        }
+      }
+    });
+  }
+
+  @NotNull
+  protected abstract MessageServer getMessageServer();
+
+  @Override
+  public void connected(@NotNull Client client, Map<String, List<String>> parameters) {
+  }
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServer.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServer.java
new file mode 100644 (file)
index 0000000..3b03592
--- /dev/null
@@ -0,0 +1,167 @@
+package org.jetbrains.io.webSocket;
+
+import com.intellij.openapi.Disposable;
+import com.intellij.openapi.util.AsyncResult;
+import com.intellij.openapi.util.Pair;
+import com.intellij.openapi.util.SimpleTimer;
+import com.intellij.openapi.util.SimpleTimerTask;
+import gnu.trove.THashSet;
+import gnu.trove.TObjectProcedure;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+
+public class WebSocketServer implements Disposable {
+  private final SimpleTimerTask heartbeatTimer;
+
+  @Nullable
+  private final WebSocketServerListener listener;
+
+  final ExceptionHandler exceptionHandler;
+
+  private final THashSet<Client> clients = new THashSet<Client>();
+
+  public WebSocketServer() {
+    this(null, new ExceptionHandlerImpl());
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public WebSocketServer(@Nullable WebSocketServerListener listener, @NotNull ExceptionHandler exceptionHandler) {
+    this(null, exceptionHandler, listener);
+  }
+
+  public WebSocketServer(@Nullable WebSocketServerOptions options, @NotNull ExceptionHandler exceptionHandler, @Nullable WebSocketServerListener listener) {
+    this.exceptionHandler = exceptionHandler;
+    this.listener = listener;
+
+    heartbeatTimer = SimpleTimer.getInstance().setUp(new Runnable() {
+      @Override
+      public void run() {
+        synchronized (clients) {
+          if (clients.isEmpty()) {
+            return;
+          }
+
+          clients.forEach(new TObjectProcedure<Client>() {
+            @Override
+            public boolean execute(Client client) {
+              if (client.channel.isActive()) {
+                client.sendHeartbeat();
+              }
+              return true;
+            }
+          });
+        }
+      }
+    }, (options == null ? new WebSocketServerOptions() : options).heartbeatDelay);
+  }
+
+  public void addClient(@NotNull Client client) {
+    synchronized (clients) {
+      clients.add(client);
+    }
+  }
+
+  public int getClientCount() {
+    synchronized (clients) {
+      return clients.size();
+    }
+  }
+
+  public boolean hasClients() {
+    return getClientCount() > 0;
+  }
+
+  @Override
+  public void dispose() {
+    try {
+      heartbeatTimer.cancel();
+    }
+    finally {
+      synchronized (clients) {
+        clients.clear();
+      }
+    }
+  }
+
+  @SuppressWarnings("MethodMayBeStatic")
+  public void sendResponse(@NotNull Client client, @NotNull ByteBuf message) {
+    if (client.channel.isOpen()) {
+      client.send(message);
+    }
+  }
+
+  @Nullable
+  public <T> AsyncResult<T> send(Client client, int messageId, ByteBuf message) {
+    try {
+      return client.send(messageId, message);
+    }
+    catch (Throwable e) {
+      exceptionHandler.exceptionCaught(e);
+      return null;
+    }
+  }
+
+  @SuppressWarnings("MethodMayBeStatic")
+  public AsyncResult removeAsyncResult(Client client, int messageId) {
+    return client.messageCallbackMap.remove(messageId);
+  }
+
+  public <T> void send(final int messageId, final ByteBuf message, @Nullable final List<AsyncResult<Pair<Client, T>>> results) {
+    forEachClient(new TObjectProcedure<Client>() {
+      private boolean first;
+
+      @Override
+      public boolean execute(final Client client) {
+        try {
+          AsyncResult<Pair<Client, T>> result = client.send(messageId, first ? message : message.duplicate());
+          first = false;
+          if (results != null) {
+            results.add(result);
+          }
+        }
+        catch (Throwable e) {
+          exceptionHandler.exceptionCaught(e);
+        }
+        return true;
+      }
+    });
+  }
+
+  boolean disconnectClient(@NotNull ChannelHandlerContext context, @NotNull Client client, boolean closeChannel) {
+    synchronized (clients) {
+      if (!clients.remove(client)) {
+        return false;
+      }
+    }
+
+    try {
+      context.attr(WebSocketHandshakeHandler.CLIENT).set(null);
+
+      if (closeChannel) {
+        context.channel().close();
+      }
+
+      client.rejectAsyncResults(exceptionHandler);
+    }
+    finally {
+      if (listener != null) {
+        listener.disconnected(client);
+      }
+    }
+    return true;
+  }
+
+  public void forEachClient(@NotNull TObjectProcedure<Client> procedure) {
+    synchronized (clients) {
+      if (clients.isEmpty()) {
+        return;
+      }
+
+      clients.forEach(procedure);
+    }
+  }
+}
\ No newline at end of file
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListener.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListener.java
new file mode 100644 (file)
index 0000000..78abd61
--- /dev/null
@@ -0,0 +1,13 @@
+package org.jetbrains.io.webSocket;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.EventListener;
+import java.util.List;
+import java.util.Map;
+
+public interface WebSocketServerListener extends EventListener {
+  void connected(@NotNull Client client, Map<String, List<String>> parameters);
+
+  void disconnected(@NotNull Client client);
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListenerAdapter.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerListenerAdapter.java
new file mode 100644 (file)
index 0000000..ee415ad
--- /dev/null
@@ -0,0 +1,16 @@
+package org.jetbrains.io.webSocket;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class WebSocketServerListenerAdapter implements WebSocketServerListener {
+  @Override
+  public void connected(@NotNull Client client, Map<String, List<String>> parameters) {
+  }
+
+  @Override
+  public void disconnected(@NotNull Client client) {
+  }
+}
diff --git a/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerOptions.java b/platform/platform-impl/src/org/jetbrains/io/webSocket/WebSocketServerOptions.java
new file mode 100644 (file)
index 0000000..7232992
--- /dev/null
@@ -0,0 +1,21 @@
+package org.jetbrains.io.webSocket;
+
+import com.intellij.util.Time;
+
+// see https://github.com/sockjs/sockjs-node
+public final class WebSocketServerOptions {
+  // client will disconnect if server doesn't response (i.e. doesn't send any message or special heartbeat packet to client) in specified time
+  public int heartbeatDelay = 25 * Time.SECOND;
+
+  public int closeTimeout = 60 * 1000;
+
+  public WebSocketServerOptions heartbeatDelay(int value) {
+    heartbeatDelay = value;
+    return this;
+  }
+
+  public WebSocketServerOptions closeTimeout(int value) {
+    closeTimeout = value;
+    return this;
+  }
+}
index 9d7e35198225f94d50f3cf55b17a736a831a6900..a050cd53f136c0efc002985d646ad1d8b5090af1 100644 (file)
@@ -3,6 +3,7 @@ package org.jetbrains.rpc;
 import io.netty.buffer.ByteBuf;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.io.ChannelBufferToString;
 import org.jetbrains.io.SimpleChannelInboundHandlerAdapter;
 
 import java.nio.CharBuffer;
index f83238707dc4e8b69634e3914b6c2a383baf06f3..464a4cc3ab6784c3f741b8b2dc4a7eab40506198 100644 (file)
@@ -9,6 +9,6 @@
     <orderEntry type="sourceFolder" forTests="false" />
     <orderEntry type="library" exported="" name="gson" level="project" />
     <orderEntry type="module" module-name="util" />
+    <orderEntry type="module" module-name="platform-impl" />
   </component>
-</module>
-
+</module>
\ No newline at end of file
index f8259f8992b983de58ab26d7a365de2b9b18a059..6200e46c48dc438dcaa61a267b6be6d965223d91 100644 (file)
@@ -11,5 +11,6 @@
     <orderEntry type="library" name="gson" level="project" />
     <orderEntry type="module" module-name="protocol-reader-runtime" exported="" />
     <orderEntry type="module" module-name="annotations" />
+    <orderEntry type="module" module-name="platform-impl" />
   </component>
 </module>
\ No newline at end of file