fix: resolve race condition in task executor
This commit is contained in:
@@ -46,8 +46,9 @@ public class TaskExecutor implements ITaskExecutor {
|
||||
private final AtomicInteger progress = new AtomicInteger(0);
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
private final AtomicBoolean terminating = new AtomicBoolean(false);
|
||||
private int tasksCount = 0;
|
||||
private final Object executorSync = new Object();
|
||||
private @Nullable ExecutorService executor;
|
||||
private int tasksCount = 0;
|
||||
|
||||
@Override
|
||||
public void addParallelTasks(List<? extends Runnable> parallelTasks) {
|
||||
@@ -94,15 +95,35 @@ public class TaskExecutor implements ITaskExecutor {
|
||||
|
||||
@Override
|
||||
public void execute() {
|
||||
if (running.get() || executor != null) {
|
||||
throw new IllegalStateException("Already executing");
|
||||
synchronized (executorSync) {
|
||||
if (running.get() || executor != null) {
|
||||
throw new IllegalStateException("Already executing");
|
||||
}
|
||||
executor = Executors.newFixedThreadPool(1, Utils.simpleThreadFactory("task-s"));
|
||||
running.set(true);
|
||||
terminating.set(false);
|
||||
progress.set(0);
|
||||
executor.execute(this::runStages);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopExecution() {
|
||||
synchronized (executorSync) {
|
||||
running.set(false);
|
||||
terminating.set(true);
|
||||
if (executor != null) {
|
||||
executor.shutdown();
|
||||
executor = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitTermination() {
|
||||
ExecutorService activeExecutor = executor;
|
||||
if (activeExecutor != null && running.get()) {
|
||||
awaitExecutorTermination(activeExecutor);
|
||||
}
|
||||
running.set(true);
|
||||
progress.set(0);
|
||||
terminating.set(false);
|
||||
executor = Executors.newFixedThreadPool(1, Utils.simpleThreadFactory("task-s"));
|
||||
executor.execute(this::runStages);
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -125,15 +146,6 @@ public class TaskExecutor implements ITaskExecutor {
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void awaitTermination() {
|
||||
if (executor == null || !running.get()) {
|
||||
// already terminated
|
||||
return;
|
||||
}
|
||||
awaitExecutorTermination(executor);
|
||||
}
|
||||
|
||||
private void runStages() {
|
||||
try {
|
||||
for (ExecStage stage : stages) {
|
||||
@@ -156,8 +168,7 @@ public class TaskExecutor implements ITaskExecutor {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
running.set(false);
|
||||
executor = null;
|
||||
stopExecution();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user