欧阳亮的博客

编程不止是一份工作,还是一种乐趣!!!

非阻塞算法CAS

在解决并发问题时,锁是最简单易解的方式,但是其代价也是最高的。在竞争的情况下系统的性能会因为加锁产生上下文切换与调度延迟而降低,而非竞争的情况下多余的加锁操作本身也会消耗掉一部分性能。

通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有守护变量的锁,都采用独占的方式来访问这些变量,如果出现多个线程同时访问锁,那第一些线线程将被挂起,当线程恢复执行时,必须等待其它线程执行完他们的时间片以后才能被调度执行,在挂起和恢复执行过程中存在着很大的开销。锁还存在着其它一些缺点,当一个线程正在等待锁时,它不能做任何事。如果一个线程在持有锁的情况下被延迟执行,那么所有需要这个锁的线程都无法执行下去。如果被阻塞的线程优先级高,而持有锁的线程优先级低,将会导致优先级反转。

悲观锁与乐观锁

悲观锁是一种独占锁,synchronized就是一种悲观锁,它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。

而另一个更加有效的锁就是乐观锁。所谓乐观锁就是可以在不发生干扰的情况下完成更新操作,这种方法需要借助冲突检测机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,操作将失败,并且可以重试或放弃。

CAS无锁算法

要实现无锁(lock-free)的非阻塞算法有多种实现方法,其中CAS(比较与交换,Compare and swap)是一种有名的无锁算法。CAS, CPU指令,在大多数处理器架构,包括IA32、Space中采用的都是CAS指令,CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

如果一个线程的失败或挂起不会导致其它线程也失败或挂起,那么这种算法就称为非阻塞算法;如果算法的每个步骤中都存在某个线程能够执行下去,那么这种算法就称为无锁算法。如果在算法中仅使用CAS来协调线程之间的同步,那它既是无阻算法,又是无锁算法。

当竞争程序不高时,基于CAS的性能应该是远远高于锁的性能的,而在没有竞争的情况下这种差距会更大,如果要快速获取无竞争的锁,那至少需要一次CAS操作再加上加锁、释放操作,因此采用悲观锁的实现在完全无竞争的情况下也会比基于CAS的实现在一般的情况下执行更多的操作。但是在竞争非常激烈的情况下,CAS的性能会比锁定的方式差很多,因为CAS是通过不断地重试、回退的方式处理竞争的,在竞争激烈的情况下会消耗很多的CPU资源。

我们来看一个最简单的例子:一个线程安全的计数器:

public class BlockedCounter {
    private int n;

    public synchronized int getValue() {
        return n;
    }

    public synchronized void increase() {
        n++;
    }
}

BlockedCounter采用悲观同步锁的方式来实现,因为n++操作并不是一个原子操作,所以getValueincrease方法都需要用synchronized关键字修饰来保证线程安全。

采用CAS算法的实现不会加锁,只是在并发竞争的情况下需要检查失败并重试:

public class CasCounter {
    private AtomicInteger n;

    public int getValue() {
        return n.get();
    }

    public void increase() {
        while (true) {
            int old = n.get();
            if (n.compareAndSet(old, old+1)) {
                return;
            }
        }
    }
}

创建非阻塞算法的关键点在于,在数据的一致性的前提下找出如何将原子修改的范围缩小到单个变量上。使用CAS实现栈也是很容易的,因为栈这种数据结构,我们只需要维护一个栈顶指针即可;在出栈或入栈的时候,只涉及到栈顶指针一个变量的更新操作。下面是采用CAS实现的栈:

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();

    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = head.get();
            newHead.next = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = head.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!head.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    static class Node<E> {
        final E item;
        Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

以上的示例(计数器和堆栈)都是非常简单的非阻塞算法,一旦掌握了在循环中使用 CAS,就可以容易地模仿它们。对于更复杂的数据结构,非阻塞算法要比这些简单示例复杂得多,因为像链表、树或哈希表可能涉及对多个指针的更新。CAS只支持对单一指针的原子性条件更新。所以,要构建一个非阻塞的链表、树或哈希表,需要找到一种方式,不仅需要用CAS更新多个指针,同时还必须保证数据结构处于一致的状态。

为此,我们需要一些技巧。第一个技巧是即使在一个包含多个步骤的更新操作中,也要保证数据结构总是处于一致的状态。第二个技巧是当线程B到达时发现线程A正在修改数据结构,那么在数据结构中应该有足够多的信息,使得B能够帮忙A完成更新操作。如果B帮忙A完成了更新操作,那么B可以执行自己的操作,而不用等待A的操作完成;当A试图完成其操作时,会发现B已经替它完成了。

这种 “帮助邻居” 的要求,对于让数据结构免受单个线程失败的影响,是必需的。如果线程发现数据结构正处在被其他线程更新的中途,然后就等候其他线程完成更新,那么如果其他线程在操作中途失败,这个线程就可能永远等候下去。即使不出现故障,这种方式也会提供糟糕的性能,因为新到达的线程必须放弃处理器,导致上下文切换,或者等到自己的时间片过期(而这更糟)。

单向链表的结构如下图所示,它有两个指针,分别是头指针head指向哑节点,和尾指针tail指向最后一个节点;哑节点的是一个存放空值的节点,它指向第一个真实的节点,当链表为空时,哑节点的next指针为空。

链表结构

采用CAS算法实现的单向链表结构如下:

public class LinkedQueue<E> {
    private static class Node<E> {
        E item;
        AtomicReference<Node<E>> next;

        Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<Node<E>>(next);
        }
    }

    private final Node<E> dummy = new Node<E>(null, null);
    private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);
    private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);

    public boolean put(E e) {
        Node<E> newNode = new Node<E>(e, null);
        while (true) {
            Node<E> t = tail.get();
            Node<E> residue = t.next.get();

            if (t == tail.get()) {
                if (residue == null) {
                    if (t.next.compareAndSet(null, newNode)) {
                        tail.compareAndSet(t, newNode);
                        return true;
                    }
                } else {
                    tail.compareAndSet(t, residue); // A
                }
            }
        }
    }

    public E take() {
        while (true) {
            Node<E> h = head.get();
            Node<E> t = tail.get();
            Node<E> first = h.next.get();

            if (h == head.get()) {
                if (h == t) {
                    if (first == null) {
                        return null; // C
                    } else {
                        tail.compareAndSet(t, first); // B
                    }
                } else if (head.compareAndSet(h, first)) {
                    E e = first.item;
                    if (e != null) {
                        first.item = null;
                        return e;
                    }
                }
            }
        }
    }
}

当向链表结构增加节点的时候,涉及两个指针的更新操作:第一个操作是把链表中的最后一个节点的next指针指向新增的节点;第二个操作是更新tail指针,让它也指向新增的节点。如果第一个操作成功了而第二个操作失败了,或者在两个操作中间线程由于CPU调度而挂起,那链表就会处于一个中间状态,如下:

链表结构-增加节点时的中间状态

put方法中A处就是为了处理“线程B帮助线程A完成第二个操作”这种情况的,通过判断链表中最后一个节点的next指针是否为空可以得知当前链接是否处于中间状态。

当向链表结构删除节点(即调用take方法)时,只涉及一个指针的更新,即哑节点的next指针。但是空链表增加第一个节点的时候有可能处于下图所示的中间状态:

链表结构-增加第一个节点时的中间状态

在这种情况下(通过头指针与尾指针相同,并且dummy节点的next指针不为空判断),调用take方法的B线程也需要帮忙增加节点的A线程去完成第二个操作。put方法的B处就属于这种情况。当头指针与尾指针相同,但是dummy节点的next指针为空的话,说明链表是空的,此时take方法直接返回空即可,即C处。

ABA问题

虽然在无竞争、或竞争不激烈的情况下,性能远远好于锁的实现,但是它会导致ABA问题。

CAS的语义是“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,但是,“相等”并不真的意味着“未被修改”。 另一个线程可能会把变量的值从A改成B,又从B改回成A。 这就是ABA问题。

很多情况下,ABA问题不会影响你的业务逻辑因此可以忽略。但有时不能忽略,要解决这个问题,一般的做法是给变量关联一个递增版本号,在更新时不但需要比较变量的值,还要比较变量的版本号。在Java中AtomicStampedReference类就是为了解决ABA问题而存在的,关于它的使用也很简单,查阅一下API就能明白,在此不再过多介绍了。