jobscheduler should not pause during event dispatch
authorAlexey Kudravtsev <cdr@intellij.com>
Mon, 12 Apr 2010 12:30:07 +0000 (16:30 +0400)
committerAlexey Kudravtsev <cdr@intellij.com>
Tue, 11 May 2010 06:05:11 +0000 (10:05 +0400)
14 files changed:
platform/lang-impl/src/com/intellij/codeInsight/daemon/impl/GeneralHighlightingPass.java
platform/lang-impl/src/com/intellij/codeInsight/daemon/impl/LocalInspectionsPass.java
platform/lang-impl/src/com/intellij/codeInsight/daemon/impl/PassExecutorService.java
platform/lang-impl/src/com/intellij/codeInsight/highlighting/BraceHighlightingHandler.java
platform/lang-impl/src/com/intellij/ui/DeferredIconImpl.java
platform/platform-impl/src/com/intellij/concurrency/JobImpl.java
platform/platform-impl/src/com/intellij/concurrency/JobSchedulerImpl.java
platform/platform-impl/src/com/intellij/concurrency/JobUtil.java [moved from platform/lang-impl/src/com/intellij/concurrency/JobUtil.java with 57% similarity]
platform/platform-impl/src/com/intellij/concurrency/PrioritizedFutureTask.java
platform/platform-impl/src/com/intellij/ide/IdeEventQueue.java
platform/platform-impl/src/com/intellij/openapi/application/impl/ApplicationImpl.java
platform/testFramework/src/com/intellij/testFramework/PlatformTestUtil.java
plugins/properties/src/com/intellij/codeInspection/duplicatePropertyInspection/DuplicatePropertyInspection.java
plugins/properties/src/com/intellij/lang/properties/UnusedPropertyInspection.java

index 9c272a2bca6567535425511dc52cdcfc31c9c2d3..cfc4c797cbbb7dd3e7bd312dbc45d7f77840be8d 100644 (file)
@@ -160,11 +160,11 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
       }
       if (elements != null) {
         result.addAll(collectHighlights(elements, progress, filtered));
-        addInjectedPsiHighlights(elements, progress);
+        if (!addInjectedPsiHighlights(elements, progress)) throw new ProcessCanceledException();
       }
 
       if (!isDumbMode()) {
-        result.addAll(highlightTodos(myFile, myDocument.getCharsSequence(), myStartOffset, myEndOffset));
+        result.addAll(highlightTodos(myFile, myDocument.getCharsSequence(), myStartOffset, myEndOffset, progress));
       }
 
       if (myUpdateAll) {
@@ -177,7 +177,8 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
     myHighlights = result;
   }
 
-  private void addInjectedPsiHighlights(@NotNull final List<PsiElement> elements, final ProgressIndicator progress) {
+  // returns false if canceled
+  private boolean addInjectedPsiHighlights(@NotNull final List<PsiElement> elements, final ProgressIndicator progress) {
     List<DocumentWindow> injected = InjectedLanguageUtil.getCachedInjectedDocuments(myFile);
     Collection<PsiElement> hosts = new THashSet<PsiElement>(elements.size() + injected.size());
 
@@ -216,10 +217,10 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
         }
       }, false);
     }
-    if (injectedFiles.isEmpty()) return;
+    if (injectedFiles.isEmpty()) return true;
     final InjectedLanguageManager injectedLanguageManager = InjectedLanguageManager.getInstance(myProject);
 
-    JobUtil.invokeConcurrentlyUnderMyProgress(injectedFiles, new Processor<PsiFile>() {
+    return JobUtil.invokeConcurrentlyUnderMyProgress(new ArrayList<PsiFile>(injectedFiles), new Processor<PsiFile>() {
       public boolean process(final PsiFile injectedPsi) {
         DocumentWindow documentWindow = (DocumentWindow)PsiDocumentManager.getInstance(myProject).getCachedDocument(injectedPsi);
         HighlightInfoHolder holder = createInfoHolder(injectedPsi);
@@ -248,7 +249,7 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
         }
 
         if (!isDumbMode()) {
-          Collection<HighlightInfo> todos = highlightTodos(injectedPsi, injectedPsi.getText(), 0, injectedPsi.getTextLength());
+          Collection<HighlightInfo> todos = highlightTodos(injectedPsi, injectedPsi.getText(), 0, injectedPsi.getTextLength(), progress);
           for (HighlightInfo info : todos) {
             addPatchedInfos(info, injectedPsi, documentWindow, injectedLanguageManager, null);
           }
@@ -326,7 +327,7 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
     return textRange;
   }
 
-  private void runHighlightVisitosForInjected(final PsiFile injectedPsi, final HighlightInfoHolder holder, final ProgressIndicator progress) {
+  private void runHighlightVisitosForInjected(@NotNull PsiFile injectedPsi, @NotNull final HighlightInfoHolder holder, @NotNull final ProgressIndicator progress) {
     HighlightVisitor[] visitors = createHighlightVisitors();
     try {
       HighlightVisitor[] filtered = filterVisitors(visitors, injectedPsi);
@@ -541,7 +542,11 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
     return new HighlightInfoHolder(file, filters);
   }
 
-  private static Collection<HighlightInfo> highlightTodos(PsiFile file, CharSequence text, int startOffset, int endOffset) {
+  private static Collection<HighlightInfo> highlightTodos(@NotNull PsiFile file,
+                                                          @NotNull CharSequence text,
+                                                          int startOffset,
+                                                          int endOffset,
+                                                          @NotNull ProgressIndicator progress) {
     PsiManager psiManager = file.getManager();
     PsiSearchHelper helper = psiManager.getSearchHelper();
     TodoItem[] todoItems = helper.findTodoItems(file, startOffset, endOffset);
@@ -549,6 +554,7 @@ public class GeneralHighlightingPass extends ProgressableTextEditorHighlightingP
 
     List<HighlightInfo> list = new ArrayList<HighlightInfo>(todoItems.length);
     for (TodoItem todoItem : todoItems) {
+      progress.checkCanceled();
       TextRange range = todoItem.getTextRange();
       String description = text.subSequence(range.getStartOffset(), range.getEndOffset()).toString();
       TextAttributes attributes = todoItem.getPattern().getAttributes().getTextAttributes();
index 001b2b27383be6e5592dfd595667abb0514c1f5e..ea400149182741672b2cab8daa26007240668c71 100644 (file)
@@ -39,6 +39,7 @@ import com.intellij.openapi.editor.colors.TextAttributesKey;
 import com.intellij.openapi.keymap.Keymap;
 import com.intellij.openapi.keymap.KeymapManager;
 import com.intellij.openapi.keymap.KeymapUtil;
+import com.intellij.openapi.progress.ProcessCanceledException;
 import com.intellij.openapi.progress.ProgressIndicator;
 import com.intellij.openapi.progress.ProgressManager;
 import com.intellij.openapi.progress.util.ProgressWrapper;
@@ -192,7 +193,7 @@ public class LocalInspectionsPass extends ProgressableTextEditorHighlightingPass
     final ProgressIndicator indicator = ProgressManager.getInstance().getProgressIndicator();
     LOG.assertTrue(indicator != null);
 
-    JobUtil.invokeConcurrentlyUnderMyProgress(tools, new Processor<LocalInspectionTool>() {
+    boolean result = JobUtil.invokeConcurrentlyUnderMyProgress(tools, new Processor<LocalInspectionTool>() {
       public boolean process(final LocalInspectionTool tool) {
         final ProgressManager progressManager = ProgressManager.getInstance();
         indicator.checkCanceled();
@@ -223,6 +224,7 @@ public class LocalInspectionsPass extends ProgressableTextEditorHighlightingPass
         return true;
       }
     }, "Inspection tools");
+    if (!result) throw new ProcessCanceledException();
 
     indicator.checkCanceled();
     inspectInjectedPsi(elements, tools);
@@ -242,12 +244,12 @@ public class LocalInspectionsPass extends ProgressableTextEditorHighlightingPass
         }
       }, false);
     }
-    JobUtil.invokeConcurrentlyUnderMyProgress(injected, new Processor<PsiFile>() {
+    if (!JobUtil.invokeConcurrentlyUnderMyProgress(new ArrayList<PsiFile>(injected), new Processor<PsiFile>() {
       public boolean process(final PsiFile injectedPsi) {
         inspectInjectedPsi(injectedPsi, myInjectedPsiInspectionResults, tools);
         return true;
       }
-    }, "Inspect injected fragments");
+    }, "Inspect injected fragments")) throw new ProcessCanceledException();
   }
 
   public Collection<HighlightInfo> getHighlights() {
index 22938eab8868d5b1f17b56a63d53ab37ace83dfd..3544c091d703f3e278c935a37d5be2c13156baf3 100644 (file)
@@ -21,7 +21,7 @@ import com.intellij.codeHighlighting.Pass;
 import com.intellij.codeHighlighting.TextEditorHighlightingPass;
 import com.intellij.concurrency.Job;
 import com.intellij.concurrency.JobImpl;
-import com.intellij.concurrency.JobScheduler;
+import com.intellij.concurrency.JobUtil;
 import com.intellij.openapi.Disposable;
 import com.intellij.openapi.application.ApplicationManager;
 import com.intellij.openapi.application.ModalityState;
@@ -39,7 +39,6 @@ import com.intellij.openapi.project.DumbAware;
 import com.intellij.openapi.project.DumbAwareRunnable;
 import com.intellij.openapi.project.DumbService;
 import com.intellij.openapi.project.Project;
-import com.intellij.openapi.util.Disposer;
 import com.intellij.openapi.util.Pair;
 import com.intellij.openapi.util.text.StringUtil;
 import com.intellij.util.ConcurrencyUtil;
@@ -66,7 +65,6 @@ public abstract class PassExecutorService implements Disposable {
 
   public PassExecutorService(Project project) {
     myProject = project;
-    Disposer.register(project, this);
   }
 
   public void dispose() {
@@ -81,7 +79,8 @@ public abstract class PassExecutorService implements Disposable {
     if (waitForTermination) {
       for (Job<Void> job : mySubmittedPasses.values()) {
         try {
-          if (!job.isDone()) ((JobImpl)job).waitForTermination();
+          JobImpl ji = (JobImpl)job;
+          if (!job.isDone()) ji.waitForTermination(ji.getTasks());
         }
         catch (Throwable throwable) {
           LOG.error(throwable);
@@ -263,9 +262,7 @@ public abstract class PassExecutorService implements Disposable {
 
   private void submit(ScheduledPass pass) {
     if (!pass.myUpdateProgress.isCanceled()) {
-      Job<Void> job = JobScheduler.getInstance().createJob(pass.myPass.toString(), pass.myJobPriority);
-      job.addTask(pass);
-      job.schedule();
+      Job<Void> job = JobUtil.submitToJobThread(pass, pass.myJobPriority);
       mySubmittedPasses.put(pass, job);
     }
   }
@@ -440,7 +437,9 @@ public abstract class PassExecutorService implements Disposable {
   public List<TextEditorHighlightingPass> getAllSubmittedPasses() {
     ArrayList<TextEditorHighlightingPass> result = new ArrayList<TextEditorHighlightingPass>(mySubmittedPasses.size());
     for (ScheduledPass scheduledPass : mySubmittedPasses.keySet()) {
-      result.add(scheduledPass.myPass);
+      if (!scheduledPass.myUpdateProgress.isCanceled()) {
+        result.add(scheduledPass.myPass);
+      }
     }
     return result;
   }
index b53d9137d53936467f474754729dd7a241a2f8c7..307946e4e48a2af76a1a5c676b0ce9e57c562e4b 100644 (file)
@@ -27,7 +27,7 @@ package com.intellij.codeInsight.highlighting;
 import com.intellij.codeInsight.CodeInsightSettings;
 import com.intellij.codeInsight.hint.EditorFragmentComponent;
 import com.intellij.concurrency.Job;
-import com.intellij.concurrency.JobScheduler;
+import com.intellij.concurrency.JobUtil;
 import com.intellij.openapi.application.ApplicationManager;
 import com.intellij.openapi.application.ModalityState;
 import com.intellij.openapi.editor.Document;
@@ -93,8 +93,7 @@ public class BraceHighlightingHandler {
     final Project project = editor.getProject();
     if (project == null) return;
     final int offset = editor.getCaretModel().getOffset();
-    Job<Object> job = JobScheduler.getInstance().createJob("Brace highlighter", Job.DEFAULT_PRIORITY);
-    job.addTask(new Runnable() {
+    JobUtil.submitToJobThread(new Runnable() {
       public void run() {
         if (isReallyDisposed(editor, project)) return;
         final PsiFile injected = ApplicationManager.getApplication().runReadAction(new Computable<PsiFile>() {
@@ -114,8 +113,7 @@ public class BraceHighlightingHandler {
           }
         }, ModalityState.stateForComponent(editor.getComponent()));
       }
-    });
-    job.schedule();
+    }, Job.DEFAULT_PRIORITY);
   }
 
   private static boolean isReallyDisposed(Editor editor, Project project) {
index 53fa5b1932d6a6c7db9333187fccb1e0d49dc5ac..7dbc4a248c92e4d16b399c43a0bf93e7bc79915b 100644 (file)
@@ -20,7 +20,7 @@
 package com.intellij.ui;
 
 import com.intellij.concurrency.Job;
-import com.intellij.concurrency.JobScheduler;
+import com.intellij.concurrency.JobUtil;
 import com.intellij.openapi.progress.ProcessCanceledException;
 import com.intellij.openapi.project.IndexNotReadyException;
 import com.intellij.util.Alarm;
@@ -90,8 +90,7 @@ public class DeferredIconImpl<T> implements DeferredIcon {
 
       myLastTarget = new WeakReference<Component>(target);
 
-      final Job<Object> job = JobScheduler.getInstance().createJob("Evaluating deferred icon", Job.DEFAULT_PRIORITY);
-      job.addTask(new Runnable() {
+      JobUtil.submitToJobThread(new Runnable() {
         public void run() {
           int oldWidth = myDelegateIcon.getIconWidth();
           myDelegateIcon = evaluate();
@@ -121,9 +120,7 @@ public class DeferredIconImpl<T> implements DeferredIcon {
             }
           });
         }
-      });
-
-      job.schedule();
+      }, Job.DEFAULT_PRIORITY);
     }
   }
 
index b056309345d51b1a2311648887b29f2fe8aaa988..52b68ad3840273a8d84aadb6f86a833480fbffe3 100644 (file)
@@ -21,33 +21,37 @@ package com.intellij.concurrency;
 
 import com.intellij.openapi.application.Application;
 import com.intellij.openapi.application.ApplicationManager;
-import com.intellij.util.containers.ContainerUtil;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class JobImpl<T> implements Job<T> {
-  private final String myTitle;
-  private final List<Callable<T>> myTasks = ContainerUtil.createEmptyCOWList(); 
-  private final long myJobIndex = JobSchedulerImpl.currentJobIndex();
+  private static volatile long ourJobsCounter = 0;
+  private final long myJobIndex = ourJobsCounter++;
   private final int myPriority;
-  private final List<PrioritizedFutureTask<T>> myFutures = ContainerUtil.createEmptyCOWList();
-  private volatile boolean myCanceled = false;
+  private final List<PrioritizedFutureTask<T>> myFutures = new ArrayList<PrioritizedFutureTask<T>>();
+  private volatile boolean canceled = false;
+  private final AtomicInteger runningTasks = new AtomicInteger();
+  private volatile boolean scheduled;
 
-  public JobImpl(String title, int priority) {
-    myTitle = title;
+  JobImpl(int priority) {
     myPriority = priority;
   }
 
   public String getTitle() {
-    return myTitle;
+    return null;
   }
 
   public void addTask(Callable<T> task) {
-    checkNotStarted();
+    checkNotScheduled();
 
-    myTasks.add(task);
+    synchronized (myFutures) {
+      PrioritizedFutureTask<T> future = createFuture(task, this, myJobIndex, myPriority);
+      myFutures.add(future);
+    }
+    runningTasks.incrementAndGet();
   }
 
   public void addTask(Runnable task, T result) {
@@ -63,124 +67,147 @@ public class JobImpl<T> implements Job<T> {
     final Application application = ApplicationManager.getApplication();
     boolean callerHasReadAccess = application != null && application.isReadAccessAllowed();
 
-    createFutures(callerHasReadAccess, false);
-
     // Don't bother scheduling if we only have one processor or only one task
-    if (JobSchedulerImpl.CORES_COUNT >= 2 && myFutures.size() >= 2) {
-      for (PrioritizedFutureTask<T> future : myFutures) {
-        JobSchedulerImpl.execute(future);
-      }
+    boolean reallySchedule;
+    PrioritizedFutureTask[] tasks = getTasks();
+    synchronized (myFutures) {
+      reallySchedule = JobSchedulerImpl.CORES_COUNT >= 2 && myFutures.size() >= 2;
     }
+    scheduled = true;
 
-    // http://gafter.blogspot.com/2006/11/thread-pool-puzzler.html
-    for (PrioritizedFutureTask<T> future : myFutures) {
-      future.run();
+    if (!reallySchedule) {
+      for (PrioritizedFutureTask future : tasks) {
+        future.run();
+      }
+      return null;
     }
 
-    return waitForTermination();
-  }
+    submitTasks(tasks, callerHasReadAccess, false);
 
-  private void createFutures(boolean callerHasReadAccess, final boolean reportExceptions) {
-    int startTaskIndex = JobSchedulerImpl.currentTaskIndex();
-    for (final Callable<T> task : myTasks) {
-      final PrioritizedFutureTask<T> future = new PrioritizedFutureTask<T>(task, myJobIndex, startTaskIndex++, myPriority, callerHasReadAccess,
-                                                                           reportExceptions);
-      myFutures.add(future);
+    while (!isDone() && JobSchedulerImpl.stealAndRunTask()) {
+      int i = 0;
     }
-  }
 
-  public List<T> waitForTermination() throws Throwable {
-    List<T> results = new ArrayList<T>(myFutures.size());
+    // in case of imbalanced tasks one huge task can stuck running and we would fall to waitForTermination instead of doing useful work
+    //// http://gafter.blogspot.com/2006/11/thread-pool-puzzler.html
+    //for (PrioritizedFutureTask task : tasks) {
+    //  task.run();
+    //}
+    //
+
+    waitForTermination(tasks);
+    return null;
+  }
 
+  public void waitForTermination(PrioritizedFutureTask[] tasks) throws Throwable {
     Throwable ex = null;
-    for (Future<T> f : myFutures) {
-      try {
-        T result = null;
+    try {
+      for (PrioritizedFutureTask f : tasks) {
+        // this loop is for workaround of mysterious bug
+        // when sometimes future hangs inside parkAndCheckForInterrupt() during unbounded get()
         while(true) {
           try {
-            result = f.get(10, TimeUnit.MILLISECONDS);
+            f.get(10, TimeUnit.MILLISECONDS);
             break;
           }
           catch (TimeoutException e) {
-            if (f.isDone() || f.isCancelled()) break;
+            if (f.isDone()) {
+              f.get(); // does awaitTermination(), and there is no chance to hang
+              break;
+            }
           }
         }
-        results.add(result);
-      }
-      catch (CancellationException ignore) {
       }
-      catch (ExecutionException e) {
-        cancel();
+    }
+    catch (CancellationException ignore) {
+      // already cancelled
+    }
+    catch (ExecutionException e) {
+      cancel();
 
-        Throwable cause = e.getCause();
-        if (cause != null) {
-          ex = cause;
-        }
+      Throwable cause = e.getCause();
+      if (cause != null) {
+        ex = cause;
       }
     }
 
-    // Future.get() exits when currently running is canceled, thus awaiter may get control before spawned tasks actually terminated,
-    // that's why additional join logic.
-    for (PrioritizedFutureTask<T> future : myFutures) {
-      future.awaitTermination();
+    if (ex != null) {
+      throw ex;
     }
-
-    if (ex != null) throw ex;
-
-    return results;
   }
 
   public void cancel() {
     checkScheduled();
-    if (myCanceled) return;
-    myCanceled = true;
+    if (canceled) return;
+    canceled = true;
 
-    for (Future<T> future : myFutures) {
+    PrioritizedFutureTask[] tasks = getTasks();
+    for (PrioritizedFutureTask future : tasks) {
       future.cancel(false);
     }
+    runningTasks.set(0);
   }
 
   public boolean isCanceled() {
     checkScheduled();
-    return myCanceled;
+    return canceled;
   }
 
   public void schedule() {
     checkCanSchedule();
+    scheduled = true;
 
-    createFutures(false, true);
+    PrioritizedFutureTask[] tasks = getTasks();
+
+    submitTasks(tasks, false, true);
+  }
 
-    for (PrioritizedFutureTask<T> future : myFutures) {
-      JobSchedulerImpl.execute(future);
+  public PrioritizedFutureTask[] getTasks() {
+    PrioritizedFutureTask[] tasks;
+    synchronized (myFutures) {
+      tasks = myFutures.toArray(new PrioritizedFutureTask[myFutures.size()]);
     }
+    return tasks;
   }
 
   public boolean isDone() {
     checkScheduled();
 
-    for (Future<T> future : myFutures) {
-      if (!future.isDone()) return false;
-    }
-
-    return true;
+    return runningTasks.get() <= 0;
   }
 
   private void checkCanSchedule() {
-    checkNotStarted();
-    if (myTasks.isEmpty()) {
-      throw new IllegalStateException("No tasks to run. You can't schedule a job which has no tasks");
+    checkNotScheduled();
+    synchronized (myFutures) {
+      if (myFutures.isEmpty()) {
+        throw new IllegalStateException("No tasks added. You can't schedule a job which has no tasks");
+      }
     }
   }
 
-  private void checkNotStarted() {
-    if (!myFutures.isEmpty()) {
+  private void checkNotScheduled() {
+    if (scheduled) {
       throw new IllegalStateException("Already running. You can't call this method for a job which is already scheduled");
     }
   }
 
   private void checkScheduled() {
-    if (myFutures.isEmpty()) {
+    if (!scheduled) {
       throw new IllegalStateException("Cannot call this method for not yet started job");
     }
   }
+
+  private static void submitTasks(PrioritizedFutureTask[] tasks, boolean callerHasReadAccess, boolean reportExceptions) {
+    for (final PrioritizedFutureTask future : tasks) {
+      JobSchedulerImpl.submitTask(future, callerHasReadAccess, reportExceptions);
+    }
+  }
+
+  void taskDone() {
+    runningTasks.decrementAndGet();
+  }
+
+  private static <T> PrioritizedFutureTask<T> createFuture(Callable<T> task, JobImpl<T> job, long jobIndex, int priority) {
+    return new PrioritizedFutureTask<T>(task, job, jobIndex, JobSchedulerImpl.currentTaskIndex(), priority);
+  }
 }
index ad798b41baefabccc65ae53351cca546ec291414..149c50b9c41e59c3ffe6f87af9505e2673c75f6e 100644 (file)
 package com.intellij.concurrency;
 
 import com.intellij.openapi.Disposable;
-import com.intellij.openapi.application.impl.ApplicationImpl;
 import org.jetbrains.annotations.NonNls;
 
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 @NonNls
 public class JobSchedulerImpl extends JobScheduler implements Disposable {
-  public static final int CORES_COUNT = /*1;//*/Runtime.getRuntime().availableProcessors();
+  public static final int CORES_COUNT = /*1;//*/ Runtime.getRuntime().availableProcessors();
 
   private static final ThreadFactory WORKERS_FACTORY = new ThreadFactory() {
-    int i;
-    public Thread newThread(final Runnable r) {
-      final Thread thread = new Thread(r, "JobScheduler pool " + i++ + "/" + CORES_COUNT);
+    private int threadSeq;
+
+    public synchronized Thread newThread(final Runnable r) {
+      @NonNls String name = "JobScheduler pool " + threadSeq + "/" + CORES_COUNT;
+      final Thread thread = new Thread(r, name);
       thread.setPriority(Thread.NORM_PRIORITY);
+      threadSeq++;
       return thread;
     }
   };
 
-  private static final Lock ourSuspensionLock = new ReentrantLock();
-
-  private static final PriorityBlockingQueue<Runnable> ourQueue = new PriorityBlockingQueue<Runnable>() {
-    public Runnable poll() {
-      final Runnable result = super.poll();
-
-      ourSuspensionLock.lock();
-      try {
-        return result;
-      }
-      finally {
-        ourSuspensionLock.unlock();
-      }
-    }
-
-    public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
-      final Runnable result = super.poll(timeout, unit);
-
-      ourSuspensionLock.lock();
-      try {
-        return result;
-      }
-      finally {
-        ourSuspensionLock.unlock();
-      }
-    }
-  };
-  private static final ThreadPoolExecutor ourExecutor = new ThreadPoolExecutor(CORES_COUNT, Integer.MAX_VALUE, 60 * 10, TimeUnit.SECONDS,
-                                                                               ourQueue, WORKERS_FACTORY) {
-    protected void beforeExecute(final Thread t, final Runnable r) {
-      PrioritizedFutureTask task = (PrioritizedFutureTask)r;
-      if (task.isParentThreadHasReadAccess()) {
-        ApplicationImpl.setExceptionalThreadWithReadAccessFlag(true);
-      }
-      task.signalStarted();
-
-      // TODO: hook up JobMonitor into thread locals
-      super.beforeExecute(t, r);
-    }
-
-    protected void afterExecute(final Runnable r, final Throwable t) {
-      super.afterExecute(r, t);
-      ApplicationImpl.setExceptionalThreadWithReadAccessFlag(false);
-      PrioritizedFutureTask task = (PrioritizedFutureTask)r;
-      task.signalDone();
-      // TODO: cleanup JobMonitor
-    }
-  };
-
-  private static volatile long ourJobsCounter = 0;
+  private static final PriorityBlockingQueue<Runnable> ourQueue = new PriorityBlockingQueue<Runnable>();
+  private static final MyExecutor ourExecutor = new MyExecutor();
 
-  public static void execute(Runnable task) {
-    ourExecutor.execute(task);
+  static int currentTaskIndex() {
+    return ourQueue.size();
   }
 
-  public static int currentTaskIndex() {
-    final PrioritizedFutureTask topTask = (PrioritizedFutureTask)ourQueue.peek();
-    return topTask == null ? 0 : topTask.getTaskIndex();
+  public <T> Job<T> createJob(String title, int priority) {
+    return new JobImpl<T>(priority);
   }
 
-  public static long currentJobIndex() {
-    return ourJobsCounter++;
+  public void dispose() {
+    ((ThreadPoolExecutor)getScheduler()).getQueue().clear();
   }
 
-  public static void suspend() {
-    ourSuspensionLock.lock();
-  }
+  static boolean stealAndRunTask() {
+    Runnable task = ourQueue.poll();
+    if (task == null) return false;
 
-  public static void resume() {
-    ourSuspensionLock.unlock();
+    task.run();
+
+    return true;
   }
 
-  public <T> Job<T> createJob(String title, int priority) {
-    return new JobImpl<T>(title, priority);
+  static void submitTask(PrioritizedFutureTask future, boolean callerHasReadAccess, boolean reportExceptions) {
+    future.beforeRun(callerHasReadAccess, reportExceptions);
+    ourExecutor.executeTask(future);
   }
 
-  public void dispose() {
-    ((ThreadPoolExecutor)getScheduler()).getQueue().clear();
+  private static class MyExecutor extends ThreadPoolExecutor {
+    private MyExecutor() {
+      super(CORES_COUNT, Integer.MAX_VALUE, 60 * 10, TimeUnit.SECONDS, ourQueue, WORKERS_FACTORY);
+    }
+
+    private void executeTask(final PrioritizedFutureTask task) {
+      super.execute(task);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      throw new IllegalStateException("Use executeTask() to submit PrioritizedFutureTasks only");
+    }
   }
 }
similarity index 57%
rename from platform/lang-impl/src/com/intellij/concurrency/JobUtil.java
rename to platform/platform-impl/src/com/intellij/concurrency/JobUtil.java
index 021c9ffe18e986990eadf0210363537d773b20b9..93ff2a50193173c212444a745ee9dfec1cb42460 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2000-2009 JetBrains s.r.o.
+ * Copyright 2000-2010 JetBrains s.r.o.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,12 +20,14 @@ import com.intellij.openapi.diagnostic.Logger;
 import com.intellij.openapi.progress.ProcessCanceledException;
 import com.intellij.openapi.progress.ProgressIndicator;
 import com.intellij.openapi.progress.ProgressManager;
+import com.intellij.openapi.progress.impl.ProgressManagerImpl;
 import com.intellij.openapi.progress.util.ProgressWrapper;
 import com.intellij.util.Processor;
 import org.jetbrains.annotations.NonNls;
 import org.jetbrains.annotations.NotNull;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
 
 /**
  * @author cdr
@@ -34,30 +36,30 @@ import java.util.Collection;
 public class JobUtil {
   private static final Logger LOG = Logger.getInstance("#com.intellij.concurrency.JobUtil");
 
-  /**
-   * @param things to process concurrently
-   * @param thingProcessor to be invoked concurrently on each element from the collection
-   * @param jobName the name of the job that invokes all the tasks
-   * @return false if tasks have been canceled
-   * @throws ProcessCanceledException if at least one task has thrown ProcessCanceledException
-   */
-  public static <T> boolean invokeConcurrentlyForAll(@NotNull Collection<T> things, @NotNull final Processor<T> thingProcessor, @NotNull @NonNls String jobName) throws ProcessCanceledException {
+  private static <T> boolean invokeConcurrentlyForAll(@NotNull final List<T> things, @NotNull final Processor<T> thingProcessor, @NotNull @NonNls String jobName) throws ProcessCanceledException {
     if (things.isEmpty()) {
       return true;
     }
     if (things.size() == 1) {
-      T t = things.iterator().next();
+      T t = things.get(0);
       return thingProcessor.process(t);
     }
 
-    final Job<String> job = JobScheduler.getInstance().createJob(jobName, Job.DEFAULT_PRIORITY);
+    final Job<String> job = new JobImpl<String>(Job.DEFAULT_PRIORITY);
 
-    for (final T thing : things) {
-      job.addTask(new Runnable(){
+    final int chunkSize = Math.max(1, things.size() / JobSchedulerImpl.CORES_COUNT / 100);
+    for (int i = 0; i < things.size(); i += chunkSize) {
+      // this job chunk is i..i+chunkSize-1
+      final int finalI = i;
+      job.addTask(new Runnable() {
         public void run() {
           try {
-            if (!thingProcessor.process(thing)) {
-              job.cancel();
+            for (int k = finalI; k < finalI + chunkSize && k < things.size(); k++) {
+              T thing = things.get(k);
+              if (!thingProcessor.process(thing)) {
+                job.cancel();
+                break;
+              }
             }
           }
           catch (ProcessCanceledException e) {
@@ -81,22 +83,51 @@ public class JobUtil {
     return !job.isCanceled();
   }
 
-  // execute in multiple threads, with checkCanceled in each delegated to our current progress
-  public static <T> boolean invokeConcurrentlyUnderMyProgress(@NotNull Collection<T> things,
+  /**
+   * Schedules concurrent execution of #thingProcessor over each element of #things and waits for completion
+   * With checkCanceled in each thread delegated to our current progress
+   * @param things to process concurrently
+   * @param thingProcessor to be invoked concurrently on each element from the collection
+   * @param jobName the name of the job that invokes all the tasks
+   * @return false if tasks have been canceled
+   *         or at least one processor returned false
+   *         or threw exception
+   *         or we were unable to start read action in at least one thread
+   * @throws ProcessCanceledException if at least one task has thrown ProcessCanceledException
+   */
+  public static <T> boolean invokeConcurrentlyUnderMyProgress(@NotNull List<T> things,
                                                               @NotNull final Processor<T> thingProcessor,
                                                               @NotNull @NonNls String jobName) throws ProcessCanceledException {
     final ProgressIndicator indicator = ProgressManager.getInstance().getProgressIndicator();
+    final ProgressWrapper wrapper = ProgressWrapper.wrap(indicator);
     return invokeConcurrentlyForAll(things, new Processor<T>() {
       public boolean process(final T t) {
         final boolean[] result = new boolean[1];
-        ProgressManager.getInstance().runProcess(new Runnable() {
+        ((ProgressManagerImpl)ProgressManager.getInstance()).executeProcessUnderProgress(new Runnable() {
           public void run() {
             result[0] = thingProcessor.process(t);
           }
-        }, ProgressWrapper.wrap(indicator));
+        }, wrapper);
         return result[0];
       }
     }, jobName);
   }
 
+  public static Job<Void> submitToJobThread(@NotNull final Runnable action, int priority) {
+    Job<Void> job = new JobImpl<Void>(priority);
+    Callable<Void> callable = new Callable<Void>() {
+      public Void call() throws Exception {
+        try {
+          action.run();
+        }
+        catch (ProcessCanceledException ignored) {
+          // since it's the only task in the job, nothing to cancel
+        }
+        return null;
+      }
+    };
+    job.addTask(callable);
+    job.schedule();
+    return job;
+  }
 }
index ad7855f2002e088640d590ca443969d61b1a43d1..c19fa37cbb7a92ad9b02a96d7484b3fc114bbb88 100644 (file)
  */
 package com.intellij.concurrency;
 
-import com.intellij.openapi.application.RuntimeInterruptedException;
+import com.intellij.openapi.application.ex.ApplicationManagerEx;
+import com.intellij.openapi.application.impl.ApplicationImpl;
 import com.intellij.openapi.diagnostic.Logger;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 
-public class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
+class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
   private static final Logger LOG = Logger.getInstance("#com.intellij.concurrency.PrioritizedFutureTask");
+  private final JobImpl<T> myJob;
   private final long myJobIndex;
   private final int myTaskIndex;
   private final int myPriority;
-  private final boolean myParentThreadHasReadAccess;
-  private final boolean myReportExceptions;
-  private final Lock myLock;
-  private volatile Condition myDoneCondition;
+  private volatile boolean myParentThreadHasReadAccess;
+  private volatile boolean myReportExceptions;
 
-  public PrioritizedFutureTask(final Callable<T> callable, long jobIndex, int taskIndex, int priority, final boolean parentThreadHasReadAccess, boolean reportExceptions) {
+  PrioritizedFutureTask(final Callable<T> callable, JobImpl<T> job, long jobIndex, int taskIndex, int priority) {
     super(callable);
+    myJob = job;
     myJobIndex = jobIndex;
     myTaskIndex = taskIndex;
     myPriority = priority;
-    myParentThreadHasReadAccess = parentThreadHasReadAccess;
-    myReportExceptions = reportExceptions;
-
-    myLock = new ReentrantLock();
-  }
-
-  public boolean isParentThreadHasReadAccess() {
-    return myParentThreadHasReadAccess;
-  }
-
-  public int compareTo(final PrioritizedFutureTask o) {
-    if (getPriority() != o.getPriority()) return getPriority() - o.getPriority();
-    if (getTaskIndex() != o.getTaskIndex()) return getTaskIndex() - o.getTaskIndex();
-    if (getJobIndex() != o.getJobIndex()) return getJobIndex() < o.getJobIndex() ? -1 : 1;
-    return 0;
-  }
-
-  public long getJobIndex() {
-    return myJobIndex;
-  }
-
-  public int getTaskIndex() {
-    return myTaskIndex;
-  }
-
-  public int getPriority() {
-    return myPriority;
   }
 
-  public void signalStarted() {
-    myLock.lock();
-    try {
-      myDoneCondition = myLock.newCondition();
-    }
-    finally {
-      myLock.unlock();
-    }
-  }
-
-  public void signalDone() {
-    myLock.lock();
-    try {
-      myDoneCondition.signalAll();
-      myDoneCondition = null;
-    }
-    finally {
-      myLock.unlock();
-    }
-  }
-
-  public void awaitTermination() {
-    myLock.lock();
-    try {
-      if (myDoneCondition == null) return;
-      myDoneCondition.await();
-    }
-    catch (InterruptedException e) {
-      throw new RuntimeInterruptedException(e);
-    }
-    finally {
-      myLock.unlock();
-    }
+  public void beforeRun(boolean parentThreadHasReadAccess, boolean reportExceptions) {
+    myParentThreadHasReadAccess = parentThreadHasReadAccess;
+    myReportExceptions = reportExceptions;
   }
 
   @Override
-  protected void done() {
-    if (myReportExceptions) {
-      // let exceptions during execution manifest themselves
-      try {
-        get();
-      }
-      catch (CancellationException e) {
-        //ignore
+  public void run() {
+    Runnable runnable = new Runnable() {
+      public void run() {
+        try {
+          if (myJob.isCanceled()) {
+            //set(null);
+            cancel(false); //todo cancel or set?
+          }
+          else {
+            PrioritizedFutureTask.super.run();
+          }
+        }
+        finally {
+          try {
+            if (myReportExceptions) {
+              // let exceptions during execution manifest themselves
+              PrioritizedFutureTask.super.get();
+            }
+          }
+          catch (CancellationException ignored) {
+          }
+          catch (InterruptedException e) {
+            LOG.error(e);
+          }
+          catch (ExecutionException e) {
+            LOG.error(e);
+          }
+          finally {
+            myJob.taskDone();
+            //myDoneCondition.up();
+          }
+        }
       }
-      catch (InterruptedException e) {
-        LOG.error(e);
+    };
+    if (myParentThreadHasReadAccess) {
+      if (ApplicationManagerEx.getApplicationEx().isUnitTestMode()) {
+        // all tests unfortunately are run from within write action, so they cannot really run read action in any thread
+        ApplicationImpl.setExceptionalThreadWithReadAccessFlag(true);
       }
-      catch (ExecutionException e) {
-        LOG.error(e);
+      // have to start "real" read action so that we cannot start write action until we are finished here
+      if (!ApplicationManagerEx.getApplicationEx().tryRunReadAction(runnable)) {
+        myJob.cancel();
       }
     }
+    else {
+      runnable.run();
+    }
+  }
+
+  public int compareTo(final PrioritizedFutureTask o) {
+    int priorityDelta = myPriority - o.myPriority;
+    if (priorityDelta != 0) return priorityDelta;
+    if (myJobIndex != o.myJobIndex) return myJobIndex < o.myJobIndex ? -1 : 1;
+    return myTaskIndex - o.myTaskIndex;
   }
 }
\ No newline at end of file
index 6b45ee24986f9289c0f382c9912a030092152381..04a8467201118993c27269ce4bbb63320e4633b8 100644 (file)
@@ -17,7 +17,6 @@ package com.intellij.ide;
 
 
 import com.intellij.Patches;
-import com.intellij.concurrency.JobSchedulerImpl;
 import com.intellij.ide.dnd.DnDManager;
 import com.intellij.ide.dnd.DnDManagerImpl;
 import com.intellij.openapi.Disposable;
@@ -363,14 +362,12 @@ public class IdeEventQueue extends EventQueue {
     AWTEvent oldEvent = myCurrentEvent;
     myCurrentEvent = e;
 
-    JobSchedulerImpl.suspend();
     try {
       _dispatchEvent(e);
     }
     finally {
       myIsInInputEvent = wasInputEvent;
       myCurrentEvent = oldEvent;
-      JobSchedulerImpl.resume();
 
       for (EventDispatcher each : myPostprocessors) {
         each.dispatch(e);
@@ -566,24 +563,24 @@ public class IdeEventQueue extends EventQueue {
                                    || queue.peekEvent(MouseEvent.MOUSE_CLICKED) != null;
 
         if (!mouseEventsAhead) {
-          Window showingWindow = mgr.getActiveWindow();
-          if (showingWindow != null) {
-            final IdeFocusManager fm = IdeFocusManager.findInstanceByComponent(showingWindow);
-            fm.doWhenFocusSettlesDown(new Runnable() {
-              public void run() {
-                if (mgr.getFocusOwner() == null) {
-                  final Application app = ApplicationManager.getApplication();
-                  if (app != null && app.isActive()) {
-                    fm.requestDefaultFocus(false);
-                  }
+        Window showingWindow = mgr.getActiveWindow();
+        if (showingWindow != null) {
+          final IdeFocusManager fm = IdeFocusManager.findInstanceByComponent(showingWindow);
+          fm.doWhenFocusSettlesDown(new Runnable() {
+            public void run() {
+              if (mgr.getFocusOwner() == null) {
+                final Application app = ApplicationManager.getApplication();
+                if (app != null && app.isActive()) {
+                  fm.requestDefaultFocus(false);
                 }
               }
-            });
-          }
+            }
+          });
         }
       }
     }
   }
+  }
 
   private void enterSuspendModeIfNeeded(AWTEvent e) {
     if (e instanceof KeyEvent) {
index 470523754f8802c75e209f552c959da9ba090af8..2c616ab1dbb23411ddcdbdd547de7724d2b44b00 100644 (file)
@@ -1048,8 +1048,9 @@ public class ApplicationImpl extends ComponentManagerImpl implements Application
   }
 
   public void saveSettings() {
-    if (myDoNotSave || isUnitTestMode() || isHeadlessEnvironment()) return;
-    _saveSettings();
+    if (!myDoNotSave && !isUnitTestMode() && !isHeadlessEnvironment()) {
+      _saveSettings();
+    }
   }
 
   public void saveAll() {
index 22a2f1e98d804b3710fef27199a2bde86e29a1c2..194268de8996de7928ba0a598de71c2589c16af7 100644 (file)
@@ -15,7 +15,7 @@
  */
 package com.intellij.testFramework;
 
-import com.intellij.ide.DataManager;
+import com.intellij.ide.*;
 import com.intellij.ide.util.treeView.AbstractTreeNode;
 import com.intellij.ide.util.treeView.AbstractTreeStructure;
 import com.intellij.idea.Bombed;
@@ -43,11 +43,15 @@ import junit.framework.AssertionFailedError;
 import org.jetbrains.annotations.NonNls;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 import javax.swing.*;
 import javax.swing.tree.DefaultMutableTreeNode;
 import javax.swing.tree.TreePath;
 import java.util.*;
+import java.awt.*;
+import java.awt.event.InvocationEvent;
+
 
 /**
  * @author yole
@@ -183,6 +187,20 @@ public class PlatformTestUtil {
     }
   }
 
+  @TestOnly
+  public static void dispatchAllInvocationEventsInIdeEventQueue() throws InterruptedException {
+    assert SwingUtilities.isEventDispatchThread() : Thread.currentThread();
+    final EventQueue eventQueue = Toolkit.getDefaultToolkit().getSystemEventQueue();
+    while (true) {
+      AWTEvent event = eventQueue.peekEvent();
+      if (event == null) break;
+        AWTEvent event1 = eventQueue.getNextEvent();
+        if (event1 instanceof InvocationEvent) {
+          IdeEventQueue.getInstance().dispatchEvent(event1);
+        }
+    }
+  }
+
   private static Date raidDate(Bombed bombed) {
     final Calendar instance = Calendar.getInstance();
     instance.set(Calendar.YEAR, bombed.year());
index 20ce5e9ce1702095e032e2f6c231540d80dedbab..e588c5261c1a73491ff8591fbe67c5f2161ebc43 100644 (file)
@@ -173,7 +173,7 @@ public class DuplicatePropertyInspection extends DescriptorProviderInspection {
     final ProgressIndicator progress = ProgressWrapper.wrap(original);
     ProgressManager.getInstance().runProcess(new Runnable() {
       public void run() {
-        JobUtil.invokeConcurrentlyUnderMyProgress(properties, new Processor<Property>() {
+        if (!JobUtil.invokeConcurrentlyUnderMyProgress(properties, new Processor<Property>() {
           public boolean process(final Property property) {
             if (original != null) {
               if (original.isCanceled()) return false;
@@ -183,7 +183,7 @@ public class DuplicatePropertyInspection extends DescriptorProviderInspection {
             processTextUsages(processedKeyToFiles, property.getUnescapedKey(), processedValueToFiles, searchHelper, scope);
             return true;
           }
-        }, "Searching properties usages");
+        }, "Searching properties usages")) throw new ProcessCanceledException();
 
         List<ProblemDescriptor> problemDescriptors = new ArrayList<ProblemDescriptor>();
         Map<String, Set<String>> keyToDifferentValues = new HashMap<String, Set<String>>();
index 1258a741049b59f7d25bff5879c6ca1b2e304bba..5d7e54f57e1a330dfc8f278864aa9a85338c7b6f 100644 (file)
@@ -24,6 +24,7 @@ import com.intellij.lang.properties.psi.PropertiesFile;
 import com.intellij.lang.properties.psi.Property;
 import com.intellij.openapi.module.Module;
 import com.intellij.openapi.module.ModuleUtil;
+import com.intellij.openapi.progress.ProcessCanceledException;
 import com.intellij.openapi.progress.ProgressIndicator;
 import com.intellij.openapi.progress.ProgressManager;
 import com.intellij.psi.PsiElement;
@@ -61,7 +62,7 @@ public class UnusedPropertyInspection extends PropertySuppressableInspectionBase
 
     final GlobalSearchScope searchScope = GlobalSearchScope.moduleWithDependentsScope(module);
     final ProgressIndicator original = ProgressManager.getInstance().getProgressIndicator();
-    JobUtil.invokeConcurrentlyUnderMyProgress(properties, new Processor<Property>() {
+    if (!JobUtil.invokeConcurrentlyUnderMyProgress(properties, new Processor<Property>() {
       public boolean process(final Property property) {
         if (original != null) {
           if (original.isCanceled()) return false;
@@ -93,7 +94,7 @@ public class UnusedPropertyInspection extends PropertySuppressableInspectionBase
 
         return true;
       }
-    }, "Searching properties usages");
+    }, "Searching properties usages")) throw new ProcessCanceledException();
 
     synchronized (descriptors) {
       return descriptors.toArray(new ProblemDescriptor[descriptors.size()]);