Java多线程笔记(7)-常用并发工具类的使用

ReentrantLock

概述

ReentrantLock,可重入锁,即支持一个线程对资源的重复加锁。ReentrantLock实现的锁与synchronized非常类似,但是也是有一些区别,大体如下:

  • 在锁的实现方面:synchronzied是JVM实现的,源码使用C++编写,而ReentrantLock是JDK实现,源码使用Java编写

  • 在使用方面:ReentrantLock实现了Lock接口,需要手动解锁,synchronized执行完代码块之后会自动解锁

  • 可中断:ReentrantLock可中断,但是synchronized不行

  • 公平锁:公平锁指的是多个线程在等待同一个锁的时候,必须按照申请锁的时间顺序来依次获得锁。ReentrantLock支持设置公平锁,而synchronzied中的锁的非公平的

  • 锁超时:ReentrantLock在获取锁的时候可以设置超时时间,而synchronized会一直等待

  • 等待队列:ReentrantLock可以提供多个Condition等待队列,而synchronized只有一个等待队列

非公平锁指的是阻塞队列内公平,阻塞队列外非公平。事实上,公平的锁机制往往没有非公平的效率更高,公平锁能够减少饥饿发生的概率,等待越久的请求越能够得到优先满足

使用

lock操作

基本使用,ReentrantLock实现了Lock接口,提供了其中的相关操作。

  • 构造方法:ReentrantLock lock = new ReentrantLock()
  • 加锁:public void lock()
    • 如果锁没有被占用,则将锁中计数state设置为1
    • 如果执行锁重入,则令计数state增加1
    • 如果锁被其他线程占用,则阻塞在锁上
  • 解锁:public void unlock()
    • 如果当前线程是该锁的持有者,则保持计数递减
    • 如果计数为0,则表示锁被释放
    • 如果当前线程不是该锁的持有者,则抛出异常
  • 可打断:public void lockInterruptibly()
    • 如果没有竞争,则获取对象锁
    • 如果有竞争,则加入阻塞队列,但是可以被其他线程打断
  • 锁超时:
    • public boolean tryLock():尝试获取锁,获取到返回 true,获取不到直接放弃,不进入阻塞队列
    • public boolean tryLock(long timeout, TimeUnit unit):在给定时间内获取锁,获取不到就退出

公平锁

公平锁与非公平锁在Reentrant中,主要区别在于其中同步器的实现。公平锁对应的同步器为FairSync,而非公平锁对应的同步器为NonfairSync,Reentrant默认是非公平锁,如果给构造方法提供true的布尔值,则可以生成公平锁

1
2
3
4
5
6
7
8
9
// 指定是否是公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

// 默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}

公平锁和非公平锁的主要区别在于获取锁的逻辑。对于非公平锁来说,只需要CAS设置同步状态成功,就表示当前线程获取了锁。而对于公平锁来说,在进行获取锁之前,需要加入同步队列中,并检查当前节点是否有前驱节点,如果没有才能继续获取锁,否则需要等待前驱节点对应的线程获取并释放锁之后,才能继续获取锁

ReentrantReadWriteLock

概述

前面提到的ReentrantLock排他锁,指的是在同一个时刻只允许有一个线程进行访问。

但是我们经常会遇到的一种情况是读写锁,读写锁允许读-读并发,在同一时刻允许多个读线程访问,但是在写线程访问的时候,所有的读线程和其他写线程均被阻塞。具体来说,读写锁维护了一对锁,分别是读锁和写锁。读锁是共享锁,写锁是独占锁。通过分离读锁和写锁,提高了并发性。

ReentrantReadWriteLock是Java并发包中提供的一个读写锁的实现,它实现了读写锁接口ReadWriteLock,该接口中有两个抽象方法readLock()writeLock(),分别返回读锁和写锁。

使用

ReentrantReadWriteLock的使用分为读锁和写锁,分别在不同情况下进行使用。这里同样可以设置公平和非公平锁,只需要在构造方法中指定true即可。

1
2
3
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock r = rw.readLock();
ReentrantReadWriteLock.WriteLock w = rw.writeLock();

在使用的时候需要注意:

  • 读锁不支持条件变量
  • 重入时不支持锁升级:持有读锁的线程不能继续获取写锁,需要先释放读锁,然后再去获取写锁
  • 重入时支持锁降级:持有写锁的线程可以继续获取读锁

ReentrantReadWriteLock内部的读锁和写锁使用的是同一个Sync同步器,原理与ReentrantLock相比没有太多特殊之处。读写锁共用一个state,其中写锁使用state的低16位,而读锁使用state的高16位

除去接口方法获取读锁和写锁之外,ReentrantReadWriteLock还提供了一些便于外界监控其内部工作状态的方法,如下所示:

方法名称 描述
int getReadLockCount() 返回当前读锁被获取的次数,重入次数重复计算
int getReadHoldCount() 返回当前线程获取读锁的次数,重入次数重复计算
boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

StampedLock

StampedLock也是一种读写锁,它的目的是进一步优化读的性能。和它的名字呼应,在使用StampedLock的时候,使用读锁和写锁都必须配合戳来使用。

1
2
3
4
5
6
7
8
9
StampedLcok lock = new StampedLock();

// 读锁的使用
long r_stamp = lock.readLock();
lock.unlockRead(r_stamp);

// 解锁的使用
long w_stamp = lock.writeLock();
lock.unlockWrite(w_stamp)

StampedLock进一步优化了读的性能,是因为它底层使用了乐观读的思想。在读取完毕之后,StampedLock会进行一次戳校验,如果校验通过,则表示这期间没有其他线程的写操作,数据可以安全使用;如果校验没有通过,则需要重新获取读锁,保证数据的一致性

1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}

需要注意的是,StampedLock不支持条件变量,不支持重入

Semaphore

Semaphore,信号量,联想到操作系统中的信号量。它可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理地使用公共资源。

Semaphore的基本使用如下:

构造方法:

  • public Semaphore(int permits):permits 表示许可线程的数量,会将其直接设置到内部同步器中的state上
  • public Semaphore(int permits, boolean fair):fair 表示公平性,如果设为 true,下次执行的线程会是等待最久的线程

PV操作:

  • public void acquire():表示获取许可
  • public void release():表示释放许可,acquire() 和 release() 方法之间的代码为同步代码

其他方法:

  • int availablePermis():返回此信号量中当前可用的许可数
  • int getQueueLength():返回正在等待获取许可证的线程数
  • boolean hasQueuedThreads():是否有线程正在等待获取许可
  • protected void reducePermits(int reduction):减少reduction个许可
  • protected Collection<Thread> getQueuedThreads():返回所有等待获取许可的线程集合

CountDownLatch

CountDownLatch允许一个或者多个线程等待其他线程完成操作。它可以用来进行线程同步线程,等待所有线程完成倒计时。

构造器如下:

  • public CountDownLatch(int count):初始化需要完成倒计时的步数,也可以将其看作是等待完成的count个执行步骤

常用API:

  • public void await():让当前线程等待,只有当计数器减到0的时候才会继续执行(当然也提供有超时时间的版本)
  • public void countDown():计数器进行减 1

举例:主线程等待其他线程执行结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CountDownLatch countDownLatch = new CountDownLatch(2);

new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("countdown -1");
countDownLatch.countDown();
}, "thread1").start();

new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("countdown -1");
countDownLatch.countDown();
}, "thread2").start();

// 主线程等待其他两个线程完成倒计时
log.debug("waiting...");
countDownLatch.await();
log.debug("over");

输出如下:thread1和2先后完成countDown之后,计数器变为0,主线程继续运行

1
2
3
4
20:23:11.345 [main] c.ThreadTest - waiting...
20:23:12.356 [thread1] c.ThreadTest - countdown -1
20:23:13.346 [thread2] c.ThreadTest - countdown -1
20:23:13.346 [main] c.ThreadTest - over

注意:

  • 计数器的值始终大于等于0,如果计数器为0,调用await无法阻塞当前线程
  • CountDownLatch无法重新初始化,也无法重新修改对象内部计数器的值

CountDownLatch与join的区别:

join同样能够完成等待线程结束操作,但是join是一个比较底层的API,并且是等待线程达到结束状态。而CountDownLatch是一个高级的API,更容易配合线程池等高级API进行使用。线程池中的线程进行复用,使用join不是等待不是很方便

CyclicBarrier

CyclicBarrier:循环屏障,用来进行线程协作,等待线程满足某个计数之后,触发继续执行。

常用方法:

  • public CyclicBarrier(int parties):屏障前需要的线程数目

  • public CyclicBarrier(int parties, Runnable barrierAction):用于在线程到达屏障 parties 时,执行 barrierAction

    • parties:代表多少个线程到达屏障开始触发线程任务
    • barrierAction:等够线程之后,执行的任务
  • public int await():线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障

举例:重复等待线程

如果使用CountDownLatch来完成重复等待线程,由于它不能重用,我们需要多次创建新的CountDownLatch对象,但是使用CyclicBarrier,就不需要重新创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
log.debug("task1 task2 finish...");
});

for (int i = 0; i < 3; i++) {
service.submit(() -> {
try {
Thread.sleep(1000);
log.debug("task1 run to barrier...");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});

service.submit(() -> {
try {
Thread.sleep(2000);
log.debug("task2 run to barrier...");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}

service.shutdown();

输出如下:

1
2
3
4
5
6
7
8
9
21:01:09.206 [pool-1-thread-1] c.ThreadTest - task1 run to barrier...
21:01:10.217 [pool-1-thread-2] c.ThreadTest - task2 run to barrier...
21:01:10.217 [pool-1-thread-2] c.ThreadTest - task1 task2 finish...
21:01:11.218 [pool-1-thread-2] c.ThreadTest - task1 run to barrier...
21:01:12.229 [pool-1-thread-1] c.ThreadTest - task2 run to barrier...
21:01:12.229 [pool-1-thread-1] c.ThreadTest - task1 task2 finish...
21:01:13.236 [pool-1-thread-1] c.ThreadTest - task1 run to barrier...
21:01:14.240 [pool-1-thread-2] c.ThreadTest - task2 run to barrier...
21:01:14.240 [pool-1-thread-2] c.ThreadTest - task1 task2 finish...

CountDownLatch vs CyclicBarrier:

  • CountDownLatch的计数器只能使用一次
  • CyclicBarrier的计数器可以使用reset方法重置
  • CyclicBarrier中还提供了其他有用方法
    • getNumberWaiting():获得阻塞的线程数量
    • isBroken():判断阻塞的线程是否被中断

Exchanger

Exchanger:交换器,是一个用于线程间协作的工具类,用于进行线程间的数据交换。

Exchanger会提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。线程之间通过exchange方法交换数据,线程1先执行到exchange之后,会等待第二个线程也执行到exchange,当两个线程都到达同步点之后,两个线程就可以交换数据

常用方法:

  • public Exchanger():创建一个新的交换器
  • public V exchange(V x):等待另一个线程执行到同步点,交换数据,返回交换得到的数据
  • public V exchange(V x, long timeout, TimeUnit unit):提供超时时间

示例:两个线程进行数据交换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Exchanger<Integer> exchanger = new Exchanger<>();

new Thread(() -> {
int value = 1;
log.debug("now value: {}", value);
try {
Thread.sleep(2000);
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("after exchange value: {}", value);
}, "thread1").start();

new Thread(() -> {
int value = 2;
log.debug("now value: {}", value);
try {
Thread.sleep(1000);
value = exchanger.exchange(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("after exchange value: {}", value);
}, "thread2").start();

输出如下,在两个线程都执行到exchange后进行数据交换

1
2
3
4
21:16:27.789 [thread1] c.ThreadTest - now value: 1
21:16:27.789 [thread2] c.ThreadTest - now value: 2
21:16:29.813 [thread2] c.ThreadTest - after exchange value: 1
21:16:29.813 [thread1] c.ThreadTest - after exchange value: 2

Java多线程笔记(7)-常用并发工具类的使用
http://example.com/2022/09/30/Java多线程笔记-7-常用并发工具类的使用/
作者
EverNorif
发布于
2022年9月30日
许可协议