Coroutines – a better match for Kotlin than Reactive Streams?

Coroutines – a better match for Kotlin than Reactive Streams?

Piotr Kubowicz - May 12, 2020

Using Reactive Programming is tempting because it promises to handle higher traffic on the same hardware. Yet this paradigm is also demanding: it increases code complexity and may make bugs harder to notice. Let’s explore how you can use Kotlin and coroutines to reduce the cognitive load of maintaining reactive applications.

Sample code

Imagine you are developing a website that can be used by both logged-in and anonymous users, just like Wikipedia. At the same time, there are proper mechanisms in place to block misbehaving users. Our focus will be the back-end code returning a welcome message displayed on top of the main page:

class WelcomeService {
    fun welcome(usernameOrIp: String): WelcomeMessage {
        val userProfile: UserProfile? = findUserProfile(usernameOrIp)
        val block: Block? = findBlock(usernameOrIp)
        return generateWelcome(usernameOrIp, userProfile, block)
    }
}

The code follows the principles of ’traditional’ blocking style. It’s simple – the business logic allows the user to provide their personal data like the first and last name, and the user can sometimes be blocked by administrators. Nullable types express this clearly. The generateWelcome logic is not relevant to this article, but let’s examine it briefly:

fun generateWelcome(
    usernameOrIp: String,
    userProfile: UserProfile?,
    block: Block?
): WelcomeMessage =
    when {
        block != null -> WelcomeMessage(
            "You are blocked. Reason: ${block.reason}",
            WARNING
        )
        else -> WelcomeMessage(
            "Hello ${userProfile?.fullName ?: usernameOrIp}",
            INFO
        )
    }

Again, null handling in Kotlin allows expressing our intent without too much ceremony.

Reactive code is complex

Now, let’s try to move the service code to the reactive world (using Reactor library):

fun welcome(usernameOrIp: String): Mono<WelcomeMessage> {
    return userProfileRepository.findById(usernameOrIp)
        .zipWith(blockRepository.findById(usernameOrIp))
        .map { tuple ->
            generateWelcome(usernameOrIp, tuple.t1, tuple.t2)
        }
}

It uses reactive repositories, so userProfileRepository.findById() returns Mono<UserProfile>. blockRepository behaves similarly. The two reactive streams are joined using a ‘zip’ operator that emits a Tuple (pair).

One thing that becomes immediately obvious is that even such a simple piece of reactive code becomes complicated and is structured in a completely different way than the initial one. There is another problem with the snippet above. It’s incorrect.

What’s worse, the problem does not manifest itself openly. You probably won’t see any exception. If the result of welcome is returned from a Spring REST controller, sometimes the REST endpoint will respond as expected, but on other occasions it will serve HTTP 200 with an empty body.

What is happening then? The documentation of the ‘zip’ operator says that if any input is empty then the resulting reactive stream is cancelled. If either the user profile or user block is not be found, generateWelcome won’t be executed. Previously, in the blocking and imperative version of this code, the compiler warned us to handle the case when a variable is null. Using reactive streams cancels this Kotlin advantage – the programmer needs to remember about the ’empty’ case, there is no automatic hint.

A fixed version of the code above may look as follows:

fun welcome(usernameOrIp: String): Mono<WelcomeMessage> {
    return userProfileRepository.findById(usernameOrIp)
        .map { Optional.of(it) }
        .defaultIfEmpty(Optional.empty())
        .zipWith(blockRepository.findById(usernameOrIp)
            .map { Optional.of(it) }
            .defaultIfEmpty(Optional.empty()))
        .map { tuple ->
            generateWelcome(
                usernameOrIp, tuple.t1.orElse(null), tuple.t2.orElse(null)
            )
        }
}

It makes the code correct, but at what cost? Most of the contents are now just noise, it’s hard to see the business logic in it. ‘Optional’ and ’t1’ are not words from the domain of this system. This piece isn’t ‘coded in the language of the domain’ (see Kevlin Henney discussing this concept by Dan North more extensively). A person who is not fairly familiar with the system won’t understand what this method does by looking at it – it is not immediately clear what are arguments of generateWelcome, you need to look at the function definition because the call is too cryptic.

Only a minority of this reactive code serves its true purpose

You could extract some expressions to local variables:

val userProfile = tuple.t1.orElse(null)

but this would make the code even more Byzantine. Kotlin Destructuring Declaration paired with Reactor Kotlin extensions could alleviate the problem a bit by introducing meaningful names without declaring additional variables:

import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2

.map { (userProfile, block) ->
    generateWelcome(usernameOrIp, userProfile.orElse(null), block.orElse(null))
}

However, the method still remains quite complicated. In reality, we may have more than two inputs, and each input may be much more verbose than just getting an element from a repository. In this way, using a ‘zip’ operator may produce a method that is orders of magnitude more convoluted than a simplistic example here and such cosmetic changes as properly naming zip inputs or outputs won’t make it any more understandable.

Improving readability with coroutines

Coroutines allow writing concurrent Kotlin code in a different way. The basis is suspended functions, which are programmed like normal functions, without wrapping values in futures or reactive streams, but still allow for non-blocking asynchronous execution. As highlighted by Venkat Subramaniam in his Exploring Coroutines in Kotlin talk, coroutines allow writing concurrent code structured similarly to traditional imperative sequential code.

There is a possibility to bridge reactive streams and coroutines using kotlinx-coroutines-reactive library prepared by the Kotlin team. We have already seen how easy it is to miss an empty Mono. awaitFirstOrNull can translate a Mono instance into an instance of unboxed nullable type:

val userProfile: UserProfile? =
    userProfileRepository.findById(usernameOrIp).awaitFirstOrNull()

The transformation is non-blocking, so the whole welcome function has to become a suspended function:

suspend fun welcome(usernameOrIp: String): WelcomeMessage {
    val userProfile = userProfileRepository.findById(usernameOrIp).awaitFirstOrNull()
    val block = blockRepository.findById(usernameOrIp).awaitFirstOrNull()
    return generateWelcome(usernameOrIp, userProfile, block)
}

Note how similar this code is to the initial, sequential and blocking version:

class WelcomeService {
    fun welcome(usernameOrIp: String): WelcomeMessage {
        val userProfile: UserProfile? = findUserProfile(usernameOrIp)
        val block: Block? = findBlock(usernameOrIp)
        return generateWelcome(usernameOrIp, userProfile, block)
    }
}

Coroutines allow asynchronous code to leverage the power of Kotlin in handling nullability. If you change the generateWelcome signature to take non-null parameters, suspending welcome will fail to compile.

Spring WebFlux already supports receiving results from suspended functions. Here is a sample code:

@RestController
class WelcomeController(
    private val welcomeService: WelcomeService
) {
    @GetMapping("/welcome")
    suspend fun welcome(@RequestParam ip: String) =
        welcomeService.welcome(ip)
}

For detailed information on how coroutines are supported, read the Kotlin support section in Spring Framework documentation.

What if you still want to return a reactive stream? You can use suspending functions internally to be able to easily handle nullability and build a Mono instance using the mono function from the kotlinx-coroutines-reactor library:

fun welcome(usernameOrIp: String): Mono<WelcomeMessage> {
    return mono {
        val userProfile = userProfileRepository.findById(usernameOrIp).awaitFirstOrNull()
        val block = blockRepository.findById(usernameOrIp).awaitFirstOrNull()
        generateWelcome(usernameOrIp, userProfile, block)
    }
}

The kotlinx-coroutines-rx2 library offers similar conversions for RxJava 2 types.

Conclusion

Using reactive streams in Kotlin cancels one of its biggest advantages: nullability handling. Handling the absence of value becomes verbose and prone to errors.

Coroutines offer another way of building asynchronous and non-blocking applications. Suspending functions are written similarly to regular functions, so they don’t require writing code in a completely different way, unlike in reactive streams. Handling exceptions and nullability are done like in a sequential Kotlin code, so you can count on the compiler to catch your mistakes.

A mixed approach is possible: reactive streams can be converted to suspended functions, and reactive streams can be produced by calling suspended functions. Spring WebFlux supports both returning reactive streams from REST controller methods as well as controller methods that are suspended functions.

About the author

Piotr Kubowicz

Piotr Kubowicz

Software Engineer

Linkedin profile Twitter Github profile

Piotr is a polyglot developer who has been coding in Java for over ten years. He also tried many other languages, from C and Perl to Ruby.
During his time at nexocode, Piotr's primary focus has been on evolving team culture and ongoing projects by developing build automation and systems architecture to ensure delivery is smooth even as project codebases get bigger and more complex. As an active developer in the community, you can notice him speaking at various meetups and conferences.

Tempted to work
on something
as creative?

That’s all we do.

join nexocode

This article is a part of

Zero Legacy
36 articles

Zero Legacy

What goes on behind the scenes in our engineering team? How do we solve large-scale technical challenges? How do we ensure our applications run smoothly? How do we perform testing and strive for clean code?

Follow our article series to get insight into our developers' current work and learn from their experience. Expect to see technical details, architecture discussions, reviews on libraries and tools we use, best practices on software quality, and maybe even some fail stories.

check it out

Zero Legacy

Insights from nexocode team just one click away

Sign up for our newsletter and don't miss out on the updates from our team on engineering and teal culture.

Done!

Thanks for joining the newsletter

Check your inbox for the confirmation email & enjoy the read!

This site uses cookies for analytical purposes.

Accept Privacy Policy

In the interests of your safety and to implement the principle of lawful, reliable and transparent processing of your personal data when using our services, we developed this document called the Privacy Policy. This document regulates the processing and protection of Users’ personal data in connection with their use of the Website and has been prepared by Nexocode.

To ensure the protection of Users' personal data, Nexocode applies appropriate organizational and technical solutions to prevent privacy breaches. Nexocode implements measures to ensure security at the level which ensures compliance with applicable Polish and European laws such as:

  1. Regulation (EU) 2016/679 of the European Parliament and of the Council of 27 April 2016 on the protection of natural persons with regard to the processing of personal data and on the free movement of such data, and repealing Directive 95/46/EC (General Data Protection Regulation) (published in the Official Journal of the European Union L 119, p 1); Act of 10 May 2018 on personal data protection (published in the Journal of Laws of 2018, item 1000);
  2. Act of 18 July 2002 on providing services by electronic means;
  3. Telecommunications Law of 16 July 2004.

The Website is secured by the SSL protocol, which provides secure data transmission on the Internet.

1. Definitions

  1. User – a person that uses the Website, i.e. a natural person with full legal capacity, a legal person, or an organizational unit which is not a legal person to which specific provisions grant legal capacity.
  2. Nexocode – NEXOCODE sp. z o.o. with its registered office in Kraków, ul. Wadowicka 7, 30-347 Kraków, entered into the Register of Entrepreneurs of the National Court Register kept by the District Court for Kraków-Śródmieście in Kraków, 11th Commercial Department of the National Court Register, under the KRS number: 0000686992, NIP: 6762533324.
  3. Website – website run by Nexocode, at the URL: nexocode.com whose content is available to authorized persons.
  4. Cookies – small files saved by the server on the User's computer, which the server can read when when the website is accessed from the computer.
  5. SSL protocol – a special standard for transmitting data on the Internet which unlike ordinary methods of data transmission encrypts data transmission.
  6. System log – the information that the User's computer transmits to the server which may contain various data (e.g. the user’s IP number), allowing to determine the approximate location where the connection came from.
  7. IP address – individual number which is usually assigned to every computer connected to the Internet. The IP number can be permanently associated with the computer (static) or assigned to a given connection (dynamic).
  8. GDPR – Regulation 2016/679 of the European Parliament and of the Council of 27 April 2016 on the protection of individuals regarding the processing of personal data and onthe free transmission of such data, repealing Directive 95/46 / EC (General Data Protection Regulation).
  9. Personal data – information about an identified or identifiable natural person ("data subject"). An identifiable natural person is a person who can be directly or indirectly identified, in particular on the basis of identifiers such as name, identification number, location data, online identifiers or one or more specific factors determining the physical, physiological, genetic, mental, economic, cultural or social identity of a natural person.
  10. Processing – any operations performed on personal data, such as collecting, recording, storing, developing, modifying, sharing, and deleting, especially when performed in IT systems.

2. Cookies

The Website is secured by the SSL protocol, which provides secure data transmission on the Internet. The Website, in accordance with art. 173 of the Telecommunications Act of 16 July 2004 of the Republic of Poland, uses Cookies, i.e. data, in particular text files, stored on the User's end device.
Cookies are used to:

  1. improve user experience and facilitate navigation on the site;
  2. help to identify returning Users who access the website using the device on which Cookies were saved;
  3. creating statistics which help to understand how the Users use websites, which allows to improve their structure and content;
  4. adjusting the content of the Website pages to specific User’s preferences and optimizing the websites website experience to the each User's individual needs.

Cookies usually contain the name of the website from which they originate, their storage time on the end device and a unique number. On our Website, we use the following types of Cookies:

  • "Session" – cookie files stored on the User's end device until the Uses logs out, leaves the website or turns off the web browser;
  • "Persistent" – cookie files stored on the User's end device for the time specified in the Cookie file parameters or until they are deleted by the User;
  • "Performance" – cookies used specifically for gathering data on how visitors use a website to measure the performance of a website;
  • "Strictly necessary" – essential for browsing the website and using its features, such as accessing secure areas of the site;
  • "Functional" – cookies enabling remembering the settings selected by the User and personalizing the User interface;
  • "First-party" – cookies stored by the Website;
  • "Third-party" – cookies derived from a website other than the Website;
  • "Facebook cookies" – You should read Facebook cookies policy: www.facebook.com
  • "Other Google cookies" – Refer to Google cookie policy: google.com

3. How System Logs work on the Website

User's activity on the Website, including the User’s Personal Data, is recorded in System Logs. The information collected in the Logs is processed primarily for purposes related to the provision of services, i.e. for the purposes of:

  • analytics – to improve the quality of services provided by us as part of the Website and adapt its functionalities to the needs of the Users. The legal basis for processing in this case is the legitimate interest of Nexocode consisting in analyzing Users' activities and their preferences;
  • fraud detection, identification and countering threats to stability and correct operation of the Website.

4. Cookie mechanism on the Website

Our site uses basic cookies that facilitate the use of its resources. Cookies contain useful information and are stored on the User's computer – our server can read them when connecting to this computer again. Most web browsers allow cookies to be stored on the User's end device by default. Each User can change their Cookie settings in the web browser settings menu: Google ChromeOpen the menu (click the three-dot icon in the upper right corner), Settings > Advanced. In the "Privacy and security" section, click the Content Settings button. In the "Cookies and site date" section you can change the following Cookie settings:

  • Deleting cookies,
  • Blocking cookies by default,
  • Default permission for cookies,
  • Saving Cookies and website data by default and clearing them when the browser is closed,
  • Specifying exceptions for Cookies for specific websites or domains

Internet Explorer 6.0 and 7.0
From the browser menu (upper right corner): Tools > Internet Options > Privacy, click the Sites button. Use the slider to set the desired level, confirm the change with the OK button.

Mozilla Firefox
browser menu: Tools > Options > Privacy and security. Activate the “Custom” field. From there, you can check a relevant field to decide whether or not to accept cookies.

Opera
Open the browser’s settings menu: Go to the Advanced section > Site Settings > Cookies and site data. From there, adjust the setting: Allow sites to save and read cookie data

Safari
In the Safari drop-down menu, select Preferences and click the Security icon.From there, select the desired security level in the "Accept cookies" area.

Disabling Cookies in your browser does not deprive you of access to the resources of the Website. Web browsers, by default, allow storing Cookies on the User's end device. Website Users can freely adjust cookie settings. The web browser allows you to delete cookies. It is also possible to automatically block cookies. Detailed information on this subject is provided in the help or documentation of the specific web browser used by the User. The User can decide not to receive Cookies by changing browser settings. However, disabling Cookies necessary for authentication, security or remembering User preferences may impact user experience, or even make the Website unusable.

5. Additional information

External links may be placed on the Website enabling Users to directly reach other website. Also, while using the Website, cookies may also be placed on the User’s device from other entities, in particular from third parties such as Google, in order to enable the use the functionalities of the Website integrated with these third parties. Each of such providers sets out the rules for the use of cookies in their privacy policy, so for security reasons we recommend that you read the privacy policy document before using these pages. We reserve the right to change this privacy policy at any time by publishing an updated version on our Website. After making the change, the privacy policy will be published on the page with a new date. For more information on the conditions of providing services, in particular the rules of using the Website, contracting, as well as the conditions of accessing content and using the Website, please refer to the the Website’s Terms and Conditions.

Nexocode Team

Close

Want to be a part of our engineering team?

Join our teal organization and work on challenging projects.

CHECK OPEN POSITIONS