convert RejectErrorReporter, AsyncPromise to kotlin
[idea/community.git] / platform / platform-api / src / org / jetbrains / concurrency / AsyncPromise.java
1 /*
2  * Copyright 2000-2016 JetBrains s.r.o.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.jetbrains.concurrency
17
18 import com.intellij.openapi.diagnostic.Logger
19 import com.intellij.openapi.util.Getter
20 import com.intellij.util.Consumer
21 import com.intellij.util.Function
22 import java.util.*
23
24 private val LOG = Logger.getInstance(AsyncPromise::class.java)
25
26 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
27 val OBSOLETE_ERROR = Promise.createError("Obsolete")
28
29 open class AsyncPromise<T> : Promise<T>(), Getter<T> {
30   @Volatile private var done: Consumer<in T>? = null
31   @Volatile private var rejected: Consumer<in Throwable>? = null
32
33   @Volatile private var state: Promise.State = Promise.State.PENDING
34
35   // result object or error message
36   @Volatile private var result: Any? = null
37
38   override fun getState() = state
39
40   override fun done(done: Consumer<in T>): Promise<T> {
41     if (isObsolete(done)) {
42       return this
43     }
44
45     when (state) {
46       Promise.State.PENDING -> {
47       }
48       Promise.State.FULFILLED -> {
49         @Suppress("UNCHECKED_CAST")
50         done.consume(result as T?)
51         return this
52       }
53       Promise.State.REJECTED -> return this
54     }
55
56     this.done = setHandler(this.done, done)
57     return this
58   }
59
60   override fun rejected(rejected: Consumer<Throwable>): Promise<T> {
61     if (isObsolete(rejected)) {
62       return this
63     }
64
65     when (state) {
66       Promise.State.PENDING -> {
67       }
68       Promise.State.FULFILLED -> return this
69       Promise.State.REJECTED -> {
70         rejected.consume(result as Throwable?)
71         return this
72       }
73     }
74
75     this.rejected = setHandler(this.rejected, rejected)
76     return this
77   }
78
79   @Suppress("UNCHECKED_CAST")
80   override fun get() = if (state == Promise.State.FULFILLED) result as T? else null
81
82   override fun <SUB_RESULT> then(fulfilled: Function<in T, out SUB_RESULT>): Promise<SUB_RESULT> {
83     @Suppress("UNCHECKED_CAST")
84     when (state) {
85       Promise.State.PENDING -> {
86       }
87       Promise.State.FULFILLED -> return DonePromise<SUB_RESULT>(fulfilled.`fun`(result as T?))
88       Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
89     }
90
91     val promise = AsyncPromise<SUB_RESULT>()
92     addHandlers(Consumer({ result ->
93                            promise.catchError {
94                              if (fulfilled is Obsolescent && fulfilled.isObsolete) {
95                                promise.setError(OBSOLETE_ERROR)
96                              }
97                              else {
98                                promise.setResult(fulfilled.`fun`(result))
99                              }
100                            }
101                          }), Consumer({ promise.setError(it) }))
102     return promise
103   }
104
105   override fun notify(child: AsyncPromise<in T>) {
106     LOG.assertTrue(child !== this)
107
108     when (state) {
109       Promise.State.PENDING -> {
110       }
111       Promise.State.FULFILLED -> {
112         @Suppress("UNCHECKED_CAST")
113         child.setResult(result as T)
114         return
115       }
116       Promise.State.REJECTED -> {
117         child.setError((result as Throwable?)!!)
118         return
119       }
120     }
121
122     addHandlers(Consumer({ child.catchError { child.setResult(it) } }), Consumer({ child.setError(it) }))
123   }
124
125   override fun <SUB_RESULT> thenAsync(fulfilled: Function<in T, Promise<SUB_RESULT>>): Promise<SUB_RESULT> {
126     @Suppress("UNCHECKED_CAST")
127     when (state) {
128       Promise.State.PENDING -> {
129       }
130       Promise.State.FULFILLED -> return fulfilled.`fun`(result as T?)
131       Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
132     }
133
134     val promise = AsyncPromise<SUB_RESULT>()
135     val rejectedHandler = Consumer<Throwable>({ promise.setError(it) })
136     addHandlers(Consumer({
137                            promise.catchError {
138                              fulfilled.`fun`(it)
139                                  .done { promise.catchError { promise.setResult(it) } }
140                                  .rejected(rejectedHandler)
141                            }
142                          }), rejectedHandler)
143     return promise
144   }
145
146   override fun processed(fulfilled: AsyncPromise<in T>): Promise<T> {
147     when (state) {
148       Promise.State.PENDING -> {
149       }
150       Promise.State.FULFILLED -> {
151         @Suppress("UNCHECKED_CAST")
152         fulfilled.setResult(result as T)
153         return this
154       }
155       Promise.State.REJECTED -> {
156         fulfilled.setError((result as Throwable?)!!)
157         return this
158       }
159     }
160
161     addHandlers(Consumer({ result -> fulfilled.catchError { fulfilled.setResult(result) } }), Consumer({ fulfilled.setError(it) }))
162     return this
163   }
164
165   private fun addHandlers(done: Consumer<T>, rejected: Consumer<Throwable>) {
166     this.done = setHandler(this.done, done)
167     this.rejected = setHandler(this.rejected, rejected)
168   }
169
170   fun setResult(result: T?) {
171     if (state != Promise.State.PENDING) {
172       return
173     }
174
175     this.result = result
176     state = Promise.State.FULFILLED
177
178     val done = this.done
179     clearHandlers()
180     if (done != null && !isObsolete(done)) {
181       done.consume(result)
182     }
183   }
184
185   fun setError(error: String): Boolean {
186     return setError(Promise.createError(error))
187   }
188
189   open fun setError(error: Throwable): Boolean {
190     if (state != Promise.State.PENDING) {
191       return false
192     }
193
194     result = error
195     state = Promise.State.REJECTED
196
197     val rejected = this.rejected
198     clearHandlers()
199     if (rejected == null) {
200       Promise.logError(LOG, error)
201     }
202     else if (!isObsolete(rejected)) {
203       rejected.consume(error)
204     }
205     return true
206   }
207
208   private fun clearHandlers() {
209     done = null
210     rejected = null
211   }
212
213   override fun processed(processed: Consumer<in T>): Promise<T> {
214     done(processed)
215     rejected({ error -> processed.consume(null) })
216     return this
217   }
218 }
219
220 private class CompoundConsumer<T>(c1: Consumer<in T>, c2: Consumer<in T>) : Consumer<T> {
221   private var consumers: MutableList<Consumer<in T>>? = ArrayList()
222
223   init {
224     synchronized(this) {
225       consumers!!.add(c1)
226       consumers!!.add(c2)
227     }
228   }
229
230   override fun consume(t: T) {
231     val list = synchronized(this) {
232       val list = consumers
233       consumers = null
234       list
235     } ?: return
236
237     for (consumer in list) {
238       if (!isObsolete(consumer)) {
239         consumer.consume(t)
240       }
241     }
242   }
243
244   fun add(consumer: Consumer<in T>) {
245     synchronized(this) {
246       if (consumers != null) {
247         consumers!!.add(consumer)
248       }
249     }
250   }
251 }
252
253 private fun <T> setHandler(oldConsumer: Consumer<in T>?, newConsumer: Consumer<in T>) = when (oldConsumer) {
254   null -> newConsumer
255   is CompoundConsumer<*> -> {
256     @Suppress("UNCHECKED_CAST")
257     (oldConsumer as CompoundConsumer<T>).add(newConsumer)
258     oldConsumer
259   }
260   else -> CompoundConsumer(oldConsumer, newConsumer)
261 }
262
263 internal fun isObsolete(consumer: Consumer<*>?) = consumer is Obsolescent && consumer.isObsolete
264
265 inline fun <T> AsyncPromise<*>.catchError(runnable: () -> T): T? {
266   try {
267     return runnable()
268   }
269   catch (e: Throwable) {
270     setError(e)
271     return null
272   }
273 }
274
275 fun <T> rejectedPromise(error: Throwable): Promise<T> = Promise.reject(error)