ProgressManager.java

package net.morimekta.terminal.progress;

import net.morimekta.strings.chr.Char;
import net.morimekta.terminal.LineBuffer;
import net.morimekta.terminal.Terminal;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.lang.Math.max;
import static java.util.Comparator.comparing;
import static net.morimekta.strings.chr.Control.cursorUp;

/**
 * Show progress on a number of tasks. The tasks can be dynamically created and finished. E.g. if a number of large files
 * needs to be downloaded they can be given a task each, and only a certain number of files will be downloaded at the
 * same time. Example:
 *
 * <pre>{@code
 * try (ProgressManager progress = new ProgressManager(term, Progress.Spinner.CLOCK)) {
 *     Future<String> first = progress.addTask("First Task", 10000, task -> {
 *         // All the work
 *         task.accept(10000);
 *         return "OK";
 *     });
 *     Future<String> second = progress.addTask("Second Task", 10000, task -> {
 *         // All the work
 *         task.accept(10000);
 *         return "OK";
 *     });
 *
 *     progress.waitAbortable();
 *
 *     term.println("First: " + first.get());
 *     term.println("Second: " + second.get());
 * } finally {
 *     term.println();
 * }
 * }</pre>
 */
public class ProgressManager
        implements AutoCloseable {
    @FunctionalInterface
    public interface ProgressHandler<T> {
        T handle(Flow.Subscriber<Progress> progress) throws Exception;
    }

    @FunctionalInterface
    public interface ProgressAsyncHandler<T> {
        void handle(CompletableFuture<T> result, Flow.Subscriber<Progress> progress);
    }

    /**
     * Create a progress bar using the given terminal.
     *
     * @param terminal The terminal to use.
     * @param spinner  The spinner to use.
     */
    public ProgressManager(Terminal terminal,
                           Spinner spinner) {
        this(terminal,
             spinner,
             DEFAULT_MAX_TASKS);
    }

    /**
     * Create a progress bar using the given terminal.
     *
     * @param terminal The terminal to use.
     * @param spinner  The spinner to use.
     * @param maxTasks Maximum number fo concurrent inProgress.
     */
    public ProgressManager(Terminal terminal,
                           Spinner spinner,
                           int maxTasks) {
        this(terminal,
             spinner,
             maxTasks,
             Executors.newScheduledThreadPool(maxTasks + 1, THREAD_FACTORY),
             Clock.systemUTC());
    }

    /**
     * Create a progress updater. Note that <b>either</b> terminal or the updater param must be set.
     *
     * @param terminal The terminal to print to.
     * @param spinner  The spinner type.
     * @param maxTasks Max number of concurrent task count.
     * @param executor The executor to run updater task in.
     * @param clock    The clock to use for timing.
     */
    protected ProgressManager(Terminal terminal,
                              Spinner spinner,
                              int maxTasks,
                              ScheduledExecutorService executor,
                              Clock clock) {
        this.terminal = terminal;
        this.executor = executor;
        this.clock = clock;
        this.spinner = spinner;

        this.maxTasks = maxTasks;
        this.startedTasks = new ArrayList<>();
        this.queuedTasks = new ConcurrentLinkedQueue<>();
        this.buffer = new LineBuffer(terminal);
        this.isWaiting = new AtomicBoolean(false);
        this.updater = executor.scheduleAtFixedRate(this::doUpdate, 10L, 100L, TimeUnit.MILLISECONDS);
    }

    /**
     * Close the progress and all tasks associated with it.
     */
    @Override
    public void close() {
        synchronized (startedTasks) {
            if (executor.isShutdown()) {
                return;
            }
            // ... stop updater thread. Do not interrupt.
            executor.shutdown();
            try {
                updater.get();
            } catch (IllegalStateException | InterruptedException | ExecutionException ignore) {
                // Ignore these closing thread errors.
            }
            // And stop all the tasks, do interrupt.
            for (InternalTask<?> task : startedTasks) {
                synchronized (task) {
                    if (!task.isDone()) {
                        task.cancel(true);
                    }
                }
            }
            for (InternalTask<?> task : queuedTasks) {
                synchronized (task) {
                    if (!task.isDone()) {
                        task.cancel(false);
                    }
                }
            }

            try {
                executor.awaitTermination(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /**
     * Wait for all scheduled tasks to finish allowing the user to abort all tasks with &lt;ctrl&gt;-C.
     *
     * @throws IOException          If interrupted by user.
     * @throws InterruptedException If interrupted by system or other threads.
     */
    public void waitAbortable() throws IOException, InterruptedException {
        try {
            isWaiting.set(true);
            terminal.waitAbortable(updater);
        } finally {
            if (!isCompleted()) {
                close();
            }
            updateLines();
            terminal.finish();
        }
    }

    /**
     * Add a task to be done while showing progress. If there are too many tasks ongoing, the task will be queued and
     * done when the local thread pool has available threads.
     *
     * @param title   The progress title of the task.
     * @param handler The handler to do the task behind the progress being shown.
     * @param <T>     The return type for the task.
     * @return The future returning the task result.
     */
    public <T> Future<T> addTask(String title,
                                 ProgressAsyncHandler<T> handler) {
        if (executor.isShutdown()) {
            throw new IllegalStateException("Adding task to closed progress manager");
        }

        InternalTask<T> task = new InternalTask<>(title, handler);
        queuedTasks.add(task);
        startTasks();
        return task;
    }

    /**
     * Add a task to be done while showing progress. If there are too many tasks ongoing, the task will be queued and
     * done when the local thread pool has available threads.
     *
     * @param title   The progress title of the task.
     * @param handler The handler to do the task behind the progress being shown.
     * @param <T>     The return type for the task.
     * @return The future returning the task result.
     */
    public <T> Future<T> addTask(String title,
                                 ProgressHandler<T> handler) {
        ProgressAsyncHandler<T> async = (result, progress) -> {
            try {
                result.complete(handler.handle(progress));
            } catch (Exception e) {
                if (!result.isCancelled()) {
                    result.completeExceptionally(e);
                }
            }
        };
        return addTask(title, async);
    }

    protected List<String> lines() {
        return buffer.lines();
    }

    // ------ private ------

    private static final int DEFAULT_MAX_TASKS = 5;

    private final Terminal                   terminal;
    private final ExecutorService            executor;
    private final Spinner                    spinner;
    private final Clock                      clock;
    private final Future<?>                  updater;
    private final ArrayList<InternalTask<?>> startedTasks;
    private final Queue<InternalTask<?>>     queuedTasks;
    private final LineBuffer                 buffer;
    private final AtomicBoolean              isWaiting;
    private final int                        maxTasks;

    private void startTasks() {
        synchronized (startedTasks) {
            if (executor.isShutdown()) {
                return;
            }

            int toAdd = maxTasks;
            for (InternalTask<?> task : startedTasks) {
                if (!task.isDone()) {
                    --toAdd;
                }
            }

            while (toAdd-- > 0 && !queuedTasks.isEmpty()) {
                InternalTask<?> task = queuedTasks.poll();
                startedTasks.add(task);
                task.start();
            }
        }
    }

    private int getTerminalWidth() {
        return terminal.getTTY().getTerminalSize().cols;
    }

    private boolean isCompleted() {
        synchronized (startedTasks) {
            if (!queuedTasks.isEmpty()) {
                return false;
            }
            for (InternalTask<?> task : startedTasks) {
                if (!task.isDone()) {
                    return false;
                }
            }
        }

        return true;
    }

    private void doUpdate() {
        updateLines();
        if (isWaiting.get() && isCompleted()) {
            updater.cancel(false);
        }
    }

    private void printAndRemoveInitialCompletedTasks() {
        Iterator<InternalTask<?>> it = startedTasks.iterator();
        while (it.hasNext()) {
            InternalTask<?> next = it.next();
            if (next.fraction >= 1.0) {
                terminal.println(renderTask(next));
                it.remove();
            } else {
                break;
            }
        }
    }

    private void updateLines() {
        List<String> updatedLines = new ArrayList<>();
        synchronized (startedTasks) {
            // Make sure we read terminal size before each actual update.
            terminal.getTTY().clearCachedTerminalSize();

            int maxUpdating = Math.min(terminal.getTTY().getTerminalSize().rows, maxTasks * 2);

            if (startedTasks.size() > maxUpdating) {
                // If we have more started then number of available rows:

                // Clear the buffer (should clear all lines and move cursor to the beginning).
                buffer.clear();
                terminal.print(Char.CR);

                // First try to move all completed tasks off the started task list
                // without changing the already completed order. Move the already
                // completed tasks off that are all before non-completed tasks.
                printAndRemoveInitialCompletedTasks();

                if (startedTasks.size() > maxUpdating) {
                    // If the list of started tasks is still too large move all completed
                    // tasks first, and then repeat.

                    // SORT completed BEFORE not-completed.
                    startedTasks.sort(comparing(t -> t.fraction < 1.0));

                    printAndRemoveInitialCompletedTasks();
                }

                // And 1 back.
                terminal.print(cursorUp(1));
            }

            for (InternalTask<?> task : startedTasks) {
                updatedLines.add(renderTask(task));
            }
            if (queuedTasks.size() > 0) {
                updatedLines.add(" -- And " + queuedTasks.size() + " more...");
            }
        }
        if (updatedLines.size() > 0) {
            while (buffer.count() < updatedLines.size()) {
                buffer.add("");
            }
            buffer.update(0, updatedLines);
        }
    }

    private String renderTask(InternalTask<?> task) {
        long now = clock.millis();

        synchronized (task) {
            int pts_w = getTerminalWidth() - 2 - task.title.length();
            if (task.isCancelled()) {
                return task.title + ": " + spinner.atStopped(task.fraction, "Cancelled", pts_w);
            } else if (task.isCompletedExceptionally()) {
                return task.title + ": " + spinner.atStopped(task.fraction, "Failed", pts_w);
            } else if (task.fraction < 1.0) {
                Duration remaining = null;
                // Progress has actually gone forward, recalculate total time.
                if (task.expected_done_ts > 0) {
                    long remaining_ms = max(0L, task.expected_done_ts - now);
                    remaining = Duration.of(remaining_ms, ChronoUnit.MILLIS);
                }
                return task.title + ": " + spinner.atProgress(task.fraction, task.spinner_pos, remaining, pts_w);
            } else {
                var duration = Duration.of(task.updated_ts - task.started_ts, ChronoUnit.MILLIS);
                return task.title + ": " + spinner.atComplete(duration, pts_w);
            }
        }
    }

    public class InternalTask<T>
            extends CompletableFuture<T>
            implements Flow.Subscriber<Progress>, Runnable {
        final long                       created_ts;
        final String                     title;
        final AtomicReference<Future<?>> future;
        final ProgressAsyncHandler<T>    handler;

        volatile int    spinner_pos;
        volatile long   spinner_update_ts;
        volatile long   started_ts;
        volatile long   updated_ts;
        volatile long   updated_time_ts;
        volatile long   expected_done_ts;
        volatile double fraction;

        /**
         * Create a progress updater. Note that <b>either</b> terminal or the updater param must be set.
         *
         * @param title   What progresses.
         * @param handler The async handler for the task.
         */
        InternalTask(String title,
                     ProgressAsyncHandler<T> handler) {
            this.title = title;
            this.handler = handler;

            this.future = new AtomicReference<>();

            this.spinner_pos = 0;
            this.spinner_update_ts = 0L;
            this.created_ts = clock.millis();
            this.started_ts = 0;
            this.fraction = 0.0;
        }

        void start() {
            synchronized (this) {
                if (isCancelled()) {
                    throw new IllegalStateException("Starting cancelled task");
                }
                if (started_ts > 0) {
                    throw new IllegalStateException("Already Started");
                }
                started_ts = clock.millis();
                spinner_update_ts = started_ts;

                future.set(executor.submit(() -> {
                    try {
                        handler.handle(this, this);
                    } catch (Exception e) {
                        if (!isDone()) {
                            completeExceptionally(e);
                        }
                    }
                }));
            }
        }

        @Override
        public void run() {
            synchronized (this) {
                if (isDone()) {
                    return;
                }
                if (started_ts > 0) {
                    throw new IllegalStateException("Already Started");
                }
                started_ts = clock.millis();
                spinner_update_ts = started_ts;
            }
            try {
                handler.handle(this, this);
            } catch (Exception e) {
                completeExceptionally(e);
            }
        }

        @Override
        public boolean complete(T t) {
            try {
                synchronized (this) {
                    if (isDone()) {
                        return false;
                    }
                    onNext(new Progress(1, 1));
                    return super.complete(t);
                }
            } finally {
                startTasks();
            }
        }

        @Override
        public boolean completeExceptionally(Throwable throwable) {
            try {
                synchronized (this) {
                    if (isDone()) {
                        return false;
                    }
                    stopInternal();
                    return super.completeExceptionally(throwable);
                }
            } finally {
                startTasks();
            }
        }

        @Override
        public boolean cancel(boolean interruptable) {
            try {
                Future<?> f = future.get();
                if (f != null) {
                    if (!f.isDone()) {
                        f.cancel(interruptable);
                    }
                }

                synchronized (this) {
                    if (super.isDone()) {
                        return false;
                    }
                    stopInternal();
                    return super.cancel(interruptable);
                }
            } finally {
                startTasks();
            }
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
        }

        /**
         * Update the progress to reflect the current progress value.
         *
         * @param current The new current progress value.
         */
        @Override
        public void onNext(Progress current) {
            synchronized (this) {
                if (isCancelled()) {
                    throw new IllegalStateException("Task is cancelled");
                }
                if (this.isDone()) {
                    return;
                }

                long now = clock.millis();
                if (now >= (spinner_update_ts + 100)) {
                    ++spinner_pos;
                    spinner_update_ts = now;
                }

                if (current.getRatio() < 1.0) {
                    fraction = current.getRatio();

                    long duration_ms = now - started_ts;
                    if (duration_ms > 3000) {
                        // Progress has actually gone forward, recalculate total time only if
                        // we have 3 second of progress.
                        if (expected_done_ts == 0L || updated_time_ts < (now - 2000L)) {
                            // Update total / expected time once per 2 seconds.
                            long assumed_total = (long) (((double) duration_ms) / fraction);
                            long remaining_ms = max(0L, assumed_total - duration_ms);
                            expected_done_ts = now + remaining_ms;
                            updated_time_ts = now;
                        }
                    }
                } else {
                    fraction = 1.0;
                    expected_done_ts = now;
                }
                updated_ts = now;
            }
        }

        @Override
        public void onError(Throwable throwable) {
            completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            onNext(new Progress(1, 1));
            stopInternal();
        }

        private void stopInternal() {
            long now = clock.millis();
            this.updated_ts = now;
            this.updated_time_ts = now;
            this.spinner_update_ts = now;
            this.expected_done_ts = 0L;
        }
    }

    private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
        private final AtomicInteger nextId = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName("progress-" + nextId.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    };
}