Article
The RxJava Threading Mistake You Are Probably Making
December 1, 2021
TL;DR: If you think you completely understand threading in RxJava, take a closer look at the behavior of Processor
classes…
In addition to having a steep learning curve, RxJava has enough secrets to continue to confound developers long after they believe they have mastered it. What follows is one such curiosity related to threading.
The Setup
Consider the following RxJava call chain:
where waitForData
and makeNetworkRequestWithData
are both functions returning the reactive type Single<T>
.
Now assume the following:
- The project containing this call chain configures network requests (like
makeNetworkRequestWithData
) so that they run synchronously by default but may be placed on a background thread usingsubscribeOn
. For years this was the default behavior of the RxJava adapters for Retrofit, the most commonly used networking library for Android. - In this project, all functions returning RxJava types are intentionally configured in a similar way: by default, calls run synchronously and alternative threading may similarly be supplied by callers using the
subscribeOn
operator. - Background threading is intended to be supplied by all final callers (the ones ultimately calling
subscribe(...)
) by adding asubscribeOn
call near the end of the chain.
Given these assumptions, the intention of the above code is to place all code above the subscribeOn(Schedulers.io())
call on an I/O bound thread and all code below the observeOn(AndroidSchedulers.mainThread())
on the main thread of the app process. It is worth noting here that while this is not necessarily a universal setup, it has been a very common one.
The Problem
This brings us to the point of interest in the setup described above: what Scheduler
is actually used when making the network request in the example above?
The answer would seem to be clear: as intended, this should be determined by the value that has been supplied to the only subscribeOn
call in the chain, Schedulers.io()
.
In many cases that would be true. There are, however, two key exceptions here and they both depend on the details of the first call in this chain, waitForData
:
The Violation
First, consider a broken assumption. Perhaps this call does not conform to the patterns of the project and it has intentionally applied its own Scheduler
such that it runs on and emits on a thread of its choosing. For example, perhaps the call is constructed as:
As far as this call is concerned, this subscribeOn(Schedulers.computation())
is all that matters and calculateData
will be run on a thread determined by Schedulers.computation()
. The emission of the result then happens on that same thread and it forces our network call in question, makeNetworkRequestWithData
, to run on this thread as well (and not one based on Schedulers.io()
as the code intends).
The difference here is subtle and will result in a thread meant for heavy computations simply sitting idle while waiting for a network response. While this issue might go unnoticed, it could still be “fixed” by essentially considering this setup a mistake, a violation of the app’s threading patterns. The subscribeOn
call here could be removed to bring it into conformity with the rest of the app and the remaining subscribeOn
call in the main chain will work as expected.
The next exception, however, is not so easily addressed.
The Surprise
What if the call to waitForData
was not based on a calculation or a network request, but some manual emission from something like a PublishProcessor
? Consider the following:
This code returns a Single<Data>
that emits as soon as a value is pushed to the dataProcessor
. Note that there are no calls to subscribeOn
or observeOn
, so this function appears to satisfy the project’s assumptions and should therefore obey the subscribeOn(Schedulers.io())
call of the main chain and ultimately emit on an I/O bound thread.
There is just one problem: a PublishProcessor
(or any other kind of Processor
) completely ignores all calls to subscribeOn
. In fact, from the perspective of a downstream caller there is no good way at all to control the thread a PublishProcessor
emits on. This results in all code downstream of a PublishProcessor
running on the thread that it emits on until the downstream threading is manually changed with something like a call to observeOn
.
So what thread does a PublishProcessor
actually emit on? The answer is simple: on whatever thread values are pushed to it and therefore whatever thread processor.onNext(...)
is called on. If you’re looking carefully, this is spelled out in the documentation :
Scheduler:
PublishProcessor
does not operate by default on a particularScheduler
and theSubscriber
s get notified on the thread the respectiveonXXX
methods were invoked.
This could be a background thread or even the main thread (which can easily happen if the data you are waiting on depends on an OS-level event). In the latter case, the call chain we’ve been discussing would result in a NetworkOnMainThreadException
in what would seem like an impossible error. Once this fact is fully absorbed it can seem obvious, but it at first seems to violate our intuition for how operators like subscribeOn
are supposed to work.
The Resolution
Unfortunately, there is no perfect solution to the above problem with the given app constraints. For example, to prevent receiving a NetworkOnMainThreadException
, the final subscribeOn
call (which is being ignored) could be replaced with the careful use of observeOn
or subscribeOn
elsewhere. For example, observeOn
could be used immediately after the waitForData
call:
Or subscribeOn
could be applied directly to the network request:
Either way, though, we must break our initial assumptions. Special cases could be made when working with Processor
classes, but that is not a very satisfying approach.
The root issue here is assuming a single subscribeOn
call at the end of the call chain was ever the appropriate way to supply threading for the whole call chain. Not only does the behavior of Processor
classes make this impossible to use consistently, but it also ignores the fact that different parts of a call chain might be best to run using different kinds of Scheduler
classes and that each function knows best which Scheduler
it requires.
It would seem, then, that a safer, more consistent pattern is to allow each function returning an RxJava type to apply its own threading and only expect the final consumer to be concerned with changing the downstream threading. Many RxJava users always knew this, but the difference could feel like a matter of choice or opinion. The behavior of Processor
classes, however, makes the answer clear. In fact, this is exactly what Retrofit now does with its RxJava 3 adapter: by default, all network requests now run on a background thread unless specified otherwise.
The Future
It is likely that many RxJava projects have encountered unexpected behavior due to this subtle fact about Processor
classes. The confusion here really stems in large part from the freedom with how the subscribeOn
and observeOn
operators can be used and how they actually work: they can be placed anywhere in a call chain, or not at all; rather than one strict pattern for their usage, several appear to be allowed; subscribeOn
applies threading to all upstream operators, except when it doesn’t.
While freedom can be very powerful, sometimes in programming opinionated frameworks and forced behavior can ultimately lead to less mistakes and more coherent code. Fortunately in this regard, the Android community is moving away from RxJava and toward Kotlin Coroutines. There are two key differences that avoid surprise threading issues like the one we have been discussing in RxJava:
- Consumers of suspending functions or
Flow
instances are required to supply aCoroutineScope
in order make the calls and receive results. Among other things, these scopes determine theDispatcher
used to define the threading behavior for any code downstream of the call in question and would be the equivalent of enforcing thatobserveOn
is always called at the beginning of an RxJava call chain after the initial call. - There is a very clear pattern in the coroutines community (and pushed by the developers of coroutines themselves) that all suspending functions and
Flow
instances should apply their own threading and should therefore be safe to be called from any other coroutine.
I go into detail on each of these points in a previous article on coroutine threading behavior .
Much like Kotlin has far overtaken Java, coroutines are undoubtedly the future for handling asynchronous work in Android. We must acknowledge, though, that as there are still many legacy projects that use Java, there will be many projects that continue to use RxJava. For these, it is important to be aware of the subtle threading issues discussed in this article and to also be mindful of adopting patterns that better match the threading behavior of Android’s future.
Brian works at Livefront, where threading is half the battle.