ProgressManager.java
package net.morimekta.terminal.progress;
import net.morimekta.strings.chr.Char;
import net.morimekta.terminal.LineBuffer;
import net.morimekta.terminal.Terminal;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Math.max;
import static java.util.Comparator.comparing;
import static net.morimekta.strings.chr.Control.cursorUp;
/**
* Show progress on a number of tasks. The tasks can be dynamically created and finished. E.g. if a number of large files
* needs to be downloaded they can be given a task each, and only a certain number of files will be downloaded at the
* same time. Example:
*
* <pre>{@code
* try (ProgressManager progress = new ProgressManager(term, Progress.Spinner.CLOCK)) {
* Future<String> first = progress.addTask("First Task", 10000, task -> {
* // All the work
* task.accept(10000);
* return "OK";
* });
* Future<String> second = progress.addTask("Second Task", 10000, task -> {
* // All the work
* task.accept(10000);
* return "OK";
* });
*
* progress.waitAbortable();
*
* term.println("First: " + first.get());
* term.println("Second: " + second.get());
* } finally {
* term.println();
* }
* }</pre>
*/
public class ProgressManager
implements AutoCloseable {
@FunctionalInterface
public interface ProgressHandler<T> {
T handle(Flow.Subscriber<Progress> progress) throws Exception;
}
@FunctionalInterface
public interface ProgressAsyncHandler<T> {
void handle(CompletableFuture<T> result, Flow.Subscriber<Progress> progress);
}
/**
* Create a progress bar using the given terminal.
*
* @param terminal The terminal to use.
* @param spinner The spinner to use.
*/
public ProgressManager(Terminal terminal,
Spinner spinner) {
this(terminal,
spinner,
DEFAULT_MAX_TASKS);
}
/**
* Create a progress bar using the given terminal.
*
* @param terminal The terminal to use.
* @param spinner The spinner to use.
* @param maxTasks Maximum number fo concurrent inProgress.
*/
public ProgressManager(Terminal terminal,
Spinner spinner,
int maxTasks) {
this(terminal,
spinner,
maxTasks,
Executors.newScheduledThreadPool(maxTasks + 1, THREAD_FACTORY),
Clock.systemUTC());
}
/**
* Create a progress updater. Note that <b>either</b> terminal or the updater param must be set.
*
* @param terminal The terminal to print to.
* @param spinner The spinner type.
* @param maxTasks Max number of concurrent task count.
* @param executor The executor to run updater task in.
* @param clock The clock to use for timing.
*/
protected ProgressManager(Terminal terminal,
Spinner spinner,
int maxTasks,
ScheduledExecutorService executor,
Clock clock) {
this.terminal = terminal;
this.executor = executor;
this.clock = clock;
this.spinner = spinner;
this.maxTasks = maxTasks;
this.startedTasks = new ArrayList<>();
this.queuedTasks = new ConcurrentLinkedQueue<>();
this.buffer = new LineBuffer(terminal);
this.isWaiting = new AtomicBoolean(false);
this.updater = executor.scheduleAtFixedRate(this::doUpdate, 10L, 100L, TimeUnit.MILLISECONDS);
}
/**
* Close the progress and all tasks associated with it.
*/
@Override
public void close() {
synchronized (startedTasks) {
if (executor.isShutdown()) {
return;
}
// ... stop updater thread. Do not interrupt.
executor.shutdown();
try {
updater.get();
} catch (IllegalStateException | InterruptedException | ExecutionException ignore) {
// Ignore these closing thread errors.
}
// And stop all the tasks, do interrupt.
for (InternalTask<?> task : startedTasks) {
synchronized (task) {
if (!task.isDone()) {
task.cancel(true);
}
}
}
for (InternalTask<?> task : queuedTasks) {
synchronized (task) {
if (!task.isDone()) {
task.cancel(false);
}
}
}
try {
executor.awaitTermination(100L, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/**
* Wait for all scheduled tasks to finish allowing the user to abort all tasks with <ctrl>-C.
*
* @throws IOException If interrupted by user.
* @throws InterruptedException If interrupted by system or other threads.
*/
public void waitAbortable() throws IOException, InterruptedException {
try {
isWaiting.set(true);
terminal.waitAbortable(updater);
} finally {
if (!isCompleted()) {
close();
}
updateLines();
terminal.finish();
}
}
/**
* Add a task to be done while showing progress. If there are too many tasks ongoing, the task will be queued and
* done when the local thread pool has available threads.
*
* @param title The progress title of the task.
* @param handler The handler to do the task behind the progress being shown.
* @param <T> The return type for the task.
* @return The future returning the task result.
*/
public <T> Future<T> addTask(String title,
ProgressAsyncHandler<T> handler) {
if (executor.isShutdown()) {
throw new IllegalStateException("Adding task to closed progress manager");
}
InternalTask<T> task = new InternalTask<>(title, handler);
queuedTasks.add(task);
startTasks();
return task;
}
/**
* Add a task to be done while showing progress. If there are too many tasks ongoing, the task will be queued and
* done when the local thread pool has available threads.
*
* @param title The progress title of the task.
* @param handler The handler to do the task behind the progress being shown.
* @param <T> The return type for the task.
* @return The future returning the task result.
*/
public <T> Future<T> addTask(String title,
ProgressHandler<T> handler) {
ProgressAsyncHandler<T> async = (result, progress) -> {
try {
result.complete(handler.handle(progress));
} catch (Exception e) {
if (!result.isCancelled()) {
result.completeExceptionally(e);
}
}
};
return addTask(title, async);
}
protected List<String> lines() {
return buffer.lines();
}
// ------ private ------
private static final int DEFAULT_MAX_TASKS = 5;
private final Terminal terminal;
private final ExecutorService executor;
private final Spinner spinner;
private final Clock clock;
private final Future<?> updater;
private final ArrayList<InternalTask<?>> startedTasks;
private final Queue<InternalTask<?>> queuedTasks;
private final LineBuffer buffer;
private final AtomicBoolean isWaiting;
private final int maxTasks;
private void startTasks() {
synchronized (startedTasks) {
if (executor.isShutdown()) {
return;
}
int toAdd = maxTasks;
for (InternalTask<?> task : startedTasks) {
if (!task.isDone()) {
--toAdd;
}
}
while (toAdd-- > 0 && !queuedTasks.isEmpty()) {
InternalTask<?> task = queuedTasks.poll();
startedTasks.add(task);
task.start();
}
}
}
private int getTerminalWidth() {
return terminal.getTTY().getTerminalSize().cols;
}
private boolean isCompleted() {
synchronized (startedTasks) {
if (!queuedTasks.isEmpty()) {
return false;
}
for (InternalTask<?> task : startedTasks) {
if (!task.isDone()) {
return false;
}
}
}
return true;
}
private void doUpdate() {
updateLines();
if (isWaiting.get() && isCompleted()) {
updater.cancel(false);
}
}
private void printAndRemoveInitialCompletedTasks() {
Iterator<InternalTask<?>> it = startedTasks.iterator();
while (it.hasNext()) {
InternalTask<?> next = it.next();
if (next.fraction >= 1.0) {
terminal.println(renderTask(next));
it.remove();
} else {
break;
}
}
}
private void updateLines() {
List<String> updatedLines = new ArrayList<>();
synchronized (startedTasks) {
// Make sure we read terminal size before each actual update.
terminal.getTTY().clearCachedTerminalSize();
int maxUpdating = Math.min(terminal.getTTY().getTerminalSize().rows, maxTasks * 2);
if (startedTasks.size() > maxUpdating) {
// If we have more started then number of available rows:
// Clear the buffer (should clear all lines and move cursor to the beginning).
buffer.clear();
terminal.print(Char.CR);
// First try to move all completed tasks off the started task list
// without changing the already completed order. Move the already
// completed tasks off that are all before non-completed tasks.
printAndRemoveInitialCompletedTasks();
if (startedTasks.size() > maxUpdating) {
// If the list of started tasks is still too large move all completed
// tasks first, and then repeat.
// SORT completed BEFORE not-completed.
startedTasks.sort(comparing(t -> t.fraction < 1.0));
printAndRemoveInitialCompletedTasks();
}
// And 1 back.
terminal.print(cursorUp(1));
}
for (InternalTask<?> task : startedTasks) {
updatedLines.add(renderTask(task));
}
if (queuedTasks.size() > 0) {
updatedLines.add(" -- And " + queuedTasks.size() + " more...");
}
}
if (updatedLines.size() > 0) {
while (buffer.count() < updatedLines.size()) {
buffer.add("");
}
buffer.update(0, updatedLines);
}
}
private String renderTask(InternalTask<?> task) {
long now = clock.millis();
synchronized (task) {
int pts_w = getTerminalWidth() - 2 - task.title.length();
if (task.isCancelled()) {
return task.title + ": " + spinner.atStopped(task.fraction, "Cancelled", pts_w);
} else if (task.isCompletedExceptionally()) {
return task.title + ": " + spinner.atStopped(task.fraction, "Failed", pts_w);
} else if (task.fraction < 1.0) {
Duration remaining = null;
// Progress has actually gone forward, recalculate total time.
if (task.expected_done_ts > 0) {
long remaining_ms = max(0L, task.expected_done_ts - now);
remaining = Duration.of(remaining_ms, ChronoUnit.MILLIS);
}
return task.title + ": " + spinner.atProgress(task.fraction, task.spinner_pos, remaining, pts_w);
} else {
var duration = Duration.of(task.updated_ts - task.started_ts, ChronoUnit.MILLIS);
return task.title + ": " + spinner.atComplete(duration, pts_w);
}
}
}
public class InternalTask<T>
extends CompletableFuture<T>
implements Flow.Subscriber<Progress>, Runnable {
final long created_ts;
final String title;
final AtomicReference<Future<?>> future;
final ProgressAsyncHandler<T> handler;
volatile int spinner_pos;
volatile long spinner_update_ts;
volatile long started_ts;
volatile long updated_ts;
volatile long updated_time_ts;
volatile long expected_done_ts;
volatile double fraction;
/**
* Create a progress updater. Note that <b>either</b> terminal or the updater param must be set.
*
* @param title What progresses.
* @param handler The async handler for the task.
*/
InternalTask(String title,
ProgressAsyncHandler<T> handler) {
this.title = title;
this.handler = handler;
this.future = new AtomicReference<>();
this.spinner_pos = 0;
this.spinner_update_ts = 0L;
this.created_ts = clock.millis();
this.started_ts = 0;
this.fraction = 0.0;
}
void start() {
synchronized (this) {
if (isCancelled()) {
throw new IllegalStateException("Starting cancelled task");
}
if (started_ts > 0) {
throw new IllegalStateException("Already Started");
}
started_ts = clock.millis();
spinner_update_ts = started_ts;
future.set(executor.submit(() -> {
try {
handler.handle(this, this);
} catch (Exception e) {
if (!isDone()) {
completeExceptionally(e);
}
}
}));
}
}
@Override
public void run() {
synchronized (this) {
if (isDone()) {
return;
}
if (started_ts > 0) {
throw new IllegalStateException("Already Started");
}
started_ts = clock.millis();
spinner_update_ts = started_ts;
}
try {
handler.handle(this, this);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public boolean complete(T t) {
try {
synchronized (this) {
if (isDone()) {
return false;
}
onNext(new Progress(1, 1));
return super.complete(t);
}
} finally {
startTasks();
}
}
@Override
public boolean completeExceptionally(Throwable throwable) {
try {
synchronized (this) {
if (isDone()) {
return false;
}
stopInternal();
return super.completeExceptionally(throwable);
}
} finally {
startTasks();
}
}
@Override
public boolean cancel(boolean interruptable) {
try {
Future<?> f = future.get();
if (f != null) {
if (!f.isDone()) {
f.cancel(interruptable);
}
}
synchronized (this) {
if (super.isDone()) {
return false;
}
stopInternal();
return super.cancel(interruptable);
}
} finally {
startTasks();
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
/**
* Update the progress to reflect the current progress value.
*
* @param current The new current progress value.
*/
@Override
public void onNext(Progress current) {
synchronized (this) {
if (isCancelled()) {
throw new IllegalStateException("Task is cancelled");
}
if (this.isDone()) {
return;
}
long now = clock.millis();
if (now >= (spinner_update_ts + 100)) {
++spinner_pos;
spinner_update_ts = now;
}
if (current.getRatio() < 1.0) {
fraction = current.getRatio();
long duration_ms = now - started_ts;
if (duration_ms > 3000) {
// Progress has actually gone forward, recalculate total time only if
// we have 3 second of progress.
if (expected_done_ts == 0L || updated_time_ts < (now - 2000L)) {
// Update total / expected time once per 2 seconds.
long assumed_total = (long) (((double) duration_ms) / fraction);
long remaining_ms = max(0L, assumed_total - duration_ms);
expected_done_ts = now + remaining_ms;
updated_time_ts = now;
}
}
} else {
fraction = 1.0;
expected_done_ts = now;
}
updated_ts = now;
}
}
@Override
public void onError(Throwable throwable) {
completeExceptionally(throwable);
}
@Override
public void onComplete() {
onNext(new Progress(1, 1));
stopInternal();
}
private void stopInternal() {
long now = clock.millis();
this.updated_ts = now;
this.updated_time_ts = now;
this.spinner_update_ts = now;
this.expected_done_ts = 0L;
}
}
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
private final AtomicInteger nextId = new AtomicInteger(1);
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName("progress-" + nextId.getAndIncrement());
thread.setDaemon(true);
return thread;
}
};
}