2017. szeptember 30., szombat

Java 9 Flow API: taking and skipping

Introduction


Limiting or skipping over parts of a flow is a very common task: either we are only interested in the first N items or we don't care about the first N items. Sometimes, N is unknown but we can decide, based on the current item, when to stop relaying items or, in contrast, when to start relaying items.


Take(N)


In concept, limiting a flow to a certain size should be straightforward: count the number of items received via onNext and when the limit is reached, issue a cancel() towards the upstream and onComplete() towards the downstream.


public static <T> Flow.Publisher<T> take(Flow.Publisher<T> source, long n) {
    return new TakePublisher<>(source, n);
}


The operator's implementation requires little state:

static final class TakeSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
}


In its simplest form, there is no need for intercepting the request() and cancel() calls from the downstream: these can be passthrought, however, since the operator has to stop the sequence upon reaching the limit (remaining == 0), the upstream's Flow.Subscriber has to be stored.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        this.upstream = s;
        downstream.onSubscribe(s);
    }


In onSubscribe, we only have to store the Flow.Subscription and forward it to the downstream.

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r > 0L) {
            remaining = --r;
            downstream.onNext(item);
 
            if (r == 0) {
                upstream.cancel();
                downstream.onComplete();
            }
        }
    }


While remaining is positive, we decrement it and save it into its field followed by an emission of the current item. If the remaining count reached zero, the upstream is cancelled and the downstream is completed. Any further items will be ignored (in case cancel() doesn't immediately stop the upstream).

    @Override
    public void onError(Throwable throwable) {
        if (remaining != 0L) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (remaining != 0L) {
            downstream.onComplete();
        }
    }


The onError and onComplete check the remaining count and if it's positive, the respective terminal event is reached. The reason for this is that if the operator runs out of items before its limit, the terminal event is relayed as well. However, if the operator happens to run with a limit equal to the actual length of the flow, the last item in onNext will trigger an onComplete which could be followed by a terminal event from the upstream that has to be suppressed. This behavior is allowed by the Reactive-Streams specification (i.e., cancel() may not have immediate effect) and we have dealt with it via a done field in other operators. Here, the fact remaining == 0 is the done indicator.

Being a pass-through for backpressure (the downstream calls request() directly on the upstream's Flow.Subscription), the upstream is not limited by this operator and may attempt to produce more items than the limit. In other times, knowing the downstream requested more than the limit, the upstream can actually be consumed in an unbounded fashion, utilizing fast-paths may result in improved performance.

Deciding which of the three behaviors should be default is up to the operator writer, but it is interesting to look at the remaining two modes: limiting the request amount and unbounding it.

Limiting the request amount


In this mode, if the downstream requests more or equal to the limit, we can issue a single request with the limit amount. However, if the downstream requests less, we have to perform some intricate request accounting.

We'll need a new field, requestRemaining with a VarHandle REQUEST_REMAINING companion. We also have to intercept request() from downstream and figure out atomically what number to request from the upstream so the total requested amount doesn't go above the limit.


static final class TakeSubscriber<T> implements 
Flow.Subscriber<T>, Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    long requestRemaining;
    static final VarHandle REQUEST_REMAINING =
        VH.find(MethodHandles.lookup(), TakeSubscriber.class,
            "requestRemaining", long.class);

    TakeSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
        this.requestRemaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
    }

    // onNext, onError and onComplete are the same

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        upstream.cancel();
    }
}


The onSubscribe now forwards this as the Flow.Subscription and cancel() just delegates to the upstream.cancel().

    @Override
    public void request(long n) {
        for (;;) {
            long r = (long)REQUEST_REMAINING.getAquire(this);
            long newRequest;
            if (r <= n) {
                newRequest = r;
            } else {
                newRequest = n;
            }
            long u = r - newRequest;
            if (REQUEST_REMAINING.compareAndSet(this, r, u)) {
                upstream.request(newRequest);
                break;
            }
        }
    }


First we read the remaining request amount. If it is smaller or equal to the downstream's request, we'll request the remaining amount from the upstream. If the downstream requested less than the remaining amount, we'll request that amount instead. The new remaining amount is then the current minus the new request amount decided. After the successful CAS, we request the new amount from the upstream and quit the loop.


Unbounding the request amount


The other direction, namely, unbounding the upstream if the downstream requested at least the limit of the operator, can be achieved through the same requestRemaining field but a different request() logic:


    @Override
    public void request(long n) {
        long r = (long)REQUEST_REMAINING.getAcquire(this);
        if (r != 0L) {
             r = (long)REQUEST_REMAINING.getAndSet(this, 0L);
             if (r != 0L && r <= n) {
                 upstream.request(Long.MAX_VALUE);
                 return;
             }
        }
        upstream.request(n);
    }


If the remaining request amount is non-zero, we atomically replace it with zero. This happens exactly once, for the first time request() is invoked by the downstream, and we'll check if the remaining amount (same as the limit) is less or requal to the downstream's request amount. If so, we request from the upstream an unbounded amount (Long.MAX_VALUE). If it was not the first request or the amount was less than the limit, we revert back to the pass-through behavior.


Take with predicate


Sometimes, we'd like to stop the flow when an item matches a certain condition. This can happen before the item is emitted (i.e., takeWhile) or after (i.e., takeUntil). When working with a predicate, we no longer know how many items we'll let pass thus manipulating the request amount is not really an option here.


public static <T> Flow.Publisher<T> takeWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new TakeWhilePublisher<>(source, predicate);
}

// Flow.Publisher boilerplate omitted

static final class TakeWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean done;

    TakeWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe simply forwards the Flow.Subscription and the terminal onError/onComplete methods forward their respective event if the done flag is false. The flag will be set in onNext if the flow has been stopped due to the condition and is there to prevent the emission of multiple terminal events in case the flow would have ended anyway.

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        boolean b;

        try {
            b = predicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            downstream.onNext(item);
        } else {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


The first step is to protect against sources that would produce a few items after a cancel() call. Next, we don't trust the user-provided Predicate, which when crashes, will have to cancel the upstream, lock out further events and signal the Throwable to the downstream. A true result from the predicate will allow us to emit the item to the downstream. A false will stop the source, lock out further upstream events and complete the downstream.

If we'd still want to receive the item before the predicate indicates the flow should stop, aka the takeUntil operator, only the onNext logic should be changed a bit:

public static <T> Flow.Publisher<T> takeUntil(
        Flow.Publisher<T> source, Predicate<? super T> stopPredicate) {

    return new TakeUntilPublisher<>(source, stopPredicate);
}

    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        downstream.onNext(item);

        boolean b;

        try {
            b = stopPredicate.test(item);
        } catch (Throwable ex) {
            upstream.cancel();
            done = true;
            downstream.onError(ex);
            return;
        }

        if (b) {
            upstream.cancel();
            done = true;
            downstream.onComplete();
        }  
    }


Here, the stopPredicate of the API entry point should indicate when to stop by returning true, whereas the previous takeWhile operator indicated a stop via false. It is a matter of taste I guess. The RxJava convetion is that takeWhile(item -> item < 5) will take items while each of them is less than five, never emitting five itself whereas takeUntil(item -> item == 5) will stop after emitting five.


Skip(N)


The dual of take(N), in some sense, is the operator which skips the first N items then lets the rest through. Again, counting the items is a crucial part of the operator's implementation.


public static <T> Flow.Publisher<T> skip(
        Flow.Publisher<T> source, long n) {

    return new SkipPublisher<>(source, n);
}


The operator's Flow.Subscriber uses similar counting method as take(), decrementing a remaining field and once it reaches zero, all subsequent events are forwarded.

static final class SkipSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    Flow.Subscription upstream;

    long remaining;

    SkipSubscriber(
            Flow.Subscriber<? super> downstream,
            long n) {
        this.downstream = downstream;
        this.remaining = n;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        downstream.onError(throwable);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
}


The onError/onComplete methods can now emit the respective event directly without the need to check the remaining count. May the sequence be shorter, equal or longer than the provided limit, the terminal events can be emitted as onNext is there to skip items, not stop the flow.

In general, when an operator drops items, it has the responsibility to ask for more from the upstream as the downstream has no way of knowing it didn't receive an item (other than timing out). Thus, it doesn't know it has to request more and has to be requested on behalf by the operator itself. However, requesting one by one is expensive - 1 CAS + 0..2 atomic increments per invocation - which can be avoided by requesting in batches.

The skip() operator is in a particular position where we know the first N items will be dropped, thus we can simply request N on top of what the downstream requested. onNext will drop the first N and even if the downstream hasn't requested, it won't get overflown.

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;

        long n = remaining;
        downstream.onSubscribe(s);
        s.request(n);
    }


The reason remaining is read before sending the Flow.Subscription to the downstream is that this call may result in value emission which decrements remaining and we'd end up with less than the amount to be skipped. This also saves a field storing the logically immutable skip amount.

The role of onNext is to drop items and then let the rest pass through:

    @Override
    public void onNext(T item) {
        long r = remaining;
        if (r == 0L) {
            downstream.onNext(item);
        } else {
            remaining = r - 1;
        }
    }


Skipping while a condition holds


Similar to a conditional take, we can skip unwanted items that match a condition (predicate) and the let the rest through unconditionally. Unfortunately, this case requires per item replenishment from the upstream if the condition doesn't hold as we can't be sure which upstream item will yield a true value and switch the operator into a pass-through mode.


public static <T> Flow.Publisher<T> skipWhile(
        Flow.Publisher<T> source, Predicate<? super T> predicate) {

    return new SkipWhilePublisher<>(source, predicate);
}

static final class SkipWhileSubscriber<T> implements Flow.Subscriber<T> {

    final Flow.Subscriber<? super T> downstream;

    final Predicate<? super T> predicate;

    Flow.Subscription upstream;

    boolean passthrough;

    boolean done;

    SkipWhileSubscriber(
            Flow.Subscriber<? super> downstream,
            Predicate<? super T> predicate) {
        this.downstream = downstream;
        this.predicate = predicate;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(s);
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        if (!done) {
            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if (!done) {
            downstream.onComplete();
        }
    }
}


The onSubscribe becomes a direct pass-through for the Flow.Subscription and the onError/onComplete methods get their done checks back. This is required because the predicate may fail for the very last item and the cancellation may not stop the terminal event. There is also the need for a flag that tells the onNext to let all items through and skip the predicate altogether.




    @Override
    public void onNext(T item) {
        if (done) {
            return;
        }

        if (!passthrough) {
            boolean b;
            try {
                b = predicate.test(item);
            } catch (Throwable ex) {
                upstream.cancel();
                done = true;
                downstream.onError(ex);
                return;
            }
            if (b) {
                upstream.request(1);
                return;
            }
            passthrough = true;           
        }

        downstream.onNext(item);
    }


First we prevent an excess onNext if there was a crash in the predicate before. Next, if the operator is not in the pass-through mode, we test with the predicate. If this turns out to be true, that indicates the item has to be dropped and a fresh item has to be requested as replenishment. Otherwise the operator enters its pass-through mode and the current and subsequent items will be emitted directly without invoking the predicate again. Implementing an "until" variant, where the predicate returning true still drops the current item, is left to the reader as an exercise.


Conclusion

The post demonstrated a way to implement the take, takeWhile, takeUntil, skip and skipWhile operators. The logic behind them doesn't require a fully fledged queue-drain approach and for being synchronous in-flow operators, don't really have to bother with concurrency and request management (except the two take alternatives which do have to manage requests).

The fact that the onXXX methods of a Flow.Subscriber are not allowed to be invoked concurrently greatly helps reducing the complexity and the need for being defensive all the time.

In the next post, we'll see how a multicasting Processor can be implemented and how a fan-out-fan-in operator can be implemented with its help.

Nincsenek megjegyzések:

Megjegyzés küldése