一起学并发编程 - 简易线程池实现

文章目录
  1. 1. 概述
  2. 2. 简易版实现
    1. 2.1. 代码
    2. 2.2. 测试一把
    3. 2.3. 总结
  3. 3. - 说点什么

开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5以后的java.util.concurrent包中内置了很多不同使用场景的线程池,为了更好的理解它们,自己手写一个线程池,加深印象。

概述

1.什么是池

它的基本思想是一种对象池,程序初始化的时候开辟一块内存空间,里面存放若干个线程对象,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省系统的资源。

2.使用线程池的好处

合理的使用线程池可以重复利用已创建的线程,这样就可以减少在创建线程和销毁线程上花费的时间和资源。并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。这样就相当于我们一次创建,就可以多次使用,大量的节省了系统频繁的创建和销毁线程所需要的资源。

简易版实现

简易版

包含功能:

1.创建线程池,销毁线程池,添加新任务

2.没有任务进入等待,有任务则处理掉

3.动态伸缩,扩容

4.拒绝策略

介绍了线程池的原理以及主要组件之后,就让我们来手动实现一个自己的线程池,以加深理解和深入学习。因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent会更加健壮和优雅(后续文章会介绍)

代码

以下线程池相关代码均在SimpleThreadPoolExecutor.java中,由于为了便于解读因此以代码块的形式呈现

维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.

1
2
3
private enum TaskState {
FREE, RUNNABLE, BLOCKED, TERMINATED;
}

定义拒绝策略接口,以及默认实现

1
2
3
4
5
6
7
8
9
10
11
static class DiscardException extends RuntimeException {
private static final long serialVersionUID = 8827362380544575914L;

DiscardException(String message) {
super(message);
}
}

interface DiscardPolicy {//拒绝策略接口
void discard() throws DiscardException;
}

任务线程具体实现

1.继承Thread,重写run方法。

2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权

3.如果有任务就去执行FIFO(先进先出)策略

4.定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static class WorkerTask extends Thread {
// 线程状态
private TaskState taskState;
// 线程编号
private static int threadInitNumber;
/**
* 生成线程名,参考Thread.nextThreadNum();
*
* @return
*/
private static synchronized String nextThreadName() {
return THREAD_NAME_PREFIX + (++threadInitNumber);
}

WorkerTask() {
super(THREAD_GROUP, nextThreadName());
}

@Override
public void run() {
Runnable target;
//说明该线程处于空闲状态
OUTER:
while (this.taskState != TaskState.TERMINATED) {
synchronized (TASK_QUEUE) {
while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
try {
this.taskState = TaskState.BLOCKED;//此处标记
//没有任务就wait住,让出CPU执行权
TASK_QUEUE.wait();
//如果被打断说明当前线程执行了 shutdown() 方法 线程状态为 TERMINATED 直接跳到 while 便于退出
} catch (InterruptedException e) {
break OUTER;
}
}
target = TASK_QUEUE.removeFirst();//遵循FIFO策略
}
if (target != null) {
this.taskState = TaskState.RUNNABLE;
target.run();//开始任务了
this.taskState = TaskState.FREE;
}
}
}

void close() {//优雅关闭线程
this.taskState = TaskState.TERMINATED;
this.interrupt();
}
}

简易版线程池,主要就是维护了一个任务队列线程集,为了动态扩容,自己也继承了Thread去做监听操作,对外提供submit()提交执行任务shutdown()等待所有任务工作完毕,关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public class SimpleThreadPoolExecutor extends Thread {

// 线程池大小
private int threadPoolSize;
// 最大接收任务
private int queueSize;
// 拒绝策略
private DiscardPolicy discardPolicy;
// 是否被销毁
private volatile boolean destroy = false;

private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默认最小线程数
private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活跃线程
private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大线程
private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多执行多少任务
private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//线程名前缀
private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//线程组的名称
private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//线程组
private final static List<WorkerTask> WORKER_TASKS = new ArrayList<>();// 线程容器
// 任务队列容器,也可以用Queue<Runnable> 遵循 FIFO 规则
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
// 拒绝策略
private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
throw new DiscardException("[拒绝执行] - [任务队列溢出...]");
};

private int minSize;//最小线程
private int maxSize;//最大线程
private int activeSize;//活跃线程

SimpleThreadPoolExecutor() {
this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
}

SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){
this.minSize = minSize;
this.activeSize = activeSize;
this.maxSize = maxSize;
this.queueSize = queueSize;
this.discardPolicy = discardPolicy;
initPool();
}

void submit(Runnable runnable) {
if (destroy) {
throw new IllegalStateException("线程池已销毁...");
}
synchronized (TASK_QUEUE) {
if (TASK_QUEUE.size() > queueSize) {//如果当前任务队超出队列限制,后续任务拒绝执行
discardPolicy.discard();
}
// 1.将任务添加到队列
TASK_QUEUE.addLast(runnable);
// 2.唤醒等待的线程去执行任务
TASK_QUEUE.notifyAll();
}
}

void shutdown() throws InterruptedException {
int activeCount = THREAD_GROUP.activeCount();
while (!TASK_QUEUE.isEmpty() && activeCount > 0) {
// 如果还有任务,那就休息一会
Thread.sleep(100);
}
int intVal = WORKER_TASKS.size();//如果线程池中没有线程,那就不用关了
while (intVal > 0) {
for (WorkerTask task : WORKER_TASKS) {
//当任务队列为空的时候,线程状态才会为 BLOCKED ,所以可以打断掉,相反等任务执行完在关闭
if (task.taskState == TaskState.BLOCKED) {
task.close();
intVal--;
} else {
Thread.sleep(50);
}
}
}
this.destroy = true;
//资源回收
TASK_QUEUE.clear();
WORKER_TASKS.clear();
this.interrupt();
System.out.println("线程关闭");
}

private void createWorkerTask() {
WorkerTask task = new WorkerTask();
//刚创建出来的线程应该是未使用的
task.taskState = TaskState.FREE;
WORKER_TASKS.add(task);
task.start();
}

/**
* 初始化操作
*/
private void initPool() {
for (int i = 0; i < this.minSize; i++) {
this.createWorkerTask();
}
this.threadPoolSize = minSize;
this.start();//自己启动自己
}

@Override
public void run() {
while (!destroy) {
try {
Thread.sleep(5_000L);
if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次扩容到 activeSize 大小
for (int i = threadPoolSize; i < activeSize; i++) {
createWorkerTask();
}
this.threadPoolSize = activeSize;
System.out.println("[初次扩充] - [" + toString() + "]");
} else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次扩容到最大线程
System.out.println();
for (int i = threadPoolSize; i < maxSize; i++) {
createWorkerTask();
}
this.threadPoolSize = maxSize;
System.out.println("[再次扩充] - [" + toString() + "]");
} else {
//防止线程在submit的时候,其他线程获取到锁干坏事
synchronized (WORKER_TASKS) {
int releaseSize = threadPoolSize - activeSize;
Iterator<WorkerTask> iterator = WORKER_TASKS.iterator();// List不允许在for中删除集合元素,所以这里需要使用迭代器
while (iterator.hasNext()) {
if (releaseSize <= 0) {
break;
}
WorkerTask task = iterator.next();
//不能回收正在运行的线程,只回收空闲线程
if (task.taskState == TaskState.FREE) {
task.close();
iterator.remove();
releaseSize--;
}
}
System.out.println("[资源回收] - [" + toString() + "]");
}
threadPoolSize = activeSize;
}
} catch (InterruptedException e) {
System.out.println("资源释放");
}
}
}

@Override
public String toString() {
return "SimpleThreadPoolExecutor{" +
"threadPoolSize=" + threadPoolSize +
", taskQueueSize=" + TASK_QUEUE.size() +
", minSize=" + minSize +
", maxSize=" + maxSize +
", activeSize=" + activeSize +
'}';
}
}

测试一把

创建一个测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SimpleExecutorTest {

public static void main(String[] args) throws InterruptedException {

SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();
IntStream.range(0, 30).forEach(i ->
executor.submit(() -> {
System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName());
try {
Thread.sleep(2_000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName());
})
);
//executor.shutdown();如果放开注释即会执行完所有任务关闭线程池
}
}

日志分析: 从日志中可以看到,初始化的时候是2个线程在工作,执行速度较为缓慢,当经过第一次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize,在执行任务的过程中,当线程数过多的时候就会触发回收机制…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-3] 开始工作...
...
[线程] - [MY-THREAD-NAME-6] 开始工作...
[线程] - [MY-THREAD-NAME-7] 开始工作...
[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-10] 开始工作...
...
[线程] - [MY-THREAD-NAME-5] 开始工作...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-4] 工作完毕...
...
[线程] - [MY-THREAD-NAME-7] 工作完毕...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]

总结

通过本文,大致可以了解线程池的工作原理和实现方式,学习的过程中,就是要知其然知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通,后面的文章中会详细介绍java.util.concurrent中的线程池

- 说点什么

全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12

  • 个人QQ:1837307557
  • battcn开源群(适合新手):391619659

微信公众号:battcn(欢迎调戏)

分享到