为什么要用线程池
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
- 方便管理线程:线程是稀缺资源,如果无条件地创建,不仅会消耗资源,还会降低线程的稳定性,使用线程池可以统一分配、调优和监考
线程池的核心参数

默认线程工厂(省略参数)创建线程池:
ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口
1 2 3 4 5 6
| public class Client { public static void main(String[] args) { ThreadPoolExecutor Pool = new ThreadPoolExecutor(8, 8, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4),new ThreadPoolExecutor.AbortPolicy()); } }
|
- corePoolSize:核心线程的数量
- maximumPoolSize:线程池能创建的最大线程个数
- keepAliveTime:空闲线程存活时间
- unit:时间单位
- workQueue:用于保存任务的阻塞队列
- threadFactory:创建线程的工程类
- hadler:饱和策略
自定义线程工厂创建线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class ThreadPool { private static ExecutorService pool; public static void main(String[] args) {
pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); Thread th = new Thread(r,"threadPool" + r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy());
for(int i = 0;i < 10; i++) { pool.execute(new ThreadTask()); } } }
public class ThreadTask implements Runnable{ public void run() { System.out.println("ThreadName:" + hread.currentThread().getName()); } }
|
常见线程池的区别以及特点
创建方法:
1 2 3 4 5
| public class Client { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); } }
|
tip: 到这里可以看到只有ThreadPoolExecutor类和ExecutorService类被使用,没有ThreadPool类
newCachedThreadPool:
- 特点:newCachedThreadPool创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要,它可以灵活的回收空闲的线程,当需要添加的时候可以灵活的添加
- 缺点:maximumPoolSize被设置为Inter.MAX_VALUE,可能会造成OOM
newFixedThreadPool:
- 特点:创建一个定长的线程池,可控制线程最大并发数,超出的任务会在线程中等待
- 缺点:线程数量是固定的,但是阻塞队列是LinkedBlockingQueue,是无界队列,也可能会造成OOM
newScheduledThreadPool:
- 特点:创建一个固定长度的线程,而且支持定时的以及周期性的任务执行,类似Timer
- 缺点:底层封装了PriorityQueue,同样是无界队列,可能会造成OOM
newSingleThreadExecutor:
- 特点:单线程化的线程池,它会用唯一的工作线程来执行任务。如果这个线程因为异常结束,那么会有一个新的线程来替代它。它必须保证前一项任务完成才能执行后一项。阻塞队列是LinkedBlockingQueue,因此是无界队列,会有OOM的风险
- 缺点:因为是单线程,高并发下有压力
为什么我们不用Executors默认创建线程池的方法,而直接自己手动去调用ThreadPoolExecutor去创建线程池
Executors 返回的线程池对象的弊端如下:
- newFixedThreadPool 和 newSingleThreadPool: LinkedBlockingQueue无界队列,允许的请求队列长度为 Integer.MAX_VALUE(无界队列),可能会堆积大量的请求,从而导致 OOM
- newCachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
- newScheduledThreadPool:同样使用无界队列(底层是PriorityQueue),也会堆积大量请求导致OOM
线程池的饱和策略有哪些
- ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝任务的处理
- ThreadPoolExecutor.CallerRunsPolicy:调用提交任务的线程运行任务(比如A提交线程,A运行任务)。但是会降低新任务提交速度,影响程序的整体性能
- ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃掉最早的未处理的任务
线程池原理
- 判断线程池的核心线程数是不是已满,如果不是则创建一个新的工作线程来执行任务。
- 如果核心线程数已满,则将提交的任务放在保存任务的阻塞队列中。
- 如果工作任务队列满了,则创建一个新的线程来执行任务,直到数量到达maximumPoolSize
- 最后如果达到线程池最大线程数,则采取对应的饱和策略
线程池中execute()和submit()方法有什么区别
相同点:
不同点:
- 接受参数:execute只能执行Runnable类型的任务,submit可以执行Runnable和Callable类型的任务
- 返回值:submit方法可以返回持有计算结果的Future对象,而execute没有
- 异常处理:submit可以方便处理异常
Java中Executor、Executors和ExecuteService的区别
Executor:
是一个接口,定义了execute方法
1 2 3
| public interface Executor { void execute(Runnable command); }
|
ExecuteService:
是一个接口,继承了Executor。相比Executor,定义了更多的方法,以及可以作为创建的线程池的返回类型
1
| ExecutorService executor = Executors.newFixedThreadPool(5);
|
Executors:
是一个工具类,继承了Executor,而且集成了很多创建线程池相关的方法,比如可以调用newFixedThreadPool(10)(返回类型是ExecuteService)
1 2 3 4 5 6 7 8 9 10 11
| public class Client { public static void main(String[] args) { Executors.newFixedThreadPool(10).submit(() -> { for (int i = 0; i < 10; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName()); }).start(); } }); } }
|
- Executor是最基本的接口,只定义了一个execute方法
- ExecuteService是一个高级的接口,实现了Executor并进行了扩展,比如实现了submit方法,以及可以创建固定类型的线程池。这个接口的目的是方便我们使用底层不同的线程池,类似List接口,屏蔽底层差异
- Executors是一个工具类,使用这个工具类可以方便的创建线程。让我们可以不用手动地指定线程池的各个参数,比如Executors.newFixedThreadPool(10)
线程池有哪些状态

- Running:正常状态,可以接受其他线程
- Shutdown:不接受新的任务提交,但是会继续处理等待队列中的任务
- Stop:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
- Tidying:所有的任务都销毁,workerCount(线程数量)为0,线程池在向Tidying状态转换时,会执行钩子方法terminated()
- Terminated:terminated()方法介绍后,就会变成这个
如何合理分配线程池大小——线程池应对IO密集型和CPU密集型的策略
- CPU密集型:也叫计算密集型,其处理器占用率高,也许在某段时间内保持100%占用率。线程配置数大概和CPU核数相当,这样可以使得每个线程在执行任务
- IO密集型:大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,但CPU的使用率不高。大部分线程在阻塞,故需要多配置线程数,2 * cpu核数
线程池如何实现动态修改
线程池提供了部分setter方法可以设置线程池的参数:
- 修改线程数,最大线程数,空闲线程停留时间,拒绝策略等
- 可以将线程池的配置参数放入配置中心,然后直接在配置中心修改
什么时候需要修改?
- 需要监考报警策略,获取线程池状态指标,当指标判定为异常后再报警
- 分析指标原因,评估策略,然后通过上述线程池提供的接口进行修改
既然线程池中使用了阻塞队列,那么什么是阻塞队列,阻塞队列有哪些
阻塞队列支持两个阻塞的插入和删除操作
- 支持阻塞的插入put方法:当队列满的时候,队列会阻塞插入元素的线程,直到队列不满
- 支持阻塞的移除take方法:当队列为空的时候,队列会阻塞移除元素的线程,直到队列不为空
阻塞队列:
- ArrayBlockingQueue:底层使用数组结构,创建时必须指定大小,是有界的
- LinkedBlockingQueue:底层使用链表结构,创建时默认大小是Inter.MAX_VALUE,因此是无界的。也可以指定大小成为有界
- PriorityBlockingQueue:一个支持优先级排列的队列,可重写自定义类的compareTo方法来指定排序规则
- DelayQueue:一个使用优先级队列实现的无界阻塞队列,使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,表示指定多久才能从队列中获得元素
- SynchronousQueue:一个不存储元素的队列,每一次put必须等待一个take操作,否则不能添加元素。适用于传递性场景
- LinkedTransferQueue:一个由链表结构组成的无界队列,设计了一种生产者和消费者之间传递的机制,称为”transfer“。当生产者调用transfer(e)方法时,它会阻塞直到一个消费者接收该元素
- LinkedBlockingDeque:一个由链表结构组成的双端队列
ArrayBlockingQueue和LinkedBlockingQueue的区别
- 底层实现:ArrayBlockingQueue基于数组,LinkedBlockingQueue基于链表
- 是否有界:ArrayBlockingQueue有界,LinkedBlockingQueue创建时可以指定大小,默认是Integer.MAX_VALUE,无界
- 锁是否分离:ArrayBlockingQueue中的锁不分离,生产者和消费者使用同一把锁。LinkedBlockingQueue的锁分离,生产者使用的是putLock,消费者使用的是takeLock,这样可以防止生产者和消费者之间竞争锁
- 内存占用:ArrayBlockingQueue需要提前分配内存,LinkedBlockingQueue是动态分配内存,会不断占用空间
ArrayBlockingQueue底层源码
属性
1 2 3 4 5 6 7 8
| final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; transient Itrs itrs = null;
|
添加方法
add(E e)(非阻塞方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
|
put(E e)(阻塞方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
offer(E e)(添加方法的具体实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
|
删除方法
poll()(非阻塞方法)
1 2 3 4 5 6 7 8 9
| public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
|
take()(阻塞方法)
1 2 3 4 5 6 7 8 9 10 11
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
remove(Object obj)(删除指定元素)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex;
do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
|
deque()(poll、take的具体实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
|
LinkedBlockingQueue底层源码
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); }
|
添加方法
add(E e)(非阻塞方法)
1 2 3 4
| public boolean add(E e) { addLast(e); return true; }
|
put(E e)(阻塞方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { last = last.next = node; }
|
offer(E e)(添加方法的具体实现,分为offerFirst和OfferLast)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(node); } finally { lock.unlock(); } }
public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); Node<E> node = new Node<E>(e); final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(node); } finally { lock.unlock(); } }
|
删除方法
poll()(非阻塞方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
take()(阻塞方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
remove(Object o)(删除指定元素)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
void fullyLock() { putLock.lock(); takeLock.lock(); }
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
|
deque()(poll、take的具体实现)
1 2 3 4 5 6 7 8 9 10 11
| private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; }
|