2 * Copyright 2000-2016 JetBrains s.r.o.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.jetbrains.concurrency
18 import com.intellij.openapi.diagnostic.Logger
19 import com.intellij.openapi.util.Getter
20 import com.intellij.util.Consumer
21 import com.intellij.util.Function
24 private val LOG = Logger.getInstance(AsyncPromise::class.java)
26 @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
27 val OBSOLETE_ERROR = Promise.createError("Obsolete")
29 open class AsyncPromise<T> : Promise<T>(), Getter<T> {
30 @Volatile private var done: Consumer<in T>? = null
31 @Volatile private var rejected: Consumer<in Throwable>? = null
33 @Volatile private var state: Promise.State = Promise.State.PENDING
35 // result object or error message
36 @Volatile private var result: Any? = null
38 override fun getState() = state
40 override fun done(done: Consumer<in T>): Promise<T> {
41 if (isObsolete(done)) {
46 Promise.State.PENDING -> {
48 Promise.State.FULFILLED -> {
49 @Suppress("UNCHECKED_CAST")
50 done.consume(result as T?)
53 Promise.State.REJECTED -> return this
56 this.done = setHandler(this.done, done)
60 override fun rejected(rejected: Consumer<Throwable>): Promise<T> {
61 if (isObsolete(rejected)) {
66 Promise.State.PENDING -> {
68 Promise.State.FULFILLED -> return this
69 Promise.State.REJECTED -> {
70 rejected.consume(result as Throwable?)
75 this.rejected = setHandler(this.rejected, rejected)
79 @Suppress("UNCHECKED_CAST")
80 override fun get() = if (state == Promise.State.FULFILLED) result as T? else null
82 override fun <SUB_RESULT> then(fulfilled: Function<in T, out SUB_RESULT>): Promise<SUB_RESULT> {
83 @Suppress("UNCHECKED_CAST")
85 Promise.State.PENDING -> {
87 Promise.State.FULFILLED -> return DonePromise<SUB_RESULT>(fulfilled.`fun`(result as T?))
88 Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
91 val promise = AsyncPromise<SUB_RESULT>()
92 addHandlers(Consumer({ result ->
94 if (fulfilled is Obsolescent && fulfilled.isObsolete) {
95 promise.setError(OBSOLETE_ERROR)
98 promise.setResult(fulfilled.`fun`(result))
101 }), Consumer({ promise.setError(it) }))
105 override fun notify(child: AsyncPromise<in T>) {
106 LOG.assertTrue(child !== this)
109 Promise.State.PENDING -> {
111 Promise.State.FULFILLED -> {
112 @Suppress("UNCHECKED_CAST")
113 child.setResult(result as T)
116 Promise.State.REJECTED -> {
117 child.setError((result as Throwable?)!!)
122 addHandlers(Consumer({ child.catchError { child.setResult(it) } }), Consumer({ child.setError(it) }))
125 override fun <SUB_RESULT> thenAsync(fulfilled: Function<in T, Promise<SUB_RESULT>>): Promise<SUB_RESULT> {
126 @Suppress("UNCHECKED_CAST")
128 Promise.State.PENDING -> {
130 Promise.State.FULFILLED -> return fulfilled.`fun`(result as T?)
131 Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
134 val promise = AsyncPromise<SUB_RESULT>()
135 val rejectedHandler = Consumer<Throwable>({ promise.setError(it) })
136 addHandlers(Consumer({
139 .done { promise.catchError { promise.setResult(it) } }
140 .rejected(rejectedHandler)
146 override fun processed(fulfilled: AsyncPromise<in T>): Promise<T> {
148 Promise.State.PENDING -> {
150 Promise.State.FULFILLED -> {
151 @Suppress("UNCHECKED_CAST")
152 fulfilled.setResult(result as T)
155 Promise.State.REJECTED -> {
156 fulfilled.setError((result as Throwable?)!!)
161 addHandlers(Consumer({ result -> fulfilled.catchError { fulfilled.setResult(result) } }), Consumer({ fulfilled.setError(it) }))
165 private fun addHandlers(done: Consumer<T>, rejected: Consumer<Throwable>) {
166 this.done = setHandler(this.done, done)
167 this.rejected = setHandler(this.rejected, rejected)
170 fun setResult(result: T?) {
171 if (state != Promise.State.PENDING) {
176 state = Promise.State.FULFILLED
180 if (done != null && !isObsolete(done)) {
185 fun setError(error: String): Boolean {
186 return setError(Promise.createError(error))
189 open fun setError(error: Throwable): Boolean {
190 if (state != Promise.State.PENDING) {
195 state = Promise.State.REJECTED
197 val rejected = this.rejected
199 if (rejected == null) {
200 Promise.logError(LOG, error)
202 else if (!isObsolete(rejected)) {
203 rejected.consume(error)
208 private fun clearHandlers() {
213 override fun processed(processed: Consumer<in T>): Promise<T> {
215 rejected({ error -> processed.consume(null) })
220 private class CompoundConsumer<T>(c1: Consumer<in T>, c2: Consumer<in T>) : Consumer<T> {
221 private var consumers: MutableList<Consumer<in T>>? = ArrayList()
230 override fun consume(t: T) {
231 val list = synchronized(this) {
237 for (consumer in list) {
238 if (!isObsolete(consumer)) {
244 fun add(consumer: Consumer<in T>) {
246 if (consumers != null) {
247 consumers!!.add(consumer)
253 private fun <T> setHandler(oldConsumer: Consumer<in T>?, newConsumer: Consumer<in T>) = when (oldConsumer) {
255 is CompoundConsumer<*> -> {
256 @Suppress("UNCHECKED_CAST")
257 (oldConsumer as CompoundConsumer<T>).add(newConsumer)
260 else -> CompoundConsumer(oldConsumer, newConsumer)
263 internal fun isObsolete(consumer: Consumer<*>?) = consumer is Obsolescent && consumer.isObsolete
265 inline fun <T> AsyncPromise<*>.catchError(runnable: () -> T): T? {
269 catch (e: Throwable) {
275 fun <T> rejectedPromise(error: Throwable): Promise<T> = Promise.reject(error)