为什么要用线程池

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
  • 方便管理线程:线程是稀缺资源,如果无条件地创建,不仅会消耗资源,还会降低线程的稳定性,使用线程池可以统一分配、调优和监考

线程池的核心参数

alt text

默认线程工厂(省略参数)创建线程池:

ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口

1
2
3
4
5
6
public class Client {
public static void main(String[] args) {
ThreadPoolExecutor Pool = new ThreadPoolExecutor(8, 8, 2, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4),new ThreadPoolExecutor.AbortPolicy());
}
}
  • corePoolSize:核心线程的数量
  • maximumPoolSize:线程池能创建的最大线程个数
  • keepAliveTime:空闲线程存活时间
  • unit:时间单位
  • workQueue:用于保存任务的阻塞队列
  • threadFactory:创建线程的工程类
  • hadler:饱和策略

自定义线程工厂创建线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ThreadPool {
private static ExecutorService pool;
public static void main(String[] args)
{

pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),

/*
ThreadFactory接口
ThreadFactory接口很简单,源码如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
*/

//自定义线程工厂
new ThreadFactory() {
public Thread newThread(Runnable r) {
System.out.println("线程"+r.hashCode()+"创建");
//线程命名
Thread th = new Thread(r,"threadPool" + r.hashCode());
return th;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());

for(int i = 0;i < 10; i++) {
pool.execute(new ThreadTask());
}
}
}

public class ThreadTask implements Runnable{
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:" + hread.currentThread().getName());
}
}

/*
线程118352462创建
线程1550089733创建
线程865113938创建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
线程1442407170创建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170
/*

常见线程池的区别以及特点

创建方法:

1
2
3
4
5
public class Client {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
}
}

tip: 到这里可以看到只有ThreadPoolExecutor类和ExecutorService类被使用,没有ThreadPool类

newCachedThreadPool:

  • 特点:newCachedThreadPool创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要,它可以灵活的回收空闲的线程,当需要添加的时候可以灵活的添加
  • 缺点:maximumPoolSize被设置为Inter.MAX_VALUE,可能会造成OOM

newFixedThreadPool:

  • 特点:创建一个定长的线程池,可控制线程最大并发数,超出的任务会在线程中等待
  • 缺点:线程数量是固定的,但是阻塞队列是LinkedBlockingQueue,是无界队列,也可能会造成OOM

newScheduledThreadPool:

  • 特点:创建一个固定长度的线程,而且支持定时的以及周期性的任务执行,类似Timer
  • 缺点:底层封装了PriorityQueue,同样是无界队列,可能会造成OOM

newSingleThreadExecutor:

  • 特点:单线程化的线程池,它会用唯一的工作线程来执行任务。如果这个线程因为异常结束,那么会有一个新的线程来替代它。它必须保证前一项任务完成才能执行后一项。阻塞队列是LinkedBlockingQueue,因此是无界队列,会有OOM的风险
  • 缺点:因为是单线程,高并发下有压力

为什么我们不用Executors默认创建线程池的方法,而直接自己手动去调用ThreadPoolExecutor去创建线程池

Executors 返回的线程池对象的弊端如下:

  • newFixedThreadPool 和 newSingleThreadPool: LinkedBlockingQueue无界队列,允许的请求队列长度为 Integer.MAX_VALUE(无界队列),可能会堆积大量的请求,从而导致 OOM
  • newCachedThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM
  • newScheduledThreadPool:同样使用无界队列(底层是PriorityQueue),也会堆积大量请求导致OOM

线程池的饱和策略有哪些

  • ThreadPoolExecutor.AbortPolicy:抛出RejectedExecutionException来拒绝任务的处理
  • ThreadPoolExecutor.CallerRunsPolicy:调用提交任务的线程运行任务(比如A提交线程,A运行任务)。但是会降低新任务提交速度,影响程序的整体性能
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃掉最早的未处理的任务

线程池原理

  1. 判断线程池的核心线程数是不是已满,如果不是则创建一个新的工作线程来执行任务。
  2. 如果核心线程数已满,则将提交的任务放在保存任务的阻塞队列中。
  3. 如果工作任务队列满了,则创建一个新的线程来执行任务,直到数量到达maximumPoolSize
  4. 最后如果达到线程池最大线程数,则采取对应的饱和策略

线程池中execute()和submit()方法有什么区别

相同点:

  • 都可以提交任务到线程池中

不同点:

  • 接受参数:execute只能执行Runnable类型的任务,submit可以执行Runnable和Callable类型的任务
  • 返回值:submit方法可以返回持有计算结果的Future对象,而execute没有
  • 异常处理:submit可以方便处理异常

Java中Executor、Executors和ExecuteService的区别

Executor:

是一个接口,定义了execute方法

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecuteService:

是一个接口,继承了Executor。相比Executor,定义了更多的方法,以及可以作为创建的线程池的返回类型

1
ExecutorService executor = Executors.newFixedThreadPool(5);

Executors:

是一个工具类,继承了Executor,而且集成了很多创建线程池相关的方法,比如可以调用newFixedThreadPool(10)(返回类型是ExecuteService)

1
2
3
4
5
6
7
8
9
10
11
public class Client {
public static void main(String[] args) {
Executors.newFixedThreadPool(10).submit(() -> {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
}).start();
}
});
}
}
  • Executor是最基本的接口,只定义了一个execute方法
  • ExecuteService是一个高级的接口,实现了Executor并进行了扩展,比如实现了submit方法,以及可以创建固定类型的线程池。这个接口的目的是方便我们使用底层不同的线程池,类似List接口,屏蔽底层差异
  • Executors是一个工具类,使用这个工具类可以方便的创建线程。让我们可以不用手动地指定线程池的各个参数,比如Executors.newFixedThreadPool(10)

线程池有哪些状态

alt text

  • Running:正常状态,可以接受其他线程
  • Shutdown:不接受新的任务提交,但是会继续处理等待队列中的任务
  • Stop:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • Tidying:所有的任务都销毁,workerCount(线程数量)为0,线程池在向Tidying状态转换时,会执行钩子方法terminated()
  • Terminated:terminated()方法介绍后,就会变成这个

如何合理分配线程池大小——线程池应对IO密集型和CPU密集型的策略

  • CPU密集型:也叫计算密集型,其处理器占用率高,也许在某段时间内保持100%占用率。线程配置数大概和CPU核数相当,这样可以使得每个线程在执行任务
  • IO密集型:大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,但CPU的使用率不高。大部分线程在阻塞,故需要多配置线程数,2 * cpu核数

线程池如何实现动态修改

线程池提供了部分setter方法可以设置线程池的参数:

  • 修改线程数,最大线程数,空闲线程停留时间,拒绝策略等
  • 可以将线程池的配置参数放入配置中心,然后直接在配置中心修改

什么时候需要修改?

  1. 需要监考报警策略,获取线程池状态指标,当指标判定为异常后再报警
  2. 分析指标原因,评估策略,然后通过上述线程池提供的接口进行修改

既然线程池中使用了阻塞队列,那么什么是阻塞队列,阻塞队列有哪些

阻塞队列支持两个阻塞的插入和删除操作

  • 支持阻塞的插入put方法:当队列满的时候,队列会阻塞插入元素的线程,直到队列不满
  • 支持阻塞的移除take方法:当队列为空的时候,队列会阻塞移除元素的线程,直到队列不为空

阻塞队列:

  • ArrayBlockingQueue:底层使用数组结构,创建时必须指定大小,是有界的
  • LinkedBlockingQueue:底层使用链表结构,创建时默认大小是Inter.MAX_VALUE,因此是无界的。也可以指定大小成为有界
  • PriorityBlockingQueue:一个支持优先级排列的队列,可重写自定义类的compareTo方法来指定排序规则
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列,使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,表示指定多久才能从队列中获得元素
  • SynchronousQueue:一个不存储元素的队列,每一次put必须等待一个take操作,否则不能添加元素。适用于传递性场景
  • LinkedTransferQueue:一个由链表结构组成的无界队列,设计了一种生产者和消费者之间传递的机制,称为”transfer“。当生产者调用transfer(e)方法时,它会阻塞直到一个消费者接收该元素
  • LinkedBlockingDeque:一个由链表结构组成的双端队列

ArrayBlockingQueue和LinkedBlockingQueue的区别

  • 底层实现:ArrayBlockingQueue基于数组,LinkedBlockingQueue基于链表
  • 是否有界:ArrayBlockingQueue有界,LinkedBlockingQueue创建时可以指定大小,默认是Integer.MAX_VALUE,无界
  • 锁是否分离:ArrayBlockingQueue中的锁不分离,生产者和消费者使用同一把锁。LinkedBlockingQueue的锁分离,生产者使用的是putLock,消费者使用的是takeLock,这样可以防止生产者和消费者之间竞争锁
  • 内存占用:ArrayBlockingQueue需要提前分配内存,LinkedBlockingQueue是动态分配内存,会不断占用空间

ArrayBlockingQueue底层源码

属性

1
2
3
4
5
6
7
8
final Object[] items;  //队列的底层为数组,是个循环数组
int takeIndex; //从队列中取元素的索引,用于take、poll、remove
int putIndex; //向队列中存放元素的索引,用于put、offer、add
int count; //队列中的元素数
final ReentrantLock lock; //队列中的锁机制,可重入锁
private final Condition notEmpty; //notEmpty条件对象,由lock创建
private final Condition notFull; //notFull条件对象,由lock创建
transient Itrs itrs = null; //迭代器对象

添加方法

add(E e)(非阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*调用了offer(e)方法,成功,返回true,失败,抛出IllegalStateException异常*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

/**
* 在当前put位置插入元素、前进和信号
* Call only when holding lock. 只有在持有锁资源时才调用该方法
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items; //将队列数组初始化
items[putIndex] = x; //将元素添加到数组里
if (++putIndex == items.length) //如果将要插入的元素索引等于数组的长度,将存放元素的索引重新置为0
putIndex = 0;
count++;
notEmpty.signal(); //使用条件对象notEmpty通知,唤醒当前等待的线程
}

/**
* Throws NullPointerException if argument is null.
*如果参数为null,则抛出NullPointerException的异常
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}

put(E e)(阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 将指定的元素插入到此队列的末尾,然后等待
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e); //判断元素是否为null
final ReentrantLock lock = this.lock; //初始化重入锁
lock.lockInterruptibly(); //加锁,以保证在调用put方法时只有一个线程
try {
while (count == items.length) //当队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里面
notFull.await(); //线程阻塞并被挂起,同时释放锁资源
enqueue(e); //调用enqueue方法
} finally {
lock.unlock(); //释放锁,让其他线程可以调用put方法
}
}

offer(E e)(添加方法的具体实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e); //检查队列中的元素是否为空。在这里不允许为空
final ReentrantLock lock = this.lock; //引入重入锁
lock.lock(); //加锁,保证调用offer时只有一个线程
try {
if (count == items.length) //如果当前元素的个数等于队列数组的长度,说明队列是满的,添加失败
return false;
else {//否则队列不满,调用enqueue(e)方法添加元素,返回true
enqueue(e);
return true;
}
} finally {//最后,释放锁,让其他线程可以调用offer方法
lock.unlock();
}
}

删除方法

poll()(非阻塞方法)

1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock; //引入重用锁
lock.lock(); //加锁,以保证当前只有一个线程
try {//如果队列为空,则返回null;否则,调用dequeue方法
return (count == 0) ? null : dequeue();
} finally {
lock.unlock(); //释放锁资源,让其他线程可以调用poll方法
}
}

take()(阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//加锁,以保证在调用take()方法时只有一个线程
try {
while (count == 0) //当队列中元素个数为1,即队列为空时
notEmpty.await(); //阻塞当前线程,并加入到条件对象notEmpty的等待队列里
return dequeue(); //调用dequeue()方法
} finally {
lock.unlock(); //释放锁,让其他线程可以调用take()方法
}
}

remove(Object obj)(删除指定元素)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* 从队列中删除指定的元素。如果该元素存在,则将该元素从队列中删除,返回true;如果不存在,则返回false
*/
public boolean remove(Object o) {
if (o == null) return false;//如果指定删除的元素为null,则返回false
final Object[] items = this.items; //阻塞队列数组
final ReentrantLock lock = this.lock; //重入锁
lock.lock(); //加锁,以此保证在调用该remove方法时只有一个线程
try {
if (count > 0) {//如果队列不为空
final int putIndex = this.putIndex; //往队列中即将要存储的元素的下标
int i = takeIndex; //从队列即将要取出元素的下标
//循环遍历阻塞队列中的元素,如果在队列中找到了要删除的元素,则将该元素删除,返回true;否则,返回false。
do {
if (o.equals(items[i])) { //
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);//结束条件为当前元素索引==最后将要存入队列中的元素的下标
}
return false;
} finally {
lock.unlock();//释放锁资源,让其他线程可以调用remove(e)方法
}
}

deque()(poll、take的具体实现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Extracts element at current take position, advances, and signals.提取元素当前的位置、进展和信号
* Call only when holding lock.在持有锁时才调用
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;//阻塞队列数组
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//用变量x记录当前要取出的元素
items[takeIndex] = null;//将该元素置为null
if (++takeIndex == items.length)//判断是否是最后一个元素
takeIndex = 0; //如果是,将取元素索引置为0,从头开始取
count--;//元素个数-1
if (itrs != null) //迭代遍历队列,
itrs.elementDequeued();
notFull.signal();// 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
return x;
}

LinkedBlockingQueue底层源码

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static class Node<E> {
E item; //元素
Node<E> next;//next指针

Node(E x) { //有参构造函数
item = x;
}

private final int capacity; //容量,默认为 Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger(); //队列中元素的数量
transient Node<E> head; //头节点
private transient Node<E> last; //尾节点
private final ReentrantLock takeLock = new ReentrantLock(); //拿锁
private final Condition notEmpty = takeLock.newCondition(); //拿锁的条件,队列不为空
private final ReentrantLock putLock = new ReentrantLock(); //放锁
private final Condition notFull = putLock.newCondition(); //放锁的条件
}

添加方法

add(E e)(非阻塞方法)

1
2
3
4
public boolean add(E e) {
addLast(e);
return true;
}

put(E e)(阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//判断添加的元素是否为null,如果为Null,抛出NullPointerException异常
int c = -1;
Node<E> node = new Node<E>(e); //构造新的结点
final ReentrantLock putLock = this.putLock; //放锁
final AtomicInteger count = this.count; //元素的个数
putLock.lockInterruptibly(); //放锁加锁,保证在调用put方法的时候只有1个线程
try {
while (count.get() == capacity) {//如果队列为满
notFull.await();//阻塞并挂起当前线程
}
enqueue(node);//将元素添加到链表的尾部
c = count.getAndIncrement(); //元素个数+1
if (c + 1 < capacity) //如果队列的容量还没有满
notFull.signal(); //在notFull对象上唤醒正在等待的1个线程,表示队列中还有元素可以消费
} finally {
putLock.unlock(); //释放放锁,让其他线程可以调用该put方法
}
if (c == 0)//由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据
signalNotEmpty();//在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费
}

//enqueue(Node<E> node)(上面方法用到)
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

offer(E e)(添加方法的具体实现,分为offerFirst和OfferLast)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offerFirst(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkFirst(node);
} finally {
lock.unlock();
}
}

public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
return linkLast(node);
} finally {
lock.unlock();
}
}

删除方法

poll()(非阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E poll() {
final AtomicInteger count = this.count; //队列中元素的个数
if (count.get() == 0) //判断该队列是否为空
return null; //如果为空,返回null
E x = null; //定义要返回的元素的变量名,初始化为Null
int c = -1;
final ReentrantLock takeLock = this.takeLock;//拿锁
takeLock.lock();//拿锁加锁,以保证在调用poll()线程的时候只有1个线程
try {
if (count.get() > 0) {//判断队列是否为空。如果不为空
x = dequeue();//删除头节点
c = count.getAndDecrement();//元素个数-1
if (c > 1)//如果队列中还有元素
notEmpty.signal();//在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
}
} finally {
takeLock.unlock();//释放拿锁资源,让其他线程可以调用该poll()方法
}
if (c == capacity)//由于存在放锁和拿锁,这里可能放锁一直在添加数据,count会变化。这里的if条件表示如果队列中还可以再插入数据
signalNotFull();//在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
return x;//返回删除的元素
}

take()(阻塞方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count; //队列中元素的个数
final ReentrantLock takeLock = this.takeLock; //拿锁
takeLock.lockInterruptibly(); //拿锁加锁,以保证在调用take()方法的时候只有一个线程
try {
while (count.get() == 0) { //如果队列为空
notEmpty.await(); //则将当前线程阻塞并挂起
}
x = dequeue(); //否则,删除头节点
c = count.getAndDecrement(); //元素个数-1
if (c > 1) //判断队列中是否还有元素
notEmpty.signal(); //如果有,在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费
} finally {
takeLock.unlock(); //释放拿锁,以保证其他线程可以调用take()方法
}
if (c == capacity) //表示如果队列中还可以再插入数据
signalNotFull(); //在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据
return x; //返回删除的那个元素
}

remove(Object o)(删除指定元素)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean remove(Object o) {
if (o == null) return false; //如果要删除的元素为null,返回false
fullyLock(); //remove操作要移动的位置不固定,2个锁都需要加锁
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {//判断在队列中是否能找到要删除的对象
unlink(p, trail);//修改节点的链接信息,同时调用notFull的signal方法 ,唤醒等待的线程
return true;
}
}
return false;//如果没有找到,返回false
} finally {
fullyUnlock();//2个锁解锁
}
}

//remove()方法中的加锁方法
void fullyLock() {
putLock.lock();
takeLock.lock();
}

//remove()方法中的解锁方法
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

deque()(poll、take的具体实现)

1
2
3
4
5
6
7
8
9
10
11
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}