2015. május 27., szerda

Schedulers (part 1)

Introduction

Schedulers are the key to asynchronous and concurrent computations in RxJava. Although many standard schedulers exist and you can wrap an Executor into a scheduler, it is worth understanding how schedulers can be built from scratch in order to utilize other forms of concurrency sources, such as GUI event loops of frameworks.


The Scheduler API

If you are familiar with Rx.NET's IScheduler API, you can discover that RxJava's Scheduler API is a bit different. This difference comes from how each library tried to solve the recursive scheduling problem. Rx.NET chose to inject the actual scheduler into the action being scheduled.

RxJava chose to mirror how Iterable/Iterator pattern is established and came up with the pair of Scheduler/Worker. RxJava's scheduler doesn't do any scheduling but allows the creation of a Worker, which allows scheduling directly and recursively and is safe to close-over in the submitted action if necessary. In addition, the Worker itself is a Subscription and can be unsubscribed which triggers a mass-unsubscription and prevents any further tasks getting scheduled (best effort I might add). This comes in handy when the scheduling is used in operators (for example, buffer with time) and the downstream unsubscribes the whole chain, cancelling the periodic buffer-emission task once and for all.

The Scheduler/Worker API has to meet some requirements:

  1. All methods should be thread-safe.
  2. Workers should make sure undelayed, sequentially scheduled tasks execute in FIFO order.
  3. Workers must make best effort to cancel outstanding tasks when unsubscribed.
  4. Unsubscription of a Worker should not affect other Worker instances of the same Scheduler.

These requirements may seem harsh, but allows reasoning about a dataflow's concurrency much easier, similar to how the sequential requirement of the Observer methods allows the same reasoning.

In addition to the requirements, there are a few nice-to-have properties associated with the API:

  1. Tasks scheduled on a Worker should try to avoid hopping threads. (Thread locality improves performance.)
  2. Delayed tasks scheduled sequentially and with the same delay amount should keep their FIFO order (with concurrent scheduling, the ordering bets are off).
With all these requirements, a conservative implementation is likely to use a single-threaded thread pool backing each individual worker and this is how the standard RxJava schedulers are implemented: the underlying ScheduledExecutorService gives all of these guarantees.



Implementing a custom Scheduler

Let's assume we need to write a custom scheduler with the following properties: (1) it should only have a single worker thread and (2) a thread-local context value needs to be 'transferred over' and made available to the executing task via the same thread-local access mechanism.

Clearly, if we'd have only (1) as the property, one could just wrap a single-threaded executor with Schedulers.from(), but property (2) requires some additional work to be performed when a task is prepared and executed.

In order to accomplish the requirements, we may reuse some of RxJava's own scheduler primitives: namely ScheduledAction and NewThreadWorker. (Note however, that these are internal classes and are subject to changes without warnings. Here, I'm using them to reduce the clutter and allow me to concentrate on the important parts of creating the scheduler.)

As usual, we start with the class skeleton:


public final class ContextAwareScheduler 
extends Scheduler {
    
    public static final ContextAwareScheduler INSTANCE = 
            new ContextAwareScheduler();                       // (1)
    
    final NewThreadWorker worker;
    
    private ContextAwareScheduler() {
        this.worker = new NewThreadWorker(
                new RxThreadFactory("ContextAwareScheduler")); // (2)
    }
    @Override
    public Worker createWorker() {
        return new ContextAwareWorker(worker);                 // (3)
    }
    
    static final class ContextAwareWorker extends Worker {

        final CompositeSubscription tracking;                  // (4)
        final NewThreadWorker worker;

        public ContextAwareWorker(NewThreadWorker worker) {
            this.worker = worker;
            this.tracking = new CompositeSubscription();
        }
        @Override
        public Subscription schedule(Action0 action) {
            // implement
        }
        @Override
        public Subscription schedule(Action0 action, 
                long delayTime, TimeUnit unit) {
            // implement
        }
        @Override
        public boolean isUnsubscribed() {
            return tracking.isUnsubscribed();                  // (5)
        }
        @Override
        public void unsubscribe() {
            tracking.unsubscribe();
        }
    }
}

Our ContextAwareScheduler skeleton may look scary, but it consists of straightforward components:

  1. Since we want a single global thread, we can't allow multiple instances of the Scheduler to exist and thus we use a static instance variable for it.
  2. The scheduler will delegate most of its work to a single underlying worker. We reuse the NewThreadWorker class and the RxThreadFactory to get a single daemon-thread backed worker instance.
  3. Instead of handing out the single worker, we need to 'split' it among usages to conform to requirement #4. Otherwise, if the worker is unsubscribed, everyone's worker is now unsubscribed and useless from then on.
  4. We still need to make sure requirement #3 is met, therefore, we need to track tasks submitted to each particular worker.
  5. The tracking structure also gives the means to check for and issue unsubscriptions.
Next, we need the aforementioned thread-local context:

public final class ContextManager {
    static final ThreadLocal<Object> ctx = new ThreadLocal<>();
    
    private ContextManager() {
        throw new IllegalStateException();
    }
    
    public static Object get() {
        return ctx.get();
    }
    public static void set(Object context) {
        ctx.set(context);
    }
}

The ContextManager just wraps around a static ThreadLocal instance. In practice, you'd want to replace the Object with some meaningful type.

Now back to the implementation of the schedule() methods:


    // ...
    @Override
    public Subscription schedule(Action0 action) {
        return schedule(action, 0, null);               // (1)
    }
    @Override
    public Subscription schedule(Action0 action, 
            long delayTime, TimeUnit unit) {

        if (isUnsubscribed()) {                         // (2)
            return Subscriptions.unsubscribed();
        }
        
        Object context = ContextManager.get();          // (3)
        Action0 a = () -> {
            ContextManager.set(context);                // (4)
            action.call();
        };
        
        return worker.scheduleActual(a, 
            delayTime, unit, tracking);                 // (5)
    }
    // ...

Just look at the simplicity!

  1. We delegate the undelayed scheduling as if the initial delay is zero. All underlying structure will interpret this zero correctly and perform the required undelayed FIFO execution.
  2. In case the current worker is unsubscribed, we don't do anything and return a constant unsubscribed subscription. Note that unsubscribing always involves some small race window where tasks (or events) may slip through. This race is internally resolved in the scheduleActual; more on this below.
  3. We hold onto the current thread's context value and wrap the action into another action. 
  4. Inside, the context value is restored to that particular thread's local storage. Since the worker is single threaded and non-reentrant, the thread-local context can't be overwritten by the next scheduled task while a previous task is running.
  5. Finally, we delegate the wrapping action and the delay information to the underlying NewThreadWorker instance. By passing in the tracking composite, the action will be properly tracked and removed if the task completes, gets unsubscribed or the entire worker gets unsubscribed.


As mentioned in the explanation steps, step (2) is inherently racing with the unsubscription of the worker but we shouldn't leave tasks behind after unsubscription. This is where the unsubscription guarantee of a container comes into play. If we wrap the Future returned by the underlying thread pool into a Subscription, we can safely add it to the tracking composite which will either atomically take a hold onto it or unsubscribe it immediately.

Let's try it out:

Worker w = INSTANCE.createWorker();

CountDownLatch cdl = new CountDownLatch(1);

ContextManager.set(1);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
});

ContextManager.set(2);
w.schedule(() -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
    cdl.countDown();
});

cdl.await();

ContextManager.set(3);

Observable.timer(500, TimeUnit.MILLISECONDS, INSTANCE)
.doOnNext(v -> {
    System.out.println(Thread.currentThread());
    System.out.println(ContextManager.get());
}).toBlocking().first();

w.unsubscribe();

Conclusion

Schedulers give the opportunity to specify where and likely when to execute tasks related to the operation of an Observable chain. Built-in schedulers should cover most of the usual needs towards parametric concurrency, but some scenarios require the use and building of custom schedulers. In this blog post, I've shown how one can, with the major help from existing RxJava classes, build his/her own custom scheduler with a custom behavior.

In the next part, we are going in deeper and look at the ScheduledAction class and see how its concepts can be utilized in case more control needs to be enforced when scheduling a task, for example, to work with or against thread interruptions.

3 megjegyzés:

  1. "one could just wrap a single-threaded executor with Executors.from()", should it be "one could just wrap a single-threaded executor with Schedulers.from()"?

    VálaszTörlés
  2. The problem with ThreadLocal is that there is no way to say "give me all thread local variables" stored in the current thread. So if you are using multiple libs, for instance spring-mvc spring-security ... all of them have their own set of thread local variables that you need to be aware of

    So in my eyes there is no way to create a general purpose scheduler, capable of dealing thread local variables.

    VálaszTörlés