client should release buffer as soon as possible
[idea/community.git] / platform / platform-impl / src / org / jetbrains / io / Decoder.java
1 /*
2  * Copyright 2000-2015 JetBrains s.r.o.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jetbrains.io;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import org.jetbrains.annotations.NotNull;
23 import org.jetbrains.annotations.Nullable;
24
25 import java.io.IOException;
26
27 public abstract class Decoder extends ChannelInboundHandlerAdapter {
28   // Netty MessageAggregator default value
29   protected static final int DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS = 1024;
30
31   private ByteBuf cumulation;
32
33   @Override
34   public final void channelRead(ChannelHandlerContext context, Object message) throws Exception {
35     if (message instanceof ByteBuf) {
36       ByteBuf input = (ByteBuf)message;
37       try {
38         messageReceived(context, input);
39       }
40       finally {
41         // client should release buffer as soon as possible, so, input could be released already
42         if (input.refCnt() > 0) {
43           input.release();
44         }
45       }
46     }
47     else {
48       context.fireChannelRead(message);
49     }
50   }
51
52   protected abstract void messageReceived(@NotNull ChannelHandlerContext context, @NotNull ByteBuf input) throws Exception;
53
54   public interface FullMessageConsumer<T> {
55     T contentReceived(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, boolean isCumulateBuffer) throws IOException;
56   }
57
58   @Nullable
59   protected final <T> T readContent(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, int contentLength, @NotNull FullMessageConsumer<T> fullMessageConsumer) throws IOException {
60     ByteBuf buffer = getBufferIfSufficient(input, contentLength, context);
61     if (buffer == null) {
62       return null;
63     }
64
65     boolean isCumulateBuffer = buffer != input;
66     int oldReaderIndex = input.readerIndex();
67     try {
68       return fullMessageConsumer.contentReceived(buffer, context, isCumulateBuffer);
69     }
70     finally {
71       if (isCumulateBuffer) {
72         // cumulation buffer - release it
73         buffer.release();
74       }
75       else {
76         buffer.readerIndex(oldReaderIndex + contentLength);
77       }
78     }
79   }
80
81   @Nullable
82   protected final ByteBuf getBufferIfSufficient(@NotNull ByteBuf input, int requiredLength, @NotNull ChannelHandlerContext context) {
83     if (!input.isReadable()) {
84       return null;
85     }
86
87     if (cumulation == null) {
88       if (input.readableBytes() < requiredLength) {
89         cumulation = input;
90         input.retain();
91         input.touch();
92         return null;
93       }
94       else {
95         return input;
96       }
97     }
98     else {
99       int currentAccumulatedByteCount = cumulation.readableBytes();
100       if ((currentAccumulatedByteCount + input.readableBytes()) < requiredLength) {
101         CompositeByteBuf compositeByteBuf;
102         if ((cumulation instanceof CompositeByteBuf)) {
103           compositeByteBuf = (CompositeByteBuf)cumulation;
104         }
105         else {
106           compositeByteBuf = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
107           compositeByteBuf.addComponent(cumulation);
108           cumulation = compositeByteBuf;
109         }
110
111         compositeByteBuf.addComponent(input);
112         input.retain();
113         input.touch();
114         return null;
115       }
116       else {
117         CompositeByteBuf buffer;
118         if (cumulation instanceof CompositeByteBuf) {
119           buffer = (CompositeByteBuf)cumulation;
120           buffer.addComponent(input);
121         }
122         else {
123           // may be it will be used by client to cumulate something - don't set artificial restriction (2)
124           buffer = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
125           buffer.addComponents(cumulation, input);
126         }
127
128         // we don't set writerIndex on addComponent, it is clear to set it to requiredLength here
129         buffer.writerIndex(requiredLength);
130
131         input.skipBytes(requiredLength - currentAccumulatedByteCount);
132         input.retain();
133         input.touch();
134         cumulation = null;
135         return buffer;
136       }
137     }
138   }
139
140   @Override
141   public void channelInactive(ChannelHandlerContext context) throws Exception {
142     try {
143       if (cumulation != null) {
144         cumulation.release();
145         cumulation = null;
146       }
147     }
148     finally {
149       super.channelInactive(context);
150     }
151   }
152 }