fileWatchers: limit executing thread count
authorSergey Simonchik <sergey.simonchik@jetbrains.com>
Sat, 14 Mar 2015 21:32:20 +0000 (00:32 +0300)
committerSergey Simonchik <sergey.simonchik@jetbrains.com>
Sat, 14 Mar 2015 21:32:20 +0000 (00:32 +0300)
platform/platform-impl/src/com/intellij/util/MergingBackgroundExecutor.java [new file with mode: 0644]

diff --git a/platform/platform-impl/src/com/intellij/util/MergingBackgroundExecutor.java b/platform/platform-impl/src/com/intellij/util/MergingBackgroundExecutor.java
new file mode 100644 (file)
index 0000000..d8265f5
--- /dev/null
@@ -0,0 +1,93 @@
+package com.intellij.util;
+
+import com.intellij.openapi.application.ApplicationManager;
+import com.intellij.openapi.diagnostic.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Executes tasks on pooled threads. At any point, at most {@code maxThreads} threads will be active processing tasks.
+ * If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
+ *
+ * Difference to {@link java.util.concurrent.Executors#newFixedThreadPool(int)} is that this utility class
+ * allows to reuse shared thread pool and thus getting rid of extra thread creation and thread pool management.
+ *
+ * @param <T> the type of elements
+ */
+public class MergingBackgroundExecutor<T> {
+
+  private static final Logger LOG = Logger.getInstance(MergingBackgroundExecutor.class);
+
+  private final int myMaxThreads;
+  private final Consumer<T> myConsumer;
+  private final BlockingQueue<T> myQueue = new LinkedBlockingDeque<T>();
+  private final AtomicInteger myRunningThreads = new AtomicInteger(0);
+
+  public MergingBackgroundExecutor(int maxThreads, @NotNull Consumer<T> consumer) {
+    myMaxThreads = maxThreads;
+    myConsumer = consumer;
+  }
+
+  protected void executeOnPooledThread(@NotNull Runnable runnable) {
+    ApplicationManager.getApplication().executeOnPooledThread(runnable);
+  }
+
+  public void queue(@NotNull T t) {
+    if (!myQueue.offer(t)) {
+      LOG.error("Unable to enqueue an element, queue size: " + myQueue.size());
+      return;
+    }
+    if (incrementIfSmaller(myRunningThreads, myMaxThreads)) {
+      executeOnPooledThread(new Runnable() {
+        @Override
+        public void run() {
+          do {
+            try {
+              processQueue();
+            }
+            finally {
+              myRunningThreads.decrementAndGet();
+            }
+            // Defense from unlucky timing:
+            // An element could be enqueued between "processQueue()" and "myRunningThreads.decrementAndGet()".
+            // As a result, "executeOnPooledThread(Runnable)" won't be called.
+            // In this case the queue processing should be started over.
+          }
+          while (!myQueue.isEmpty() && incrementIfSmaller(myRunningThreads, myMaxThreads));
+        }
+      });
+    }
+  }
+
+  private static boolean incrementIfSmaller(@NotNull AtomicInteger i, int max) {
+    int value;
+    do {
+      value = i.get();
+      if (value >= max) {
+        return false;
+      }
+    }
+    while (!i.compareAndSet(value, value + 1));
+    return true;
+  }
+
+  private void processQueue() {
+    T t;
+    while ((t = myQueue.poll()) != null) {
+      myConsumer.consume(t);
+    }
+  }
+
+  @NotNull
+  public static MergingBackgroundExecutor<Runnable> newRunnableExecutor(int maxThreads) {
+    return new MergingBackgroundExecutor<Runnable>(maxThreads, new Consumer<Runnable>() {
+      @Override
+      public void consume(Runnable runnable) {
+        runnable.run();
+      }
+    });
+  }
+}