* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.jetbrains.concurrency;
+package org.jetbrains.concurrency
-import com.intellij.openapi.diagnostic.Logger;
-import com.intellij.openapi.util.Getter;
-import com.intellij.util.Consumer;
-import com.intellij.util.Function;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
+import com.intellij.openapi.diagnostic.Logger
+import com.intellij.openapi.util.Getter
+import com.intellij.util.Consumer
+import com.intellij.util.Function
+import java.util.*
-import java.util.ArrayList;
-import java.util.List;
+private val LOG = Logger.getInstance(AsyncPromise::class.java)
-public class AsyncPromise<T> extends Promise<T> implements Getter<T> {
- private static final Logger LOG = Logger.getInstance(AsyncPromise.class);
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+val OBSOLETE_ERROR = Promise.createError("Obsolete")
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static final RuntimeException OBSOLETE_ERROR = Promise.createError("Obsolete");
+open class AsyncPromise<T> : Promise<T>(), Getter<T> {
+ @Volatile private var done: Consumer<in T>? = null
+ @Volatile private var rejected: Consumer<in Throwable>? = null
- private volatile Consumer<? super T> done;
- private volatile Consumer<? super Throwable> rejected;
+ @Volatile private var state: Promise.State = Promise.State.PENDING
- protected volatile State state = State.PENDING;
// result object or error message
- private volatile Object result;
+ @Volatile private var result: Any? = null
- @NotNull
- @Override
- public State getState() {
- return state;
- }
+ override fun getState() = state
- @NotNull
- @Override
- public Promise<T> done(@NotNull Consumer<? super T> done) {
+ override fun done(done: Consumer<in T>): Promise<T> {
if (isObsolete(done)) {
- return this;
+ return this
}
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- //noinspection unchecked
- done.consume((T)result);
- return this;
- case REJECTED:
- return this;
+ when (state) {
+ Promise.State.PENDING -> {
+ }
+ Promise.State.FULFILLED -> {
+ @Suppress("UNCHECKED_CAST")
+ done.consume(result as T?)
+ return this
+ }
+ Promise.State.REJECTED -> return this
}
- this.done = setHandler(this.done, done);
- return this;
+ this.done = setHandler(this.done, done)
+ return this
}
- @NotNull
- @Override
- public Promise<T> rejected(@NotNull Consumer<Throwable> rejected) {
+ override fun rejected(rejected: Consumer<Throwable>): Promise<T> {
if (isObsolete(rejected)) {
- return this;
+ return this
}
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- return this;
- case REJECTED:
- rejected.consume((Throwable)result);
- return this;
+ when (state) {
+ Promise.State.PENDING -> {
+ }
+ Promise.State.FULFILLED -> return this
+ Promise.State.REJECTED -> {
+ rejected.consume(result as Throwable?)
+ return this
+ }
}
- this.rejected = setHandler(this.rejected, rejected);
- return this;
- }
-
- @Override
- public T get() {
- //noinspection unchecked
- return state == State.FULFILLED ? (T)result : null;
+ this.rejected = setHandler(this.rejected, rejected)
+ return this
}
- @SuppressWarnings("SynchronizeOnThis")
- private static final class CompoundConsumer<T> implements Consumer<T> {
- private List<Consumer<? super T>> consumers = new ArrayList<>();
+ @Suppress("UNCHECKED_CAST")
+ override fun get() = if (state == Promise.State.FULFILLED) result as T? else null
- public CompoundConsumer(@NotNull Consumer<? super T> c1, @NotNull Consumer<? super T> c2) {
- synchronized (this) {
- consumers.add(c1);
- consumers.add(c2);
+ override fun <SUB_RESULT> then(fulfilled: Function<in T, out SUB_RESULT>): Promise<SUB_RESULT> {
+ @Suppress("UNCHECKED_CAST")
+ when (state) {
+ Promise.State.PENDING -> {
}
+ Promise.State.FULFILLED -> return DonePromise<SUB_RESULT>(fulfilled.`fun`(result as T?))
+ Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
}
- @Override
- public void consume(T t) {
- List<Consumer<? super T>> list;
- synchronized (this) {
- list = consumers;
- consumers = null;
- }
-
- if (list != null) {
- for (Consumer<? super T> consumer : list) {
- if (!isObsolete(consumer)) {
- consumer.consume(t);
- }
- }
- }
- }
-
- public void add(@NotNull Consumer<? super T> consumer) {
- synchronized (this) {
- if (consumers != null) {
- consumers.add(consumer);
- }
- }
- }
+ val promise = AsyncPromise<SUB_RESULT>()
+ addHandlers(Consumer({ result ->
+ promise.catchError {
+ if (fulfilled is Obsolescent && fulfilled.isObsolete) {
+ promise.setError(OBSOLETE_ERROR)
+ }
+ else {
+ promise.setResult(fulfilled.`fun`(result))
+ }
+ }
+ }), Consumer({ promise.setError(it) }))
+ return promise
}
- @Override
- @NotNull
- public <SUB_RESULT> Promise<SUB_RESULT> then(@NotNull final Function<? super T, ? extends SUB_RESULT> fulfilled) {
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- //noinspection unchecked
- return new DonePromise<SUB_RESULT>(fulfilled.fun((T)result));
- case REJECTED:
- return new RejectedPromise<SUB_RESULT>((Throwable)result);
- }
+ override fun notify(child: AsyncPromise<in T>) {
+ LOG.assertTrue(child !== this)
- final AsyncPromise<SUB_RESULT> promise = new AsyncPromise<SUB_RESULT>();
- addHandlers(result -> {
- try {
- if (fulfilled instanceof Obsolescent && ((Obsolescent)fulfilled).isObsolete()) {
- promise.setError(OBSOLETE_ERROR);
- }
- else {
- promise.setResult(fulfilled.fun(result));
- }
+ when (state) {
+ Promise.State.PENDING -> {
}
- catch (Throwable e) {
- promise.setError(e);
+ Promise.State.FULFILLED -> {
+ @Suppress("UNCHECKED_CAST")
+ child.setResult(result as T)
+ return
+ }
+ Promise.State.REJECTED -> {
+ child.setError((result as Throwable?)!!)
+ return
}
- }, promise::setError);
- return promise;
- }
-
- @Override
- public void notify(@NotNull final AsyncPromise<? super T> child) {
- LOG.assertTrue(child != this);
-
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- //noinspection unchecked
- child.setResult((T)result);
- return;
- case REJECTED:
- child.setError((Throwable)result);
- return;
}
- addHandlers(result -> {
- try {
- child.setResult(result);
- }
- catch (Throwable e) {
- child.setError(e);
- }
- }, child::setError);
+ addHandlers(Consumer({ child.catchError { child.setResult(it) } }), Consumer({ child.setError(it) }))
}
- @Override
- @NotNull
- public <SUB_RESULT> Promise<SUB_RESULT> thenAsync(@NotNull final Function<? super T, Promise<SUB_RESULT>> fulfilled) {
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- //noinspection unchecked
- return fulfilled.fun((T)result);
- case REJECTED:
- return Promise.reject((Throwable)result);
+ override fun <SUB_RESULT> thenAsync(fulfilled: Function<in T, Promise<SUB_RESULT>>): Promise<SUB_RESULT> {
+ @Suppress("UNCHECKED_CAST")
+ when (state) {
+ Promise.State.PENDING -> {
+ }
+ Promise.State.FULFILLED -> return fulfilled.`fun`(result as T?)
+ Promise.State.REJECTED -> return rejectedPromise(result as Throwable)
}
- final AsyncPromise<SUB_RESULT> promise = new AsyncPromise<SUB_RESULT>();
- final Consumer<Throwable> rejectedHandler = promise::setError;
- addHandlers(result -> {
- try {
- fulfilled.fun(result)
- .done(subResult -> {
- try {
- promise.setResult(subResult);
- }
- catch (Throwable e) {
- promise.setError(e);
- }
- })
- .rejected(rejectedHandler);
- }
- catch (Throwable e) {
- promise.setError(e);
- }
- }, rejectedHandler);
- return promise;
+ val promise = AsyncPromise<SUB_RESULT>()
+ val rejectedHandler = Consumer<Throwable>({ promise.setError(it) })
+ addHandlers(Consumer({
+ promise.catchError {
+ fulfilled.`fun`(it)
+ .done { promise.catchError { promise.setResult(it) } }
+ .rejected(rejectedHandler)
+ }
+ }), rejectedHandler)
+ return promise
}
- @Override
- @NotNull
- public Promise<T> processed(@NotNull AsyncPromise<? super T> fulfilled) {
- switch (state) {
- case PENDING:
- break;
- case FULFILLED:
- //noinspection unchecked
- fulfilled.setResult((T)result);
- return this;
- case REJECTED:
- fulfilled.setError((Throwable)result);
- return this;
- }
-
- addHandlers(result -> {
- try {
- fulfilled.setResult(result);
+ override fun processed(fulfilled: AsyncPromise<in T>): Promise<T> {
+ when (state) {
+ Promise.State.PENDING -> {
}
- catch (Throwable e) {
- fulfilled.setError(e);
+ Promise.State.FULFILLED -> {
+ @Suppress("UNCHECKED_CAST")
+ fulfilled.setResult(result as T)
+ return this
}
- }, fulfilled::setError);
- return this;
+ Promise.State.REJECTED -> {
+ fulfilled.setError((result as Throwable?)!!)
+ return this
+ }
+ }
+
+ addHandlers(Consumer({ result -> fulfilled.catchError { fulfilled.setResult(result) } }), Consumer({ fulfilled.setError(it) }))
+ return this
}
- private void addHandlers(@NotNull Consumer<T> done, @NotNull Consumer<Throwable> rejected) {
- this.done = setHandler(this.done, done);
- this.rejected = setHandler(this.rejected, rejected);
+ private fun addHandlers(done: Consumer<T>, rejected: Consumer<Throwable>) {
+ this.done = setHandler(this.done, done)
+ this.rejected = setHandler(this.rejected, rejected)
}
- @NotNull
- private static <T> Consumer<? super T> setHandler(@Nullable Consumer<? super T> oldConsumer, @NotNull Consumer<? super T> newConsumer) {
- if (oldConsumer == null) {
- return newConsumer;
- }
- else if (oldConsumer instanceof CompoundConsumer) {
- //noinspection unchecked
- ((CompoundConsumer<T>)oldConsumer).add(newConsumer);
- return oldConsumer;
+ fun setResult(result: T?) {
+ if (state != Promise.State.PENDING) {
+ return
}
- else {
- return new CompoundConsumer<T>(oldConsumer, newConsumer);
+
+ this.result = result
+ state = Promise.State.FULFILLED
+
+ val done = this.done
+ clearHandlers()
+ if (done != null && !isObsolete(done)) {
+ done.consume(result)
}
}
- public void setResult(T result) {
- if (state != State.PENDING) {
- return;
+ fun setError(error: String): Boolean {
+ return setError(Promise.createError(error))
+ }
+
+ open fun setError(error: Throwable): Boolean {
+ if (state != Promise.State.PENDING) {
+ return false
}
- this.result = result;
- state = State.FULFILLED;
+ result = error
+ state = Promise.State.REJECTED
- Consumer<? super T> done = this.done;
- clearHandlers();
- if (done != null && !isObsolete(done)) {
- done.consume(result);
+ val rejected = this.rejected
+ clearHandlers()
+ if (rejected == null) {
+ Promise.logError(LOG, error)
+ }
+ else if (!isObsolete(rejected)) {
+ rejected.consume(error)
}
+ return true
}
- static boolean isObsolete(@Nullable Consumer<?> consumer) {
- return consumer instanceof Obsolescent && ((Obsolescent)consumer).isObsolete();
+ private fun clearHandlers() {
+ done = null
+ rejected = null
}
- public boolean setError(@NotNull String error) {
- return setError(Promise.createError(error));
+ override fun processed(processed: Consumer<in T>): Promise<T> {
+ done(processed)
+ rejected({ error -> processed.consume(null) })
+ return this
}
+}
- public boolean setError(@NotNull Throwable error) {
- if (state != State.PENDING) {
- return false;
+private class CompoundConsumer<T>(c1: Consumer<in T>, c2: Consumer<in T>) : Consumer<T> {
+ private var consumers: MutableList<Consumer<in T>>? = ArrayList()
+
+ init {
+ synchronized(this) {
+ consumers!!.add(c1)
+ consumers!!.add(c2)
}
+ }
- result = error;
- state = State.REJECTED;
+ override fun consume(t: T) {
+ val list = synchronized(this) {
+ val list = consumers
+ consumers = null
+ list
+ } ?: return
- Consumer<? super Throwable> rejected = this.rejected;
- clearHandlers();
- if (rejected == null) {
- Promise.logError(LOG, error);
- }
- else if (!isObsolete(rejected)) {
- rejected.consume(error);
+ for (consumer in list) {
+ if (!isObsolete(consumer)) {
+ consumer.consume(t)
+ }
}
- return true;
}
- private void clearHandlers() {
- done = null;
- rejected = null;
+ fun add(consumer: Consumer<in T>) {
+ synchronized(this) {
+ if (consumers != null) {
+ consumers!!.add(consumer)
+ }
+ }
+ }
+}
+
+private fun <T> setHandler(oldConsumer: Consumer<in T>?, newConsumer: Consumer<in T>) = when (oldConsumer) {
+ null -> newConsumer
+ is CompoundConsumer<*> -> {
+ @Suppress("UNCHECKED_CAST")
+ (oldConsumer as CompoundConsumer<T>).add(newConsumer)
+ oldConsumer
}
+ else -> CompoundConsumer(oldConsumer, newConsumer)
+}
- @Override
- public Promise<T> processed(@NotNull Consumer<? super T> processed) {
- done(processed);
- rejected(error -> processed.consume(null));
- return this;
+internal fun isObsolete(consumer: Consumer<*>?) = consumer is Obsolescent && consumer.isObsolete
+
+inline fun <T> AsyncPromise<*>.catchError(runnable: () -> T): T? {
+ try {
+ return runnable()
+ }
+ catch (e: Throwable) {
+ setError(e)
+ return null
}
-}
\ No newline at end of file
+}
+
+fun <T> rejectedPromise(error: Throwable): Promise<T> = Promise.reject(error)
\ No newline at end of file