同步工具类¶
CountDownLatch¶
CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件的发生。
1. 创建对象
CountDownLatch startGate = new CountDownLatch(3);
2. 在某处await
startGate.await();
3. 调用countdown()方法3次后将使await的线程不再阻塞
startGate.countdown();
FutureTask¶
表示一种抽象的可生成结果的计算。将计算结果从执行计算的线程传递到获取这个结果的线程。 Future.get将阻塞直到任务进入完成状态,然后返回结果或抛出异常。
1. 创建对象
FutureTask<XxClass> future = new FutureTask<XxClass>(new Callable<XxClass>(){
public XxClass call() {
return XxClass();
}
});
2. future.start();
3. 使用get异步阻塞获取计算结果
try{
future.get();
...
} catch(ExecutionException e) {
...
}
Semaphore¶
用于控制同时执行操作的数量,实现某种资源池,对容器施加边界。
Semaphore sem = new Semaphore(1);
sem.acquire();
...
sem.release();
栅栏(Barrier)¶
CyclicBarrier¶
CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。(即栅栏是可重复利用的)
1. 创建CyclicBarrier对象,当3个线程到达栅栏位置时执行run()方法
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable(){
public void run(){
...
}
});
2. 在某个线程的run方法中调用await()等待其他线程到达栅栏位置
public void run() {
while(xx) {
...
try {
barrier.await();
} catch(InterruptedException ex) {
...
// 如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就认为被打破了
} catch(BrokenBarrierException ex) {
...
}
}
}
Exchanger¶
另一种形式的栅栏是Exchanger, 它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时会非常有用。例如用来交换缓冲区。 数据的交换时机取决于应用程序的响应需求。最简单的策略是当缓冲区被填满或者空时进行交换。这种策略会使交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一种策略是当缓冲被填充到一定程度并保持一段时间后也进行交换。
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}