replaced <code></code> with more concise {@code}
[idea/community.git] / platform / platform-api / src / com / intellij / util / concurrency / QueueProcessor.java
1 /*
2  * Copyright 2000-2017 JetBrains s.r.o.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.intellij.util.concurrency;
17
18 import com.intellij.openapi.application.Application;
19 import com.intellij.openapi.application.ApplicationManager;
20 import com.intellij.openapi.application.ModalityState;
21 import com.intellij.openapi.diagnostic.Logger;
22 import com.intellij.openapi.progress.ProcessCanceledException;
23 import com.intellij.openapi.util.Condition;
24 import com.intellij.openapi.util.Conditions;
25 import com.intellij.util.Consumer;
26 import com.intellij.util.PairConsumer;
27 import org.jetbrains.annotations.Debugger;
28 import org.jetbrains.annotations.NotNull;
29
30 import java.util.ArrayDeque;
31 import java.util.Deque;
32 import java.util.Map;
33
34 import static com.intellij.util.containers.ContainerUtil.newIdentityTroveMap;
35
36 /**
37  * <p>QueueProcessor processes elements which are being added to a queue via {@link #add(Object)} and {@link #addFirst(Object)} methods.</p>
38  * <p>Elements are processed one by one in a special single thread.
39  * The processor itself is passed in the constructor and is called from that thread.
40  * By default processing starts when the first element is added to the queue, though there is an 'autostart' option which holds
41  * the processor until {@link #start()} is called.</p>
42  *
43  * @param <T> type of queue elements.
44  */
45 public class QueueProcessor<T> {
46   private static final Logger LOG = Logger.getInstance("#com.intellij.util.concurrency.QueueProcessor");
47   public enum ThreadToUse {
48     AWT,
49     POOLED
50   }
51
52   private final PairConsumer<T, Runnable> myProcessor;
53   private final Deque<T> myQueue = new ArrayDeque<>();
54
55   private boolean isProcessing;
56   private boolean myStarted;
57
58   private final ThreadToUse myThreadToUse;
59   private final Condition<?> myDeathCondition;
60   private final Map<Object, ModalityState> myModalityState = newIdentityTroveMap();
61
62   /**
63    * Constructs a QueueProcessor, which will autostart as soon as the first element is added to it.
64    */
65   public QueueProcessor(@NotNull Consumer<T> processor) {
66     this(processor, Conditions.alwaysFalse());
67   }
68
69   /**
70    * Constructs a QueueProcessor, which will autostart as soon as the first element is added to it.
71    */
72   public QueueProcessor(@NotNull Consumer<T> processor, @NotNull Condition<?> deathCondition) {
73     this(processor, deathCondition, true);
74   }
75
76   public QueueProcessor(@NotNull Consumer<T> processor, @NotNull Condition<?> deathCondition, boolean autostart) {
77     this(wrappingProcessor(processor), autostart, ThreadToUse.POOLED, deathCondition);
78   }
79
80   @NotNull
81   public static QueueProcessor<Runnable> createRunnableQueueProcessor() {
82     return new QueueProcessor<>(new RunnableConsumer());
83   }
84
85   @NotNull
86   public static QueueProcessor<Runnable> createRunnableQueueProcessor(ThreadToUse threadToUse) {
87     return new QueueProcessor<>(wrappingProcessor(new RunnableConsumer()), true, threadToUse, Conditions.FALSE);
88   }
89
90   @NotNull
91   private static <T> PairConsumer<T, Runnable> wrappingProcessor(@NotNull final Consumer<T> processor) {
92     return (item, runnable) -> {
93       runSafely(() -> processor.consume(item));
94       runnable.run();
95     };
96   }
97
98   /**
99    * Constructs a QueueProcessor with the given processor and autostart setting.
100    * By default QueueProcessor starts processing when it receives the first element. Pass {@code false} to alternate its behavior.
101    *
102    * @param processor processor of queue elements.
103    * @param autostart if {@code true} (which is by default), the queue will be processed immediately when it receives the first element.
104    *                  If {@code false}, then it will wait for the {@link #start()} command.
105    *                  After QueueProcessor has started once, autostart setting doesn't matter anymore: all other elements will be processed immediately.
106    */
107
108   public QueueProcessor(@NotNull PairConsumer<T, Runnable> processor,
109                         boolean autostart,
110                         @NotNull ThreadToUse threadToUse,
111                         @NotNull Condition<?> deathCondition) {
112     myProcessor = processor;
113     myStarted = autostart;
114     myThreadToUse = threadToUse;
115     myDeathCondition = deathCondition;
116   }
117
118   /**
119    * Starts queue processing if it hasn't started yet.
120    * Effective only if the QueueProcessor was created with no-autostart option: otherwise processing will start as soon as the first element
121    * is added to the queue.
122    * If there are several elements in the queue, processing starts from the first one.
123    */
124   public void start() {
125     synchronized (myQueue) {
126       if (myStarted) return;
127       myStarted = true;
128       if (!myQueue.isEmpty()) {
129         startProcessing();
130       }
131     }
132   }
133   
134   private void finishProcessing(boolean continueProcessing) {
135     synchronized (myQueue) {
136       isProcessing = false;
137       if (myQueue.isEmpty()) {
138         myQueue.notifyAll();
139       }
140       else if (continueProcessing){
141         startProcessing();
142       }
143     }
144   }
145
146   public void add(@NotNull T t, ModalityState state) {
147     synchronized (myQueue) {
148       myModalityState.put(t, state);
149     }
150     doAdd(t, false);
151   }
152
153   public void add(@NotNull T element) {
154     doAdd(element, false);
155   }
156
157   public void addFirst(@NotNull T element) {
158     doAdd(element, true);
159   }
160
161   private void doAdd(@NotNull T element, boolean atHead) {
162     synchronized (myQueue) {
163       if (atHead) {
164         myQueue.addFirst(element);
165       }
166       else {
167         myQueue.add(element);
168       }
169       startProcessing();
170     }
171   }
172
173   public void clear() {
174     synchronized (myQueue) {
175       myQueue.clear();
176     }
177   }
178
179   public void waitFor() {
180     synchronized (myQueue) {
181       while (isProcessing) {
182         try {
183           myQueue.wait();
184         }
185         catch (InterruptedException e) {
186           //ok
187         }
188       }
189     }
190   }
191   
192   boolean waitFor(long timeoutMS) {
193     synchronized (myQueue) {
194       long start = System.currentTimeMillis();
195       
196       while (isProcessing) {
197         long rest = timeoutMS - (System.currentTimeMillis() - start);
198         
199         if (rest <= 0) return !isProcessing;
200         
201         try {
202           myQueue.wait(rest);
203         }
204         catch (InterruptedException e) {
205           //ok
206         }
207       }
208       
209       return true;
210     }
211   }
212
213   private boolean startProcessing() {
214     LOG.assertTrue(Thread.holdsLock(myQueue));
215
216     if (isProcessing || !myStarted) {
217       return false;
218     }
219     isProcessing = true;
220     final T item = myQueue.removeFirst();
221     final Runnable runnable = () -> {
222       if (myDeathCondition.value(null)) {
223         finishProcessing(false);
224         return;
225       }
226       runSafely(() -> myProcessor.consume(item, () -> finishProcessing(true)));
227     };
228     final Application application = ApplicationManager.getApplication();
229     if (myThreadToUse == ThreadToUse.AWT) {
230       final ModalityState state = myModalityState.remove(item);
231       if (state != null) {
232         application.invokeLater(runnable, state);
233       }
234       else {
235         application.invokeLater(runnable);
236       }
237     }
238     else {
239       application.executeOnPooledThread(runnable);
240     }
241     return true;
242   }
243
244   public static void runSafely(@Debugger.Insert(group = "com.intellij.util.Alarm._addRequest") @NotNull Runnable run) {
245     try {
246       run.run();
247     }
248     catch (ProcessCanceledException e) {
249       throw e;
250     }
251     catch (Throwable e) {
252       try {
253         LOG.error(e);
254       }
255       catch (Throwable e2) {
256         //noinspection CallToPrintStackTrace
257         e2.printStackTrace();
258       }
259     }
260   }
261
262   public boolean isEmpty() {
263     synchronized (myQueue) {
264       return myQueue.isEmpty() && !isProcessing;
265     }
266   }
267
268   /**
269    * Removes several last tasks in the queue, leaving only {@code remaining} amount of them, counted from the head of the queue.
270    */
271   public void dismissLastTasks(int remaining) {
272     synchronized (myQueue) {
273       while (myQueue.size() > remaining) {
274         myQueue.pollLast();
275       }
276     }
277   }
278
279   public boolean hasPendingItemsToProcess() {
280     synchronized (myQueue) {
281       return !myQueue.isEmpty();
282     }
283   }
284
285   public static final class RunnableConsumer implements Consumer<Runnable> {
286     @Override
287     public void consume(Runnable runnable) {
288       runnable.run();
289     }
290   }
291 }