线程池

前言

在平时使用多线程的过程中,线程的创建,运行以及管理都是我们自己手动维护的。这样很不便于多线程的管理,并且频繁的创建,销毁线程都是需要较大的资源开销的。而且很多线程被创建出来执行了一次任务之后便不再用了,等待他的只有销毁,这都是很浪费资源的。于是便有了线程池,他能够帮助我们创建多个线程执行不同的任务,且在每个任务执行完毕后循环利用线程。这对于执行效率和空间占用等方面都是很大的提升…

线程池

线程池的优点

  • 第一 : 降低资源消耗 通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 第二 : 提高响应速度 当任务到达时,任务可以不需要等待线程创建就能立即执行。
  • 第三 :提高线程的可管理性 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的实现原理

当一个任务来到时 …

  1. 首先会判断核心线程是否已满。如果未满,不论是否有空闲线程,都重新创建新的线程执行任务然后加入核心池中(这一步需要获取全局锁) ;如果满了,则将任务提交给一个空闲线程去执行,如果此时没有空闲线程则执行步骤2
  2. 判断阻塞队列是否已满。如果未满,则将任务置入阻塞队列,等待核心线程池的调用;如果满了,则执行步骤3
  3. 判断最大线程池是否已满。如果未满,则创建新的线程执行任务并加入最大线程池中。如果满了,则执行步骤4
  4. 调用饱和策略处理该任务

为什么采用以上设计思路
之所以采用上面这种设计思路,是为了在执行的时候尽可能的避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热后(当前运行的线程数大于等于corePoolSize),再有新的任务到来就会直接到步骤2,而步骤2不需要获取全局锁,且可以暂时缓冲任务,使其不执行步骤3,也不会获取全局锁

补充概念

  • 工作线程 : 线程池创建的线程都会封装为一个工作线程Worker,工作线程会去执行相应的任务,并在任务结束后循环从阻塞队列中拿任务

线程池工作原理图:

image

综上,线程池中的线程执行任务分两种情况:

  1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务
  2. 这个线程执行完当前任务后,会反复从阻塞队列获取任务执行

线程池的使用

创建线程池使用的方法

1
2
3
4
5
6
public ThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);

创建线程池的实例代码

1
2
3
4
5
//创建一个线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
3,5,2000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<Runnable>());

解释一下各个参数的含义

  • int corePoolSize :表示核心池的大小,当有一个任务提交到核心池时,核心线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行该任务也会创建新的线程,等到需要执行的任务数大于核心池的基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
  • int maximumPoolSize :最大线程池大小。如果阻塞队列都满了,并且已创建的线程数量小于最大线程池的数量,此时线程池会在创建新的线程执行任务。注意,如果阻塞队列是一个无界队列,那么这个参数就是无效的了
  • long keepAliveTime : 线程活动保持时间。线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大该数值,提高线程的利用率
  • TimeUnit unit:线程活动保持时间的单位。可选单位有天(DAYS),小时(HOURS),分钟(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)、
  • BlockingQueue workQueue : 任务队列(阻塞队列),可选的队列有三种
    1. ArrayBlockingQueue:这是一个基于数组的有界阻塞队列,按照FIFO原则对元素进行排序。
    2. LinkedBlockingQueue:基于链表结构的阻塞队列。也是按照FIFO排序元素。吞吐量通常高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    3. SynchronousQueue :一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则会处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
    4. PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • RejectedExecutionHandler handler :饱和策略,默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法

execute()方法
该方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

使用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//创建一个线程池
/**
* 各参数的意义(顺序解释)
*
* 核心线程池的大小,最大线程池的大小,线程空闲时存活时间,存活时间的单位,阻塞队列
* 饱和策略不指定,默认使用AbortPolicy,饱和时直接抛出异常
*
*/
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
3,5,2000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<Runnable>());


//使用execute()方法提交任务
// execute方法没有返回值,且需要一个Runnable实例对象作为参数
MyThread myThread = new MyThread();
for(int i=0;i<5;i++){
poolExecutor.execute(myThread);
}

submit()方法
submit()方法用于提交有返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程知道任务完成,使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后返回,这时候有可能任务没有执行完。

使用该方法需要一个Callable对象
代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class MyThread_02 implements Callable{

@Override
public Object call() throws Exception {
System.out.println("当前线程是: "+Thread.currentThread().getName());
Thread.sleep(1000);
return Thread.currentThread().getName()+"执行成功";
}
}

public class Test_02 {
public static void main(String[] args) throws Exception {

//创建线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3,5,2000,
TimeUnit.MICROSECONDS,new LinkedBlockingDeque<Runnable>());

//使用submit的方式提交任务,任务执行后会返回一个Future对象.
//使用Future对象的get()方法可以获取返回值,get()方法会阻塞线程直到任务结束
MyThread_02 myThread02 = new MyThread_02();

for(int i=0;i<5;i++){
Future future = poolExecutor.submit(myThread02);
String s = (String) future.get();
System.out.println(s);
}

System.out.println("主线程停止");
}
}

使用execute()方式实现卖票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class MyThread implements Runnable{

private int tickets = 20;
@Override
public void run() {
System.out.println("当前线程为 "+Thread.currentThread().getName()+"准备开始购票");
try {
while(tickets > 0){
Thread.sleep(1000);
synchronized (this){
if(tickets >0){
tickets--;
System.out.println(Thread.currentThread().getName()+" 购票完成"+"还剩余"+this.tickets+" 张票");
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class Test {

public static void main(String[] args) {

//创建一个线程池
/**
* 各参数的意义(顺序解释)
*
* 核心线程池的大小,最大线程池的大小,线程空闲时存活时间,存活时间的单位,阻塞队列
* 饱和策略不指定,默认使用AbortPolicy,饱和时直接抛出异常
*
*/
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
3,5,2000,
TimeUnit.MICROSECONDS,
new LinkedBlockingDeque<Runnable>());


//使用execute()方法提交任务
// execute方法没有返回值,且需要一个Runnable实例对象作为参数
MyThread myThread = new MyThread();
for(int i=0;i<5;i++){
poolExecutor.execute(myThread);
}
}
}

关闭线程池

可以通过调用线程池的shotdowm或shutdownNow方法来关闭线程池。他的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。

showdown和showdownNow的区别

  1. showdownNow首先将线程池的状态置为stop,然后尝试停止所有的 正在执行暂停任务 的线程,并返回等待执行任务的列表
  2. 而showdoun只是将线程池的状态置为shotdown状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

至于应该调用哪一种方法来来关闭线程池,要根据具体任务而定,如果任务可以不执行完就终止,就使用showdownNow,通常使用showdown来关闭线程池

线程数的确定

性质不同的任务可以用不同规模的线程池分开处理。

CPU密集型任务应尽量配置少的线程数量,因为他的cpu利用很高

IO密集型任务应配置较多的线程,因为他并不是在一直在执行任务。

依赖数据库连接池的线程,因为线程提交sql后需要等待数据库返回结果,等待的时间越长,则cpu空闲时间就越长,那么线程数应该设置的大一点,这样才能充分利用cpu

尽量使用有界队列

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任 务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线 程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻 塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多, 有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所 有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是 出现这样问题时也会影响到其他任务。

Executor框架

Executor框架的两级调度模型

在HotSpot VM线程模型中,Java线程被一对一映射为本地操作系统线程。
Java线程启动时会创建一个本地操作系统线程;当线程停止时,这个本地操作系统线程也会被回收。操作系统会调度所有的本地操作系统线程并分配给可用的cpu

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级调度框架(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。这就是两级调度模型。

即应用程序通过Executor框架控制上层的调度,而下层的调度有操作系统内核控制,下层的调度不受应用程序的控制

任务的两级调度模型示意图
image

Executor框架的结构与成员

image

Exector框架的结构

Executor框架主要分为三部分,任务任务的执行异步计算的结果

任务

包括被执行任务需要实现的接口:Runnable接口或Callable接口

任务的执行

包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。在ExecutorService下又有两个关键类ThreadPoolExecutor(普通线程池)和ScheduledThreadPoolExecutor(定时线程池)

ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。

异步计算的结果

包括接口Future和实现Future接口的FutureTask类,代表异步计算结果

三种类型的普通线程池

Executor框架最核心的类是ThreadPoolExecutor,他是线程池的实现类。通过Executor框架的工具类Executors,可以创建3种类型的ThreadPoolExecutor

  1. 创建固定大小的线程池
  2. 创建单线程池
  3. 创建大小无限制的线程池

1.创建固定大小的线程池(FixedThreadPool)

1
public static ExecutorService newFixedThreadPool(int num)

底层实现

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int n) { 
return new ThreadPoolExecutor(
n, n, 0L,TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列容量为Integer.MAX_VALUE).使用无界队列最为阻塞队列有如下特点:

  1. 当线程池中线程数达到corePoolSize时,新到的将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
  2. 由于使用的是无界队列,则maximumPoolSize和keepLive将是两个无用参数
  3. 线程池不会调用饱和策略

该种线程池适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场合,适用于负载比较重的服务器

2.单线程池(newSingleThreadExecutor)

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newSingleThreadExecutor的核心线程池和最大线程池都设置为1,其他参数和FixedThreadPool相同。该线程池同样也是使用无界队列LinkedBlockingQueue作为线程池的工作队列。

单线程池适用于需要保证顺序的执行各个任务;并且在任意时间点,不会有多个线程是活动的 的应用场景

3.CacheThreadPool

CacheThreadPool是一个会根据需要创建新线程的线程池。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为
Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool
中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的
maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,
CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存
资源。

定时池: ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor。他主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但ScheduledThreadPoolExecutor功能给更强大,更灵活。Timer对应的是后台单个线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

该线程池中阻塞队列使用的也是一个无界队列DelayQueue,所以最大线程池的数量在这没有什么意义