refactor: add new task executor, remove task barrier (#1879)

This commit is contained in:
Skylot
2024-01-23 20:45:19 +00:00
parent e73612b4d2
commit 75d2e540aa
15 changed files with 398 additions and 192 deletions
@@ -266,10 +266,6 @@ public class JadxWrapper {
return getDecompiler().getEnclosingNode(codeInfo, pos);
}
public List<Runnable> getSaveTasks() {
return getDecompiler().getSaveTasks();
}
public List<JavaPackage> getPackages() {
return getDecompiler().getPackages();
}
@@ -5,6 +5,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@@ -20,6 +21,7 @@ import javax.swing.SwingWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jadx.api.utils.tasks.ITaskExecutor;
import jadx.gui.settings.JadxSettings;
import jadx.gui.ui.MainWindow;
import jadx.gui.ui.panel.ProgressPanel;
@@ -104,7 +106,7 @@ public class BackgroundExecutor {
private final class TaskWorker extends SwingWorker<TaskStatus, Void> implements ITaskInfo {
private final long id;
private final IBackgroundTask task;
private ThreadPoolExecutor executor;
private ITaskExecutor taskExecutor;
private TaskStatus status = TaskStatus.WAIT;
private long jobsCount;
private long jobsComplete;
@@ -148,47 +150,43 @@ public class BackgroundExecutor {
}
private void runJobs() throws InterruptedException {
List<? extends Runnable> jobs = task.scheduleJobs();
jobsCount = jobs.size();
taskExecutor = task.scheduleTasks();
jobsCount = taskExecutor.getTasksCount();
LOG.debug("Starting background task '{}', jobs count: {}, time limit: {} ms, memory check: {}",
task.getTitle(), jobsCount, task.timeLimit(), task.checkMemoryUsage());
if (jobsCount != 1) {
progressPane.changeVisibility(this, true);
}
status = TaskStatus.STARTED;
int threadsCount = settings.getThreadsCount();
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsCount);
for (Runnable job : jobs) {
executor.execute(job);
}
executor.shutdown();
taskExecutor.setThreadsCount(settings.getThreadsCount());
taskExecutor.execute();
long startTime = System.currentTimeMillis();
status = waitTermination(executor, buildCancelCheck(startTime));
status = waitTermination(buildCancelCheck(startTime));
time = System.currentTimeMillis() - startTime;
jobsComplete = executor.getCompletedTaskCount();
jobsComplete = taskExecutor.getProgress();
}
@SuppressWarnings("BusyWait")
private TaskStatus waitTermination(ThreadPoolExecutor executor, Supplier<TaskStatus> cancelCheck) throws InterruptedException {
private TaskStatus waitTermination(Supplier<TaskStatus> cancelCheck) throws InterruptedException {
try {
int k = 0;
while (true) {
if (executor.isTerminated()) {
if (!taskExecutor.isRunning()) {
return TaskStatus.COMPLETE;
}
TaskStatus cancelStatus = cancelCheck.get();
if (cancelStatus != null) {
performCancel(executor);
performCancel();
return cancelStatus;
}
if (k < 10) {
// faster update for short tasks
Thread.sleep(200);
if (k == 5) {
updateProgress(executor);
updateProgress();
}
} else {
updateProgress(executor);
updateProgress();
Thread.sleep(1000);
}
if (jobsCount == 1 && k == 5) {
@@ -199,22 +197,22 @@ public class BackgroundExecutor {
}
} catch (InterruptedException e) {
LOG.debug("Task wait interrupted");
performCancel(executor);
performCancel();
return TaskStatus.CANCEL_BY_USER;
} catch (Exception e) {
LOG.error("Task wait aborted by exception", e);
performCancel(executor);
performCancel();
return TaskStatus.ERROR;
}
}
private void updateProgress(ThreadPoolExecutor executor) {
private void updateProgress() {
Consumer<ITaskProgress> onProgressListener = task.getProgressListener();
ITaskProgress taskProgress = task.getTaskProgress();
if (taskProgress == null) {
setProgress(calcProgress(executor.getCompletedTaskCount(), jobsCount));
setProgress(calcProgress(taskExecutor.getProgress(), jobsCount));
if (onProgressListener != null) {
onProgressListener.accept(new TaskProgress(executor.getCompletedTaskCount(), jobsCount));
onProgressListener.accept(new TaskProgress(taskExecutor.getProgress(), jobsCount));
}
} else {
setProgress(calcProgress(taskProgress));
@@ -224,12 +222,16 @@ public class BackgroundExecutor {
}
}
private void performCancel(ThreadPoolExecutor executor) throws InterruptedException {
private void performCancel() throws InterruptedException {
progressPane.changeLabel(this, task.getTitle() + " (" + NLS.str("progress.canceling") + ")… ");
progressPane.changeIndeterminate(this, true);
// force termination
ExecutorService executor = taskExecutor.getInternalExecutor();
if (executor == null) {
return;
}
taskExecutor.terminate();
task.cancel();
executor.shutdown();
int cancelTimeout = task.getCancelTimeoutMS();
if (cancelTimeout != 0) {
if (executor.awaitTermination(cancelTimeout, TimeUnit.MILLISECONDS)) {
@@ -240,8 +242,9 @@ public class BackgroundExecutor {
LOG.debug("Forcing tasks cancel");
executor.shutdownNow();
boolean complete = executor.awaitTermination(task.getShutdownTimeoutMS(), TimeUnit.MILLISECONDS);
LOG.debug("Forced task cancel status: {}",
complete ? "success" : "fail, still active: " + executor.getActiveCount());
LOG.debug("Forced task cancel status: {}", complete
? "success"
: "fail, still active: " + (taskExecutor.getTasksCount() - taskExecutor.getProgress()));
}
private Supplier<TaskStatus> buildCancelCheck(long startTime) {
@@ -261,13 +264,13 @@ public class BackgroundExecutor {
}
if (checkMemoryUsage && !UiUtils.isFreeMemoryAvailable()) {
LOG.info("Memory usage: {}", UiUtils.memoryInfo());
if (executor.getCorePoolSize() == 1) {
if (taskExecutor.getThreadsCount() == 1) {
LOG.error("Task '{}' memory limit reached, force cancel", task.getTitle());
return TaskStatus.CANCEL_BY_MEMORY;
}
LOG.warn("Low memory, reduce processing threads count to 1");
// reduce thread count and continue
executor.setCorePoolSize(1);
// reduce threads count and continue
taskExecutor.setThreadsCount(1);
System.gc();
UiUtils.sleep(1000); // wait GC
if (!UiUtils.isFreeMemoryAvailable()) {
@@ -12,6 +12,8 @@ import org.slf4j.LoggerFactory;
import jadx.api.ICodeCache;
import jadx.api.JavaClass;
import jadx.api.utils.tasks.ITaskExecutor;
import jadx.core.utils.tasks.TaskExecutor;
import jadx.gui.JadxWrapper;
import jadx.gui.ui.MainWindow;
import jadx.gui.utils.NLS;
@@ -44,6 +46,12 @@ public class DecompileTask extends CancelableBackgroundTask {
}
@Override
public ITaskExecutor scheduleTasks() {
TaskExecutor executor = new TaskExecutor();
executor.addParallelTasks(scheduleJobs());
return executor;
}
public List<Runnable> scheduleJobs() {
if (mainWindow.getCacheObject().isFullDecompilationFinished()) {
return Collections.emptyList();
@@ -60,6 +68,10 @@ public class DecompileTask extends CancelableBackgroundTask {
LOG.error("Decompile batches build error", e);
return Collections.emptyList();
}
return getJobs(batches);
}
private List<Runnable> getJobs(List<List<JavaClass>> batches) {
ICodeCache codeCache = wrapper.getArgs().getCodeCache();
List<Runnable> jobs = new ArrayList<>(batches.size());
for (List<JavaClass> batch : batches) {
@@ -1,11 +1,11 @@
package jadx.gui.jobs;
import java.io.File;
import java.util.List;
import javax.swing.JOptionPane;
import jadx.api.ICodeCache;
import jadx.api.utils.tasks.ITaskExecutor;
import jadx.gui.JadxWrapper;
import jadx.gui.cache.code.FixedCodeCache;
import jadx.gui.ui.MainWindow;
@@ -32,11 +32,11 @@ public class ExportTask extends CancelableBackgroundTask {
}
@Override
public List<Runnable> scheduleJobs() {
public ITaskExecutor scheduleTasks() {
wrapCodeCache();
wrapper.getArgs().setRootDir(saveDir);
List<Runnable> saveTasks = wrapper.getSaveTasks();
this.timeLimit = DecompileTask.calcDecompileTimeLimit(saveTasks.size());
ITaskExecutor saveTasks = wrapper.getDecompiler().getSaveTaskExecutor();
this.timeLimit = DecompileTask.calcDecompileTimeLimit(saveTasks.getTasksCount());
return saveTasks;
}
@@ -1,18 +1,16 @@
package jadx.gui.jobs;
import java.util.List;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
import jadx.api.utils.tasks.ITaskExecutor;
public interface IBackgroundTask extends Cancelable {
String getTitle();
/**
* Jobs to run in parallel
*/
List<? extends Runnable> scheduleJobs();
ITaskExecutor scheduleTasks();
/**
* Called on executor thread after the all jobs finished.
@@ -5,6 +5,9 @@ import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
import jadx.api.utils.tasks.ITaskExecutor;
import jadx.core.utils.tasks.TaskExecutor;
/**
* Simple not cancelable task with memory check
*/
@@ -29,8 +32,10 @@ public class SimpleTask implements IBackgroundTask {
}
@Override
public List<Runnable> scheduleJobs() {
return jobs;
public ITaskExecutor scheduleTasks() {
TaskExecutor executor = new TaskExecutor();
executor.addParallelTasks(jobs);
return executor;
}
@Override
@@ -14,6 +14,8 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import jadx.api.utils.tasks.ITaskExecutor;
import jadx.core.utils.tasks.TaskExecutor;
import jadx.gui.jobs.BackgroundExecutor;
import jadx.gui.jobs.CancelableBackgroundTask;
import jadx.gui.jobs.ITaskInfo;
@@ -98,8 +100,10 @@ public class SearchTask extends CancelableBackgroundTask {
}
@Override
public List<? extends Runnable> scheduleJobs() {
return jobs;
public ITaskExecutor scheduleTasks() {
TaskExecutor executor = new TaskExecutor();
executor.addParallelTasks(jobs);
return executor;
}
@Override