add MessageBus#hasUndeliveredEvents (needed for IDEA-162546) dbe/171.1045
authorpeter <peter@jetbrains.com>
Thu, 17 Nov 2016 11:07:40 +0000 (12:07 +0100)
committerpeter <peter@jetbrains.com>
Thu, 17 Nov 2016 11:09:07 +0000 (12:09 +0100)
platform/platform-tests/testSrc/com/intellij/util/messages/MessageBusTest.java
platform/util/src/com/intellij/util/messages/MessageBus.java
platform/util/src/com/intellij/util/messages/impl/MessageBusConnectionImpl.java
platform/util/src/com/intellij/util/messages/impl/MessageBusImpl.java

index 5df08117ca5cc9e5a4db71489772b484fcf590dd..7750a16fdec21c9717bcf2cea1747d593f6ea986 100644 (file)
@@ -46,6 +46,7 @@ public class MessageBusTest extends TestCase {
 
   private static final Topic<T1Listener> TOPIC1 = new Topic<>("T1", T1Listener.class);
   private static final Topic<T2Listener> TOPIC2 = new Topic<>("T2", T2Listener.class);
+  private static final Topic<Runnable> RUNNABLE_TOPIC = new Topic<>("runnableTopic", Runnable.class);
 
   private class T1Handler implements T1Listener {
     private final String id;
@@ -312,4 +313,26 @@ public class MessageBusTest extends TestCase {
 
     assertEquals("events mismatch", joinExpected, joinActual);
   }
+
+  public void testHasUndeliveredEvents() {
+    assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+    assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+
+    myBus.connect().subscribe(RUNNABLE_TOPIC, () -> {
+      assertTrue(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+      assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+    });
+    myBus.connect().subscribe(RUNNABLE_TOPIC, () -> {
+      assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+      assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+    });
+    myBus.syncPublisher(RUNNABLE_TOPIC).run();
+  }
+
+  public void testHasUndeliveredEventsInChildBys() {
+    MessageBusImpl childBus = new MessageBusImpl(this, myBus);
+    myBus.connect().subscribe(RUNNABLE_TOPIC, () -> assertTrue(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC)));
+    childBus.connect().subscribe(RUNNABLE_TOPIC, () -> assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC)));
+    myBus.syncPublisher(RUNNABLE_TOPIC).run();
+  }
 }
index 2885d9623bbe931425a81fd4785018046346685b..0f640f6128c3a8d613812acda5572c9fab915efa 100644 (file)
@@ -140,4 +140,10 @@ public interface MessageBus {
    * {@link #connect(Disposable) connections}.
    */
   void dispose();
+
+  /**
+   * @return true when events in the given topic are being dispatched in the current thread,
+   * and not all listeners have received the events yet.
+   */
+  boolean hasUndeliveredEvents(@NotNull Topic<?> topic);
 }
\ No newline at end of file
index ec6049868863ac9a20bc941ed780679915e21bf9..4f2fe46b7dc909c4a4d287ab055ba4e8c1352eb4 100644 (file)
@@ -138,6 +138,15 @@ public class MessageBusConnectionImpl implements MessageBusConnection {
     myPendingMessages.get().offer(message);
   }
 
+  boolean containsMessage(@NotNull Topic topic) {
+    for (Message message : myPendingMessages.get()) {
+      if (message.getTopic() == topic) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public String toString() {
     return mySubscriptions.toString();
   }
index e14d9b4d3c8582f291bcbc8d2c337a9426195f85..bb74d5a0413b5b95bffeeebcc8e0bc66c5c4900f 100644 (file)
@@ -279,6 +279,22 @@ public class MessageBusImpl implements MessageBus {
     myDisposed = true;
   }
 
+  @Override
+  public boolean hasUndeliveredEvents(@NotNull Topic<?> topic) {
+    if (!isDispatchingAnything()) return false;
+
+    for (MessageBusConnectionImpl connection : getTopicSubscribers(topic)) {
+      if (connection.containsMessage(topic)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isDispatchingAnything() {
+    return getRootBus().myWaitingBuses.get() != null;
+  }
+
   private void checkNotDisposed() {
     if (myDisposed) {
       LOG.error("Already disposed: " + this);
@@ -306,13 +322,7 @@ public class MessageBusImpl implements MessageBus {
 
   private void postMessage(Message message) {
     checkNotDisposed();
-    final Topic topic = message.getTopic();
-    List<MessageBusConnectionImpl> topicSubscribers = mySubscriberCache.get(topic);
-    if (topicSubscribers == null) {
-      topicSubscribers = new SmartList<MessageBusConnectionImpl>();
-      calcSubscribers(topic, topicSubscribers);
-      mySubscriberCache.put(topic, topicSubscribers);
-    }
+    List<MessageBusConnectionImpl> topicSubscribers = getTopicSubscribers(message.getTopic());
     if (!topicSubscribers.isEmpty()) {
       for (MessageBusConnectionImpl subscriber : topicSubscribers) {
         subscriber.getBus().myMessageQueue.get().offer(new DeliveryJob(subscriber, message));
@@ -322,6 +332,17 @@ public class MessageBusImpl implements MessageBus {
     }
   }
 
+  @NotNull
+  private List<MessageBusConnectionImpl> getTopicSubscribers(Topic topic) {
+    List<MessageBusConnectionImpl> topicSubscribers = mySubscriberCache.get(topic);
+    if (topicSubscribers == null) {
+      topicSubscribers = new SmartList<MessageBusConnectionImpl>();
+      calcSubscribers(topic, topicSubscribers);
+      mySubscriberCache.put(topic, topicSubscribers);
+    }
+    return topicSubscribers;
+  }
+
   private void notifyPendingJobChange(int delta) {
     ThreadLocal<SortedMap<MessageBusImpl, Integer>> ref = getRootBus().myWaitingBuses;
     SortedMap<MessageBusImpl, Integer> map = ref.get();