本文最后更新于:2022-09-30T21:21:44+08:00
多线程设计模式
两阶段终止
目的:在线程t1中优雅地终止线程t2,即不是粗暴地直接终止线程t2,而是可以让线程t2能够处理一些善后工作,然后再进行终止
错误思路:
- 使用线程对象的stop停止线程:stop
方法会真正杀死线程,如果这时线程锁住了共享资源,当它被杀死后就再也没有机会释放锁,其它线程将永远无法获取锁
System.exit(int)
:目的仅是停止一个线程,但这种做法会让整个程序都停止
流程分析:
考虑线程t2在进行while(true)的循环,每次判断打断标记是否为真,如果为真,则进行一些后事的处理,然后break退出循环;如果为假,则可以进行睡眠,防止CPU占用达到100%。
考虑线程t2被打断的时机,可能是在执行while循环普通代码即运行状态下被打断的,此时打断标记被设置为true,下一次循环中进行判断并退出。也可能是在sleep中即阻塞状态下被打断的,此时打断标记会被清除,需要重新设置打断标记,才能在下次循环中进行判断并退出。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class TwoPhaseTermination { private Thread monitor;
public void start() { monitor = new Thread(() -> { while (true) { Thread currentThread = Thread.currentThread(); boolean interrupted = currentThread.isInterrupted(); if (interrupted) { log.debug("料理后事"); break; } try { Thread.sleep(1000); log.debug("执行普通代码"); } catch (InterruptedException e) { e.printStackTrace(); currentThread.interrupt(); } } }, "monitor"); monitor.start(); }
public void stop() { monitor.interrupt(); } }
|
保护性暂停
目的:Guarded
Suspension,实现一个线程等待另一个线程的执行结果,同时提供超时参数(等待和产生关系一一对应)
分析:
一个线程的结果需要传递到另一个线程,让这两个线程关联同一个GuardedObject。线程t1需要结果,因此需要等待相关条件;线程t2产生结果,结果产生之后,通知t1。同时需要实现超时效果,则需要在每次等待前后计算当前等待了多长时间,并与提供的超时时间进行比较,决定是跳出循环还是继续等待。
join的实现就是类似。
实例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class GuardedObject { private Object response;
public Object get(long millis) { synchronized (this) { long begin = System.currentTimeMillis(); long timePassed = 0; while (response == null) { long waitTime = millis - timePassed; if (waitTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } timePassed = System.currentTimeMillis() - begin; } } return response; }
public void complete(Object response) { this.response = response; this.notifyAll(); } }
|
生产者消费者
说明:
- 与前面保护性暂停中的GuardObject不同,这种模式不需要产生结果与消费结果一一对应
- 生产者仅负责产生数据,不关心数据该如何处理;而消费者专心处理结果数据
- 消息队列用来平衡生产和消费的线程资源
- 消息队列有容量限制,满时不会再加入数据,空时不会再消耗数据
JDK中的各种阻塞队列即采用该模式
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| class MessageQueue { private LinkedList<Message> list = new LinkedList<>(); private int capacity;
public MessageQueue(int capacity) { this.capacity = capacity; }
public Message take() { synchronized (list) { while (list.isEmpty()) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.removeFirst(); list.notifyAll(); return message; } }
public void put(Message message) { synchronized (list) { while (list.size() == capacity) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.addLast(message); list.notifyAll(); } }
}
final class Message { private int id; private Object value;
public Message(int id, Object value) { this.id = id; this.value = value; }
public int getId() { return id; }
public Object getValue() { return value; } }
|
交替输出
目的:控制多个线程的输出顺序
分析:利用标志位来判断当前哪个线程能够输出
示例代码:3个线程交替输出a,b,c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class AlternateOutput { public static void main(String[] args) { OutputHelper helper = new OutputHelper(1, 10); new Thread(() -> { helper.print("a", 1, 2); }).start(); new Thread(() -> { helper.print("b", 2, 3); }).start(); new Thread(() -> { helper.print("c", 3, 1); }).start(); }
public static class OutputHelper { private int flag; private int loopNum;
public OutputHelper(int flag, int loopNum) { this.flag = flag; this.loopNum = loopNum; }
public void print(String str, int waitFlag, int nextFlag) { for (int i = 0; i < loopNum; i++) { synchronized (this) { while (flag != waitFlag) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this.notifyAll(); } } } } }
|