Fast Reactor Tests With Virtual Time

Fast Reactor Tests With Virtual Time

Piotr Kubowicz - December 27, 2021

Sometimes your code deals with a situation when things happen slowly. Maybe you schedule a background task that runs after some time. Or run a special action when asking for data that takes far too long. Either way, it is a tricky case that needs to be tested well. But what to do if we don’t want a test that waits a lot? Project Reactor, a reactive programming library for JVM, handles concurrency in a high-level and declarative fashion. Its test utility, StepVerifier, allows using Virtual Time: ‘mock’ the clock and advance time in your tests faster than the system clock runs.

A similar and tricky situation is when you need to test that a defined action is not executed for a given time. A brute force solution is to wait until time passes and then check nothing prohibited happened. Again, with Virtual Time, we can write a test that executes immediately.

The good news is: your code does not need to be prepared to run under Virtual Time. Just write it as usual, and there will be no problem testing it using this technique. You might also appreciate that this technique can cause a multi-threaded logic to behave as if it was single-threaded. With fewer moving parts, it’s easier to assert the state of the code under test is what it is supposed to be.

Waiting for Asynchronous Action to Happen

Let’s meet our sample Kotlin code.

fun upload(path: String, data: String): Mono<Void> =
    storage.save(path, data)
        .onErrorResume(StorageException::class) {
            scheduleDelete(path)
            Mono.empty()
        }

private fun scheduleDelete(path: String) {
    Mono.delay(Duration.ofMillis(10_000))
        .then(storage.delete(path))
        .subscribe({ logger.info("Cleaned up $path") })
}

In production code, the Storage class calls some external API. We don’t want network calls during test execution, so we replace this class with a test implementation that is easy to control:

class FakeStorage(private val logic: StorageLogic) : Storage {
    override fun save(path: String, data: String) =
        Mono.fromRunnable<Void> { logic.save(path, data) }

    override fun delete(path: String) =
        Mono.fromRunnable<Void> { logic.delete(path) }
}

interface StorageLogic {
    fun save(path: String, data: String)
    fun delete(path: String)
}

FakeStorage serves as a bridge between reactive streams and regular mocks. It seems it’s easy to write a unit test:

whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()

verify(storageLogic).delete("/path")

Unfortunately, this test won’t pass. It waits until the reactive stream executing save returned by the called method finishes. However, it is not enough: the reactive stream executing delete is still in the middle of execution. The ‘delete’ stream runs as a background task, is not returned to the outside world, so there is no way in the test to attach to it and wait until it finishes. We can deal with it by checking the expected result in a loop:

whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()

verify(storageLogic, timeout(10_000)).delete("/path")

Here, passing timeout() makes Mockito.verify() repeat pausing and checking the condition until it succeeds. There are special libraries that wait for a specified condition to become true, for example, Awaitility, and they work similarly. It may seem we have good tools to solve the problem.

When Sleeping in Tests Is Not an Option

So, we have a test code that passes, but it’s not a good test. A single method takes 10 seconds to execute. If the code under test has a delay measured in minutes, this approach stops being practical. We cannot wait for so long.

Also, waiting introduces problems with test stability. I wrote that the last test passes, but it’s not true. With 10 second wait time, deleting is executed at roughly the same time when the verification starts. It’s a race condition, and validation is run too early on my machine. I have to add 10 milliseconds to wait time to make the test green. But with something occupying the CPU or a garbage collector pause, scheduled tasks won’t be run when they are supposed to, but later, and it’s hard to predict how long the additional delay will be. So I have to add a second or two or risk the test failing randomly.

Assuring Something Does Not Happen for Some Time

There is an incorrect code that passes the test above:

storage.save(path, data)
    .onErrorResume(StorageException::class) {
        storage.delete(path)
    }

Here, no delay is applied. We can modify the test to detect such a bug:

whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
fileService.upload("/path", "content").block()

verify(storageLogic, after(9_000).never()).delete("/path")
verify(storageLogic, timeout(2_000)).delete("/path")

But as before, this is nearly putting Thread.sleep(9_000) into the test. Generally, if the code under test performs an action asynchronously, and we want to make sure the action is not performed, we have to put something like a sleep instruction into the test. If we need to be faster, we have to change the way we test.

What Is Virtual Time

This problem is not unique to writing tests. In Discrete Event Simulation, a real-life process or system is emulated on a computer. Imagine we want to prove that changing the weekly work schedule will improve how a factory works. We don’t want the computer to simulate a week minute by minute and get an answer after 7 days. Instead, the simulation manages a list of events (like “employee A starts a shift”), each with time assigned, and uses an internal variable as a clock. In each simulation step, this variable (called Virtual Time) is incremented, compared with the time assigned to remaining events, and events that just moved from being “future” to being “past” are “executed.”

The same principle can be applied to reactive applications. We stop running reactive operators and subscriptions on multiple threads and system clocks, which usually happens. Instead, there will be one thread to execute everything and a variable acting as a clock. It also simplifies tests: we don’t need to deal with asynchronous execution; everything is run on one thread, so it behaves like an asynchronous execution.

Virtual Time in Project Reactor

Project Reactor comes with a utility class that allows inspecting how reactive streams behave on a low level. Here I use it to take a look into an infinite sequence:

val infinite: Flux<Int> = Flux.generate(
    { 100 },
    { state, sink -> sink.next(state); state + 1 }
)

StepVerifier.create(infinite)
    .expectNext(100, 101, 102, 103)
    .thenCancel()
    .verify()

One of its features will be of particular interest in this article: virtual time.

StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
    .thenAwait(Duration.ofMillis(10_000))
    .verifyComplete()

Note the different ways of constructing StepVerifier: I removed the call to create(). For code passed to withVirtualTime(), all calls to Schedulers.parallel(), Schedulers.boundedElastic() and so on return a test scheduler implementation. As a result, thenAwait() won’t cause the test to pause. The execution takes around 4 milliseconds on my computer. You can think what happens is similar to:

val virtualTime = VirtualTimeScheduler()
try {
    Schedulers.setFactory(virtualTime)
    val stepVerifier = StepVerifier.create(fileService.upload("/path", "content"))
    virtualTime.advanceTimeBy(Duration.ofMillis(10_000))
    stepVerifier.verifyComplete()
} finally {
    Schedulers.resetFactory()
}

Schedulers are replaced only for the time of executing StepVerifier checks, then set back to normal (multi-threaded) ones.

Test That Does Not Sleep

Let’s see the complete test code:

whenever(storageLogic.save(any(), any())).thenThrow(StorageException())
StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
    .thenAwait(Duration.ofMillis(10_000))
    .verifyComplete()

verify(storageLogic).delete("/shirt")

It looks similar to the original version relying on Mockito.timeout(), but it works differently. Previously, a background thread executing delayed deletion ran concurrently to test execution, so the test had to wait. We could see different threads in logs:

10:39:17.443 [Test worker ] INFO  Scheduling /path cleanup after error
10:39:27.444 [parallel-3  ] INFO  Removing /path

There are no waits with StepVerifier virtual time, and nothing happens in the background. Calling withVirtualTime() installs a special scheduler that does not use a thread pool. Advancing the virtual clock by 10 seconds immediately executes the delayed deletion. After verifyComplete() finishes, deleting is done, and we can verify it with a simple call. Let’s see the logs:

10:39:17.427 [Test worker ] INFO  Scheduling /path cleanup after error
10:39:17.430 [Test worker ] INFO  Removing /path

Code Compatible With Virtual Time StepVerifier virtual time works not only with Mono.delay(). It also properly handles delaySubscription(), delayElement() and similar methods of Flux, for example delayElements(). Also, delayed tasks created with Reactor’s Scheduler.schedule() will work properly:

Schedulers.boundedElastic().schedule(
    { storage.deleteSync(path) },
    deleteDelay.toMillis(), MILLISECONDS
)

Advanced StepVerifier Usage

So far, we have seen expectNext() that checks that elements are emitted and thenAwait() that advances Virtual Time. There are more methods, for example expectNextMatches() allowing to pass a predicate performing the check, or expectError() allowing to verify an error appears in the sequence. What about verifying that nothing happens? This code will compile but won’t work:

StepVerifier.withVirtualTime { fileService.upload("/path", "content") }
    .expectNoEvent(Duration.ofMillis(10_000))

In our case, deleting happens in the background, and we cannot detect it in the test this way. Correct solution: use StepVerifier.then() to execute some custom checks during verification, for example, assuring deletion mock is not called.

StepVerifier.withVirtualTime { fileService.uploadNoDelay("/path", "content") }
    .then { verify(storageLogic, never()).delete(any()) }
    .thenAwait(Duration.ofMillis(10_000))
    .verifyComplete()

verify(storageLogic).delete("/path")

The test above fails if delete is started without any delay.

Conclusion

With Virtual Time, time flow is detached from how the system clock runs, and instead is controlled by a ‘simulation’ scenario. You can freeze execution and verify the system state. Then, no background actions will execute and interfere with assertions. Or you can ‘fast-forward’ by an arbitrary amount of time and execute actions immediately, without waiting.

Project Reactor has built-in support for Virtual Time. You can use StepVerifier, a well-documented test utility prepared by library authors.

Project Reactor handles Virtual Time in a transparent way. There is no need to modify your production code to make it testable using this method, or program in a special way. StepVerifier will change the way the typical usage of schedulers works, injecting its own scheduler. This special implementation works on Virtual Time instead of thread pools. Switching to Virtual Time is evident in code structure and enforces a proper cleanup. There is nothing like Spring Boot auto-configuration or similar magic. You have to add a bit of code in each place explicitly. After StepVerifier work is done, schedulers are restored to regular operation.

The behavior of code running in Virtual Time is easier to understand. This is because all actions are executed in one thread. Race conditions and other concurrency problems won’t happen. It’s easier to inspect system state and write assertions. There is no need to apply specialized libraries for checking asynchronous state, like Awaitility.

Think of this technique as one of the tools you have at your disposal. Don’t try to apply it everywhere. Use it when the code you want to test has complicated logic related to time handling.

https://projectreactor.io/docs/core/release/reference/#testing – official documentation of StepVerifier

https://github.com/pkubowicz/virtual-time – a sample project allowing to execute examples from this text

About the author

Piotr Kubowicz

Piotr Kubowicz

Software Engineer

Linkedin profile Twitter Github profile

Piotr is a polyglot developer who has been coding in Java for over ten years. He also tried many other languages, from C and Perl to Ruby.
During his time at nexocode, Piotr's primary focus has been on evolving team culture and ongoing projects by developing build automation and systems architecture to ensure delivery is smooth even as project codebases get bigger and more complex. As an active developer in the community, you can notice him speaking at various meetups and conferences.

Would you like to discuss
AI opportunities in your
business?

Let us know and Mateusz
will arrange a call with our experts.

Thanks for the message!

We'll do our best to get back to you
as soon as possible.

More articles

Find us on

Need help with implementing AI in your business?

Let's talk blue circle

This site uses cookies for analytical purposes.

Accept Privacy Policy

In the interests of your safety and to implement the principle of lawful, reliable and transparent processing of your personal data when using our services, we developed this document called the Privacy Policy. This document regulates the processing and protection of Users’ personal data in connection with their use of the Website and has been prepared by Nexocode.

To ensure the protection of Users' personal data, Nexocode applies appropriate organizational and technical solutions to prevent privacy breaches. Nexocode implements measures to ensure security at the level which ensures compliance with applicable Polish and European laws such as:

  1. Regulation (EU) 2016/679 of the European Parliament and of the Council of 27 April 2016 on the protection of natural persons with regard to the processing of personal data and on the free movement of such data, and repealing Directive 95/46/EC (General Data Protection Regulation) (published in the Official Journal of the European Union L 119, p 1); Act of 10 May 2018 on personal data protection (published in the Journal of Laws of 2018, item 1000);
  2. Act of 18 July 2002 on providing services by electronic means;
  3. Telecommunications Law of 16 July 2004.

The Website is secured by the SSL protocol, which provides secure data transmission on the Internet.

1. Definitions

  1. User – a person that uses the Website, i.e. a natural person with full legal capacity, a legal person, or an organizational unit which is not a legal person to which specific provisions grant legal capacity.
  2. Nexocode – NEXOCODE sp. z o.o. with its registered office in Kraków, ul. Generała Henryka Kamieńskiego 51, 30-644 Kraków, entered into the Register of Entrepreneurs of the National Court Register kept by the District Court for Kraków-Śródmieście in Kraków, 11th Commercial Department of the National Court Register, under the KRS number: 0000686992, NIP: 6762533324.
  3. Website – website run by Nexocode, at the URL: nexocode.com whose content is available to authorized persons.
  4. Cookies – small files saved by the server on the User's computer, which the server can read when when the website is accessed from the computer.
  5. SSL protocol – a special standard for transmitting data on the Internet which unlike ordinary methods of data transmission encrypts data transmission.
  6. System log – the information that the User's computer transmits to the server which may contain various data (e.g. the user’s IP number), allowing to determine the approximate location where the connection came from.
  7. IP address – individual number which is usually assigned to every computer connected to the Internet. The IP number can be permanently associated with the computer (static) or assigned to a given connection (dynamic).
  8. GDPR – Regulation 2016/679 of the European Parliament and of the Council of 27 April 2016 on the protection of individuals regarding the processing of personal data and onthe free transmission of such data, repealing Directive 95/46 / EC (General Data Protection Regulation).
  9. Personal data – information about an identified or identifiable natural person ("data subject"). An identifiable natural person is a person who can be directly or indirectly identified, in particular on the basis of identifiers such as name, identification number, location data, online identifiers or one or more specific factors determining the physical, physiological, genetic, mental, economic, cultural or social identity of a natural person.
  10. Processing – any operations performed on personal data, such as collecting, recording, storing, developing, modifying, sharing, and deleting, especially when performed in IT systems.

2. Cookies

The Website is secured by the SSL protocol, which provides secure data transmission on the Internet. The Website, in accordance with art. 173 of the Telecommunications Act of 16 July 2004 of the Republic of Poland, uses Cookies, i.e. data, in particular text files, stored on the User's end device.
Cookies are used to:

  1. improve user experience and facilitate navigation on the site;
  2. help to identify returning Users who access the website using the device on which Cookies were saved;
  3. creating statistics which help to understand how the Users use websites, which allows to improve their structure and content;
  4. adjusting the content of the Website pages to specific User’s preferences and optimizing the websites website experience to the each User's individual needs.

Cookies usually contain the name of the website from which they originate, their storage time on the end device and a unique number. On our Website, we use the following types of Cookies:

  • "Session" – cookie files stored on the User's end device until the Uses logs out, leaves the website or turns off the web browser;
  • "Persistent" – cookie files stored on the User's end device for the time specified in the Cookie file parameters or until they are deleted by the User;
  • "Performance" – cookies used specifically for gathering data on how visitors use a website to measure the performance of a website;
  • "Strictly necessary" – essential for browsing the website and using its features, such as accessing secure areas of the site;
  • "Functional" – cookies enabling remembering the settings selected by the User and personalizing the User interface;
  • "First-party" – cookies stored by the Website;
  • "Third-party" – cookies derived from a website other than the Website;
  • "Facebook cookies" – You should read Facebook cookies policy: https://www.facebook.com/policy/cookies
  • "Other Google cookies" – Refer to Google cookie policy: www.google.com/policies/technologies/types/

3. How System Logs work on the Website

User's activity on the Website, including the User’s Personal Data, is recorded in System Logs. The information collected in the Logs is processed primarily for purposes related to the provision of services, i.e. for the purposes of:

  • analytics – to improve the quality of services provided by us as part of the Website and adapt its functionalities to the needs of the Users. The legal basis for processing in this case is the legitimate interest of Nexocode consisting in analyzing Users' activities and their preferences;
  • fraud detection, identification and countering threats to stability and correct operation of the Website.

4. Cookie mechanism on the Website

Our site uses basic cookies that facilitate the use of its resources. Cookies contain useful information and are stored on the User's computer – our server can read them when connecting to this computer again. Most web browsers allow cookies to be stored on the User's end device by default. Each User can change their Cookie settings in the web browser settings menu: Google ChromeOpen the menu (click the three-dot icon in the upper right corner), Settings > Advanced. In the "Privacy and security" section, click the Content Settings button. In the "Cookies and site date" section you can change the following Cookie settings:

  • Deleting cookies,
  • Blocking cookies by default,
  • Default permission for cookies,
  • Saving Cookies and website data by default and clearing them when the browser is closed,
  • Specifying exceptions for Cookies for specific websites or domains

Internet Explorer 6.0 and 7.0
From the browser menu (upper right corner): Tools > Internet Options > Privacy, click the Sites button. Use the slider to set the desired level, confirm the change with the OK button.

Mozilla Firefox
browser menu: Tools > Options > Privacy and security. Activate the “Custom” field. From there, you can check a relevant field to decide whether or not to accept cookies.

Opera
Open the browser’s settings menu: Go to the Advanced section > Site Settings > Cookies and site data. From there, adjust the setting: Allow sites to save and read cookie data

Safari
In the Safari drop-down menu, select Preferences and click the Security icon.From there, select the desired security level in the "Accept cookies" area.

Disabling Cookies in your browser does not deprive you of access to the resources of the Website. Web browsers, by default, allow storing Cookies on the User's end device. Website Users can freely adjust cookie settings. The web browser allows you to delete cookies. It is also possible to automatically block cookies. Detailed information on this subject is provided in the help or documentation of the specific web browser used by the User. The User can decide not to receive Cookies by changing browser settings. However, disabling Cookies necessary for authentication, security or remembering User preferences may impact user experience, or even make the Website unusable.

5. Additional information

External links may be placed on the Website enabling Users to directly reach other website. Also, while using the Website, cookies may also be placed on the User’s device from other entities, in particular from third parties such as Google, in order to enable the use the functionalities of the Website integrated with these third parties. Each of such providers sets out the rules for the use of cookies in their privacy policy, so for security reasons we recommend that you read the privacy policy document before using these pages. We reserve the right to change this privacy policy at any time by publishing an updated version on our Website. After making the change, the privacy policy will be published on the page with a new date. For more information on the conditions of providing services, in particular the rules of using the Website, contracting, as well as the conditions of accessing content and using the Website, please refer to the the Website’s Terms and Conditions.

Nexocode Team