开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5
以后的java.util.concurrent
包中内置了很多不同使用场景的线程池
,为了更好的理解它们,自己手写一个线程池,加深印象。
概述
1.什么是池
它的基本思想是一种对象池
,程序初始化的时候开辟一块内存空间,里面存放若干个线程对象
,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象
所带来的性能开销,节省系统的资源。
2.使用线程池的好处
合理的使用线程池
可以重复利用已创建的线程,这样就可以减少在创建线程和销毁线程上花费的时间和资源。并且,线程池在某些情况下还能动态调整
工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。这样就相当于我们一次创建,就可以多次使用,大量的节省了系统频繁的创建和销毁线程所需要的资源。
简易版实现

包含功能:
1.创建线程池,销毁线程池,添加新任务
2.没有任务进入等待,有任务则处理掉
3.动态伸缩,扩容
4.拒绝策略
介绍了线程池的原理以及主要组件之后,就让我们来手动实现一个自己的线程池,以加深理解和深入学习。因为自己实现的简易版本
所以不建议生产中使用,生产中使用java.util.concurrent
会更加健壮和优雅(后续文章会介绍)
代码
以下线程池相关代码均在SimpleThreadPoolExecutor.java
中,由于为了便于解读因此以代码块的形式呈现
维护一个内部枚举类,用来标记当前任务线程
状态,在Thread
中其实也有.
1 2 3
| private enum TaskState { FREE, RUNNABLE, BLOCKED, TERMINATED; }
|
定义拒绝策略接口,以及默认实现
1 2 3 4 5 6 7 8 9 10 11
| static class DiscardException extends RuntimeException { private static final long serialVersionUID = 8827362380544575914L;
DiscardException(String message) { super(message); } }
interface DiscardPolicy { void discard() throws DiscardException; }
|
任务线程具体实现
1.继承Thread
,重写run方法。
2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()
如果当前线程处于空闲状态且没有任何任务了就将它wait
住,让出CPU执行权
3.如果有任务就去执行FIFO(先进先出)策略
4.定义close
方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public static class WorkerTask extends Thread { private TaskState taskState; private static int threadInitNumber;
private static synchronized String nextThreadName() { return THREAD_NAME_PREFIX + (++threadInitNumber); }
WorkerTask() { super(THREAD_GROUP, nextThreadName()); }
@Override public void run() { Runnable target; OUTER: while (this.taskState != TaskState.TERMINATED) { synchronized (TASK_QUEUE) { while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) { try { this.taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { break OUTER; } } target = TASK_QUEUE.removeFirst(); } if (target != null) { this.taskState = TaskState.RUNNABLE; target.run(); this.taskState = TaskState.FREE; } } }
void close() { this.taskState = TaskState.TERMINATED; this.interrupt(); } }
|
简易版线程池,主要就是维护了一个任务队列
和线程集
,为了动态扩容,自己也继承了Thread
去做监听操作,对外提供submit()提交执行任务
、shutdown()等待所有任务工作完毕,关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
| public class SimpleThreadPoolExecutor extends Thread {
private int threadPoolSize; private int queueSize; private DiscardPolicy discardPolicy; private volatile boolean destroy = false;
private final static int DEFAULT_MIN_THREAD_SIZE = 2; private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5; private final static int DEFAULT_MAX_THREAD_SIZE = 10; private final static int DEFAULT_WORKER_QUEUE_SIZE = 100; private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-"; private final static String THREAD_POOL_NAME = "SIMPLE-POOL"; private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME); private final static List<WorkerTask> WORKER_TASKS = new ArrayList<>(); private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> { throw new DiscardException("[拒绝执行] - [任务队列溢出...]"); };
private int minSize; private int maxSize; private int activeSize;
SimpleThreadPoolExecutor() { this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY); }
SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){ this.minSize = minSize; this.activeSize = activeSize; this.maxSize = maxSize; this.queueSize = queueSize; this.discardPolicy = discardPolicy; initPool(); }
void submit(Runnable runnable) { if (destroy) { throw new IllegalStateException("线程池已销毁..."); } synchronized (TASK_QUEUE) { if (TASK_QUEUE.size() > queueSize) { discardPolicy.discard(); } TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } }
void shutdown() throws InterruptedException { int activeCount = THREAD_GROUP.activeCount(); while (!TASK_QUEUE.isEmpty() && activeCount > 0) { Thread.sleep(100); } int intVal = WORKER_TASKS.size(); while (intVal > 0) { for (WorkerTask task : WORKER_TASKS) { if (task.taskState == TaskState.BLOCKED) { task.close(); intVal--; } else { Thread.sleep(50); } } } this.destroy = true; TASK_QUEUE.clear(); WORKER_TASKS.clear(); this.interrupt(); System.out.println("线程关闭"); }
private void createWorkerTask() { WorkerTask task = new WorkerTask(); task.taskState = TaskState.FREE; WORKER_TASKS.add(task); task.start(); }
private void initPool() { for (int i = 0; i < this.minSize; i++) { this.createWorkerTask(); } this.threadPoolSize = minSize; this.start(); }
@Override public void run() { while (!destroy) { try { Thread.sleep(5_000L); if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { for (int i = threadPoolSize; i < activeSize; i++) { createWorkerTask(); } this.threadPoolSize = activeSize; System.out.println("[初次扩充] - [" + toString() + "]"); } else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) { System.out.println(); for (int i = threadPoolSize; i < maxSize; i++) { createWorkerTask(); } this.threadPoolSize = maxSize; System.out.println("[再次扩充] - [" + toString() + "]"); } else { synchronized (WORKER_TASKS) { int releaseSize = threadPoolSize - activeSize; Iterator<WorkerTask> iterator = WORKER_TASKS.iterator(); while (iterator.hasNext()) { if (releaseSize <= 0) { break; } WorkerTask task = iterator.next(); if (task.taskState == TaskState.FREE) { task.close(); iterator.remove(); releaseSize--; } } System.out.println("[资源回收] - [" + toString() + "]"); } threadPoolSize = activeSize; } } catch (InterruptedException e) { System.out.println("资源释放"); } } }
@Override public String toString() { return "SimpleThreadPoolExecutor{" + "threadPoolSize=" + threadPoolSize + ", taskQueueSize=" + TASK_QUEUE.size() + ", minSize=" + minSize + ", maxSize=" + maxSize + ", activeSize=" + activeSize + '}'; } }
|
测试一把
创建一个测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class SimpleExecutorTest {
public static void main(String[] args) throws InterruptedException {
SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor(); IntStream.range(0, 30).forEach(i -> executor.submit(() -> { System.out.printf("[线程] - [%s] 开始工作...\n", Thread.currentThread().getName()); try { Thread.sleep(2_000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("[线程] - [%s] 工作完毕...\n", Thread.currentThread().getName()); }) ); } }
|
日志分析: 从日志中可以看到,初始化的时候是2
个线程在工作,执行速度较为缓慢,当经过第一次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize
的大小,第二次是扩容到maxSize
,在执行任务的过程中,当线程数过多的时候就会触发回收机制…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| [线程] - [MY-THREAD-NAME-1] 开始工作... [线程] - [MY-THREAD-NAME-2] 开始工作... [线程] - [MY-THREAD-NAME-1] 工作完毕... [线程] - [MY-THREAD-NAME-1] 开始工作... [线程] - [MY-THREAD-NAME-2] 工作完毕... [线程] - [MY-THREAD-NAME-2] 开始工作... [线程] - [MY-THREAD-NAME-1] 工作完毕... [线程] - [MY-THREAD-NAME-1] 开始工作... [线程] - [MY-THREAD-NAME-2] 工作完毕... [线程] - [MY-THREAD-NAME-2] 开始工作... [初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}] [线程] - [MY-THREAD-NAME-3] 开始工作... ... [线程] - [MY-THREAD-NAME-6] 开始工作... [线程] - [MY-THREAD-NAME-7] 开始工作... [再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}] [线程] - [MY-THREAD-NAME-10] 开始工作... ... [线程] - [MY-THREAD-NAME-5] 开始工作... [资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}] [线程] - [MY-THREAD-NAME-4] 工作完毕... ... [线程] - [MY-THREAD-NAME-7] 工作完毕... [资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}] [资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
|
总结
通过本文,大致可以了解线程池的工作原理和实现方式,学习的过程中,就是要知其然知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通
,后面的文章中会详细介绍java.util.concurrent
中的线程池
。
- 说点什么
全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12
- 个人QQ:1837307557
- battcn开源群(适合新手):391619659
微信公众号:battcn
(欢迎调戏)