这篇博文是Java Concurrency系列博文的第二篇。在这篇博文,我们将会来共同探索Java多线程编程中常用到的机制,包括Latch、Barrier、Future和Semaphore。
PS:本文主要参考了Java Concurrentcy in Pratice一书,如有纰漏,请谅解。
java.util.concurrent.atomic
AtomicBoolean - 可原子更新的布尔变量
AtomicInteger - 可原子更新的int变量
AtomicIntegerArray - int数组,包含的元素都是可原子更新的
AtomicIntegerFieldUpdater<T> - 基于反射的实用工具,可以对指定类的指定volatile int字段进行原子更新
AtomicLong - 可原子更新的long变量
AtomicLongArray - long数组,包含的元素都是可原子更新的
AtomicLongFieldUpdater<T> - 基于反射的实用工具,可以对指定类的指定volatile long字段进行原子更新。用于原子数据结构,该结构中同一节点的几个字段都独立受原子更新控制。
AtomicMarkableReference<V> - 将对象引用关联一个标记比特,可原子更新
AtomicReference<V> - 可原子更新的对象引用
AtomicReferenceArray<E> - 对象引用数组,包含的元素都是可原子更新的
AtomicReferenceFieldUpdate<T, V> - 基于反射的实用工具,可以对指定类的指定volatile引用字段进行原子更新
AtomicStampedReference<V> - 将对象引用关联一个integer“标签”,可原子更新
java.util.concurrent.atomic包所提供的类,不基于锁机制,实现单一变量的线程安全。这个包中的类,在volatile变量、属性和数组元素的基础上,增加了原子条件更新操作:boolean compareAndSet(expectedValue, updateValue);
。这个方法可以原子更新某个变量。如果这个变量当前的值是expectedValue
,就会将一个变量设置为updateValue
。如果成功更新,则会返回true
。这个包中的类同样提供了一些获取和非条件设置变量方法,以及弱条件原子更新操作weakComparedAndSet
。
这些方法的规范,让实现可以采用当代处理器的高效机器级别原子操作指令,来实现原子操作。但是在一些平台,可能会需要使用到内部锁机制。因此这些方法严格上来说,并不都是非阻塞的 - 一个线程在执行这些操作的时候可能会被隐形地被阻塞。
AtomicBoolean
,AtomicInteger
,AtomicLong
和AtomicReference
的示例各自提供相应类型的变量的访问和更新操作,以及其他一些实用工具的原子操作。比如,AtomicLong
和AtomicInteger
提供了原子增量方法。比如:
class Sequencer {
private final AtomicLong sequenceNumber = new AtomicLong(0);
public long next() {
return sequenceNumber.getAndIncrement();
}
}
原子变量获取和更新的内存效果,总体上遵循volatile
的规则:
get
跟读取volatile
变量的内存效果一样;set
跟写入(赋值)volatile
变量的内存效果一样;lazySet
,跟写入(赋值)volatile
变量的内存效果一样,除了它允许后续的内存操作进行重排(专指那些自身没有对一般non-volatile变量的写操作的有重排约束的操作);weakCompareAndSet
以原子方式读取和条件写入某个变量,可能发生意外失败,并且不提供排序保证,但对于字段中的其他更改不一定确保原子性;compareAndSet
和其他读取-然后-写入的操作,比如getAndIncrement
,跟读取和写入volatile
变量的内存效果一样。
1 | /** |
上面是java 1.7的实现代码,从中我们可以看到compareAndSet
和weakCompareAndSet
的实现其实是一样的。但是从注释中,我们可以看到,weakCompareAndSet
可能会发生意外失败,只有在很少的情况下才适合来替代compareAndSet
。虽然目前的实现是一样的,但这可能是暂时的,将来可能会不一样,所以在使用的时候,我们还是应该按照接口说明来使用。
除了包含表示单个值的类之外,此包还包含 Updater 类,该类可用于获取任意选定类的任意选定volatile
字段上的compareAndSet
操作。AtomicReferenceFieldUpdater
、AtomicIntegerFieldUpdater
和 AtomicLongFieldUpdater
是基于反射的实用工具,可以提供对关联字段类型的访问。它们主要用于原子数据结构中,该结构中同一节点(例如,树节点的链接)的几个 volatile 字段都独立受原子更新控制。这些类在如何以及何时使用原子更新方面具有更大的灵活性,但相应的弊端是基于映射的设置较为拙笨、使用不太方便,而且在保证方面也较差。
AtomicMarkableReference
类将单个布尔值与引用关联起来。例如,可以在数据结构内部使用此位,来标记引用的对象在逻辑上已被删除。AtomicStampedReference类将整数值与引用关联起来。例如,这可用于表示与更新系列对应的版本号。
Atomic类不能完全用来替代java.lang.Integer
等相关类。Atomic类并不提供诸如hashCode
和compareTo
等方法。因为atomic变量是可变的,不适合用来当做哈希表的键值。另外,Atomic类仅提供常用的类型。例如,没有表示 byte 的原子类。如果需要这么做,可以使用AtomicInteger
来保持byte值,然后再进行适当的强制转换。
1 |
|
从AtomicInteger
的实现代码,可以看到AtomicInteger
类只有三个属性unsafe
,valueOffset
和value
.其中,unsafe
是java提供的获得对对象内存地址访问的类,注释已经清楚的写出了,它的作用就是在更新操作时提供“比较并替换”的作用。实际上就是AtomicInteger
中的一个工具类。valueOffset
是用来记录value
本身在内存的偏移地址的,主要是为了在更新操作的时候方便在内存中找到value
的位置。value
是用来存储整数的时间变量,这里被声明为volatile
,就是为了保证在更新操作时,当前线程可以拿到value
最新的值。
为了更好地理解AtomicInteger
的内部机理,我们来看一下方法getAndIncrement
的实现。这个方法中主要是调用了方法compareAndSet
,而compareAndSet
则是调用了unsafe
的compareAndSwapInt
方法,也即使用unsafe的native方法,从而实现高效的硬件级别原子操作。所以,AtomicInteger
的秘密就在于Unsafe
。
通过上面的源代码,我们知道AtomicInteger
是通过Unsafe.getUnsafe();
方法来初始化unsafe
。在一般情况下,我们是拿不到该类的实例的,当然jdk库里面是可以随意使用的。
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
上面的这个静态代码块是用来获取AtomicInteger
实例中的value
属性在内存中的位置valueOffset
。这里使用了Unsafe
的objectFieldOffset
方法。这个方法是unsafe
的native
方法,用来获取一个给定的静态属性的位置。
在AtomicInteger
中的多个地方都使用到了valueOffset
,比如compareAndSet
:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
这里,如果valueOffset
位置包含的值与expect
值相同,则更新valueOffset
位置的值为update
,并返回true
,否则不更新,返回false
。
关于Unsafe
的compareAndSwapInt
方法的具体实现,感兴趣的读者可以自行去这里http://www.docjar.com/html/api/sun/misc/Unsafe.java.html进行了解,这里就不再详细分析。
Latch
Latch
相当于一个大门。在Latch
到达终止的状态之前,大门都是关闭的,此时所有线程都会被阻塞。只有latch到达了终止状态,大门才会打开,然后线程才能通过。一旦Latch
达到终止状态,Latch
是不能再改变状态,会一直保持打开。
Latch
常用来确保在其他一次性的活动完成之前,阻止特定的活动继续执行。比如说:
- 在所需的资源没有准备好之前,某个计算任务不能提前执行。一个只有两个状态的
Latch
可以用来表示“资源是否已经准备好”。 - 在一个任务所依赖的其他任务没有开始之前,这个任务不能提前开始。每一个任务都与一个二值
Latch
相关联。在开始某一项任务之前,首先等待其所依赖的其他任务的Latch
。然后在任务结束之后,释放这个任务关联的Latch
,以便让依赖与这个任务的其他任务可以继续执行。
CountDownLatch
CountDownLatch
是Latch
的一个实现,允许一个或者多个线程等待一组事件发生。CountDownLatch
的内部维护了一个计数器。初始化CountDownLatch
时需要指定计数器的初始值,用来表示需要等待完成的事件的个数。每调用一次countDown
方法,表示其中一个事件已经完成,计数器的值将减一。如果计数器没有为0,那调用wait
方法的所有的线程都会被阻塞,直到计数器减为0(也即所有的事件都发生了),线程被中断,或者等待超时。
1 |
|
Barrier
Latch
就像是一次性筷子,只能用一次,一旦到达终止状态,就不能再改变,不能重复使用。但我们很多时候是要重复同样的动作,这个时候就需要使用Barrier
。与Latch
相似,Barrier
同样可以阻塞线程,直到特定的事件发生。它们的区别就在于Barrier
所有的线程都必须都达到某个屏障点才可以继续运行,也就是说Barrier
等待的是线程,而Latch
等待的是事件。就比如说,你要参加同学聚会,然后收到一封邮件,里面写着“大家明天6点在麦当劳那里等,如果你先到,那你就在那里等其他人,直到所有人都到了之后,我们再来商量去哪里玩。”,这就是Barrier
。
CyclicBarrier
CyclicBarrier
允许一组线程互相等待,直到该组线程全部到达某个屏障点。在创建CyclicBarrier
的时候,需要指定线程组中线程的数量。调用CyclicBarrier
对象的await
方法,表示当前线程已到达屏障点,然后等待其他线程到达。当所有线程到达公共屏障点后,CyclicBarrier
对象将打开,释放线程组,然后重置CyclicBarrier
对象的状态。因此CyclicBarrier
是可以循环使用的。如果有线程在等待期间超时或者被中断, 该CyclicBarrier
对象被视为已损坏,随后对await
方法的调用都要抛出BrokenBarrierException
异常。
1 |
|
Exchanger
Exchanger,是另外一种Barrier,可以用来完成线程间的数据交换。
Exchanger非常适合用于两个异步线程之间交换数据。比如说,有两个线程,一个负责往缓冲填数据,一个负责从缓冲取数据,那么这两个线程就可以通过Exchanger来交换数据。每个线程都在调用exchange
方法时给出某个对象,并接受其他线程返回时给出的对象。两个线程通过Exchanger来交换的信息(也就对象),对于两个线程来说都是线程安全的。
Future
FutureTask
实现了Future
接口,常用来获得某个计算任务的结果。
FutureTask常用的构造函数为FutureTask(CallableFutureTask
进入完成状态,那它就会一直保持这个状态,也就是FutureTask
是不可以重用的。
通过FutureTask的get方法可以返回任务的执行结果。如果FutureTask对象处于已完成状态,那get方法将立即返回计算结果,否则get方法会阻塞,直到FutureTask转变为已完成状态。
FutureTask的常见使用场景是封装一个耗时任务,然后提前开始计算,当需要计算结果时,再调用其get方法,这样可以减少等待计算完成的时间。
1 |
|
Semaphore
Semaphore
用于管理许可。在创建Semaphore
对象的时候,需要指定许可的最大个数。线程首先通过acquare
方法来申请获得许可(如果有的话),然后在完成之后再调用release
方法来释放许可。要是当前Semaphore
已经没有可用的许可,那申请的线程就会被阻塞,直到有可用的许可(或者直到被中断或者超时)。许可是不与线程绑定的,一个线程申请的许可,可以在另一个线程里释放。
Semaphore
通常用于实现资源池,如数据库连接池等。另外,Semaphore
也可以用于将任何集合变成阻塞式的有界集合。
特别的是,当许可的数量为一时,Semaphore
就可当做互斥锁来使用,可用来实现非重入的锁机制。
1 |
|
补充
Callable & Runnable
Callable
接口和Runnable
接口相似,都是接口,区别就是Callable
需要实现call
方法,而Runnable
需要实现run
方法;并且,call
方法还可以返回任何对象,无论是什么对象,JVM都会当作Object
来处理。但是如果使用了泛型,我们就不用每次都对Object
进行转换了。
两者的不同之处:
- Callable可以返回一个类型V,而Runnable不可以
- Callable能够抛出checked exception,而Runnable不可以。
- Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的
- Callable和Runnable都可以应用于executors。而Thread类只支持Runnable.