4eb035b48410c6785f27c533d55ab691a0256959
[idea/community.git] / platform / built-in-server / src / org / jetbrains / io / jsonRpc / ClientManager.kt
1 package org.jetbrains.io.jsonRpc
2
3 import com.intellij.openapi.Disposable
4 import com.intellij.openapi.util.SimpleTimer
5 import gnu.trove.THashSet
6 import gnu.trove.TObjectProcedure
7 import io.netty.buffer.ByteBuf
8 import io.netty.channel.ChannelHandlerContext
9 import io.netty.util.AttributeKey
10 import org.jetbrains.concurrency.Promise
11 import org.jetbrains.io.webSocket.WebSocketServerOptions
12
13 internal val CLIENT = AttributeKey.valueOf<Client>("SocketHandler.client")
14
15 class ClientManager(private val listener: ClientListener?, val exceptionHandler: ExceptionHandler, options: WebSocketServerOptions? = null) : Disposable {
16   private val heartbeatTimer = SimpleTimer.getInstance().setUp({
17     forEachClient(TObjectProcedure {
18       if (it.channel.isActive) {
19         it.sendHeartbeat()
20       }
21       true
22     })
23   }, (options ?: WebSocketServerOptions()).heartbeatDelay.toLong())
24
25   private val clients = THashSet<Client>()
26
27   fun addClient(client: Client) {
28     synchronized (clients) {
29       clients.add(client)
30     }
31   }
32
33   val clientCount: Int
34     get() = synchronized (clients) { clients.size }
35
36   fun hasClients() = clientCount > 0
37
38   override fun dispose() {
39     try {
40       heartbeatTimer.cancel()
41     }
42     finally {
43       synchronized (clients) {
44         clients.clear()
45       }
46     }
47   }
48
49   fun <T> send(messageId: Int, message: ByteBuf, results: MutableList<Promise<Pair<Client, T>>>? = null) {
50     forEachClient(object : TObjectProcedure<Client> {
51       private var first: Boolean = false
52
53       override fun execute(client: Client): Boolean {
54         try {
55           val result = client.send<Pair<Client, T>>(messageId, if (first) message else message.duplicate())
56           first = false
57           results?.add(result!!)
58         }
59         catch (e: Throwable) {
60           exceptionHandler.exceptionCaught(e)
61         }
62         return true
63       }
64     })
65   }
66
67   fun disconnectClient(context: ChannelHandlerContext, client: Client, closeChannel: Boolean): Boolean {
68     synchronized (clients) {
69       if (!clients.remove(client)) {
70         return false
71       }
72     }
73
74     try {
75       context.attr(CLIENT).remove()
76
77       if (closeChannel) {
78         context.channel().close()
79       }
80
81       client.rejectAsyncResults(exceptionHandler)
82     }
83     finally {
84       listener?.disconnected(client)
85     }
86     return true
87   }
88
89   fun forEachClient(procedure: TObjectProcedure<Client>) {
90     synchronized (clients) {
91       clients.forEach(procedure)
92     }
93   }
94 }