本文最后更新于:2022-09-30T15:39:50+08:00
Lock
概述
在前面的学习中,我们知道synchronized
关键字能够帮助我们完成加锁操作,保证代码的原子性、有序性和可见性。而在Java中,除了这个关键字以外还提供了相关的锁类,来帮助我们完成相应的功能。这些能够帮助我们完成锁功能的类,都有一个公共接口Lock
,包名为java.util.concurrent.locks
。接口中有如下抽象方法:
void lock()
:进行加锁,获得锁
void lockInterruptibly() throws InterruptedException
:进行加锁,获得可以打断的锁。可中断指的是可以在获取锁的过程中可以被其他线程打断
boolean tryLock()
:尝试获取锁,返回是否成功获取锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException
:在给定时间内尝试获取锁,返回是否成功获取锁
void unlock()
:解锁操作
Condition newCondition()
:获取条件变量进行等待
基本使用
Lock本身只是一个接口,规定了锁相关类的操作。这里以其中一个具体实现ReentrantLock的使用来说明锁的基本使用,ReentrantLock与synchronzied类似,都提供可重入锁,但是仍然有一些不同之处。
加锁和解锁
Lock需要程序员手动完成锁的获取和锁的释放。如果线程一直不释放锁,那么可能会造成一些问题。因此为了让解锁操作一定执行,一般将其放在finally代码块中
1 2 3 4 5 6
| myLock.lock(); try { } finally { myLock.unlock(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| Lock myLock = new ReentrantLock();
new Thread(() -> { log.debug("start"); myLock.lock(); try { log.debug("获取到锁"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.debug("释放锁"); myLock.unlock(); } }, "thread1").start(); new Thread(() -> { log.debug("start"); myLock.lock(); try { log.debug("获取到锁"); } finally { log.debug("释放锁"); myLock.unlock(); } }, "thread2").start();
|
可打断
打断指的是我们可以打断一个阻塞在锁上的进程,通过异常捕获机制进行打断。但是普通的lock加锁操作不会抛出异常,而我们可以使用lockInterruptibly操作来进行锁的获取,打断后将其从阻塞队列中移除。
下面的代码中,主线程先获取到锁,然后启动thread1。由于主线程一直没有释放锁,因此thread1会一直阻塞在锁上。然后主线程打断thread1,通过异常捕获机制可以将thread1停止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Lock myLock = new ReentrantLock();
Thread thread1 = new Thread(() -> { try { log.debug("尝试获取锁"); myLock.lockInterruptibly(); } catch (InterruptedException e) { log.debug("没有获取到锁,被打断,直接返回"); return; } try { log.debug("获取到锁"); } finally { log.debug("释放锁"); myLock.unlock(); } }, "thread1");
myLock.lock(); thread1.start(); Thread.sleep(500); log.debug("打断thread1"); thread1.interrupt();
|
输出如下:
1 2 3
| 15:12:41.195 [thread1] c.ThreadTest - 尝试获取锁 15:12:41.697 [main] c.ThreadTest - 打断thread1 15:12:41.697 [thread1] c.ThreadTest - 没有获取到锁,被打断,直接返回
|
尝试获取锁
Lock接口中提供操作进行尝试获取锁,返回是否获取成功。我们可以通过if判断进行锁的获取。
普通的tryLock()
:下面的代码在tryLock的时候,主线程并没有释放锁,因此thread1无法获得锁,因此直接从else代码块退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| Lock myLock = new ReentrantLock();
Thread thread1 = new Thread(() -> { if (myLock.tryLock()) { try { log.debug("获取到锁"); } finally { log.debug("释放锁"); myLock.unlock(); } }else{ log.debug("没有获取到锁"); } }, "thread1");
myLock.lock(); log.debug("获取锁"); thread1.start(); Thread.sleep(500); log.debug("释放锁"); myLock.unlock();
|
带时限的tryLock(long time, TimeUnit unit)
:下面的代码我们设置了时限为1s。由于在thread1启动的1s之内,主线程释放了锁,那么thread1是能够最终获取到锁的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| Lock myLock = new ReentrantLock();
Thread thread1 = new Thread(() -> { try { if (myLock.tryLock(1,TimeUnit.SECONDS)) { try { log.debug("获取到锁"); } finally { log.debug("释放锁"); myLock.unlock(); } }else{ log.debug("没有获取到锁"); } } catch (InterruptedException e) { e.printStackTrace(); }
}, "thread1");
myLock.lock(); log.debug("获取锁"); thread1.start(); Thread.sleep(500); log.debug("释放锁"); myLock.unlock();
|
条件变量
在使用synchronized的过程中,我们说调用wait方法可以让线程进入WaitSet进行等待。Lock接口中提供的条件变量就可以类比WaitSet。但是在synchronized中,同一个锁的wait方法进入的都是一个WaitSet,而在Lock中可以使用多个条件变量,实现更加精确的划分。条件变量上的等待和唤醒分别对应await和signal方法
获取Condition对象:public Condition newCondition()
Condition 类 API:
void await()
:当前线程从运行状态进入等待状态,释放锁
void signal()
:唤醒一个等待在 Condition
上的线程,但是必须获得与该 Condition 相关的锁
使用流程:
- 使用 await / signal
前都需要获得锁,只有获得锁的线程才能调用这些方法
- 线程执行 await之后,会释放锁进入ConditionObject等待
- 线程被 signal唤醒之后,重新竞争Lock锁
- 线程在条件对象队列中被打断会抛出中断异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| Lock myLock = new ReentrantLock(); Condition condition1 = myLock.newCondition(); Condition condition2 = myLock.newCondition();
new Thread(() -> { try { myLock.lock(); log.debug("获取锁,在condition1上等待"); try { condition1.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("被唤醒"); } finally { myLock.unlock(); log.debug("释放锁"); } }, "thread1").start();
new Thread(() -> { try { myLock.lock(); log.debug("获取锁,在condition2上等待"); try { condition2.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("被唤醒"); } finally { myLock.unlock(); log.debug("释放锁"); } }, "thread2").start();
Thread.sleep(1000); myLock.lock(); log.debug("唤醒condition1"); condition1.signal(); myLock.unlock(); Thread.sleep(1000); myLock.lock(); log.debug("唤醒condition2"); condition2.signal(); myLock.unlock();
|
输出如下:通过多个条件变量我们可以实现更加细粒度的唤醒。
1 2 3 4 5 6 7 8
| 15:34:42.203 [thread1] c.ThreadTest - 获取锁,在condition1上等待 15:34:42.219 [thread2] c.ThreadTest - 获取锁,在condition2上等待 15:34:43.218 [main] c.ThreadTest - 唤醒condition1 15:34:43.218 [thread1] c.ThreadTest - 被唤醒 15:34:43.218 [thread1] c.ThreadTest - 释放锁 15:34:44.219 [main] c.ThreadTest - 唤醒condition2 15:34:44.219 [thread2] c.ThreadTest - 被唤醒 15:34:44.219 [thread2] c.ThreadTest - 释放锁
|
AQS
概述
我们前面也提到了Lock本身只是一个接口,规定了阻塞式锁需要提供的相关操作,但是一个可用的锁还是需要落到具体实现上。
AQS,全称AbstractQueuedSynchronizer
,抽象队列同步器,是阻塞式锁和相关同步器工具的框架,它作为抽象父类,向我们提供了一些模板方法。通过AQS我们可以较为方便地实现Lock接口中的抽象方法,Lock接口的实现类基本都是通过聚合了一个同步器的子类来完成线程访问控制的
AQS是实现锁或者其他同步组件的关键,在锁的实现中聚合同步器,利用同步器来实现锁的语义。
- 锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节
- 同步器面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待与唤醒等底层操作
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法,来管理同步状态。继承得到的同步器被组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
AQS的主要思想如下:
- 使用state属性来表示资源的状态(分为独占模式和共享模式),这是一个int型整数,子类需要定义如何维护这个状态,如何获取锁和释放锁。
- 对这个状态的操作使用AQS中提供的三个方法(getState、setState、compareAndSetState)
- 独占模式:只有一个线程能够访问资源
- 共享模式:允许多个线程访问资源
- 提供了基于FIFO的等待队列,类似于Monitor的EntryList
- 使用条件变量来实现等待唤醒机制,支持多个条件变量,类似于Monitor的WaitSet
原理简述
AQS的原理与Monitor类似,包括同步队列,等待队列等机制。
AQS利用内部的同步队列来完成同步状态的管理。
- AQS中的内部类Node即为其中的节点,用来保存获取锁失败的线程引用、等待状态以及前驱后继节点、等待状态等。
- 如果当前线程获取锁失败之后,同步器会将当前线程以及等待状态等信息构造程一个节点(Node)并将其加入同步队列,同时阻塞当前线程。当锁释放之后,会将首节点中的线程唤醒,使其再次尝试获取锁。
- 同步队列是双向链表,便于出队入队
- 条件队列(或等待队列,WaitSet)是单向链表
AQS利用state属性来完成锁状态的表示。
- state是一个32位的整数,利用volatile修饰,配合CAS来保证其修改时的原子性
- 在独占模式下,state表示线程重入的次数;在共享模式下,state表示剩余的许可数
可重写的方法
AQS使用了模板方法模式,继承AQS的自定义同步器可以重写的方法如下,默认情况下下面的方法都是直接抛出UnsupportedOperationException
。
在实现方法的过程中,需要注意这些方法的实现必须是内部线程安全的
protected boolean tryAcquire(int arg) |
独占式获取锁 该方法的实现需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) |
独占式释放锁。等待获取同步锁的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) |
共享式获取锁 返回大于等于0的值则表示获取成功,反之则表示获取失败 |
protected boolean tryReleaseShared(int arg) |
共享式释放锁 |
protected boolean isHeldExclusively() |
判断当前的锁是否被占用。一般用来判断是否被当前线程所独占 |
提供的模板方法
实现自定义同步组件的时候,会调用同步器提供的模板方法,常用的模板方法如下:
void acquire(int arg) |
独占式获取锁。 如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待。该方法会调用重写的tryAcquire(int
arg)方法 |
void acquireInterruptibly(int arg) |
与acquire(int
arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException返回 |
boolean tryAcquireNanos(int arg, long nanos) |
在acquireInterruptibly(int arg)基础上增加了超时限制 |
void acquireShared(int arg) |
共享式获取锁。与独占式获取的主要区别是在同一时刻可以有多个线程获取到锁 |
void acquireSharedInterruptibly(int arg) |
与acquireShared(int arg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg, long nanos) |
在acquireSharedInterruptibly(int arg)的基础上增加了超时限制 |
boolean release(int arg) |
独占式释放锁。释放锁之后会将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) |
共享式释放锁 |
Collection<Thread> getQueuedThreads() |
获取等待在同步队列上的线程集合 |
示例:自定义不可重入锁实现
利用AQS,我们可以尝试实现一个自定义的不可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); setState(0); return true; }
@Override protected boolean isHeldExclusively() { return getState() == 1; }
public Condition newCondition() { return new ConditionObject(); } }
private MySync sync = new MySync();
@Override public void lock() { sync.acquire(1); }
@Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
@Override public boolean tryLock() { return sync.tryAcquire(1); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); }
@Override public void unlock() { sync.release(1); }
@Override public Condition newCondition() { return sync.newCondition(); } }
|