Sometimes your code deals with a situation when things happen slowly. Maybe you schedule a background task that runs after some time. Or run a special action when asking for data that takes far too long. Either way, it is a tricky case that needs to be tested well. But what to do if we don’t want a test that waits a lot? Project Reactor, a reactive programming library for JVM, handles concurrency in a high-level and declarative fashion. Its test utility, StepVerifier, allows using Virtual Time: ‘mock’ the clock and advance time in your tests faster than the system clock runs.
A similar and tricky situation is when you need to test that a defined action is not executed for a given time. A brute force solution is to wait until time passes and then check nothing prohibited happened. Again, with Virtual Time, we can write a test that executes immediately.
The good news is: your code does not need to be prepared to run under Virtual Time. Just write it as usual, and there will be no problem testing it using this technique.
You might also appreciate that this technique can cause a multi-threaded logic to behave as if it was single-threaded. With fewer moving parts, it’s easier to assert the state of the code under test is what it is supposed to be.
Waiting for Asynchronous Action to Happen
Let’s meet our sample Kotlin code.
fun upload(path: String, data: String): Mono<Void> =
storage.save(path, data)
.onErrorResume(StorageException::class) {
scheduleDelete(path)
Mono.empty()
}
private fun scheduleDelete(path: String) {
Mono.delay(Duration.ofMillis(10_000))
.then(storage.delete(path))
.subscribe({ logger.info("Cleaned up $path") })
}
In production code, the Storage class calls some external API. We don’t want network calls during test execution, so we replace this class with a test implementation that is easy to control:
class FakeStorage(private val logic: StorageLogic) : Storage {
override fun save(path: String, data: String) =
Mono.fromRunnable<Void> { logic.save(path, data) }
override fun delete(path: String) =
Mono.fromRunnable<Void> { logic.delete(path) }
}
interface StorageLogic {
fun save(path: String, data: String)
fun delete(path: String)
}
FakeStorage serves as a bridge between reactive streams and regular mocks. It seems it’s easy to write a unit test:
whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()
verify(storageLogic).delete("/path")
Unfortunately, this test won’t pass. It waits until the reactive stream executing save returned by the called method finishes. However, it is not enough: the reactive stream executing delete is still in the middle of execution. The ‘delete’ stream runs as a background task, is not returned to the outside world, so there is no way in the test to attach to it and wait until it finishes.
We can deal with it by checking the expected result in a loop:
whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()
verify(storageLogic, timeout(10_000)).delete("/path")
Here, passing
timeout() makes Mockito.verify() repeat pausing and checking the condition until it succeeds. There are special libraries that wait for a specified condition to become true, for example,
Awaitility, and they work similarly. It may seem we have good tools to solve the problem.
When Sleeping in Tests Is Not an Option
So, we have a test code that passes, but it’s not a good test. A single method takes 10 seconds to execute. If the code under test has a delay measured in minutes, this approach stops being practical. We cannot wait for so long.
Also, waiting introduces problems with test stability. I wrote that the last test passes, but it’s not true. With 10 second wait time, deleting is executed at roughly the same time when the verification starts. It’s a race condition, and validation is run too early on my machine. I have to add 10 milliseconds to wait time to make the test green. But with something occupying the CPU or a garbage collector pause, scheduled tasks won’t be run when they are supposed to, but later, and it’s hard to predict how long the additional delay will be. So I have to add a second or two or risk the test failing randomly.
Assuring Something Does Not Happen for Some Time
There is an incorrect code that passes the test above:
storage.save(path, data)
.onErrorResume(StorageException::class) {
storage.delete(path)
}
Here, no delay is applied.
We can modify the test to detect such a bug:
whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()
verify(storageLogic, after(9_000).never()).delete("/path")
verify(storageLogic, timeout(2_000)).delete("/path")
But as before, this is nearly putting Thread.sleep(9_000) into the test. Generally, if the code under test performs an action asynchronously, and we want to make sure the action is not performed, we have to put something like a sleep instruction into the test. If we need to be faster, we have to change the way we test.
What Is Virtual Time
This problem is not unique to writing tests. In Discrete Event Simulation, a real-life process or system is emulated on a computer. Imagine we want to prove that changing the weekly work schedule will improve how a factory works. We don’t want the computer to simulate a week minute by minute and get an answer after 7 days. Instead, the simulation manages a list of events (like “employee A starts a shift”), each with time assigned, and uses an internal variable as a clock. In each simulation step, this variable (called Virtual Time) is incremented, compared with the time assigned to remaining events, and events that just moved from being “future” to being “past” are “executed.”
The same principle can be applied to reactive applications. We stop running reactive operators and subscriptions on multiple threads and system clocks, which usually happens. Instead, there will be one thread to execute everything and a variable acting as a clock. It also simplifies tests: we don’t need to deal with asynchronous execution; everything is run on one thread, so it behaves like an asynchronous execution.
Virtual Time in Project Reactor
Project Reactor comes with a utility class that allows inspecting how reactive streams behave on a low level. Here I use it to take a look into an infinite sequence:
val infinite: Flux<Int> = Flux.generate(
{ 100 },
{ state, sink -> sink.next(state); state + 1 }
)
StepVerifier.create(infinite)
.expectNext(100, 101, 102, 103)
.thenCancel()
.verify()
One of its features will be of particular interest in this article: virtual time.
StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
.thenAwait(Duration.ofMillis(10_000))
.verifyComplete()
Note the different ways of constructing StepVerifier: I removed the call to create(). For code passed to withVirtualTime(), all calls to Schedulers.parallel(), Schedulers.boundedElastic() and so on return a test scheduler implementation. As a result, thenAwait() won’t cause the test to pause. The execution takes around 4 milliseconds on my computer. You can think what happens is similar to:
val virtualTime = VirtualTimeScheduler()
try {
Schedulers.setFactory(virtualTime)
val stepVerifier = StepVerifier.create(fileService.upload("/path", "content"))
virtualTime.advanceTimeBy(Duration.ofMillis(10_000))
stepVerifier.verifyComplete()
} finally {
Schedulers.resetFactory()
}
Schedulers are replaced only for the time of executing StepVerifier checks, then set back to normal (multi-threaded) ones.
Test That Does Not Sleep
Let’s see the complete test code:
whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
.thenAwait(Duration.ofMillis(10_000))
.verifyComplete()
verify(storageLogic).delete("/shirt")
It looks similar to the original version relying on Mockito.timeout(), but it works differently. Previously, a background thread executing delayed deletion ran concurrently to test execution, so the test had to wait. We could see different threads in logs:
10:39:17.443 [Test worker ] INFO Scheduling /path cleanup after error
10:39:27.444 [parallel-3 ] INFO Removing /path
There are no waits with StepVerifier virtual time, and nothing happens in the background. Calling withVirtualTime() installs a special scheduler that does not use a thread pool. Advancing the virtual clock by 10 seconds immediately executes the delayed deletion. After verifyComplete() finishes, deleting is done, and we can verify it with a simple call. Let’s see the logs:
10:39:17.427 [Test worker ] INFO Scheduling /path cleanup after error
10:39:17.430 [Test worker ] INFO Removing /path
Code Compatible With Virtual Time
StepVerifier virtual time works not only with Mono.delay(). It also properly handles delaySubscription(), delayElement() and similar methods of Flux, for example delayElements(). Also, delayed tasks created with Reactor’s Scheduler.schedule() will work properly:
Schedulers.boundedElastic().schedule(
{ storage.deleteSync(path) },
deleteDelay.toMillis(), MILLISECONDS
)
Advanced StepVerifier Usage
So far, we have seen expectNext() that checks that elements are emitted and thenAwait() that advances Virtual Time. There are more methods, for example expectNextMatches() allowing to pass a predicate performing the check, or expectError() allowing to verify an error appears in the sequence. What about verifying that nothing happens? This code will compile but won’t work:
StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
.expectNoEvent(Duration.ofMillis(10_000))
In our case, deleting happens in the background, and we cannot detect it in the test this way.
Correct solution: use StepVerifier.then() to execute some custom checks during verification, for example, assuring deletion mock is not called.
StepVerifier.withVirtualTime { fileService.uploadNoDelay("/path", "content") }
.then { verify(storageLogic, never()).delete(any()) }
.thenAwait(Duration.ofMillis(10_000))
.verifyComplete()
verify(storageLogic).delete("/path")
The test above fails if delete is started without any delay.
Conclusion
With Virtual Time, time flow is detached from how the system clock runs, and instead is controlled by a ‘simulation’ scenario. You can freeze execution and verify the system state. Then, no background actions will execute and interfere with assertions. Or you can ‘fast-forward’ by an arbitrary amount of time and execute actions immediately, without waiting.
Project Reactor has built-in support for Virtual Time. You can use StepVerifier, a well-documented test utility prepared by library authors.
Project Reactor handles Virtual Time in a transparent way. There is no need to modify your production code to make it testable using this method, or program in a special way. StepVerifier will change the way the typical usage of schedulers works, injecting its own scheduler. This special implementation works on Virtual Time instead of thread pools.
Switching to Virtual Time is evident in code structure and enforces a proper cleanup. There is nothing like Spring Boot auto-configuration or similar magic. You have to add a bit of code in each place explicitly. After StepVerifier work is done, schedulers are restored to regular operation.
The behavior of code running in Virtual Time is easier to understand. This is because all actions are executed in one thread. Race conditions and other concurrency problems won’t happen. It’s easier to inspect system state and write assertions. There is no need to apply specialized libraries for checking asynchronous state, like Awaitility.
Think of this technique as one of the tools you have at your disposal. Don’t try to apply it everywhere. Use it when the code you want to test has complicated logic related to time handling.
Links
https://projectreactor.io/docs/core/release/reference/#testing – official documentation of StepVerifier
https://github.com/pkubowicz/virtual-time – a sample project allowing to execute examples from this text