2 * Copyright 2000-2016 JetBrains s.r.o.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.jetbrains.io;
18 import com.intellij.openapi.diagnostic.Logger;
19 import com.intellij.openapi.util.Condition;
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.bootstrap.BootstrapUtil;
22 import io.netty.channel.*;
23 import io.netty.channel.nio.NioEventLoopGroup;
24 import io.netty.channel.socket.nio.NioSocketChannel;
25 import io.netty.channel.socket.oio.OioSocketChannel;
26 import io.netty.handler.codec.http.HttpMethod;
27 import io.netty.handler.codec.http.HttpObjectAggregator;
28 import io.netty.handler.codec.http.HttpRequestDecoder;
29 import io.netty.handler.codec.http.HttpResponseEncoder;
30 import io.netty.handler.codec.http.cors.CorsConfig;
31 import io.netty.handler.codec.http.cors.CorsConfigBuilder;
32 import io.netty.handler.codec.http.cors.CorsHandler;
33 import io.netty.handler.stream.ChunkedWriteHandler;
34 import io.netty.util.concurrent.GlobalEventExecutor;
35 import org.jetbrains.annotations.NotNull;
36 import org.jetbrains.annotations.Nullable;
37 import org.jetbrains.annotations.TestOnly;
38 import org.jetbrains.concurrency.AsyncPromise;
39 import org.jetbrains.concurrency.Promise;
40 import org.jetbrains.ide.PooledThreadExecutor;
42 import java.io.IOException;
43 import java.net.BindException;
44 import java.net.ConnectException;
45 import java.net.InetSocketAddress;
46 import java.net.Socket;
47 import java.util.concurrent.TimeUnit;
49 public final class NettyUtil {
50 public static final int MAX_CONTENT_LENGTH = 100 * 1024 * 1024;
52 public static final int DEFAULT_CONNECT_ATTEMPT_COUNT = 20;
53 public static final int MIN_START_TIME = 100;
55 public static void logAndClose(@NotNull Throwable error, @NotNull Logger log, @NotNull Channel channel) {
56 // don't report about errors while connecting
59 if (error instanceof ConnectException) {
67 log.info("Channel will be closed due to error");
72 public static void log(@NotNull Throwable throwable, @NotNull Logger log) {
73 if (isAsWarning(throwable)) {
82 static Channel doConnect(@NotNull Bootstrap bootstrap,
83 @NotNull InetSocketAddress remoteAddress,
84 @Nullable AsyncPromise<?> promise,
86 @NotNull Condition<Void> stopCondition) throws Throwable {
88 if (bootstrap.config().group() instanceof NioEventLoopGroup) {
89 return connectNio(bootstrap, remoteAddress, promise, maxAttemptCount, stopCondition, attemptCount);
97 //noinspection IOResourceOpenedButNotSafelyClosed,SocketOpenedButNotSafelyClosed
98 socket = new Socket(remoteAddress.getAddress(), remoteAddress.getPort());
101 catch (IOException e) {
102 if (stopCondition.value(null) || (promise != null && promise.getState() != Promise.State.PENDING)) {
105 else if (maxAttemptCount == -1) {
106 if (sleep(promise, 300)) {
111 else if (++attemptCount < maxAttemptCount) {
112 if (sleep(promise, attemptCount * MIN_START_TIME)) {
117 if (promise != null) {
125 OioSocketChannel channel = new OioSocketChannel(socket);
126 BootstrapUtil.initAndRegister(channel, bootstrap).sync();
131 private static Channel connectNio(@NotNull Bootstrap bootstrap,
132 @NotNull InetSocketAddress remoteAddress,
133 @Nullable AsyncPromise<?> promise,
135 @NotNull Condition<Void> stopCondition,
138 ChannelFuture future = bootstrap.connect(remoteAddress).awaitUninterruptibly();
139 if (future.isSuccess()) {
140 if (!future.channel().isOpen()) {
143 return future.channel();
145 else if (stopCondition.value(null) || (promise != null && promise.getState() == Promise.State.REJECTED)) {
148 else if (maxAttemptCount == -1) {
149 if (sleep(promise, 300)) {
154 else if (++attemptCount < maxAttemptCount) {
155 if (sleep(promise, attemptCount * MIN_START_TIME)) {
160 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
161 Throwable cause = future.cause();
162 if (promise != null) {
164 promise.setError("Cannot connect: unknown error");
167 promise.setError(cause);
175 private static boolean sleep(@Nullable AsyncPromise<?> promise, int time) {
177 //noinspection BusyWait
180 catch (InterruptedException ignored) {
181 if (promise != null) {
182 promise.setError("Interrupted");
189 private static boolean isAsWarning(@NotNull Throwable throwable) {
190 String message = throwable.getMessage();
191 if (message == null) {
195 return (throwable instanceof IOException && message.equals("An existing connection was forcibly closed by the remote host")) ||
196 (throwable instanceof ChannelException && message.startsWith("Failed to bind to: ")) ||
197 throwable instanceof BindException ||
198 (message.startsWith("Connection reset") || message.equals("Operation timed out") || message.equals("Connection timed out"));
201 public static Bootstrap nioClientBootstrap() {
202 return nioClientBootstrap(new NioEventLoopGroup(1, PooledThreadExecutor.INSTANCE));
205 public static Bootstrap nioClientBootstrap(@NotNull EventLoopGroup eventLoopGroup) {
206 Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class);
207 bootstrap.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true);
211 public static void addHttpServerCodec(@NotNull ChannelPipeline pipeline) {
212 pipeline.addLast("httpRequestEncoder", new HttpResponseEncoder());
213 // https://jetbrains.zendesk.com/agent/tickets/68315
214 pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder(16 * 1024, 16 * 1024, 8192));
215 pipeline.addLast("httpObjectAggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
216 // could be added earlier if HTTPS
217 if (pipeline.get(ChunkedWriteHandler.class) == null) {
218 pipeline.addLast("chunkedWriteHandler", new ChunkedWriteHandler());
220 pipeline.addLast("corsHandler", new CorsHandlerDoNotUseOwnLogger(CorsConfigBuilder
225 .allowedRequestMethods(HttpMethod.GET, HttpMethod.POST, HttpMethod.PUT, HttpMethod.DELETE, HttpMethod.HEAD, HttpMethod.PATCH)
226 .allowedRequestHeaders("origin", "accept", "authorization", "content-type")
230 private static final class CorsHandlerDoNotUseOwnLogger extends CorsHandler {
231 public CorsHandlerDoNotUseOwnLogger(@NotNull CorsConfig config) {
236 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
237 context.fireExceptionCaught(cause);
242 public static void awaitQuiescenceOfGlobalEventExecutor(long timeout, @NotNull TimeUnit unit) {
244 @NotNull GlobalEventExecutor executor = GlobalEventExecutor.INSTANCE;
245 executor.awaitInactivity(timeout, unit);
247 catch (InterruptedException ignored) {
250 catch (IllegalStateException ignored) {
251 // thread did not start