2 * Copyright 2000-2015 JetBrains s.r.o.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.jetbrains.io;
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import org.jetbrains.annotations.NotNull;
23 import org.jetbrains.annotations.Nullable;
25 import java.io.IOException;
27 public abstract class Decoder extends ChannelInboundHandlerAdapter {
28 // Netty MessageAggregator default value
29 protected static final int DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS = 1024;
31 private ByteBuf cumulation;
34 public final void channelRead(ChannelHandlerContext context, Object message) throws Exception {
35 if (message instanceof ByteBuf) {
36 ByteBuf input = (ByteBuf)message;
38 messageReceived(context, input);
41 // client should release buffer as soon as possible, so, input could be released already
42 if (input.refCnt() > 0) {
48 context.fireChannelRead(message);
52 protected abstract void messageReceived(@NotNull ChannelHandlerContext context, @NotNull ByteBuf input) throws Exception;
54 public interface FullMessageConsumer<T> {
55 T contentReceived(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, boolean isCumulateBuffer) throws IOException;
59 protected final <T> T readContent(@NotNull ByteBuf input, @NotNull ChannelHandlerContext context, int contentLength, @NotNull FullMessageConsumer<T> fullMessageConsumer) throws IOException {
60 ByteBuf buffer = getBufferIfSufficient(input, contentLength, context);
65 boolean isCumulateBuffer = buffer != input;
66 int oldReaderIndex = input.readerIndex();
68 return fullMessageConsumer.contentReceived(buffer, context, isCumulateBuffer);
71 if (isCumulateBuffer) {
72 // cumulation buffer - release it
76 buffer.readerIndex(oldReaderIndex + contentLength);
82 protected final ByteBuf getBufferIfSufficient(@NotNull ByteBuf input, int requiredLength, @NotNull ChannelHandlerContext context) {
83 if (!input.isReadable()) {
87 if (cumulation == null) {
88 if (input.readableBytes() < requiredLength) {
99 int currentAccumulatedByteCount = cumulation.readableBytes();
100 if ((currentAccumulatedByteCount + input.readableBytes()) < requiredLength) {
101 CompositeByteBuf compositeByteBuf;
102 if ((cumulation instanceof CompositeByteBuf)) {
103 compositeByteBuf = (CompositeByteBuf)cumulation;
106 compositeByteBuf = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
107 compositeByteBuf.addComponent(cumulation);
108 cumulation = compositeByteBuf;
111 compositeByteBuf.addComponent(input);
117 CompositeByteBuf buffer;
118 if (cumulation instanceof CompositeByteBuf) {
119 buffer = (CompositeByteBuf)cumulation;
120 buffer.addComponent(input);
123 // may be it will be used by client to cumulate something - don't set artificial restriction (2)
124 buffer = context.alloc().compositeBuffer(DEFAULT_MAX_COMPOSITE_BUFFER_COMPONENTS);
125 buffer.addComponents(cumulation, input);
128 // we don't set writerIndex on addComponent, it is clear to set it to requiredLength here
129 buffer.writerIndex(requiredLength);
131 input.skipBytes(requiredLength - currentAccumulatedByteCount);
141 public void channelInactive(ChannelHandlerContext context) throws Exception {
143 if (cumulation != null) {
144 cumulation.release();
149 super.channelInactive(context);