Java Concurrency - Utility

这篇博文是Java Concurrency系列博文的第二篇。在这篇博文,我们将会来共同探索Java多线程编程中常用到的机制,包括Latch、Barrier、Future和Semaphore。

PS:本文主要参考了Java Concurrentcy in Pratice一书,如有纰漏,请谅解。

java.util.concurrent.atomic

AtomicBoolean                        - 可原子更新的布尔变量
AtomicInteger                        - 可原子更新的int变量
AtomicIntegerArray                    - int数组,包含的元素都是可原子更新的
AtomicIntegerFieldUpdater<T>        - 基于反射的实用工具,可以对指定类的指定volatile int字段进行原子更新
AtomicLong                            - 可原子更新的long变量
AtomicLongArray                        - long数组,包含的元素都是可原子更新的
AtomicLongFieldUpdater<T>            - 基于反射的实用工具,可以对指定类的指定volatile long字段进行原子更新。用于原子数据结构,该结构中同一节点的几个字段都独立受原子更新控制。
AtomicMarkableReference<V>            - 将对象引用关联一个标记比特,可原子更新
AtomicReference<V>                    - 可原子更新的对象引用
AtomicReferenceArray<E>                - 对象引用数组,包含的元素都是可原子更新的
AtomicReferenceFieldUpdate<T, V>    - 基于反射的实用工具,可以对指定类的指定volatile引用字段进行原子更新
AtomicStampedReference<V>            - 将对象引用关联一个integer“标签”,可原子更新

java.util.concurrent.atomic包所提供的类,不基于锁机制,实现单一变量的线程安全。这个包中的类,在volatile变量、属性和数组元素的基础上,增加了原子条件更新操作:boolean compareAndSet(expectedValue, updateValue);。这个方法可以原子更新某个变量。如果这个变量当前的值是expectedValue,就会将一个变量设置为updateValue。如果成功更新,则会返回true。这个包中的类同样提供了一些获取和非条件设置变量方法,以及弱条件原子更新操作weakComparedAndSet

这些方法的规范,让实现可以采用当代处理器的高效机器级别原子操作指令,来实现原子操作。但是在一些平台,可能会需要使用到内部锁机制。因此这些方法严格上来说,并不都是非阻塞的 - 一个线程在执行这些操作的时候可能会被隐形地被阻塞。

AtomicBooleanAtomicIntegerAtomicLongAtomicReference的示例各自提供相应类型的变量的访问和更新操作,以及其他一些实用工具的原子操作。比如,AtomicLongAtomicInteger提供了原子增量方法。比如:

class Sequencer {
    private final AtomicLong sequenceNumber = new AtomicLong(0);
    public long next() {
        return sequenceNumber.getAndIncrement();
    }
}

原子变量获取和更新的内存效果,总体上遵循volatile的规则:

  • get跟读取volatile变量的内存效果一样;
  • set跟写入(赋值)volatile变量的内存效果一样;
  • lazySet,跟写入(赋值)volatile变量的内存效果一样,除了它允许后续的内存操作进行重排(专指那些自身没有对一般non-volatile变量的写操作的有重排约束的操作);
  • weakCompareAndSet以原子方式读取和条件写入某个变量,可能发生意外失败,并且不提供排序保证,但对于字段中的其他更改不一定确保原子性;
  • compareAndSet和其他读取-然后-写入的操作,比如getAndIncrement,跟读取和写入volatile变量的内存效果一样。
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p>May <a href="package-summary.html#Spurious">fail spuriously</a>
* and does not provide ordering guarantees, so is only rarely an
* appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return true if successful.
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

上面是java 1.7的实现代码,从中我们可以看到compareAndSetweakCompareAndSet的实现其实是一样的。但是从注释中,我们可以看到,weakCompareAndSet可能会发生意外失败,只有在很少的情况下才适合来替代compareAndSet。虽然目前的实现是一样的,但这可能是暂时的,将来可能会不一样,所以在使用的时候,我们还是应该按照接口说明来使用。

除了包含表示单个值的类之外,此包还包含 Updater 类,该类可用于获取任意选定类的任意选定volatile字段上的compareAndSet操作。AtomicReferenceFieldUpdaterAtomicIntegerFieldUpdaterAtomicLongFieldUpdater是基于反射的实用工具,可以提供对关联字段类型的访问。它们主要用于原子数据结构中,该结构中同一节点(例如,树节点的链接)的几个 volatile 字段都独立受原子更新控制。这些类在如何以及何时使用原子更新方面具有更大的灵活性,但相应的弊端是基于映射的设置较为拙笨、使用不太方便,而且在保证方面也较差。

AtomicMarkableReference类将单个布尔值与引用关联起来。例如,可以在数据结构内部使用此位,来标记引用的对象在逻辑上已被删除。AtomicStampedReference类将整数值与引用关联起来。例如,这可用于表示与更新系列对应的版本号。

Atomic类不能完全用来替代java.lang.Integer等相关类。Atomic类并不提供诸如hashCodecompareTo等方法。因为atomic变量是可变的,不适合用来当做哈希表的键值。另外,Atomic类仅提供常用的类型。例如,没有表示 byte 的原子类。如果需要这么做,可以使用AtomicInteger来保持byte值,然后再进行适当的强制转换。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;

/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
...
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
...
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
...
}

AtomicInteger的实现代码,可以看到AtomicInteger类只有三个属性unsafevalueOffsetvalue.其中,unsafe是java提供的获得对对象内存地址访问的类,注释已经清楚的写出了,它的作用就是在更新操作时提供“比较并替换”的作用。实际上就是AtomicInteger中的一个工具类。valueOffset是用来记录value本身在内存的偏移地址的,主要是为了在更新操作的时候方便在内存中找到value的位置。value是用来存储整数的时间变量,这里被声明为volatile,就是为了保证在更新操作时,当前线程可以拿到value最新的值。

为了更好地理解AtomicInteger的内部机理,我们来看一下方法getAndIncrement的实现。这个方法中主要是调用了方法compareAndSet,而compareAndSet则是调用了unsafecompareAndSwapInt方法,也即使用unsafe的native方法,从而实现高效的硬件级别原子操作。所以,AtomicInteger的秘密就在于Unsafe

通过上面的源代码,我们知道AtomicInteger是通过Unsafe.getUnsafe();方法来初始化unsafe。在一般情况下,我们是拿不到该类的实例的,当然jdk库里面是可以随意使用的。

static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
}

上面的这个静态代码块是用来获取AtomicInteger实例中的value属性在内存中的位置valueOffset。这里使用了UnsafeobjectFieldOffset方法。这个方法是unsafenative方法,用来获取一个给定的静态属性的位置。

AtomicInteger中的多个地方都使用到了valueOffset,比如compareAndSet

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

这里,如果valueOffset位置包含的值与expect值相同,则更新valueOffset位置的值为update,并返回true,否则不更新,返回false

关于UnsafecompareAndSwapInt方法的具体实现,感兴趣的读者可以自行去这里http://www.docjar.com/html/api/sun/misc/Unsafe.java.html进行了解,这里就不再详细分析。


Latch

Latch相当于一个大门。在Latch到达终止的状态之前,大门都是关闭的,此时所有线程都会被阻塞。只有latch到达了终止状态,大门才会打开,然后线程才能通过。一旦Latch达到终止状态,Latch是不能再改变状态,会一直保持打开。

Latch常用来确保在其他一次性的活动完成之前,阻止特定的活动继续执行。比如说:

  • 在所需的资源没有准备好之前,某个计算任务不能提前执行。一个只有两个状态的Latch可以用来表示“资源是否已经准备好”。
  • 在一个任务所依赖的其他任务没有开始之前,这个任务不能提前开始。每一个任务都与一个二值Latch相关联。在开始某一项任务之前,首先等待其所依赖的其他任务的Latch。然后在任务结束之后,释放这个任务关联的Latch,以便让依赖与这个任务的其他任务可以继续执行。

CountDownLatch

CountDownLatchLatch的一个实现,允许一个或者多个线程等待一组事件发生。CountDownLatch的内部维护了一个计数器。初始化CountDownLatch时需要指定计数器的初始值,用来表示需要等待完成的事件的个数。每调用一次countDown方法,表示其中一个事件已经完成,计数器的值将减一。如果计数器没有为0,那调用wait方法的所有的线程都会被阻塞,直到计数器减为0(也即所有的事件都发生了),线程被中断,或者等待超时。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end-start;
}
}


Barrier

Latch就像是一次性筷子,只能用一次,一旦到达终止状态,就不能再改变,不能重复使用。但我们很多时候是要重复同样的动作,这个时候就需要使用Barrier。与Latch相似,Barrier同样可以阻塞线程,直到特定的事件发生。它们的区别就在于Barrier所有的线程都必须都达到某个屏障点才可以继续运行,也就是说Barrier等待的是线程,而Latch等待的是事件。就比如说,你要参加同学聚会,然后收到一封邮件,里面写着“大家明天6点在麦当劳那里等,如果你先到,那你就在那里等其他人,直到所有人都到了之后,我们再来商量去哪里玩。”,这就是Barrier

CyclicBarrier

CyclicBarrier允许一组线程互相等待,直到该组线程全部到达某个屏障点。在创建CyclicBarrier的时候,需要指定线程组中线程的数量。调用CyclicBarrier对象的await方法,表示当前线程已到达屏障点,然后等待其他线程到达。当所有线程到达公共屏障点后,CyclicBarrier对象将打开,释放线程组,然后重置CyclicBarrier对象的状态。因此CyclicBarrier是可以循环使用的。如果有线程在等待期间超时或者被中断, 该CyclicBarrier对象被视为已损坏,随后对await方法的调用都要抛出BrokenBarrierException异常。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;

public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}
});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}

private class Worker implements Runnable {
private final Board board;

public Worker(Board board) {this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();}
}
}

Exchanger

Exchanger,是另外一种Barrier,可以用来完成线程间的数据交换。

Exchanger非常适合用于两个异步线程之间交换数据。比如说,有两个线程,一个负责往缓冲填数据,一个负责从缓冲取数据,那么这两个线程就可以通过Exchanger来交换数据。每个线程都在调用exchange方法时给出某个对象,并接受其他线程返回时给出的对象。两个线程通过Exchanger来交换的信息(也就对象),对于两个线程来说都是线程安全的。


Future

FutureTask实现了Future接口,常用来获得某个计算任务的结果。

FutureTask常用的构造函数为FutureTask(Callable callable),使用Callable封装任务,而不是Runnable(对于Callable和Runnable的区别,请参看这里)。FutureTask对象具有三种状态: 等待运行,正在运行,已完成。不管是计算完成,抛出异常或者被取消,都会使得FutureTask的状态变为已完成。一旦FutureTask进入完成状态,那它就会一直保持这个状态,也就是FutureTask是不可以重用的。

通过FutureTask的get方法可以返回任务的执行结果。如果FutureTask对象处于已完成状态,那get方法将立即返回计算结果,否则get方法会阻塞,直到FutureTask转变为已完成状态。

FutureTask的常见使用场景是封装一个耗时任务,然后提前开始计算,当需要计算结果时,再调用其get方法,这样可以减少等待计算完成的时间。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public class Preloader {
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});

private final Thread thread = new Thread(future);

public void start() { thread.start(); }

public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw launderThrowable(cause);
}
}
}


Semaphore

Semaphore用于管理许可。在创建Semaphore对象的时候,需要指定许可的最大个数。线程首先通过acquare方法来申请获得许可(如果有的话),然后在完成之后再调用release方法来释放许可。要是当前Semaphore已经没有可用的许可,那申请的线程就会被阻塞,直到有可用的许可(或者直到被中断或者超时)。许可是不与线程绑定的,一个线程申请的许可,可以在另一个线程里释放。

Semaphore通常用于实现资源池,如数据库连接池等。另外,Semaphore也可以用于将任何集合变成阻塞式的有界集合。

特别的是,当许可的数量为一时,Semaphore就可当做互斥锁来使用,可用来实现非重入的锁机制。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public class BoundedHashSet<T> { private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
}
finally {
if (!wasAdded)
sem.release();
}
}

public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}

补充

Callable & Runnable

Callable接口和Runnable接口相似,都是接口,区别就是Callable需要实现call方法,而Runnable需要实现run方法;并且,call方法还可以返回任何对象,无论是什么对象,JVM都会当作Object来处理。但是如果使用了泛型,我们就不用每次都对Object进行转换了。

两者的不同之处:

  1. Callable可以返回一个类型V,而Runnable不可以
  2. Callable能够抛出checked exception,而Runnable不可以。
  3. Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的
  4. Callable和Runnable都可以应用于executors。而Thread类只支持Runnable.