2017. október 19., csütörtök

Android LiveData API: a quick look

Introduction


Threading and lifecycle are one of the top concerns when developing applications for the Android platform. UI has to be interacted with on a dedicated thread (main thread) but in order to keep the UI responsible to user input and rendering, blocking or CPU intensive calculations should be kept off it.

In addition, views can get destroyed and recreated in a way that is outside of a given application's control unlike a desktop Swing application. This means background tasks must be stopped and listeners removed to prevent leaking references to now logically dead objects.

RxJava and RxAndroid can help with threading concerns and there are other libraries that tap into the various lifecycle events; in general, this means someone will call dispose() on a particular flow or clear() on a CompositeDisposable to mass-cancel multiples of them.

Having a rich set of transformative and coordinating operators along with support for normal values, errors and finite sequences may be overwhelming compared to a classical Listener-based API. Google's LiveData is one of such classical Listener style APIs but unlike Swing's ActionListener for example, there are explicit requirements that interaction with the LiveData object itself happens on the main thread and signals will be dispatched from the main thread to Observers to it.

LiveData API


Unfortunately, I wasn't able to locate a public repository for the LiveData sources and had to rely on the sources downloaded from Google's Maven repository:

compile "android.arch.lifecycle:reactivestreams:1+"

There is an interoperation library associated with LiveData that allows presenting and consuming events from any Reactive-Streams Publisher. This will transitively import the actual LiveData library. Note that LiveData is currently considered beta and may change arbitrarily before release. That said, I don't think the core structure and premise will actually change.

The main consumer type is the android.arch.lifecycle.Observer with its single onChanged(T) method. Given an arbitrary LiveData instance, one can use observe(LifecycleOwner, Observer) or observeForever(Observer) methods to attach the listener to a source. The onChanged(T) will be called on the main thread. Cancelling an observation requires one to call removeObserver() for which one has to remember both the original LiveData object and the Observer instance itself. All the methods return void. The LifecycleOwner will be listened for destroy events, at which point the Observers are removed from the LiveData instance that depended on that specific source. This is similar to RxBinding's bindToLifecycle() approach composed onto a flow.

The LiveData abstract class doesn't feature any transformative or coordinating operators on itself, but the library features an utility class, Transformations, currently with a map and switchMap operations. Both will execute the transformations and emissions on the main thread, but otherwise they match the behavior of the same RxJava operators.

In theory, one could interpret LiveData as a cold source, but its usage patterns point to a hot source, just like a BehaviorSubject in RxJava; LiveData remembers the last value and emits it first to new consumers. To signal to consumers, LiveData has protected methods setValue and postValue. setValue requires the value to be set on the main thread and postValue will schedule a task to the main thread to call setValue there. To manually invoke these methods, a MutableLiveData object is available where the two methods have been made public.


Possible shortcomings


The main shortcoming I see with LiveData is the ubiquitous "return to main thread" mentality. For example, querying data from a LiveData-enabled datasource and preprocessing it should happen in a background thread as well before the aggregated data is sent back to the main thread for displaying them. Unfortunately, the datasource will notify the preprocessing Observer on the main thread for each resulting item, which Observer then has to schedule tasks on a background thread for all those items. The main thread is involved too early in such flows which wastes time for the main thread.

A second shortcoming could be the lack of terminal event notifications included in the Observer API. Of course, one can define an union-class with the value, error and complete subtypes, just like RxJava's Notification object, to work around this problem. Needless to say, such wrapper will cost allocation and indirection.

Lastly, addListener/removeListener pairs compose relatively poorly and requires the operator to remember both the original LiveData and the Observer that was used when the link was established. An onSubscribe(Disposable) style cancellation support worked out much nicer in RxJava 2. Of course, having a similar method in Observer would break its lambda-friendly Single Abstract Method structure.


Reactive-Streams interoperation


Consuming a LiveData as Publisher


RxJava has infrastructure support to work with such classical Listener-style sources. Specifically, the create() operator on Observable and Flowable show such an example usage in its JavaDoc. Consequently, talking to LiveData is a straightforward routine:


LiveData<String> source = new MutableLiveData<String>();
Flowable<String> f = Flowable.<String>create(emitter -> {
   Observer<String> o = v -> {
       emitter.onNext(v);
   };
   
   source.observeForever(o);
   emitter.setCancellable(() -> 
       AndroidSchedulers.mainThread()
       .scheduleDirect(() -> 
            mld.removeObserver(o)
       )
   );
}, BackpressureStrategy.LATEST);


The Reactive-Streams bridge in LiveDataReactiveStreams, of course, has no access to Flowable and has to work out the Subscription management manually. Unfortunately, the toPublisher() implementation is wrong. If there was a public repository, I could contribute a correct one (without using RxJava of course).

Let's see the implementation piece by piece to demonstrate how not to write Publishers. (The file is marked as Apache 2.0 open source, should be fine to quote it here, right?)


    public static <T> Publisher<T> toPublisher(
            final LifecycleOwner lifecycle, final LiveData<T> liveData) {

        return new Publisher<T>() {
            boolean mObserving;
            boolean mCanceled;
            long mRequested;
            @Nullable
            T mLatest;

            @Override
            public void subscribe(final Subscriber<T> subscriber) {
                // continued below
            }
        };
    }


The first red flag comes from the mutable, non thread-safe state of the anonymous Publisher. Hot sources may have mutable fields but those are protected with proper synchronization primitives (see PublishProcessor for example). However, these fields should be part of the state of the Subscription that is sent to a Subscriber as Publishers are expected to be consumed by any number of Subscribers and interaction with one Subscriber should not affect the interactions with the others.

Of course, a Publisher can chose to only service a single Subscriber (as with UnicastProcessor), but for that, there is no field to let the subscribe() know there is already someone consuming the Publisher.

Next, the Observer instance is defined with a backpressure strategy of keeping the latest if the downstream is not ready.


    final Observer observer = new Observer() {
        @Override
        public void onChanged(@Nullable T t) {
            if (mCanceled) {
                return;
            }
            if (mRequested > 0) {
                mLatest = null;
                subscriber.onNext(t);
                if (mRequested != Long.MAX_VALUE) {
                    mRequested--;
                }
            } else {
                mLatest = t;
            }
        }
    };


Apart from the wrong shared state of the mCanceled, mRequested and mLatest, the code relies on the fact that both the onChanged invocation and the request() call happens on the main thread (see below). Having a per Subscriber atomic mRequested instead would save a trip to the main thread for an increment.

The next segment deals with the request() call from the downstream.


    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(final long n) {
            if (n < 0 || mCanceled) {
                return;
            }
            ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
                @Override
                public void run() {
                    if (mCanceled) {
                        return;
                    }
                    // Prevent overflowage.
                    mRequested = mRequested + n >= mRequested
                               ? mRequested + n : Long.MAX_VALUE;
                    if (!mObserving) {
                         mObserving = true;
                         liveData.observe(lifecycle, observer);
                    } else if (mLatest != null) {
                         observer.onChanged(mLatest);
                         mLatest = null;
                    }
                }
            });
        }



The first (smaller) problem is that non-positive requests should be rewarded with an onError(IllegalArgumentException) properly serialized with any onNext calls. Here, it is ignored. In addition, request() may be called from any thread and such call may not see mCanceled properly. The mObserving adds an odd delay to start observing the actual LiveData object. (Since most sequences will issue a request() almost immediately, I'm not sure what benefit this can give, however, there were voices pretty keen on such behavior recently who couldn't understand that to control the first invocation of request(), one would have to consume a Publisher directly with a Subscriber and not have any intermediate operators between the two.)

Lastly, let's see cancel():

    @Override
    public void cancel() {
        if (mCanceled) {
            return;
        }
        ArchTaskExecutor.getInstance().executeOnMainThread(new Runnable() {
            @Override
            public void run() {
                if (mCanceled) {
                    return;
                }
                if (mObserving) {
                    liveData.removeObserver(observer);
                    mObserving = false;
                }
                mLatest = null;
                mCanceled = true;
            }
       });
   }


Since removeObserver has to be called from the main thread (which by itself could be inconvenient), a task is scheduled which will also update mCanceled to true. Since cancel() can be invoked from any thread, a non-volatile mCanceled may not be visible. In addition, since the mCanceled is set to true on the main thread, there could be a significant delay between a cancel() and its effects, letting onChanged() calls still through. Setting a volatile mCanceled true right in the cancel() method could have the stopping effects much earlier.

(In fact, this was a problem with our early unsubscribeOn() where the Reactive-Streams TCK would complain about still receiving too events after it has cancelled the flow. The solution was to set a volatile cancelled flag that would be read by onNext and drop events until the upstream.cancel() got executed asynchronously.)

It is relatively simple to fix the mistakes here: combine Observer and Subscription into one class and move the mXXX fields into this class.

Exposing a Publisher as LiveData


The unfortunate lack of terminal events in LiveData forces us to deal with only onNext calls. There is no way to tell an Observer that no more events will come (unless agreeing upon an event container type) and then get rid of them en-masse (there is no clearObservers()).

With RxJava, this could be done relatively easily:


    MutableLiveData<String> mld2 = new MutableLiveData<>();
    Disposable d = Flowable.range(1, 5).map(Object::toString)
            .subscribe(mld2::postValue);


Of course, one has to manage the Disposable as usual and possibly decide, when to subscribe() to a cold source with respect to available Observers of the (Mutable)LiveData. A refCount-like behavior is possible but for that, one has to implement a custom LiveData object.

Apparently, there exist events on the LiveData class, onActive and onInactive, that will be called when the first Observer arrives and the last Observer leaves respectively. The ReactiveStreamsLiveData.fromPublisher() uses them in its implementation:

    private static class PublisherLiveData<T> extends LiveData<T> {
        private WeakReference<Subscription> mSubscriptionRef;
        private final Publisher mPublisher;
        private final Object mLock = new Object();

        PublisherLiveData(@NonNull final Publisher publisher) {
            mPublisher = publisher;
        }

        // the rest is below
}


The mSubscriptionRef holds the current active connection's Subscription and the mLock protects it so that a concurrent call to onSubscribe may not clash with the deactivation of the LiveData object. As we'll see in a short, this is not enough because the race can leave the upstream's Subscription from an old activation not cancelled and thus still posting to the LiveData object.

The onActivate event should subscribe to the source and relay its onNext events:

    @Override
    protected void onActive() {
        super.onActive();

        mPublisher.subscribe(new Subscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                // Don't worry about backpressure. If the stream is too noisy then
                // backpressure can be handled upstream.
                synchronized (mLock) {
                    s.request(Long.MAX_VALUE);
                    mSubscriptionRef = new WeakReference<>(s);
                }
            }

            @Override
            public void onNext(final T t) {
                postValue(t);
            }

            @Override
            public void onError(Throwable t) {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
                // Errors should be handled upstream, so propagate as a crash.
                throw new RuntimeException(t);
            }

            @Override
            public void onComplete() {
                synchronized (mLock) {
                    mSubscriptionRef = null;
                }
            }
        });
    }


Here, apart from the mSubscriptionRef management, the only small mistake is the call to request() from within the lock itself. A bigger mistake is that onError should not throw.

The onInactive should cancel any existing Subscription if the Subscription representing that connection has been received:


    @Override
    protected void onInactive() {
        super.onInactive();
        synchronized (mLock) {
            WeakReference<Subscription> subscriptionRef = mSubscriptionRef;
            if (subscriptionRef != null) {
                Subscription subscription = subscriptionRef.get();
                if (subscription != null) {
                    subscription.cancel();
                }
                mSubscriptionRef = null;
            }
        }
    }


Unfortunately, the call to onSubscribe is not guaranteed to happen on the same thread the Publisher.subscribe() is invoked. Even though RxJava's Flowable almost always does so, other Reactive-Streams implementations are free to delay the call to onSubscribe and issue it from any thread. (We deal with such situations in Flowable via the deferred requesting/cancellation mechanism.)

Consider the following scenario. We know onActive and onInactive execute on the main thread thus they are serialized in respect to each other. Let's assume a Publisher will signal onSubscribe in a delayed manner when subscribed to. The main thread executes onActive that calls subscribe() on the Publisher. Then the main thread executes onInactive() after. Since the Publisher hasn't called onSubscribe yet, there is no mSubscriptionRef and thus there is nothing to cancel. The onSubscribe() is then invoked by the Publisher and it starts streaming events (to nobody practically). Now the main thread executes onActive again, triggering yet another subscription to the Publisher. After onSubscribe() is called again, the previous Subscription is overwritten and there are now twice as many onNext events posted to the LiveData (these could be duplicates in case the Publisher is hot/multicasting, or arbitrary items from various stages of a cold Publisher). Since the previous Subscription is lost, it is not possible to stop the first subscription other than letting it terminate on its own pace.

One possible solution is to host the Subscriber in an AtomicReference inside the custom LiveData, plus have that Subscriber store the incoming Subscription in an AtomicReference too. Therefore, an onInactive can get rid of the previous Subscriber and at the same time, instruct it to cancel the Subscription incoming through onSubscribe without requesting from it:


    static final class PublisherLiveData<T> extends LiveData<T> {
        final Publisher<T> mPublisher;
        final AtomicReference<SubscriberLiveData> mSubscriber;

        PublisherLiveData(@NonNull final Publisher<T> publisher) {
            mPublisher = publisher;
            mSubscriber = new AtomicReference<>();
        }

        @Override
        protected void onActive() {
            super.onActive();
            SubscriberLiveData s = new SubscriberLiveData();
            mSubscriber.set(s);
            mPublisher.subscribe(s);
        }

        @Override
        protected void onInactive() {
            super.onInactive();
            SubscriberLiveData s = mSubscriber.getAndSet(null);
            if (s != null) {
                s.cancel();
            }
        }

        final class SubscriberLiveData 
        extends AtomicReference<Subscription>
        implements Subscriber<T>, Subscription {
            @Override
            public void onSubscribe(Subscription s) {
                if (compareAndSet(null, s)) {
                    s.request(Long.MAX_VALUE);
                } else {
                    s.cancel();
                }
            }

            @Override
            public void onNext(T item) {
                postValue(item);
            }

            @Override
            public void onError(Throwable ex) {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
                ex.printStackTrace();
            }

            @Override
            public void onComplete() {
                lazySet(this);
                mSubscriber.compareAndSet(this, null);
            }

            @Override
            public void cancel() {
                Subscription s = getAndSet(this);
                if (s != null && s != this) {
                    s.cancel();
                }
            }
            
            @Override
            public void request(long n) {
                // never called
            }
        }
    }

Although onActive and onInactive would update mSubscriber from the main thread, the atomics is preferable since we want to set the current Subscriber to null once the Publisher terminated, thus playing nice with the GC. (For comparison, the refCount in RxJava is more involved because connection and disconnection may be triggered from any thread at any time.)


Conclusion


The LiveData class can be considered a classical (gen 0) reactive-push component which has practically one purpose on Android: notify its Observers of data (changes) on the main thread. If the typical thread crossing in the app is mainly from a background thread to the main thread, LiveData may be just enough to suit one's needs. It's one step above Agera after all since LiveData actually transmits data to be acted upon, not just "something happened" signals. However, if possibly failing and finite sources are present, which also have to be coordinated, transformed and kept on background thread(s) as long as possible, I believe RxJava is a better choice.

Nevertheless, there are standard interoperation bridges provided with LiveData which would allow it to work with or pose as a Reactive-Streams Publisher, but unfortunately, the implementation of this bridge has concurrency flaws that could be easily fixed by a PR, if there was an open repository for the library (because you'd want to post unit tests as well).

2017. szeptember 30., szombat

Java 9 Flow API: taking and skipping

Introduction


Limiting or skipping over parts of a flow is a very common task: either we are only interested in the first N items or we don't care about the first N items. Sometimes, N is unknown but we can decide, based on the current item, when to stop relaying items or, in contrast, when to start relaying items.


Take(N)


In concept, limiting a flow to a certain size should be straightforward: count the number of items received via onNext and when the limit is reached, issue a cancel() towards the upstream and onComplete() towards the downstream.


public static <T> Flow.Publisher<T> take(Flow.Publisher<T> source, long n) {
    return new TakePublisher<>(source, n);
}


The operator's implementation requires little state:

static final class TakeSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
}


In its simplest form, there is no need for intercepting the request() and cancel() calls from the downstream: these can be passthrought, however, since the operator has to stop the sequence upon reaching the limit (remaining == 0), the upstream's Flow.Subscriber has to be stored.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        this.upstream = s;
        downstream.onSubscribe(s);
    }


In onSubscribe, we only have to store the Flow.Subscription and forward it to the downstream.

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r > 0L) {
            remaining = --r;
            downstream.onNext(item);
 
            if (r == 0) {
                upstream.cancel();
                downstream.onComplete();
            }
        }
    }


While remaining is positive, we decrement it and save it into its field followed by an emission of the current item. If the remaining count reached zero, the upstream is cancelled and the downstream is completed. Any further items will be ignored (in case cancel() doesn't immediately stop the upstream).

    @Override
    public void onError(Throwable throwable) {
        if (remaining != 0L) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (remaining != 0L) {
            downstream.onComplete();
        }
    }


The onError and onComplete check the remaining count and if it's positive, the respective terminal event is reached. The reason for this is that if the operator runs out of items before its limit, the terminal event is relayed as well. However, if the operator happens to run with a limit equal to the actual length of the flow, the last item in onNext will trigger an onComplete which could be followed by a terminal event from the upstream that has to be suppressed. This behavior is allowed by the Reactive-Streams specification (i.e., cancel() may not have immediate effect) and we have dealt with it via a done field in other operators. Here, the fact remaining == 0 is the done indicator.

Being a pass-through for backpressure (the downstream calls request() directly on the upstream's Flow.Subscription), the upstream is not limited by this operator and may attempt to produce more items than the limit. In other times, knowing the downstream requested more than the limit, the upstream can actually be consumed in an unbounded fashion, utilizing fast-paths may result in improved performance.

Deciding which of the three behaviors should be default is up to the operator writer, but it is interesting to look at the remaining two modes: limiting the request amount and unbounding it.

Limiting the request amount


In this mode, if the downstream requests more or equal to the limit, we can issue a single request with the limit amount. However, if the downstream requests less, we have to perform some intricate request accounting.

We'll need a new field, requestRemaining with a VarHandle REQUEST_REMAINING companion. We also have to intercept request() from downstream and figure out atomically what number to request from the upstream so the total requested amount doesn't go above the limit.


static final class TakeSubscriber<T> implements 
Flow.Subscriber<T>, Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    long requestRemaining;
    static final VarHandle REQUEST_REMAINING =
        VH.find(MethodHandles.lookup(), TakeSubscriber.class,
            "requestRemaining", long.class);

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
        this.requestRemaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
    }

    // onNext, onError and onComplete are the same

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        upstream.cancel();
    }
}


The onSubscribe now forwards this as the Flow.Subscription and cancel() just delegates to the upstream.cancel().

    @Override
    public void request(long n) {
        for (;;) {
            long r = (long)REQUEST_REMAINING.getAquire(this);
            long newRequest;
            if (r <= n) {
                newRequest = r;
            } else {
                newRequest = n;
            }
            long u = r - newRequest;
            if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
                upstream.request(newRequest);
                break;
            }
        }
    }


First we read the remaining request amount. If it is smaller or equal to the downstream's request, we'll request the remaining amount from the upstream. If the downstream requested less than the remaining amount, we'll request that amount instead. The new remaining amount is then the current minus the new request amount decided. After the successful CAS, we request the new amount from the upstream and quit the loop.


Unbounding the request amount


The other direction, namely, unbounding the upstream if the downstream requested at least the limit of the operator, can be achieved through the same requestRemaining field but a different request() logic:


    @Override
    public void request(long n) {
        long r = (long)REQUEST_REMAINING.getAcquire(this);
        if (r != 0L) {
             r = (long)REQUEST_REMAINING.getAndSet(this, 0L);
             if (r != 0L && r <= n) {
                 upstream.request(Long.MAX_VALUE);
                 return;
             }
        }
        upstream.request(n);
    }


If the remaining request amount is non-zero, we atomically replace it with zero. This happens exactly once, for the first time request() is invoked by the downstream, and we'll check if the remaining amount (same as the limit) is less or requal to the downstream's request amount. If so, we request from the upstream an unbounded amount (Long.MAX_VALUE). If it was not the first request or the amount was less than the limit, we revert back to the pass-through behavior.


Take with predicate


Sometimes, we'd like to stop the flow when an item matches a certain condition. This can happen before the item is emitted (i.e., takeWhile) or after (i.e., takeUntil). When working with a predicate, we no longer know how many items we'll let pass thus manipulating the request amount is not really an option here.


public static <T> Flow.Publisher<T> takeWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new TakeWhilePublisher<>(source, predicate);
}

// Flow.Publisher boilerplate omitted

static final class TakeWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean done;

    TakeWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe simply forwards the Flow.Subscription and the terminal onError/onComplete methods forward their respective event if the done flag is false. The flag will be set in onNext if the flow has been stopped due to the condition and is there to prevent the emission of multiple terminal events in case the flow would have ended anyway.

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        boolean b;

        try {
            b = predicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            downstream.onNext(item);
        } else {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


The first step is to protect against sources that would produce a few items after a cancel() call. Next, we don't trust the user-provided Predicate, which when crashes, will have to cancel the upstream, lock out further events and signal the Throwable to the downstream. A true result from the predicate will allow us to emit the item to the downstream. A false will stop the source, lock out further upstream events and complete the downstream.

If we'd still want to receive the item before the predicate indicates the flow should stop, aka the takeUntil operator, only the onNext logic should be changed a bit:

public static <T> Flow.Publisher<T> takeUntil(
        Flow.Publisher<T> source, Predicate<? super T> stopPredicate) {

    return new TakeUntilPublisher<>(source, stopPredicate);
}

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        downstream.onNext(item);

        boolean b;

        try {
            b = stopPredicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


Here, the stopPredicate of the API entry point should indicate when to stop by returning true, whereas the previous takeWhile operator indicated a stop via false. It is a matter of taste I guess. The RxJava convetion is that takeWhile(item -> item < 5) will take items while each of them is less than five, never emitting five itself whereas takeUntil(item -> item == 5) will stop after emitting five.


Skip(N)


The dual of take(N), in some sense, is the operator which skips the first N items then lets the rest through. Again, counting the items is a crucial part of the operator's implementation.


public static <T> Flow.Publisher<T> skip(
        Flow.Publisher<T> source, long n) {

    return new SkipPublisher<>(source, n);
}


The operator's Flow.Subscriber uses similar counting method as take(), decrementing a remaining field and once it reaches zero, all subsequent events are forwarded.

static final class SkipSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    SkipSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
}


The onError/onComplete methods can now emit the respective event directly without the need to check the remaining count. May the sequence be shorter, equal or longer than the provided limit, the terminal events can be emitted as onNext is there to skip items, not stop the flow.

In general, when an operator drops items, it has the responsibility to ask for more from the upstream as the downstream has no way of knowing it didn't receive an item (other than timing out). Thus, it doesn't know it has to request more and has to be requested on behalf by the operator itself. However, requesting one by one is expensive - 1 CAS + 0..2 atomic increments per invocation - which can be avoided by requesting in batches.

The skip() operator is in a particular position where we know the first N items will be dropped, thus we can simply request N on top of what the downstream requested. onNext will drop the first N and even if the downstream hasn't requested, it won't get overflown.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;

        long n = remaining;
        downstream.onSubscribe(s);
        s.request(n);
    }


The reason remaining is read before sending the Flow.Subscription to the downstream is that this call may result in value emission which decrements remaining and we'd end up with less than the amount to be skipped. This also saves a field storing the logically immutable skip amount.

The role of onNext is to drop items and then let the rest pass through:

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r == 0L) {
            downstream.onNext(item);
        } else {
            remaining = r - 1;
        }
    }


Skipping while a condition holds


Similar to a conditional take, we can skip unwanted items that match a condition (predicate) and the let the rest through unconditionally. Unfortunately, this case requires per item replenishment from the upstream if the condition doesn't hold as we can't be sure which upstream item will yield a true value and switch the operator into a pass-through mode.


public static <T> Flow.Publisher<T> skipWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new SkipWhilePublisher<>(source, predicate);
}

static final class SkipWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean passthrough;

    boolean done;

    SkipWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe becomes a direct pass-through for the Flow.Subscription and the onError/onComplete methods get their done checks back. This is required because the predicate may fail for the very last item and the cancellation may not stop the terminal event. There is also the need for a flag that tells the onNext to let all items through and skip the predicate altogether.




    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        if (!passthrough) {
            boolean b;
            try {
                b = predicate.test(item);
            } catch (Throwable ex) {
                upstream.cancel();
                done = true;
                downstream.onError(ex);
                return;
            }
            if (b) {
                upstream.request(1);
                return;
            }
            passthrough = true;           
        }

        downstream.onNext(item);
    }


First we prevent an excess onNext if there was a crash in the predicate before. Next, if the operator is not in the pass-through mode, we test with the predicate. If this turns out to be true, that indicates the item has to be dropped and a fresh item has to be requested as replenishment. Otherwise the operator enters its pass-through mode and the current and subsequent items will be emitted directly without invoking the predicate again. Implementing an "until" variant, where the predicate returning true still drops the current item, is left to the reader as an exercise.


Conclusion

The post demonstrated a way to implement the take, takeWhile, takeUntil, skip and skipWhile operators. The logic behind them doesn't require a fully fledged queue-drain approach and for being synchronous in-flow operators, don't really have to bother with concurrency and request management (except the two take alternatives which do have to manage requests).

The fact that the onXXX methods of a Flow.Subscriber are not allowed to be invoked concurrently greatly helps reducing the complexity and the need for being defensive all the time.

In the next post, we'll see how a multicasting Processor can be implemented and how a fan-out-fan-in operator can be implemented with its help.

2017. szeptember 27., szerda

Java 9 Flow API: arbitration and concatenation

Introduction


A very common task is to combine multiple sources, or more generally, start consuming a source once the previous source has terminated. The naive approach would be to simply call otherSource.subscribe(nextSubscriber) from onError or onComplete. Unfortunately, this doesn't work for two reasons: 1) it may end up with deep stacks due to a "tail" subscription from onError/onComplete and 2) we should request the remaining, unfulfilled amount from the new source that hasn't be provided by the previous source to not overflow the downstream.

The first issue can be solved by applying a heavyweight observeOn in general and implementing a basic trampolining loop only for certain concrete cases such as flow concatenation to be described in this post.

The second issue requires a more involved source: not only do we have to switch between Flow.Subscriptions from different sources, we have to make sure concurrent request() invocations are not lost and are routed to the proper Flow.Subscription along with any concurrent cancel() calls. Perhaps the difficulty is lessened by the fact that switching sources happens on a terminal event boundary only, thus we don't have to worry about the old source calling onNext while the logic switches to the new source and complicating the accounting of requested/emitted item counts. Enter SubscriptionArbiter.


Subscription arbitration


We have to deal with 4 types of potentially concurrent signals when arbitrating Flow.Subscriptions:


  1. A request(long) call from downstream that has to be routed to the current Flow.Subscription
  2. A cancel() call from downstream that has to be routed to the current Flow.Subscription and cancel any future Flow.Subscription.
  3. A setSubscription(Flow.Subscription) that is called by the current Flow.Subscriber after subscribing to any Flow.Publisher which is not guaranteed to happen on the same thread subscribe() is called (i.e., as with the standard SubmissionPublisher or our range() operator).
  4. A setProduced(long n) that is called when the previous source terminates and we want to make sure the new source will be requested the right amount; i.e., we'll have to deduce this amount from the current requested amount so setSubscription will issue the request for the remainder to the new Flow.Subscription.


Let's start with the skeleton of the SubscriptionArbiter class providing these methods:


public class SubscriptionArbiter implements Flow.Subscription {

    Flow.Subscription current;
    static final VarHandle CURRENT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "current", Flow.Subscription.class);

    Flow.Subscription next;
    static final VarHandle NEXT =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "next", Flow.Subscription.class);

    long requested;

    long downstreamRequested;
    static final VarHandle DOWNSTREAM_REQUESTED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "downstreamRequested", Flow.Subscription.class);
    
    long produced;
    static final VarHandle PRODUCED =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "produced", Flow.Subscription.class);

    long wip;
    static final VarHandle WIP =
        VH.find(MethodHandles.lookup(), SubscriptionArbiter.class,
            "wip", int.class);

    @Override
    public final void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    public final boolean isCancelled() {
        // TODO implement
        return false;
    }

    public final void setSubscription(Flow.Subscription s) {
        // TODO implement
    }

    public final void setProduced(long n) {
        // TODO implement
    }

    final void arbiterDrain() {
        // TODO implement
    }
}

We intend the class to be extended to save on allocation and object headers, however, some methods should not be overridden by any subclass as it would likely break the internal logic. The only relatively safe overridable method is cancel(): the subclass will likely have its own resources that have to be released upon a cancel() call from the downstream which will receive an instance of this class via onSubscribe. The meaning of each field is as follows:


  • current holds the current Flow.Subscription. Its companion CURRENT VarHandle is there to support cancellation.
  • next temporarily holds the next Flow.Subscription to replace current instance. Direct replacement can't work due to the required request accounting.
  • requested holds the current outstanding request count. It doesn't have any VarHandle because it will be only accessed from within a drain-loop.
  • downstreamRequested accumulates the downstream's requests in case there the drain loop is executing.
  • produced holds the number of items produced by the previous source which has to be deduced from requested before switching to the next source happens. It is accompanied by a VarHandle to ensure proper visibility of its value from within the drain loop.
  • wip is our standard work-in-progress counter to support the queue-drain like lock-free serialization we use almost everywhere now.

The first method we implement is request() that will be called by the downstream from an arbitrary thread at any time:


    @Override
    public final void request(long n) {
        for (;;) {
            long r = (long)DOWNSTREAM_REQUESTED.getAcquire(this);
            long u = r + n;
            if (u < 0L) {
                u = Long.MAX_VALUE;
            }
            if (DOWNSTREAM_REQUESTED.compareAndSet(this, r, u)) {
                arbiterDrain();
                break;
            }
        }
    }


We perform the usual atomic addition capped at Long.MAX_VALUE and call arbiterDrain().

    @Override
    public void cancel() {
        Flow.Subscription s = (Flow.Subscription)CURRENT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
        s = (Flow.Subscription)NEXT.getAndSet(this, this);
        if (s != null && s != this) {
            s.cancel();
        }
    }

    public final boolean isCancelled() {
        return CURRENT.getAcquire(this) == this;
    }


We atomically swap in both the current and the next Flow.Subscription instances with the cancelled indicator of this. To support some eagerness in cancellation, the isCancelled can be called by the subclass (i.e., concat an array of Flow.Publishers) to quit its trampolined looping.

Next, we "queue up" the next Flow.Subscription:


    public final void setSubscription(Flow.Subscription subscription) {
        if (NEXT.compareAndSet(this, null, subscription)) {
            arbiterDrain();
        } else {
            subscription.cancel();
        }
    }


We expect there will be only one thread calling setSubscription and that call happens before the termination of the associated source, thus a simple CAS from null to subscription should be enough. In this scenario, a failed CAS can only mean the arbiter was cancelled in the meantime and we cancel the subscription accordingly. We'll still have to relay the unfulfilled request amount to this new subscription which will be done in arbiterDrain().

The setProducer will have to "queue up" the fulfilled amount in a similar fashion:


    public final void setProduced(long n) {
        PRODUCED.setRelease(this, n);
        arbiterDrain();
    }


As with the setSubscription, we expect this to happen once per a terminated source before the subscription to the next source happens, thus there is no real need to atomically accumulate the item count.

Finally, let's see the heavy lifting in arbiterDrain() itself now:


    final void arbiterDrain() {
        if ((int)WIP.getAndAdd(this, 1) != 0) {
            return;
        }
        
        Flow.Subscription requestFrom = null;
        long requestAmount = 0L;

        for (;;) {

            // TODO implement

            if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
                break;
            }
        }

        if (requestFrom != null && requestFrom != this
                && requestAmount != 0L) {
            requestFrom.request(requestAmount);
        }
    }


The arbiterDrain(), whose name was chosen to avoid clashing with a subclass' drain() method if any, method starts out as most typical trampolined drain loop did: the atomic increment to wip from 0 to 1 enters the loop and the decrement to zero leaves the loop.

One oddity may come from the requestFrom and requestAmount local variables. Unlike a traditional stable-prefetch queue-drain, requesting from within the loop can bring back the reentrancy issue, the tail-subscription problem and may prevent other actions from happening with the arbiter until the request() call returns. Therefore, once the loop decided what the current target Flow.Subscription is, we'll issue a request to it outside the loop. It is possible by the time the drain method reaches the last if statement that the current requestFrom is outdated or the arbiter was cancelled. This is not a problem because request() and cancel() in general are expected to race and an outdated Flow.Subscription means it has already terminated and a request() call is a no-op to it.

The last part inside the loop has to "dequeue" the deferred changes and apply them to the state of the arbiter:


    for (;;) {

        // (1) ----------------------------------------------
        Flow.Subscription currentSub = (Flow.Subscription)CURRENT.getAcquire(this);
        if (currentSub != this) {

            // (2) ------------------------------------------
            long req = requested;

            long downstreamReq = (long)DOWNSTREAM_REQUESTED.getAndSet(this, 0L);

            long prod = (long)PRODUCED.getAndSet(this, 0L);

            Flow.Subscription nextSub = (Flow.Subscription)NEXT.getAcquire(this, null);
            if (nexSub != null && nextSub != this) {
                NEXT.compareAndSet(this, nextSub, null);
            }

            // (3) ------------------------------------------
            if (downstreamReq != 0L) {
                req += downstreamReq;
                if (req < 0L) {
                    req = Long.MAX_VALUE;
                }
            }

            // (4) ------------------------------------------
            if (prod != 0L && req != Long.MAX_VALUE) {
                req -= prod;
            }

            requested = req;

            // (5) ------------------------------------------
            if (nextSub != null && nextSub != this) {
                requestFrom = nextSub;
                requestAmount = req;
                CURRENT.compareAndSet(currentSub, nextSub);
            } else {
                // (6) --------------------------------------
                requestFrom = currentSub;
                requestAmount += downstreamReq;
                if (requestAmount < 0L) {
                    requestAmount = Long.MAX_VALUE;
                }
            }
        }

        if ((int)WIP.getAndAdd(this, -1) - 1 == 0) {
            break;
        }
    }



  1. First we check if the current instance holds the cancelled indicator (this). If so, we don't have to execute any of the logic as the arbiter has been cancelled by the downstream.
  2. We read out the current and queued state: the current outstanding requested amount, the request amount from the downstream if any, the produced item count by the previous source and the potential next Flow.Subscription instance. While it is safe to atomically swap in 0 for both the downstreamRequested and produced values, swapping in null unconditionally may overwrite the cancelled indicator and the setSubscription won't cancel its argument.
  3. If there was an asynchronous request() call, we add the downstreamReq amount to the current requested amount, capped at Long.MAX_VALUE (unbounded indicator).
  4. If there was a non-zero produced amount and the requested amount isn't Long.MAX_VALUE, we subtract the two. The new requested amount is then saved.
  5. If there was a new Flow.Subscription set via setSubscription, we indicate where to request from outside the loop and we indicate the whole current requested amount (now including any async downstream request and upstream produced count) should be used. The CAS will make sure the next Flow.Subscription only becomes the current one if there was no cancellation in the meantime.
  6. Otherwise, we target the current Flow.Subscription, add up the downstream's extra requests capped at Long.MAX_VALUE. The reason for this is that the downstream may issue multiple requests (r1, r2) in a quick succession which makes the logic to loop back again, now having r1 + r2 items outstanding from the downstream's perspective.

Now that the infrastructure is ready, let's implement a couple of operators.


Concatenating an array of Flow.Publishers


Perhaps the simplest operator we could write on top of the SubscriptionArbiter is the concat() operator. It consumes one Flow.Publisher after another in a non-overlapping fashion until all of them have completed.


@SafeVarargs
public static <T> Flow.Publisher<T> concat(Flow.Publisher<? extends T>... sources) {
    return new ConcatPublisher<>(sources);
}


The ConcatPublisher itself is straightforward: create a coordinator, send it to the downstream and trigger the consumption of the first source:


     @Override
     public void subscribe(Flow.Subscriber<? super T> s) {
         ConcatCoordinator<T> parent = new ConcatCoordinator<>(s, sources);
         s.onSubscribe(parent);
         parent.drain();
     }

The ConcatCoordinator can be implemented as follows:


static final class ConcatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {
    
    final Flow.Subscription<? super T> downstream;

    final Flow.Publisher<? extends T>[] sources;

    int index;

    int trampoline;
    static final VarHandle TRAMPOLINE =
        VH.find(MethodHandles.lookup(), ConcatCoordinator.class,
            "trampoline", int.class);

    long consumed;
    
    ConcatCoordinator(
           Flow.Subscription<? super T> downstream,
           Flow.Publisher<? extends T>[] sources
    ) {
        this.downstream = downstream;
        this.sources = sources;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }
}


The ConcatCoordinator extends the SubscriptionArbiter, thus it is a Flow.Subscription as well and as such will be used as the connection object towards the downstream. It also implements Flow.Subscription because we'll use the same instance to subscribe to all of the Flow.Publishers one after the other.

One may come up with the objection that reusing the same Flow.Subscriber instance is not allowed by the Reactive-Streams specification the Flow API inherited. However, the specification actually just discourages the reuse and otherwise mandates external synchronization so that the onXXX methods are invoked in a serialized manner. We'll see that the trampolining in the operator will just ensure that property along with the arbiter itself. Of course, we could just new up a Flow.Subscriber for the next source but that Flow.Subscriber would be itself nothing more than a delegator for the coordinator instance (no need for a per-source state in it); combining the two just saves on allocation and indirection.

The fields are interpreted as follows:


  • downstream is the Flow.Subscriber that receives the signals.
  • sources is the array of Flow.Publishers that will be consumed one after the other
  • index points at the current Flow.Publisher and gets incremented once one completes.
  • trampoline is the work-in-progress indicator for the drain loop; chosen to avoid clashing with the arbiter's own wip field in this blog for better readability. In practice, since they are in different classes, one can name them both wip.
  • consumed tracks how many items the current source has produced (and has the coordinator consumed). We'll use this to update the arbiter at the completion of the current source instead of doing it for each item received because that saves a lot of overhead and we don't really care about each individual item's reception.


The coordinator's onXXX methods are relatively trivial at this point:


   @Override
    public void onSubscribe(Flow.Subscription s) {
        setSubscription(s);
    }

    @Override
    public void onNext(T item) {
        consumed++;
        downstream.onNext(item);
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        drain();
    }
   


We save the Flow.Subscription into the arbiter, write through the item or throwable and call the drain method upon normal completion.

What's left is the drain() method itself:


    void drain() {
        // (1) -------------------------------------------------------
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                // (2) -----------------------------------------------
                if (isCancelled()) {
                    return;
                }

                // (3) -----------------------------------------------
                if (index == sources.length) {
                    downstream.onComplete();
                    return;
                }

                // (4) -----------------------------------------------
                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                // (5) -----------------------------------------------
                sources[index++].subscribe(this);

            // (6) ---------------------------------------------------
            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }

Again, not really a complicated method, but as usual, the difficulty may come from understanding why such short code is actually providing the required behavior and safeguards:


  1. We know that drain() is only invoked from the subscribe() or onComplete() methods. This standard lock-free trampolining check ensures only one thread is busy setting up the consumption of the next (or the first) source. In addition, since only a guaranteed one-time per source onComplete() can trigger the consumption of the next, we don't have to worry about racing with onNext in this operator. (However, an in-flow concatMap is a different scenario.) This setup also defends against increasing the stack depth due to tail-subscription: a trampoline > 1 indicates we can immediately subscribe to the next source.
  2. In case the downstream cancelled the operator, we simply quit the loop.
  3. In case the index is equal to the number of sources, it means we reached the end of the concatenation and can complete the downstream via onComplete().
  4. Otherwise, we indicate to the arbiter the number of items consumed from the previous source so it can update its outstanding (current) request amount. Note that consumed is not concurrently updated because onNext and onComplete (and thus drain) on the same source can't be executed concurrently.
  5. We then subscribe to the next source, move the index forward by one to point to the next-next source and subscribe with this.
  6. Finally if there was no synchronous or racy onComplete, we quit the loop, otherwise we resume with the subsequent sources.


One can add a few features and safeguards to this coordinator, such as delaying errors till the very end and ensuring the indexth sources entry is not null. These are left as exercise to the reader.


Repeat


How can we turn this into a repeat operator where the source is resubscribed on successful completion? Easy: drop the index and have only a single source Flow.Publisher to be worked on!


public static <T> Flow.Publisher<T> repeat(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RepeatCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onXXX methods are the same

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                if (--max < 0L) {
                    downstream.onComplete();
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


Given that repeating indefinitely is usually not desired, we limit the resubscriptions to a number of times specified by the user. Since there is only one source Flow.Publisher, no indexing into an array is needed and we only have to decrement the counter to detect the condition for completing the downstream.


Retry


How about retrying a Flow.Publisher in case it failed with an onError? Easy: have onError call drain() and onComplete call downstream.onComplete() straight:


public static <T> Flow.Publisher<T> retry(
        Flow.Publisher<T> source, long max) {
    return new RepeatPublisher<>(source, max);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    long max;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (--max < 0L) {
            downstream.onError(throwable);
        } else {
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                source.subscribe(this);

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}


There are two slight changes in retry(). First, in case we run out of the retry count, the latest Flow.Publisher's error is delivered to the downstream from within onError and no further retry can happen. Consequently, the drain logic no longer should call onComplete when the number of allowed retries have been used up.


On error, resume with another Flow.Publisher


Now that we've seen multi-source switchover and single-source continuation, switching to an alternative or "fallback" Flow.Publisher should be straightforward to set up: have a 2 element array with the main and fallback Flow.Publishers, then make sure onError triggers the switch.


public static <T> Flow.Publisher<T> onErrorResumeNext(
        Flow.Publisher<T> source, Flow.Publisher<T> fallback) {
    return new RepeatPublisher<>(source, fallback);
}

// ... subscribe() has the same pattern.

static final class RetryCoordinator<T> extends SubscriptionArbiter
implements Flow.Subscriber<T> {

    final Flow.Publisher<T> source;

    final Flow.Publisher<T> fallback;

    boolean switched;

    // ... the onSubscribe and onNext methods are the same

    @Override
    public void onError(Throwable throwable) {
        if (switched) {
            downstream.onError(throwable);
        } else {
            switched = true;
            drain();
        }
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }

    final void drain() {
        if ((int)TRAMPOLINE.getAndAdd(this, 1) == 0) {
            do {

                if (isCancelled()) {
                    return;
                }

                long c = consumed;
                if (c != 0L) {
                    consumed = 0L;
                    setProduced(c);
                }

                if (switched) {
                    fallback.subscribe(this);
                } else {
                    source.subscribe(this);
                }

            } while ((int)TRAMPOLINE.getAndAdd(this, -1) - 1 != 0);
        }
    }
}

Here, we have two states, switched == false indicates we are consuming the main source. If that fails, we set switched = true and the drain loop will subscribe to the fallback Flow.Publisher. However, if the fallback fails, the onError also checks for switched == true and instead of draining (and thus retrying) the fallback Flow.Publisher again, it just terminates with the Throwable the fallback emitted.


Conclusion


In this post, the subscription arbitration concept was presented which allows us to switch between non-overlapping Flow.Publisher sources when one terminates (completes or fails with an error) while maintaining the link of cancellation between the individual Flow.Subscriptions as well as making sure backpressure is properly transmitted and preserved between them.

When combining with a trampolining logic, such arbitration allowed us to implement a couple of standard ReactiveX operators such as concat, repeat, retry and onErrorResumeNext while only applying small changes to the methods and algorithms in them.

Note however, that even if the arbiter can be reused for in-flow operators such as concatMap (concatenate Flow.Publishers generated from upstream values), repeatWhen (repeat if a companion Flow.Publisher signals an item) and retryWhen, one can no longer use a single Flow.Subscriber to subscribe to both the main flow and the inner/companion flows at the same time. We will explore these types of operators in a later post.

The arbitration has its own limit: it can't support live switching between sources, i.e., when onNext may be in progress while the switch has to happen. If you are familiar with the switchMap operator, this is what can happen during its execution. We'll look into this type of operator in a subsequent post.

But for now, we'll investigate a much lighter set of operators in the next post: limiting the number of items the downstream can receive and skipping certain number of items from the upstream; both based on counting items and based on a per-item predicate checks, i.e., the take() and skip() operators.