2015. május 11., hétfő

Operator concurrency primitives: serialized access (part 2)

Introduction

In part 1, I've explained the need to serialize access in order to ensure certain RxJava methods and operations happen in a sequential way. I've described the emitter-loop approach in detail and shown how one can build serialization constructs by using it. I'd like to recall that the construct excels in mostly synchronous use due to how the Java JIT uses biased locking and lock elision if it detects single-threaded use. However, when asynchronous execution dominates or the emission happens on another thread, the emitter-loop can become a bottleneck due to its blocking nature.

In this blog post, I'm going to describe the other, non-blocking serialization construct I call queue-drain.

Queue-drain

The queue-drain construct is relatively short compared to the emitter-loop approach:

class BasicQueueDrain {
    final AtomicInteger wip = new AtomicInteger();  // (1)
    public void drain() {
        // work preparation
        if (wip.getAndIncrement() == 0) {           // (2)
            do {
                // work draining
            } while (wip.decrementAndGet() != 0);   // (3)
        }
    }
}

The basic construct works as follows:
  1. We need a numeric variable we can increment atomically. I usually call this wip (work-in-progress in short). It indicates the amount of work to be done and might even allow us to build the queue-drain wait-free if the underlying Java runtime has the necessary intrinsics for (2) and (3). We use AtomicInteger in RxJava because we think an overflow is unlikely with the known usage pattern. Of course, it can be replaced with AtomicLong if necessary.
  2. We atomically retrieve the current wip count and increment it by one. The thread who increments it from 0 to 1 wins the drain right and can enter the loop. Any other thread will just increment it further and exit the method.
  3. Whenever a work item has been drained and processed, we atomically decrement and check the wip counter. If it reached zero, the loop quits. Since there is only one thread that does the decrement, we are guaranteed to not lose any signals in the process. Since (2) and (3) are both atomic changes and wip == 1 at (3), if (2) wins then (3) will 'remain' 1 and another iteration happens or if (3) wins  then (2) will increment wip from 0 to 1 and enters the drain loop while (3) leaves it without any further interference.
If you remember the emitter-loop example from the previous post, one can draw some parallels between the data structures of the two. 

In the emitter-loop, the emission right is won if a thread finds an emitting == false state and sets it to true; in queue-drain, the thread wins who can atomically increment wip from 0 to 1. In the emitter-loop, if the thread didn't win the right to emit, it will set the missing flag to true; in-queue drain, a losing thread will increment wip further (to 2 and beyond). On the outgoing edge, the emitter-loop checks if the missing flag is set and loops again; in queue-drain, a new interation will happen if the wip value is greater than 1 before the decrement. In the emitter-loop, if the missing flag is clear, the loop quits; in queue-drain, decrementing wip to 0 will quit the loop.

With this in mind, we can build the non-blocking variant of the ValueEmitterLoop from the previous post by using the queue-drain construct:

class ValueQueueDrain<T> {
    final Queue<T> queue = new MpscLinkedQueue<>();     // (1)
    final AtomicInteger wip = new AtomicInteger();
    Consumer consumer;                                  // (2)

    public void drain(T value) {
        queue.offer(Objects.requireNonNull(value));     // (3)
        if (wip.getAndIncrement() == 0) {
            do {
                T v = queue.poll();                     // (4)
                consumer.accept(v);                     // (5)
            } while (wip.decrementAndGet() != 0);       // (6)
        }
    }
}

The construct is quite similar to the emitter-loop variant, but less 'verbose' due to the lack of synchronized blocks:
  1. We are still using JCTools' better queue to hold onto values until they can be consumed. If the maximum number of values can be known, an MpscArrayQueue can be employed. Note, however, that they are not wait-free in respect of offer/poll.
  2. We will drain the queued values into this consumer instance one-by-one.
  3. First, we offer a non-null value to the queue.
  4. The thread who won the drain right takes out a value from the queue. Because wip essentially represents the lower bound for the number of items in the queue, poll() will never return a null value.
  5. We pass on the value to the consumer.
  6. We loop again if wip indicates more values, or quit otherwise. Now it is possible the queue is not empty when (6) is reached, but that is not a problem since the synchronization point is is the wip change: even if there is an arbitrary delay between (3) and the increment, the drain loop can quit knowing that when the increment happens, the delayed thread will continue draining the queue.
If we benchmark the synchronous performance of ValueEmitterLoop and ValueQueueDrain, the latter performs way worse in throughput after the warmup period.

The reason for that is the unavoidable atomic operations, which - even in uncontended case - take several cycles to happen due to the mandatory write-buffer flush in modern multicore CPUs: we have 2 atomic increment/decrement in drain() per value, plus the additional atomics involved in the Mpsc queue implementation. In contrast, ValueListEmitterLoop is even faster because it exploits the so-called fast-path in the logic once the synchronized blocks are optimized away.

Note that the benchmark I mentioned has a simple workload in its consumer callback and thus measure the overhead of the serialization approach. Depending on the workload and work distribution, your mileage may vary.

We can change our queue-drain to have such a fast-path in order to exploit any lack of concurrency the drain() method it may encounter. This may come from a nicely interleaved concurrent use or just a plain sequential access. 

Note however, that in some cases, such fast-path logic may contain quite a complicated and slightly similar logic to that of the slow path and can get lengthy to implement. I suggest benchmarking the plain queue-drain first or apply the set-to-1 trick (I'll explain later) before attempting to implement it.

In the ValueQueueDrain example, implementing a fast-path is short enough (and quite worth it):

class ValueQueueDrainFastpath<T> {
    final Queue<T> queue = new MpscLinkedQueue<>();
    final AtomicInteger wip = new AtomicInteger();
    Consumer consumer;

    public void drain(T value) {
        Objects.requireNonNull(value);
        if (wip.compareAndSet(0, 1)) {          // (1)
            consumer.accept(value);             // (2)
            if (wip.decrementAndGet() == 0) {   // (3)
                return;
            }
        } else {
            queue.offer(value);                 // (4)
            if (wip.getAndIncrement() != 0) {   // (5)
                return;
            }
        }
        do {
            T v = queue.poll();                 // (6)
            consumer.accept(v);
        } while (wip.decrementAndGet() != 0);
    }
}

Even though ValueQueueDrainFastpath seems to have more atomics, it performs better in sequential use because it avoids the atomics in offer and poll. It works as follows:
  1. After making sure value is non-null, it tries to compare-and-swap (CAS) in the wip value of 1 (attempts the 0-1 transition). (On a side note, adding wip.get() == 0 && before the CAS might improve the performance a bit, but its effect is highly CPU and usage-pattern dependent because it creates a dependent load with respect to CAS. My benchmarks on different CPU types were inconclusive.)
  2. We simply give the value directly to the consumer without even touching the queue. This is where the main performance improvement comes.
  3. We now need to atomically decrement the the wip counter and if there was no concurrent call to drain() that reached (5). One might think a CAS from 1 to 0 would work here, but if the CAS fails, we need to decrement wip anyway to be able to do the slow path (since we are still in 'emission' mode). 
  4. If the CAS in (1) failed, we attempt the regular slow-path drain loop, therefore, we enqueue the value.
  5. We increment wip atomically and if it wasn't zero we quit (the logical not is required because the drain loop is shared between the fast-path and the slow-path). This check is mandatory because a concurrent drain() call could enter the fast path and leave it before the first thread reaches (5), leaving wip at zero which when incremented to 1 is eligible to enter the drain loop.
  6. Finally, if either the slow-path was taken or the fast-path couldn't do (3) in time, we perform the usual drain loop. While a thread is in the drain loop, all other threads will fail the CAS during the time and take the slow path.
Note that this fast-path approach is a tradeoff: one trades the potential sequential fast-path case with the one extra CAS failure on the slow path (or with the wip.get() == 0 check, a less likely CAS failure on the slow-path but a potential effect of a dependent load on the fast-path). Again, benchmark your case, don't just assume.

If you look closely, you can discover the similarity to the ValueListEmitterLoop shown in the previous post: basically the thread that wins the right of emission will call consumer.accept() first as the fast path and revert to the slow-path (the loop) if there were some activity in the meantime and missed was set to true (i.e., wip was greater than 1 here). 

Apart from the fast-path optimization, there is another option to reduce the number of atomic operations in the drain loop. Remember the analogue of the wip variable between the emitter-loop and the queue-drain? It turns out any wip value above 1 just means there are more values available and the queue can 'tell' it if run out of values. We can save one atomic decrement per value if we could just drain the queue completely and loop again if some values arrived just before a single atomic decrement:

class ValueQueueDrainOptimized<T> {
    final Queue<T> queue = new MpscLinkedQueue<>();
    final AtomicInteger wip = new AtomicInteger();
    Consumer consumer;

    public void drain(T value) {
        queue.offer(Objects.requireNonNull(value));
        if (wip.getAndIncrement() == 0) {
            do {
                wip.set(1);                              // (1)
                T v;
                while ((v = queue.poll()) != null) {     // (2)
                    consumer.accept(v);
                }
            } while (wip.decrementAndGet() != 0);        // (3)
        }
    }
}     

The ValueQueueDrainOptimized example looks somewhat similar to ValueQueueDrain but works a bit differently:

  1. Once we enter the drain loop, we set wip back to 1. Since we will completely drain the queue, there is no need to loop back again (and find a potentially empty queue again). This is equivalent when we set missing flag to false in the emitter-loop example.
  2. Instead of one at a time, now we have an inner loop which drains the queue. We don't use queue.size() or queue.isEmpty() but rely on the fact that an empty queue will return null when polled, saving on more atomic operations.
  3. We decrement wip and if it reaches zero, the loop quits. In the optimistic case, if there was a burst of offers just before (1), we drained them all and due to wip set to 1, there will be only a single iteration of the main drain loop. If there was an offer just before (3), it is not a problem because the synchronization is made sure by the two atomic increment/decrement pair: so either (3) doesn't decrement to zero and loops again or it decrements to zero and allows the if (getAndIncrement() == 0) enter the drain loop.
Note, however, that this optimization has its limits too. Even if it 'batches' the incoming values, the additional cache-coherence traffic due to (2) might not improve the performance in some cases, depending on the call pattern to drain().

After the examples and explanations, the queue-drain may seem too problematic: its optimization potential and performance is quite dependent on the usage pattern and one is more likely to use the emitter-loop approach instead.

However, there is an operator used by almost everyone which shines with the queue-drain pattern: observeOn. Its current implementation, although it still has room for improvements, utilizes the optimization shown in the ValueQueueDrainOptimized example along with the fact that the queue (now a wait-free SpscArrayQueue) offer and poll sides are accessed from exactly one thread each (except in certain backpressure scenarios, more on this on a later post).

Conclusion

In this post, I've introduced the second serialization approach I call queue-drain. I showed its basic form and two optimized variants and explained that their performance is dependent on the value arrival pattern to the drain() method (and sometimes on CPU type). Make sure you benchmark your implementation.

Its non-blocking lock-free (and sometimes wait-free) structure makes it better suited for medium or highly-concurrent serialization scenarios or cases when its two parts (queueing and draining) can be made sure to run on different threads.

In the next post, I'll talk about various thread-safe Producer implementations; some of which will utilize the emitter-loop or queue-drain approach I detailed in the posts.

5 megjegyzés:

  1. Hi. Could you please elaborate on cache coherence traffic in last example and how does it differ from previous ones? Thank you.

    VálaszTörlés
  2. When I benchmarked the solution, it was a bit slower and I suspect the extra lazySet causes more back-and-forth in high contention scenarios.

    VálaszTörlés
  3. “We now need to atomically decrement the the wip counter and if there was no concurrent call to drain() that reached (5).”
    Should it be:
    "We now need to atomically decrement the the wip counter and if there was no concurrent call to drain() that reached (5), we will quit."

    VálaszTörlés
  4. And "if there were some activity in the meantime and missed was set to true (i.e., wip was greater than 1 here).", should it be:
    "if there were some activity in the meantime and emitting was set to true (i.e., wip was greater than 0 here)."?

    VálaszTörlés
  5. And "This is equivalent when we set missing flag to false in the emitter-loop example.", I think this sentence is misunderstanding, which implementation do you refer to? besides, there is no `missing` variable in all implementations in last post.

    I guess you mean the `EmitterLoopSerializer`, and `missed` variable.

    VálaszTörlés