FakeScheduledExecutor.java
/*
* Copyright (c) 2020, Stein Eldar Johnsen
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package net.morimekta.testing.concurrent;
import net.morimekta.testing.concurrent.internal.CompletableScheduledFuture;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* A scheduled executor that uses a fake clock as back-bone to the executor. To trigger scheduled tasks in the
* executions, call {@link FakeClock#tick(long)} on the fake clock. Execution will be handles in a separate
* thread using an internal executor service.
*/
public class FakeScheduledExecutor
extends AbstractExecutorService
implements ScheduledExecutorService, FakeClock.TimeListener {
public FakeScheduledExecutor(FakeClock clock) {
this(clock, 1);
}
/**
* Create a fake scheduled executor.
*
* @param clock Clock to trigger executions.
* @param maxThreads Max number of threads.
*/
public FakeScheduledExecutor(FakeClock clock, int maxThreads) {
this.scheduledTasks = new ArrayList<>();
this.executor = newFixedThreadPool(maxThreads);
this.clock = requireNonNull(clock, "clock == null");
this.listener = new FakeTimeListener();
this.clock.addListener(listener);
this.nextId = new AtomicInteger();
}
// ---- FakeClock.TimeListener
/**
* @param newNow The new current time value.
* @deprecated Use the {@link FakeClock#tick(long)} method to propagate time.
*/
@Override
@Deprecated(since = "v5.2.0", forRemoval = true)
public void newCurrentTime(Instant newNow) {
throw new UnsupportedOperationException("Not allowed to propagate time by calling this method");
}
/**
* @param now The current time when asking.
* @return The delay until the next execution should happen.
* @deprecated Should not be used.
*/
@Override
@Deprecated(since = "v5.2.0", forRemoval = true)
public Duration getDelay(Instant now) {
return listener.getDelay(now);
}
// ---- ScheduledExecutorService
@Override
public ScheduledFuture<?> schedule(Runnable runnable, long l, TimeUnit timeUnit) {
requireNonNull(runnable, "runnable == null");
return this.schedule(() -> {
runnable.run();
return null;
}, l, timeUnit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit timeUnit) {
requireNonNull(callable, "callable == null");
requireNonNull(timeUnit, "timeUnit == null");
if (delay < 0) {
throw new IllegalArgumentException("Invalid delay " + delay);
}
if (isShutdown()) {
throw new IllegalStateException("Executor is shut down");
}
var now = clock.instant();
var nextExecution = new AtomicReference<>(now.plusMillis(timeUnit.toMillis(delay)));
var future = new CompletableScheduledFuture<V>(clock, nextExecution);
var task = new ScheduledTask(0, 0, () -> {
try {
future.complete(callable.call());
} catch (Exception e) {
future.completeExceptionally(e);
}
}, future, nextExecution, nextExecution);
if (delay == 0) {
execute(task::runOnce);
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new AssertionError("Interrupted", e);
}
} else {
removeOnCancel(future, task);
synchronized (scheduledTasks) {
scheduledTasks.add(task);
}
}
return future;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long rate, TimeUnit timeUnit) {
validateArguments(runnable, initialDelay, 1, rate, timeUnit);
var now = clock.instant();
var nextExecution = new AtomicReference<>(Instant.MAX);
var nextRun = new AtomicReference<>(now.plusMillis(timeUnit.toMillis(initialDelay)));
var future = new CompletableScheduledFuture<Void>(clock, nextExecution);
var task = new ScheduledTask(0, timeUnit.toMillis(rate), runnable, future, nextExecution, nextRun);
removeOnCancel(future, task);
synchronized (scheduledTasks) {
scheduledTasks.add(task);
}
if (initialDelay == 0) {
execute(() -> task.runWithRate(now));
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new AssertionError("Interrupted", e);
}
} else {
nextExecution.set(nextRun.get());
}
return future;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
long initialDelay,
long delay,
TimeUnit timeUnit) {
validateArguments(runnable, initialDelay, delay, 1, timeUnit);
var now = clock.instant();
var nextExecution = new AtomicReference<>(Instant.MAX);
var nextRun = new AtomicReference<>(now.plusMillis(timeUnit.toMillis(initialDelay)));
var future = new CompletableScheduledFuture<Void>(clock, nextExecution);
var task = new ScheduledTask(timeUnit.toMillis(delay), 0, runnable, future, nextExecution, nextRun);
removeOnCancel(future, task);
synchronized (scheduledTasks) {
scheduledTasks.add(task);
}
if (initialDelay == 0) {
execute(task::runWithDelay);
try {
Thread.sleep(3);
} catch (InterruptedException e) {
throw new AssertionError("Interrupted", e);
}
} else {
nextExecution.set(nextRun.get());
}
return future;
}
// ---- ExecutorService
@Override
public void shutdown() {
executor.shutdown();
synchronized (scheduledTasks) {
scheduledTasks.forEach(task -> task.future.complete(null));
scheduledTasks.clear();
}
}
@Override
public List<Runnable> shutdownNow() {
var ret = executor.shutdownNow();
synchronized (scheduledTasks) {
scheduledTasks.forEach(task -> task.future.complete(null));
scheduledTasks.clear();
}
return ret;
}
@Override
public boolean isShutdown() {
return executor.isShutdown();
}
@Override
public boolean isTerminated() {
return executor.isTerminated();
}
@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
return executor.awaitTermination(l, timeUnit);
}
// ---- Executor
@Override
public void execute(Runnable runnable) {
executor.execute(runnable);
}
// ---- Private
private void validateArguments(Runnable runnable,
long initialDelay,
long delay,
long rate,
TimeUnit timeUnit) {
requireNonNull(runnable, "runnable == null");
requireNonNull(timeUnit, "timeUnit == null");
if (initialDelay < 0) {
throw new IllegalArgumentException("Invalid initial delay " + initialDelay);
}
if (delay < 1) {
throw new IllegalArgumentException("Invalid delay " + delay);
}
if (rate < 1) {
throw new IllegalArgumentException("Invalid rate " + rate);
}
if (isShutdown()) {
throw new IllegalStateException("Executor is shut down");
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private void removeOnCancel(CompletableFuture<?> future, ScheduledTask task) {
future.whenCompleteAsync((value, throwable) -> {
if (future.isCancelled()) {
synchronized (scheduledTasks) {
scheduledTasks.remove(task);
}
}
}, executor);
}
private class ScheduledTask
implements Comparable<ScheduledTask> {
private final long delayMs;
private final long rateMs;
private final Runnable runnable;
private final CompletableFuture<?> future;
private final AtomicReference<Instant> nextExecution;
private final AtomicReference<Instant> nextRun;
private final int id;
private ScheduledTask(long delayMs,
long rateMs,
Runnable runnable,
CompletableFuture<?> future,
AtomicReference<Instant> nextExecution,
AtomicReference<Instant> nextRun) {
this.delayMs = delayMs;
this.rateMs = rateMs;
this.id = nextId.incrementAndGet();
this.runnable = runnable;
this.nextExecution = nextExecution;
this.nextRun = nextRun;
this.future = future;
}
public boolean isCancelled() {
return future.isCancelled();
}
public boolean shouldExecute(Instant now) {
return !isCancelled() && !now.isBefore(nextExecution.get());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ScheduledTask)) {
return false;
}
ScheduledTask that = (ScheduledTask) o;
return delayMs == that.delayMs &&
rateMs == that.rateMs &&
id == that.id;
}
@Override
public int hashCode() {
return Objects.hash(delayMs, rateMs, id);
}
@Override
public int compareTo(ScheduledTask task) {
requireNonNull(task, "delayed == null");
int c = nextRun.get().compareTo(task.nextRun.get());
if (c != 0) {
return c;
}
return Integer.compare(id, task.id);
}
private void runOnce() {
if (future.isDone()) {
return;
}
runnable.run();
synchronized (scheduledTasks) {
scheduledTasks.remove(this);
}
}
private void runWithDelay() {
if (future.isDone()) {
return;
}
try {
runnable.run();
} catch (Exception e) {
future.completeExceptionally(e);
synchronized (scheduledTasks) {
scheduledTasks.remove(this);
}
return;
}
var nextRun = clock.instant().plusMillis(delayMs);
nextExecution.set(nextRun);
this.nextRun.set(nextRun);
}
private void runWithRate(Instant now) {
if (future.isDone()) {
return;
}
// Run the task for each time it should have been run in the
// time the last 'tick' should have triggered.
Instant nextRun = this.nextRun.get();
while (!now.isBefore(nextRun)) {
try {
runnable.run();
} catch (Exception e) {
future.completeExceptionally(e);
synchronized (scheduledTasks) {
scheduledTasks.remove(this);
}
return;
}
nextRun = this.nextRun.updateAndGet(it -> it.plusMillis(rateMs));
}
nextExecution.set(nextRun);
}
}
private class FakeTimeListener implements FakeClock.TimeListener {
@Override
public void newCurrentTime(Instant now) {
if (isShutdown()) {
return;
}
AtomicBoolean scheduled = new AtomicBoolean();
synchronized (scheduledTasks) {
scheduledTasks.removeIf(ScheduledTask::isCancelled);
scheduledTasks.stream()
.sorted()
.filter(task -> task.shouldExecute(now))
.forEach(task -> {
task.nextExecution.set(Instant.MAX);
scheduled.set(true);
if (task.delayMs > 0) {
execute(task::runWithDelay);
} else if (task.rateMs > 0) {
execute(() -> task.runWithRate(now));
} else {
execute(task::runOnce);
}
});
}
if (scheduled.get()) {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
throw new AssertionError("Interrupted", e);
}
}
}
@Override
public Duration getDelay(Instant now) {
synchronized (scheduledTasks) {
scheduledTasks.removeIf(ScheduledTask::isCancelled);
var next = scheduledTasks
.stream()
.map(it -> it.nextExecution.get())
.filter(it -> !it.equals(Instant.MAX))
.sorted()
.findFirst()
.orElse(Instant.MAX);
if (next.equals(Instant.MAX)) {
return Duration.ZERO;
}
if (!now.isBefore(next)) {
return FakeClock.MIN_DELAY;
}
return Duration.between(now, next);
}
}
}
private final FakeClock clock;
private final FakeTimeListener listener;
private final List<ScheduledTask> scheduledTasks;
private final ExecutorService executor;
private final AtomicInteger nextId;
}