2017. szeptember 11., hétfő

Interoperation between RxJava and Kotlin Coroutines

Introduction


Writing imperative-looking code with Kotlin Coroutines is certainly an attractive property of it, but I'd think things can get quite convoluted pretty fast once, for example, Selectors are involved.

I haven't gotten there to look at what Selectors are, I only read that they can help you implement a flatMap like stream combiner. We are not goind to do that now, RxJava can do it for us after all.

However, the reasonable question arises: if I have a coroutine generator, a coroutine transformation or simply want to receive items from a Flowable, how can I make RxJava work with these coroutines?

Easily with the combined magic of Kotlin Coroutines and RxJava coroutines!


Suspendable Emitter


A generator is a source-like construct that emits items followed by a terminal signal. It should be familiar from RxJava as the Flowable.generate() operator. It gives you a FlowableEmitter and the usual onNext, onError and onComplete calls on it.

One limitation is that you can call onNext only once per invocation of your (Bi)Consumer lambda that receives the emitter. The reason is that we can't block a second call to onNext and we don't want to buffer it either; therefore, RxJava cooperates with the developer.

Compiler supported suspension and state machine built by it, however, allow us to prevent a second call from getting through by suspending it until there is a demand from the downstream, which then resumes the coroutine where it left off. Therefore, we can lift the single onNext requirement for our Coroutine-based generator.

So let's define the SuspendEmitter interface


interface SuspendEmitter<in T> : CoroutineScope {

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


By extending the CoroutineScope, we provide useful infrastructure (i.e., coroutineContext, isActive) to the block that will target our SuspendEmitter. One can argue that why use onError and onComplete since a coroutine can throw and simply end. The reason is that this way, a coroutine can terminate the sequence from a transformation we'll see later, just like our recent mapFilter operator allows it.


The flow-producer

Given our context providing interface for a generator coroutine, let's define the generator method the user will call:


fun <T> produceFlow(generator: suspend SuspendEmitter.() -> Unit) : Flowable<T> {
    return Produce(generator)
}


(For those unfamiliar with the Kotlin syntax, the SuspendEmitter.() -> Unit is practically a one parameter lambda of signature (param: SuspendEmitter) -> Unit where, when the lambda is implemented, accessing methods of param do not need to be qualified by it, thus you can write onNext(1) instead of param.onNext(1).)

We have to implement a Flowable that interacts with a suspendable generator function in some fashion. When implementing source-like operators, one usually has to write a Subscription instance and call Subscriber.onSubscribe() with it.

class Produce<T>(private val generator: suspend SuspendEmitter<T>.() -> Unit) : 
        Flowable<T>() {
    override fun subscribeActual(s: Subscriber<in T>) {
        launch(Unconfined) {
            val parent = ProduceSubscription(s)
            parent.setJob(coroutineContext[Job])
            s.onSubscribe(parent)
            generator(parent)
        }
    }
}


Since the generator is a suspendable coroutine, we need a context where it can run. The Unconfined context gives us a trampolined execution environment where resumptions of suspended coroutines are not confined to any particular thread, as if you'd run with the trampoline() Scheduler in RxJava.

We create our Subscription, attach the Job of the coroutine context itself to bridge the cancellation from a downstream Subscription.cancel(), signal the custom Subscription to the downstream and then execute the provided producer block by supplying it the parent which also implements SuspendEmitter.

So far, nothing is too hairy or convoluted, however, the interaction between regular trampolined coroutines of RxJava and the Kotlin Coroutine infrastructure is more involved.

Non-blocking await/notify

We will need a way to get the generator coroutine suspended if there are no downstream requests and we have to resume that coroutine when the downstream does request an amount. This resembles the wait-notify pair of a typical BlockingQueue implementation where a blocking emission due to a full queue gets unblocked by a notification by a concurrent take()/poll() invocation. Since we don't want to block and the coroutine infrastructure supports programmatic resuming of a coroutine, we'll use this feature in two helper methods establishing a non-blocking wait-notify exchange:


typealias Cont = Continuation<Unit>

fun notify(ref: AtomicReference<Cont?>) {
    while (true) {
        val cont = ref.get()
        val next : Cont?
        if (cont != null && cont != TOKEN) {
            if (ref.compareAndSet(cont, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(cont, TOKEN)) {
                break;
            }
        }
    }
}


We will use a valueless Continuation<Unit>, Cont for short, and atomics to place an indicator or an actual continuation object in an AtomicReference. The notify() atomically performs the following logic: if there is a real continuation in the reference, we clear it and then call resume on it to trigger the resumption. Otherwise, we set it to the shared TOKEN object indicating that when the other side, await, wanted to get continued, it can do so immediately.

fun await(ref: AtomicReference<Cont?>, cont: Cont) {
    while (true) {
        val a = ref.get()
        if (a == TOKEN) {
            if (ref.compareAndSet(a, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(a, cont)) {
                break;
            }
        }

    }
}


The await() method uses the same reference and the continuation instance provided by a suspendCoroutine in its code block.The method atomically checks if there is a TOKEN and if so, it calls resume on the continuation parameter after clearing the TOKEN from the reference. Otherwise, it stores the continuation in the reference and quits.

val TOKEN: Cont = object: Cont {
    override val context: CoroutineContext
        get() = throw UnsupportedOperationException()

    override fun resume(value: Unit) {
        throw UnsupportedOperationException()
    }

    override fun resumeWithException(exception: Throwable) {
        throw UnsupportedOperationException()
    }

}


Finally, the TOKEN is just an empty implementation of a Continuation - we should never call its methods as the object reference itself serves only a purpose of indicator for an immediate resumption.



The ProduceSubscription  

Now we can implement the ProduceSubscription class. First, let's see the skeleton with the relevant fields:

open class ProduceSubscription<T>(
        private val actual: Subscriber<in T>,
        private val ctx : CoroutineContext
) : Subscription, SuspendEmitter<T> {

    companion object {
        val CANCELLED = Object()
    }

    @Suppress("DEPRECATION")
    override val context: CoroutineContext
        get() = ctx!!

    override val isActive: Boolean
        get() = job.get() != CANCELLED

    private val job = AtomicReference<Any>()

    private val requested = AtomicLong()

    private val resume = AtomicReference<Cont?>()

    private var done: Boolean = false

    override suspend fun onNext(t: T) {
        // TODO implement
    }

    override suspend fun onError(t: Throwable) {
        // TODO implement
    }

    override suspend fun onComplete() {
        // TODO implement
    }

    override fun cancel() {
        // TODO implement
    }

    override fun request(n: Long) {
        // TODO implement
    }

    fun setJob(j: Job?) {
        // TODO implement
    }
}

We see the methods of both Subscription and SuspendEmitter along with a couple of fields/properties:


  • It takes the downstream's Subscriber and the CoroutineContext it will provide to the produce callback in the operator.
  • We will use the companion object's CANCELLED value to indicate the the parent job we get from the coroutineContext is cancelled exactly once.
  • It considers being active when the job object is not the CANCELLED indicator
  • Of which Job is then stored in the job AtomicReference.
  • We have to track the requested amount from downstream via an AtomicLong.
  • The resume AtomicReference stores the continuation to be used with the non-blocking await-notify shown in the previous section.
  • Finally, we have the done flag indicating the generator coroutine called onError or onComplete at most once.
Perhaps the main difficulty lies in the implementation of the onNext method as it is the primary interaction point between a coroutine that has to be suspended if there are no requests:


    override suspend fun onNext(t: T) {
        if (job.get() == CANCELLED) {
            suspendCoroutine<Unit> { }
        }
        val r = requested.get()
        if (r == 0L) {
            suspendCoroutine<Unit> { cont -> await(resume, cont)  }
        }

        actual.onNext(t)

        if (job.get() == CANCELLED) {
            suspendCoroutine<Unit> { }
        }
        if (resume.get() == TOKEN) {
            resume.compareAndSet(TOKEN, null)
        }
        if (r != Long.MAX_VALUE) {
            requested.decrementAndGet()
        }
    }


First we check if the downstream has cancelled the generator in which case we should get out of the coroutine entirely. I'm not sure if there is a more appropriate way for doing this other than suspending indefinely.

Next, we check the request amount and if it is zero, we suspend the current coroutine by using our non-blocking await mechanism. Once notified, or there was at least one requested item, the code should continue with the emission of the item. This could trigger an in-sequence cancellation and we suspend the coroutine indefinitely again.

Since the downstream can immediately request some amount due to the s.onSubscribe(parent) call in the operator, before the generator can even run and call onNext, we may have a TOKEN in the resume field, that would otherwise incorrectly indicate the next call to await it can resume immediately, violating the backpressure we expect. I know this sounds convoluted, but I learned it the hard way...

Finally, we decrement the request amount if not unbounded.

The onError and onComplete look pretty much alike:


    override suspend fun onError(t: Throwable) {
        if (!done) {
            done = true
            actual.onError(t)
            cancel()
        }
        suspendCoroutine<Unit> { }
    }

    override suspend fun onComplete() {
        if (!done) {
            done = true
            actual.onComplete()
            cancel()
        }
        suspendCoroutine<Unit> { }
    }


We set the done flag to true, emit the relevant event to the downstream and then cancel the job/Subscription we are running with. I defensively suspend the coroutine afterwards.

Next we see how cancel() and setJob() works:

    override fun cancel() {
        val o = job.getAndSet(CANCELLED)
        if (o != CANCELLED) {
            (o as Job).cancel()
        }
    }

    fun setJob(j: Job?) {
        while (true) {
            val o = job.get()
            if (o == CANCELLED) {
                j?.cancel()
                break
            }
            if (job.compareAndSet(o, j)) {
                break
            }
        }
    }


They are pretty much implemented along RxJava's typical deferred cancellation mechanism. cancel() atomically swaps in the CANCELLED indicator and calls cancel on the Job it contained. setJob() atomically set the Job instance or cancels it if cancel() swapped in the CANCELLED indicator just before that.

Lastly, the request() implementation that is responsible for accounting downstream requests and resuming the suspended generator if inside onNext().

    override fun request(n: Long) {
        if (BackpressureHelper.add(requested, n) == 0L) {
            notify(resume)
        }
    }


In the RxJava world, a transition from 0 to n triggers the emission loop in a range() operator for example. Here, we notify a possibly suspended coroutine that will resume from the await() method we implemented.

Testing it is simple with RxJava:


val f = produceFlow {
    for (i in 0 until 10) {
         println("Generating $i")
         onNext(i)
    }
    onComplete()
}

f.test(0)
.assertEmpty()
.requestMore(5)
.assertValues(0, 1, 2, 3, 4)
.requestMore(5)
.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


Outstanding!

The flow-transformer

Now that we have a way to emit items, we would like to emit an item in response to an upstream value, like the map() operator but with a suspendable coroutine function. RxJava's map is confined to return one item in exchange for one upstream item.

With coroutines and the ProduceSubscription described in the previous section, we could emit any number of items without overflowing a Subscriber!

Let's define our API and a skeleton implementation for it first:


fun <T, R> Flowable<T>.transform(
        transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable<R> {
    return Transform(this, transformer)
}

class Transform<T, R>(
        private val source: Flowable<T>, 
        private val transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable<R>() {
    override fun subscribeActual(s: Subscriber<in R>) {
        // TODO implement
    }
}


We define a transform extension method on Flowable with a suspendable transformer that takes our SuspendEmitter, the upstream's value and returns nothing.

This time, we have an upstream we have to subscribe to via a regular FlowableSubscriber from RxJava, call the coroutine in some way and make sure we keep calling the upstream for more values as we have to deal with the backpressure of the coroutine itself transitively.

The first step into this direction is the handling of the upstream's own Subscription we get through Subscriber.onSubscribe. We have to attach that to the Subscription we show to the downstream Subscriber. Since we will use the ProduceSubscription anyway, we extend it and override its cancel() for this purpose:


class ProduceWithResource<T>(
        actual: Subscriber<in T>,
        ctx : CoroutineContext
) : ProduceSubscription<T>(actual, ctx) {
    private val resource = AtomicReference<Subscription>()

    fun setResource(s: Subscription) {
        SubscriptionHelper.replace(resource, s)
    }

    override fun cancel() {
        SubscriptionHelper.cancel(resource)
        super.cancel()
    }
}


We simply use the deferred cancellation helper for Subscriptions.

Now let's see how we can prepare the context for running the coroutine inside the transform operator's subscribeActual() method:

    val ctx = newCoroutineContext(Unconfined)
    val parent = ProduceWithResource(s, ctx)
    s.onSubscribe(parent)
    source.subscribe(object: FlowableSubscriber<T> {

        var upstream : Subscription? = null

        val wip = AtomicInteger()
        var error: Throwable? = null

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }
    })


First we create an unconfinded context where each invocation of the transformer coroutine will execute and suspend in. We create the producer that can hold an additional Subscription and send it to the downstream Subscriber. Finally, we subscribe to the upstream with a FlowableSubscriber.

In this custom FlowableSubscriber, we will have request from upstream, thus we save the Subscription we'll get from it. The wip and error fields will be used to achieve something similar to a half-serialization. I'll explain it once the methods are implemented.

Handling onSubscribe() is straightforward and typical for an RxJava operator:


    override fun onSubscribe(s: Subscription) {
        upstream = s
        parent.setResource(s)
        s.request(1)
    }


We store the upstream's subscription locally and in the ProducerWithResource to link up the cancellation across the operator. Then we request one item; this is partly due to simplifying the interaction between a suspended coroutine and the upstream producer. Using larger prefetch would require the use of some intermediate queue - possible, but left for the reader as an exercise. (Finally, we found a use for request(1)!)

Next, onNext():

    override fun onNext(t: T) {
        launch(ctx) {
           parent.setJob(coroutineContext[Job])

           wip.getAndIncrement()

           transformer(parent, t)

           if (wip.decrementAndGet() == 0) {
               upstream!!.request(1)
           } else {
               val ex = error;
               if (ex == null) {
                   s.onComplete()
               } else {
                   s.onError(ex)
               }
               parent.cancel()
           }
       }
    }

First, the Job of the actual coroutineContext has to be stored so a downstream cancellation can can call its Job.cancel() method. We have to do this because we will go in and out of the launch() when the upstream sends an item.

Next, the wip counter is incremented, which may seem odd. The reason for this is that if the transformer coroutine gets suspended, the execution returns to the caller of onNext(), a regular RxJava producer of some sorts. If this producer has reached its end, it will call onError or onComplete as these can be issued without request. As we'll see a bit later, forwarding these signals cuts out any pending emission from the suspended coroutine, therefore, we use the pattern of a half-serializer to save this terminal indication.

The transformer is executed with the parent ProducerWithResource instance that handles the suspendable onNext emissions towards the downstream.

Once the transformer's job has been done, the execution (resumes) with the atomic decrement of the wip counter. If it successfully decrements to 0, there was no terminal event signalled from the upstream while the transformer was suspended, thus we can request the next item from the upstream RxJava source.

The onError and onComplete are much simpler fortunately:


    override fun onError(t: Throwable) {
        error = t
        if (wip.getAndIncrement() == 0) {
            s.onError(t)
            parent.cancel()
        }
    }

    override fun onComplete() {
        if (wip.getAndIncrement() == 0) {
            s.onComplete()
            parent.cancel()
        }
    }


We store the Throwable (in onError only), then atomically increment the wip counter. If there was no ongoing coroutine, we are safe to emit the terminal event and cleanup/cancel the contextual Job we may still be referencing. If the original wip value was 1, the increment bumps it to 2 and the decrement in onNext() will detect the terminal condition and act accordingly.

Let's test it (by reusing the generator for fun)!

    f.transform({
        if (it % 2 == 0) {
            onNext(it)
        }
    })
    .test()
    .assertResult(0, 2, 4, 6, 8)

    f.transform({
        onNext(it)
        onNext(it + 1)
    })
    .test()
    .assertResult(0, 1, 1, 2, 2, 3, 3, 4, 4,
            5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10)

    f.transform({
        launch(CommonPool) {
            onNext(it + 1)
        }
    })
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


We can filter or amplify a source, synchronously or asynchronously if necessary with a single operator! Excellent!

The receiver

The last operation we'd do is, given a Flowable flow, we'd like to return to the coroutine world and consume the flow. For that, a ReceiverChannel seems to be appropriate output type as it can be for-each looped nicely.

Let's define the extension method toReceiver() with a skeleton as well:


suspend fun <T> Flowable<T>.toReceiver(capacityHint: Int = 128) : ReceiveChannel<T> {
    val queue = Channel<T>(capacityHint)

    val upstream = AtomicReference<Subscription>()
    val error = AtomicReference<Throwable>()
    val wip = AtomicInteger()

    subscribe(object: FlowableSubscriber<T> {

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

    })

    return // TODO implement
}


First, a Channel of type T and the given capacity is created. It is followed by the AtomicReference that will hold the source Flowable's Subscription, which will have to be linked up with the consumer to propagate cancellation. Next, since the upstream may signal terminal events while the channel is suspended in a send() we'll use - similar to the ProducerWithResource.onNext() situation, we will use the same AtomicInteger-based technique. The error AtomicReference will serve as the intermediary when handing over the terminal event to the channel.

Let's see the FlowableSubscriber implementation first:

        override fun onSubscribe(s: Subscription) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1)
            }
        }

        override fun onNext(t: T) {
            launch (Unconfined) {
                wip.getAndIncrement()

                queue.send(t);

                if (wip.decrementAndGet() == 0) {
                    upstream.get().request(1)
                } else {
                    queue.cancel(error.get());
                }
            }
        }

        override fun onComplete() {
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel();
                }
            }
        }

        override fun onError(t: Throwable) {
            error.lazySet(t)
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel(t);
                }
            }
        }


The FlowableSubscriber implementation, practically, performs the same bookeeping as the transformer() operator did, with the exception that the closing of the channel has to happen in a launch-provided context.

However, this is only the producer half of the channel, we still need the consumer part, more specifically, the consumer-reemitter. Luckily, the build in produce() operator of the Coroutines library help with it. Why not return the channel directly? Because we need a way to detect if the channel is closed from the consumer's end and Channel doesn't allow us to register a completion handler for it. However, the Job inside the coroutineContext of produce() does:

    return produce(Unconfined) {
        coroutineContext[Job]?.invokeOnCompletion { 
            SubscriptionHelper.cancel(upstream) 
        }

        for (v in queue) send(v)
    }


Let's test this last operator:

runBlocking {
    for (i in f.toReceiver()) {
         println(i)
    }
    println("Done")

    for (i in f.subscribeOn(Schedulers.single()).toReceiver()) {
         println("Async $i")
    }
    println("Async Done")
}


Well done!

Conclusion

In this blog post, I demonstrated how one can write three operators, produceFlow, transform and toReceiver, that can interoperate with RxJava's own, backpressure enabled Flowable type reasonably well.

This should prove that both technologies, at the end, can be combined by the developer as seen fit for the target domain or business requirements.

This was somewhat a heated week for me so for now, until something interesting comes up in this topic, me writing about Kotlin Coroutines will be ... suspended.

Nincsenek megjegyzések:

Megjegyzés küldése