99e93e526f46ab51892afd9499590c152e1f22d5
[idea/community.git] / platform / platform-impl / src / org / jetbrains / io / Decoder.java
1 /*
2  * Copyright 2000-2015 JetBrains s.r.o.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jetbrains.io;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import org.jetbrains.annotations.NotNull;
23 import org.jetbrains.annotations.Nullable;
24
25 import java.io.IOException;
26
27 public abstract class Decoder extends ChannelInboundHandlerAdapter {
28   // Netty MessageAggregator default value
29   protected static final int DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS = 1024;
30
31   private ByteBuf cumulation;
32
33   @Override
34   public final void channelRead(ChannelHandlerContext context, Object message) throws Exception {
35     if (message instanceof ByteBuf) {
36       ByteBuf input = (ByteBuf)message;
37       try {
38         messageReceived(context, input);
39       }
40       finally {
41         input.release();
42       }
43     }
44     else {
45       context.fireChannelRead(message);
46     }
47   }
48
49   protected abstract void messageReceived(@NotNull ChannelHandlerContext context, @NotNull ByteBuf input) throws Exception;
50
51   public interface FullMessageConsumer<T> {
52     T contentReceived(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, boolean isCumulateBuffer) throws IOException;
53   }
54
55   @Nullable
56   protected final <T> T readContent(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, int contentLength, @NotNull FullMessageConsumer<T> fullMessageConsumer) throws IOException {
57     ByteBuf buffer = getBufferIfSufficient(input, contentLength, context);
58     if (buffer == null) {
59       return null;
60     }
61
62     boolean isCumulateBuffer = buffer != input;
63     int oldReaderIndex = input.readerIndex();
64     try {
65       return fullMessageConsumer.contentReceived(buffer, context, isCumulateBuffer);
66     }
67     finally {
68       if (isCumulateBuffer) {
69         // cumulation buffer - release it
70         buffer.release();
71       }
72       else {
73         buffer.readerIndex(oldReaderIndex + contentLength);
74       }
75     }
76   }
77
78   @Nullable
79   protected final ByteBuf getBufferIfSufficient(@NotNull ByteBuf input, int requiredLength, @NotNull ChannelHandlerContext context) {
80     if (!input.isReadable()) {
81       return null;
82     }
83
84     if (cumulation == null) {
85       if (input.readableBytes() < requiredLength) {
86         cumulation = input;
87         input.retain();
88         input.touch();
89         return null;
90       }
91       else {
92         return input;
93       }
94     }
95     else {
96       int currentAccumulatedByteCount = cumulation.readableBytes();
97       if ((currentAccumulatedByteCount + input.readableBytes()) < requiredLength) {
98         CompositeByteBuf compositeByteBuf;
99         if ((cumulation instanceof CompositeByteBuf)) {
100           compositeByteBuf = (CompositeByteBuf)cumulation;
101         }
102         else {
103           compositeByteBuf = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
104           compositeByteBuf.addComponent(cumulation);
105           cumulation = compositeByteBuf;
106         }
107
108         compositeByteBuf.addComponent(input);
109         input.retain();
110         input.touch();
111         return null;
112       }
113       else {
114         CompositeByteBuf buffer;
115         if (cumulation instanceof CompositeByteBuf) {
116           buffer = (CompositeByteBuf)cumulation;
117           buffer.addComponent(input);
118         }
119         else {
120           // may be it will be used by client to cumulate something - don't set artificial restriction (2)
121           buffer = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
122           buffer.addComponents(cumulation, input);
123         }
124
125         // we don't set writerIndex on addComponent, it is clear to set it to requiredLength here
126         buffer.writerIndex(requiredLength);
127
128         input.skipBytes(requiredLength - currentAccumulatedByteCount);
129         input.retain();
130         input.touch();
131         cumulation = null;
132         return buffer;
133       }
134     }
135   }
136
137   @Override
138   public void channelInactive(ChannelHandlerContext context) throws Exception {
139     try {
140       if (cumulation != null) {
141         cumulation.release();
142         cumulation = null;
143       }
144     }
145     finally {
146       super.channelInactive(context);
147     }
148   }
149 }