2015. május 22., péntek

Operator concurrency primitives: subscription-containers (part 2)

Introduction


In this blog post, I'm going to implement two, lock-free versions of the TwoSubscriptions container from the previous post. Although they will be functionally equivalent, the implementation will reflect two different philosophies regarding how one can check for their unsubscription status and unsubscribe them.


Using boolean isUnsubscribed in the state


A simple way of implementing a lock-free data structure is to do a so-called copy-on-write operation on every mutation, which involves an immutable state and a CAS loop. With our TwoSubscriptions, we will capture the state of two distinct Subscriptions into a composite class:

    static final class State {
        final Subscription s1;
        final Subscription s2;
        final boolean isUnsubscribed;
        public State(Subscription s1, 
                Subscription s2, 
                boolean isUnsubscribed) {
            this.s1 = s1;
            this.s2 = s2;
            this.isUnsubscribed = isUnsubscribed;
            
        }
    }
    // ...

With this State inner class, whenever the state needs to change, we will create a new instance, copy over the relevant values and use a CAS loop to achieve atomicity. Now let's see our new lock-free container's class structure:

public final class TwoSubscribersLockFree1 
implements Subscription {
    static final class State { 
        // ...
    }

    static final State EMPTY = 
        new State(null, null, false);                  // (1)

    static final State UNSUBSCRIBED = 
        new State(null, null, true);                   // (2)
    
    final AtomicReference<State> state = 
        new AtomicReference<>(EMPTY);                  // (3)

    public void set(boolean first, Subscription s) {
        // implement
    }
    
    @Override
    public void unsubscribe() {
        // implement
    }

    @Override
    public boolean isUnsubscribed() {
        // implement
    }
}

First, since the initial and terminal states are essentially constants, they are declared as static final instances with the difference that UNSUBSCRIBED.isUnsubscribed == true (1) (2). Since the state needs to be changed atomically, we also need an AtomicReference to hold the State instance (3) which we initialize to the empty (constant) state.

With the given skeleton, the implementation of set() looks as follows:

    public void set(boolean first, Subscription s) {
        for (;;) {
            State current = state.get();                    // (1)
            if (current.isUnsubscribed) {                   // (2)
                s.unsubscribe();
                return;
            }
            State next;
            Subscription old;
            if (first) {
                next = new State(s, current.s2, false);     // (3)
                old = current.s1;                           // (4)
            } else {
                next = new State(current.s1, s, false);
                old = current.s2;
            }
            if (state.compareAndSet(current, next)) {       // (5)
                if (old != null) {
                    old.unsubscribe();                      // (6)
                }
                return;
            }
        }
    }

and works as follows:

  1. The current state value is read.
  2. If the current state is unsubscribed, the terminal state of this container is was reached, we unsubscribe the parameter and quit.
  3. Otherwise, we'll create a new state based on the old one, replace the appropriate subscription with the provided one.
  4. Since the subscription needs to be unsubscribed on replacement, we save its instance locally.
  5. The CAS operation will atomically swap in the new, updated state or we perform a new iteration in case a concurrent modification happened to the state.
  6. With a successful CAS, the original subscription (if any) is unsubscribed and the loop is quit.
The implementation of isUnsubscribed() is straightforward:

    // ...
    @Override
    public boolean isUnsubscribed() {
        return state.get().isUnsubscribed;
    }
    // ...

Finally, let's see how one can implement the unsubscribe() method.

    @Override
    public void unsubscribe() {
        State current = state.get();                        // (1)
        if (!current.isUnsubscribed) {                      // (2)
            current = state.getAndSet(UNSUBSCRIBED);        // (3)
            if (!current.isUnsubscribed) {                  // (4)
                List<Throwable> errors = null;              // (5)
                
                errors = unsubscribe(current.s1, errors);   // (6)
                errors = unsubscribe(current.s2, errors);
                
                Exceptions.throwIfAny(errors);              // (7)
            }
        }
    }

    private List<Throwable> unsubscribe(Subscription s,     // (8)
            List<Throwable> errors) {
        if (s != null) {
            try {
                s.unsubscribe();
            } catch (Throwable e) {
                if (errors == null) {
                    errors = new ArrayList<>();
                }
                errors.add(e);
            }
        }
        return errors;
    }
}

The method has several interesting steps:

  1. We retrieve the current state.
  2. If the current state is already unsubscribed, there is nothing to do and the method quits.
  3. Otherwise, we atomically exchange the current state with the constant terminal state.
  4. If the previous state was unsubscribed, the method can quit, otherwise, since the getAndSet is atomic there will be exactly one caller who transitions from a non-terminated state into the terminated state. There is no need for a CAS loop here and the unsubscription, so far, can be wait-free on platforms with intrinsified getAndSet.
  5. The possible exceptions are collected into an errors list.
  6. I've factored out the unsubscription and error collection into a method and it is called for each of the contained subscriptions.
  7. If any of the unsubscriptions threw, the exception(s) are rethrown.
  8. The convenience method of unsubscribing a subscription and updating the errors list if necessary.

Using the UNSUBSCRIBED state reference

If we think about it, since the terminal state is distinct from the others not just by the isUnsubscribed flag, but by having a unique constant reference. It is possible remove isUnsubscribed and compare against the UNSUBSCRIBED instance everywhere as necessary.

Therefore, we can simplify the State class in the new TwoSubscribersLockFree2 as follows:

public final class TwoSubscribersLockFree2 implements Subscription {
    static final class State {
        final Subscription s1;
        final Subscription s2;
        public State(Subscription s1, 
                Subscription s2) {
            this.s1 = s1;
            this.s2 = s2;
            
        }
    }

The isUnsubscribed field was removed from it and we have to change every former isUnsubscribed check:

    // ...
    static final State EMPTY = new State(null, null);         // (1) 
    static final State UNSUBSCRIBED = new State(null, null);
    
    final AtomicReference<tate> state
        = new AtomicReference<>(EMPTY);

    public void set(boolean first, Subscription s) {
        for (;;) {
            State current = state.get();
            if (current == UNSUBSCRIBED) {                    // (2)
                s.unsubscribe();
                return;
            }
            State next;
            Subscription old;
            if (first) {
                next = new State(s, current.s2);
                old = current.s1;
            } else {
                next = new State(current.s1, s);
                old = current.s2;
            }
            if (state.compareAndSet(current, next)) {
                if (old != null) {
                    old.unsubscribe();
                }
                return;
            }
        }
    }
    
    @Override
    public boolean isUnsubscribed() {
        return state.get() == UNSUBSCRIBED;                    // (3)
    }
    
    @Override
    public void unsubscribe() {
        State current = state.get();
        if (current != UNSUBSCRIBED) {                         // (4)
            current = state.getAndSet(UNSUBSCRIBED);
            if (current != UNSUBSCRIBED) {                     // (5)
                List<Throwable> errors = null;
                
                errors = unsubscribe(current.s1, errors);
                errors = unsubscribe(current.s2, errors);
                
                Exceptions.throwIfAny(errors);
            }
        }
    }
    // ...

The new constants no longer need a boolean flag (1) and places of current.isUnsubscribed are now replaced with current == UNSUBSCRIBED check (2, 3, 4, 5).

Given these two approaches, which one to choose? Benchmark and see it for yourself. Obviously, the first allocates more memory but the boolean check can be faster on certain platforms, whereas the second costs less in memory but reference comparison can be slower.

Generally though, using the class will increase the GC pressure as every modification triggers the allocation of a new state. It is possible to avoid it by performing per-subscription CAS loops, but the approach can get cumbersome as the number of subscription fields increases.

Conclusion

In this post, I've introduced two lock-free variants of the TwoSubscriptions container and explained their inner workings.

It is more likely one has to manage more than two subscriptions at a time, therefore, I'm going to demonstrate an array-based container with the very same underlying approaches in the next post.

1 megjegyzés: