2017. szeptember 25., hétfő

Java 9 Flow API: timing out events

Introduction


One of the main properties of reactive programming is that the events may arrive over time instead of immediately available to a consumer. In traditional Future-based programming, one could wait for the result in a blocking manner via Future.get(long, TimeUnit). Other data sources, such as network InputStream have either their own built-in timeout facility or one has to use external means to close the stream after certain period of time to unblock the reader to it. Java 8 Streams have also no direct timeout support.

In the reactive mindset, one can consider timing out events (items) as requesting an element and racing its arrival against the clock. If the item arrives in time, we should ignore the clock. If the clock fires first, we should stop the sender of the items and somehow notify the consumer of the situation. Perhaps the simplest way is to signal onError with a TimeoutException. Since there could be multiple items from a flow, we have to do this racing for each potential items over and over until the flow terminates.


The timeout operator


Since there is "time" in timeout, we'll need a source of time that can be started and stopped at will. The first tool that comes into mind is the java.util.Timer class, however, even its Javadoc suggest one uses a ScheduledExecutorService instead. If one has to deal with a lot of timed operations, besides of timing out flows, having the control over such signals via a (set of) ScheduledExecutorServices is desirable. Therefore, let's define our timeout API with it:


public static <T> Flow.Publisher<T> timeout(
        Flow.Publisher<T> source,
        long timeout, TimeUnit unit,
        ScheduledExecutorService timer) {

    return new TimeoutPublisher<>(source, timeout, unit, timer);
}


(Note that if one uses the Executors.newScheduledExecutorService(), it has to be shutdown at some point, otherwise it's non-daemon thread by default would prevent the JVM from quitting.)

One primary responsibility of this type of operator is to make sure the downstream's onXXX methods are called in sequence and non-overlapping. However, a timeout may happen at the same time the main source signals an event which could lead to call to onNext and onError concurrently - violating the Flow protocol.

The natural non-blocking solution could be the use of a serialization primitive described some time ago in this blog, but we could go more lean on such behavior due to the special nature of this operator.

Thinking about the possible states of such operator, there are two state transitions we should consider: receiving and emitting the next upstream signal, receiving and emitting an error for the timeout signal. In concurrency land, this means using a state variable and compare-and-set to transition from state to state in an atomic fashion.

There is a small problem though: how does a timeout know it happened after the right item didn't arrive? Relying on the accuracy of Future.cancel() for this might not be the best option. Luckily, there is a way to have both proper mutual exclusion and serialization: by using a long field index to track the index of the upstream item as well as indicate a terminal state via Long.MAX_VALUE - an index unlikely reached.

The idea is as follows: a regular onNext signal from the upstream increments this long field unless it is Long.MAX_VALUE at which point the event is ignored. An upstream onError or onComplete will atomically swap in the Long.MAX_VALUE and if not already at Long.MAX_VALUE, emit the respective signal to the downstream. At the same time, a thread executing the Runnable.run() for on the timeout side will try to swap in the last event's index with a Long.MAX_VALUE too. If the current index hasn't changed during the wait (no onXXX signal from the main source), the atomic swap will ensure that any subsequent onXXX call will ignore its event, thus only one thread at a time will emit a signal to the downstream. This may sound complicated when written in plaintext but the code should look much clearer in a couple of paragraphs.

When doing the parent Flow.Publisher implementation of TimeoutPublisher, one has to consider a peculiar case. When should the timeout for the very first item start? Remember, we have to call onSubscribe on the downstream to provide it with the ability to request and cancel the flow. If the timeout would start after downstream.onSubscribe(), a synchronous and blocking source responding to the downstream's request at that point may not give back control at all, rendering the timeout operator nonoperational. If the timeout started before the call to downstream.onSubscribe(), we risk emitting the TimeoutException before or while the downstream.onSubscribe() is executing. Since we also have to intercept a cancel() call from downstream to stop an ongoing timeout, we have to call downstream.onSubscribe() from within the Flow.Publisher.subscribe() method before we subscribe to the upstream or even start the timeout for the very first item:


    @Override
    public void subscribe(Subscriber<? super T> s) {
        TimeoutSubscriber<T> parent = new TimeoutSubscriber<>(
            s, timeout, unit, timer);

        s.onSubscribe(parent);

        parent.startTimeout(0L);

        source.subscribe(parent);
    }


We create the in-sequence Flow.Subscriber first (which implements Flow.Subscription), send it to the downstream, start the timeout for the first item (index is zero at this point) and we subscribe to the upstream source.

The next step is to write up the skeleton of the TimeoutSubscriber:


static final class TimeoutSubscriber<T> implements
Flow.Subscriber<T>, Flow.Subscription {
   
    final Flow.Subscriber<? super T> downstream;

    final long timeout;

    final TimeUnit unit;

    final ScheduledExecutorService timer;

    Flow.Subscription upstream;
    static final VarHandle UPSTREAM =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "upstream", Flow.Subscription.class);

    long requested;
    static final VarHandle REQUESTED =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "requested", long.class);

    long index;
    static final VarHandle INDEX =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "index", long.class);

    Future<?> task;
    static final VarHandle TASK =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "task", Future.class);

    static final Future<Object> CANCELLED_TASK = 
        new FutureTask<>(() -> { }, null);
    
    TimeoutSubscriber(
            Flow.Subscriber<? super T> downstream,
            long timeout,
            TimeUnit unit,
            ScheduledExecutorService timer) {
        this.downstream = downstream;
        this.timeout = timeout;
        this.unit = unit;
        this.timer = timer;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
   
    @Override
    public void request(long n) {
        // TODO implement
    }
    
    @Override
    public void cancel() {
        // TODO implement
    }

    void run(long idx) {
        // TODO implement
    }

    void startTimeout(long idx) {
        // TODO implement
    }
}

The fields can be described as follows:


  • downstream is the Flow.Subscriber we'd like to signal events to.
  • timeout and unit have to be stored as a new timeout task will have to be started after each in-time event.
  • timer is the ScheduledExecutorService we will submit tasks delayed by the timeout and unit.
  • upstream will hold the source's Flow.Subscription. Its VarHandle UPSTREAM will allow us to defer any cancellation coming from the downstream until there is an instance of it from the upstream. In addition, we have to defer requesting from downstream due to the same reason as with cancellation: since we call the downstream.onSubscribe before subscribing to the upstream, the downstream may issue cancel and/or request to a not-yet available Flow.Subscription.
  • requested will hold onto any downstream requests until there is an upstream Flow.Subscription available to the operator. This is part of the so-called deferred requesting pattern.
  • index holds the state that indicates if the operator has reached its terminal state naturally or due to timeout.
  • task holds the current Future for the timeout task submitted to the timer ScheduledExecutorService or the CANCELLED_TASK indicator if the operator has been cancelled concurrently to its operation and no further scheduling of any tasks should happen. Since only one such task has to run at a time, the actual parallelism level of the ScheduledExecutorService doesn't matter. The value has to be changed atomically, hence the TASK VarHandle after it.

The implementation of each method, although relatively short, is a bit more involved as they are more involved in direct, inlined atomic state changes. The onSubscribe method has to provide the deferred cancellation and deferred requesting behavior:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (UPSTREAM.compareAndSet(null, s)) {
            long r = (long)REQUESTED.getAndSet(this, 0L);
            if (r != 0L) {
                s.request(r);
            }
        } else {
            s.cancel();
        }
    }


We atomically try to set in the upstream's Flow.Subscription and if successful, we atomically take the accumulated request amount from downstream. If there was any, we request it from upstream. Otherwise, a non-null upstream just means the operator has been cancelled and we cancel the upstream as well.

Before we look into the other onXXX methods, let's see the partners of onSubscribe to show the other sides of the deferred requesting and cancellation logic:

    @Override
    public void request(long n) {
        Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAcquire(this);
        if (current != null) {
            current.request(n);
        } else {
            for (;;) {
                long r = (long)REQUESTED.getAcquire(this);
                long u = r + n;
                if (u < 0L) {
                    u = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, r, u)) {
                    break;
                }
            }

            current = (Flow.Subscription)UPSTREAM.getAcquire(this);
            if (current != null && current != this) {
                long r = (long)REQUESTED.getAndSet(this, 0L);
                if (r != 0L) {
                    current.request(r);
                }
            }
        }
    }

When the downstream calls request(), we could be in two states: the upstream Flow.Subscription is available, in which case we simply forward the request amount to it; or the upstream Flow.Subscription is not available and we have to temporarily accumulate downstream requests (which could happen on any thread and any time). This happens via our standard bounded atomic add operation. Once this succeeds, the upstream may have just called onSubscribe with a valid Flow.Subscription (the value of this indicates cancel() has been invoked, see down below). In that case, we atomically take all the accumulated requests, swapping in a zero in exchange, and if that value was non-zero, we issue the request to this fresh upstream. Concurrent interleaving will find requested zero or non-zero and issue any excess request amount accordingly.


    @Override
    public void cancel() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }
        }
    }


First, we atomically swap in the terminal index Long.MAX_VALUE, locking out both the onXXX methods and the timeout task. Then, we atomically swap in the cancelled Flow.Subscription indicator (this) and cancel any available upstream. The same thing has to happen to the timeout task.

Now let's see the onNext implementation:

    @Override
    public void onNext(T item) {

        long idx = (long)INDEX.getAcquire(this);
        if (idx == Long.MAX_VALUE) {
            return;
        }
        if (!INDEX.compareAndSet(this, idx, idx + 1)) {
            return;
        }

        Future<?> future = (Future<?>)TASK.getAcqurie(this);
        if (future == CANCELLED_TASK) {
            return;
        }
        if (future != null) {
            future.cancel(false);
        }

        downstream.onNext(item);

        startTimeout(idx + 1);
    }


First we get the current index and if already at Long.MAX_VALUE, we quit. Next, we try to atomically update the index to the next value and if that fails (due to cancellation or a racy timeout), we quit as well. Once the index has been updated successfully, we cancel the ongoing timeout task, emit the current item and start a new task with the subsequent index value.

The onError and onComplete methods are relatively similar by cancelling the timeout task and signalling the appropriate terminal event:


    @Override
    public void onError(Throwable throwable) {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onComplete();
        }
    }


The are pretty similar to how cancel() is implemented: we atomically swap in the terminal index Long.MAX_VALUE and cancel the timeout task. Note that cancelling the upstream is not necessary at this point as it is considered cancelled in accordance with the Flow (Reactive-Streams) specification.

    void run(long idx) {
        if (INDEX.compareAndSet(this, idx, Long.MAX_VALUE)) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            downstream.onError(new TimeoutException());
        }
    }


Given the last known item index, we atomically try to swap in the terminal index indicator and if successful, we cancel the upstream (directly or in a deferred manner) followed by the signal of the TimeoutException. Since the task executing will end either way, there is no need or reason to cancel the Future tracking the timeout task itself. If the index has been changed (either due to the arrival of an onNext item or cancellation), the timeout task will do nothing.

Finally, let's see how the timeout tasks are scheduled:


    void startTimeout(long idx) {
        Future<?> old = (Future<?>)TASK.getAcquire(this);
        if (old != CANCELLED_TASK) { 
            Future<?> future = timer.schedule(() -> run(idx), timeout, unit);
            if (!TASK.compareAndSet(this, old, future)) {
                future.cancel(false);
            }
        }
    }


First, we get the previous, old task instance so we can conditionally swap in the new task if there was no cancellation in between the two. Then, we schedule the execution of the run() method with the current item index (provided by the subscribe() or the onNext() method). If the actual index doesn't change until the run() executes, it will trigger the timeout logic. Then, we atomically try to replace the old Future with the new one and if that fails (due to cancellation), we cancel the new Future task too.


Conclusion


As demonstrated in this post, writing a basic timeout operator - which just signals a TimeoutException, can be done with a relatively few lines. The complication comes from undestanding why the atomic index changes are actually correct in various race scenarios and how it also ensures proper event serialization towards the downstream.

The same pattern can be used for writing a set of other operators which uses one-shot signal to "interrupt" a main sequence, for example, takeUntil(Flow.Publisher), where in the latter, the Future tracking is replaced by tracking the Flow.Subscription of the other Flow.Publisher.

There is, however, the natural need for doing something else than signalling a TimeoutException: switching to another Flow.Publisher for example. One would think, let's subscribe the TimeoutSubscriber to that fallback Flow.Publisher! Unfortunately, it doesn't work, partly because we'd probably didn't want to timeout the fallback Flow.Publisher's consumption (and even if), partly because we have to make sure the switch over preserves the number of outstanding downstream requests.

The deferred requesting logic in the operator shown in the post is not suitable for such transitions and it requires a peculiar Flow.Subscription management logic I call Subscription Arbitration. It enables a whole suit of other operators to work across sources, such as typical RxJava operators concat(Map), repeat(When), retry(When), timeout with fallback and onErrorResumeNext. In the next blog post, we'll see how the arbitration logic and most of these operators can be implemented.

Nincsenek megjegyzések:

Megjegyzés küldése