fileWatchers: limit executing thread count
[idea/community.git] / platform / platform-impl / src / com / intellij / util / MergingBackgroundExecutor.java
1 package com.intellij.util;
2
3 import com.intellij.openapi.application.ApplicationManager;
4 import com.intellij.openapi.diagnostic.Logger;
5 import org.jetbrains.annotations.NotNull;
6
7 import java.util.concurrent.BlockingQueue;
8 import java.util.concurrent.LinkedBlockingDeque;
9 import java.util.concurrent.atomic.AtomicInteger;
10
11 /**
12  * Executes tasks on pooled threads. At any point, at most {@code maxThreads} threads will be active processing tasks.
13  * If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
14  *
15  * Difference to {@link java.util.concurrent.Executors#newFixedThreadPool(int)} is that this utility class
16  * allows to reuse shared thread pool and thus getting rid of extra thread creation and thread pool management.
17  *
18  * @param <T> the type of elements
19  */
20 public class MergingBackgroundExecutor<T> {
21
22   private static final Logger LOG = Logger.getInstance(MergingBackgroundExecutor.class);
23
24   private final int myMaxThreads;
25   private final Consumer<T> myConsumer;
26   private final BlockingQueue<T> myQueue = new LinkedBlockingDeque<T>();
27   private final AtomicInteger myRunningThreads = new AtomicInteger(0);
28
29   public MergingBackgroundExecutor(int maxThreads, @NotNull Consumer<T> consumer) {
30     myMaxThreads = maxThreads;
31     myConsumer = consumer;
32   }
33
34   protected void executeOnPooledThread(@NotNull Runnable runnable) {
35     ApplicationManager.getApplication().executeOnPooledThread(runnable);
36   }
37
38   public void queue(@NotNull T t) {
39     if (!myQueue.offer(t)) {
40       LOG.error("Unable to enqueue an element, queue size: " + myQueue.size());
41       return;
42     }
43     if (incrementIfSmaller(myRunningThreads, myMaxThreads)) {
44       executeOnPooledThread(new Runnable() {
45         @Override
46         public void run() {
47           do {
48             try {
49               processQueue();
50             }
51             finally {
52               myRunningThreads.decrementAndGet();
53             }
54             // Defense from unlucky timing:
55             // An element could be enqueued between "processQueue()" and "myRunningThreads.decrementAndGet()".
56             // As a result, "executeOnPooledThread(Runnable)" won't be called.
57             // In this case the queue processing should be started over.
58           }
59           while (!myQueue.isEmpty() && incrementIfSmaller(myRunningThreads, myMaxThreads));
60         }
61       });
62     }
63   }
64
65   private static boolean incrementIfSmaller(@NotNull AtomicInteger i, int max) {
66     int value;
67     do {
68       value = i.get();
69       if (value >= max) {
70         return false;
71       }
72     }
73     while (!i.compareAndSet(value, value + 1));
74     return true;
75   }
76
77   private void processQueue() {
78     T t;
79     while ((t = myQueue.poll()) != null) {
80       myConsumer.consume(t);
81     }
82   }
83
84   @NotNull
85   public static MergingBackgroundExecutor<Runnable> newRunnableExecutor(int maxThreads) {
86     return new MergingBackgroundExecutor<Runnable>(maxThreads, new Consumer<Runnable>() {
87       @Override
88       public void consume(Runnable runnable) {
89         runnable.run();
90       }
91     });
92   }
93 }