private static final Topic<T1Listener> TOPIC1 = new Topic<>("T1", T1Listener.class);
private static final Topic<T2Listener> TOPIC2 = new Topic<>("T2", T2Listener.class);
+ private static final Topic<Runnable> RUNNABLE_TOPIC = new Topic<>("runnableTopic", Runnable.class);
private class T1Handler implements T1Listener {
private final String id;
assertEquals("events mismatch", joinExpected, joinActual);
}
+
+ public void testHasUndeliveredEvents() {
+ assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+ assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+
+ myBus.connect().subscribe(RUNNABLE_TOPIC, () -> {
+ assertTrue(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+ assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+ });
+ myBus.connect().subscribe(RUNNABLE_TOPIC, () -> {
+ assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC));
+ assertFalse(myBus.hasUndeliveredEvents(TOPIC2));
+ });
+ myBus.syncPublisher(RUNNABLE_TOPIC).run();
+ }
+
+ public void testHasUndeliveredEventsInChildBys() {
+ MessageBusImpl childBus = new MessageBusImpl(this, myBus);
+ myBus.connect().subscribe(RUNNABLE_TOPIC, () -> assertTrue(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC)));
+ childBus.connect().subscribe(RUNNABLE_TOPIC, () -> assertFalse(myBus.hasUndeliveredEvents(RUNNABLE_TOPIC)));
+ myBus.syncPublisher(RUNNABLE_TOPIC).run();
+ }
}
myDisposed = true;
}
+ @Override
+ public boolean hasUndeliveredEvents(@NotNull Topic<?> topic) {
+ if (!isDispatchingAnything()) return false;
+
+ for (MessageBusConnectionImpl connection : getTopicSubscribers(topic)) {
+ if (connection.containsMessage(topic)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isDispatchingAnything() {
+ return getRootBus().myWaitingBuses.get() != null;
+ }
+
private void checkNotDisposed() {
if (myDisposed) {
LOG.error("Already disposed: " + this);
private void postMessage(Message message) {
checkNotDisposed();
- final Topic topic = message.getTopic();
- List<MessageBusConnectionImpl> topicSubscribers = mySubscriberCache.get(topic);
- if (topicSubscribers == null) {
- topicSubscribers = new SmartList<MessageBusConnectionImpl>();
- calcSubscribers(topic, topicSubscribers);
- mySubscriberCache.put(topic, topicSubscribers);
- }
+ List<MessageBusConnectionImpl> topicSubscribers = getTopicSubscribers(message.getTopic());
if (!topicSubscribers.isEmpty()) {
for (MessageBusConnectionImpl subscriber : topicSubscribers) {
subscriber.getBus().myMessageQueue.get().offer(new DeliveryJob(subscriber, message));
}
}
+ @NotNull
+ private List<MessageBusConnectionImpl> getTopicSubscribers(Topic topic) {
+ List<MessageBusConnectionImpl> topicSubscribers = mySubscriberCache.get(topic);
+ if (topicSubscribers == null) {
+ topicSubscribers = new SmartList<MessageBusConnectionImpl>();
+ calcSubscribers(topic, topicSubscribers);
+ mySubscriberCache.put(topic, topicSubscribers);
+ }
+ return topicSubscribers;
+ }
+
private void notifyPendingJobChange(int delta) {
ThreadLocal<SortedMap<MessageBusImpl, Integer>> ref = getRootBus().myWaitingBuses;
SortedMap<MessageBusImpl, Integer> map = ref.get();