Java并发编程系列之二线程基础

上篇文章对并发的理论基础进行了回顾,主要是为什么使用多线程、多线程会引发什么问题及引发的原因,和怎么使用Java中的多线程去解决这些问题。

正所谓,知其然知其所以然,这是学习一个知识遵循的原则。

推荐读者先行查看并发编程的理论知识,以便可以丝滑入戏。

并发编程系列之一并发理论基础

本篇文章重点在于Java中怎么去使用多线程,和多线程的一些相关概念和操作,及怎么优化多线程。

在Java中每个对象都有其生命周期,线程同样不例外,也有其生命周期。

一、线程生命周期

线程的几种状态转换

image-20220407104430832

1、新建(New)

新创建了一个线程对象,但还没有调用start()方法。

2、就绪

当线程对象调用了start()方法后,该线程就进入就绪状态。处于就绪状态的线程位于线程队列中,此时它只是具备了运行的条件,能否获得CPU的使用权并开始运行,还需要等待系统的调度。

3、运行(Runnable)

如果处于就绪状态的线程获得了CPU的使用权,并开始执行run()方法中的线程执行体,则该线程处于运行状态。

一个线程启动后,它可能不会一直处于运行状态,当运行状态的线程使用完系统分配的时间后,系统就会剥夺该线程占用的CPU资源,让其他线程获得执行的机会。需要注意的是,

只有处于就绪状态的线程才可能转换到运行状态。

4、阻塞(Blocking)

等待获取一个排它锁,如果其线程释放了锁就会结束此状态。

①无限期等待(Waiting)

等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。

进入方法 退出方法
没有设置 Timeout 参数的 Object.wait() 方法 Object.notify() / Object.notifyAll()
没有设置 Timeout 参数的 Thread.join() 方法 被调用的线程执行完毕
LockSupport.park() 方法 -

②限期等待(Timed Waiting)

无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。

调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。

调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。

睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。

阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用 Thread.sleep() 和 Object.wait() 等方法进入。

进入方法 退出方法
Thread.sleep() 方法 时间结束
设置了 Timeout 参数的 Object.wait() 方法 时间结束 / Object.notify() / Object.notifyAll()
设置了 Timeout 参数的 Thread.join() 方法 时间结束 / 被调用的线程执行完毕
LockSupport.parkNanos() 方法 -
LockSupport.parkUntil() 方法 -

5、死亡(Terminated)

如果线程调用stop()方法或nun()方法正常执行完毕,或者线程抛出一个未捕获的异常(Exception)错误(Error),线程就进入死亡状态。一旦进入死亡状态,线程将不再拥有运行的资格,也不能再转换到其他状态。

理解线程的五种状态,在调用多线程的方法时,能清楚的知道当前处于哪个状态。

我们举一个简单的实例来说明每个状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyThread extends Thread {

//运行状态
public void run() {
// ...
}

public static void main(String[] args) {
MyThread mt = new MyThread(); //1、新建状态
mt.start(); //就绪状态
}
}

在线程控制章节有一些方法,如sleep()\join()方法,这些方法会让线程处于阻塞状态。

了解了线程的生成周期以后,接下来我们就需要掌握在Java中怎么使用多线程。

在Java中有三种方式实现多线程。

二、创建线程的三种方式

有三种使用线程的方法:

  • 实现 Runnable 接口;
  • 实现 Callable 接口;
  • 继承 Thread 类。

实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以说任务是通过线程驱动从而执行的。

1、实现 Runnable 接口

需要实现 run() 方法。

通过 Thread 调用 start() 方法来启动线程。

1
2
3
4
5
6
public class MyRunnable implements Runnable {
public void run() {
// 需要执行多线程的业务逻辑
}
}

1
2
3
4
5
6
public static void main(String[] args) {
MyRunnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
thread.start();
}

2、 实现 Callable 接口

与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。

1
2
3
4
5
6
7
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}


1
2
3
4
5
6
7
8
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}

3、继承 Thread 类

同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。

当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。

1
2
3
4
5
6
7
public class MyThread extends Thread {
public void run() {
// ...
}
}


1
2
3
4
5
6
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}


4、实现接口 VS 继承 Thread

实现接口会更好一些,因为:

  • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

三、线程控制

线程在使用过程中能对其灵活的控制,包含线程睡眠和线程让步等。

在学习线程的一些控制方法前,有一个必须要了解的前置知识,在线程中分为守护进程和非守护进程。

1、Daemon

守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。

当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。

垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是JVM上仅剩的线程时,垃圾回收线程会自动离开。它始终在低级别的状态中运行,用于实时监控和管理系统中的可回收资源。

main() 属于非守护线程。

非守护线程可以转换为守护进程。

使用 setDaemon() 方法将一个线程设置为守护线程。

1
2
3
4
5
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.setDaemon(true);
}

2、sleep()

Thread.sleep(millisec) 方法会休眠当前正在执行的线程,millisec 单位为毫秒。

sleep() 可能会抛出 InterruptedException,因为异常不能跨线程传播回 main() 中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

1
2
3
4
5
6
7
8
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

3、yield()

对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

1
2
3
4
public void run() {
Thread.yield();
}

4、join()

一旦这个线程执行了这个方法,只有这个线程处于死亡状态其他线程才能执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MyThread extends Thread {
11
12 public MyThread() {
13 }
14
15 public MyThread(String name) {
16 super(name);
17 }
18
19 @Override
20 public void run() {
21 for (int i = 0; i < 10; i++) {
22 System.out.println(getName() + ":" + i);
23 }
24 }
25
26 public static void main(String[] args) {
27 // 1.创建MyThread类的对象
28 MyThread myThread1 = new MyThread("线程1");
29 MyThread myThread2 = new MyThread("线程2");
30 MyThread myThread3 = new MyThread("线程3");
31
32 // 2.启动线程
33 myThread1.start();
34 try {
35 // 等待myThread1线程死亡,只有当该线程死亡之后才能继续执行其它线程
36 myThread1.join();
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 }
40 myThread2.start();
41 myThread3.start();
42
43 }
44 }

5、wait()\notify()

wait\notify\notifyAll操作都是属于Object类提供的方法,即所有的对象都具有该方法,他们是的一对的,调用的时候不能分开呦。

wait():调用wait方法的线程,当前持有锁的该线程等待,直至该对象的另一个持锁线程调用notify/notifyAll操作。
wait(long timeOut)、wait(long timeOut,int nanos)

线程状态转换是,当wait被唤醒或超时,并不是直接进入到运行或者就绪状态,而是先进入到Block状态,抢锁成功后,才能进入到可运行状态。

wait方法在调用进入阻塞之前会释放锁,而sleep或join是不会释放锁的

notify():通知持有该对象锁的所有线程中的的随意一个线程被唤醒

notifyAll():通知持有该对象锁的所有线程被同时唤醒

我们形象的做一个比喻:

如果把多线程比喻成一个运动员,跑道就是CPU每次只能允许一个运动员进入跑道,运动员的后勤保障就是守护进程,通过setDaemon()方法,运动员就转业为了后勤人员。

执行sleep()就是提前设定一个时间,让运动员休息会。wait()方法是运动员无限期的睡着,直到教练杀出来一脚踹醒(执行notify方法)运动员才会唤醒。

yield()会把跑道让给别的运动员。

join()方法会让运动员拥有最高的跑道权限,我不跑完,谁都不能进来。

四、线程同步

Java允许并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查), 将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用, 从而保证了该变量的唯一性和准确性。

Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock。

1、synchronized

①. 同步一个代码块

1
2
3
4
5
6
7
public void func() {
synchronized (this) {
// ...
}
}


它只作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。

对于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SynchronizedExample {

public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}

public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}
1
2
3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9


对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。

1
2
3
4
5
6
7
8
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}

1
2
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9  

②. 同步一个方法

1
2
3
4
public synchronized void func () {
// ...
}

它和同步代码块一样,作用于同一个对象。

③. 同步一个类

1
2
3
4
5
6
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

1
2
3
4
5
6
7
8
9
10
11
public class SynchronizedExample {

public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}


④. 同步一个静态方法

1
2
3
4
5
public synchronized static void fun() {
// ...
}


作用于整个类。

2、ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class LockExample {

private Lock lock = new ReentrantLock();

public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 确保释放锁,从而避免发生死锁。
}
}
}

1
2
3
4
5
6
7
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}

1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9    

3、比较

①. 锁的实现**

synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

②. 性能

新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

③. 等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

ReentrantLock 可中断,而 synchronized 不行。

④. 公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

⑤. 锁绑定多个条件

一个 ReentrantLock 可以同时绑定多个 Condition 对象。

4、使用选择

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。

这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。

并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池就应用而生。

五、线程池

线程池围绕着一个核心的类 java.uitl.concurrent.ThreadPoolExecutor,我们将它作为一个切入点揭开线程池的面纱。

1、核心线程类

 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

在ThreadPoolExecutor类中有四个构造方法。

其中三个最终都是调用了下面这个构造方法,限于篇幅就不在贴其他三个源码了,读者可以进行求证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

 下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    1
    2
    3
    4
    5
    6
    7
    TimeUnit.DAYS;               //
    TimeUnit.HOURS; //小时
    TimeUnit.MINUTES; //分钟
    TimeUnit.SECONDS; //
    TimeUnit.MILLISECONDS; //毫秒
    TimeUnit.MICROSECONDS; //微妙
    TimeUnit.NANOSECONDS; //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,观察传入的workQueue 都是默认,即最大可添加Integer.MAX_VALUE个任务,所有在使用过程中要避免使用默认线程池。这里的阻塞队列有以下几种选择:

    1
    2
    3
    4
    5
    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
  • threadFactory:线程工厂,主要用来创建线程;

  • handler:表示当拒绝处理任务时的策略,有以下四种取值:

    1
    2
    3
    4
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

以上对构造的七个参数进行了介绍,那么这些参数是怎么起作用的呢,我们接着看线程池的执行流程。

2、线程执行流程

  1. 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  2. 当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
  3. 当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
  4. 当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
  5. 当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,释放空闲线程
  6. 当设置allowCoreThreadTimeOut(true)时,该参数默认false,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

3、四种线程池及使用场景

Java通过Executors提供四种线程池,分别为

  1. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newScheduledThreadPool 创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
  4. newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newCachedThreadPool:

  • 底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;时间单位TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)

  • 通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定时间,则该线程会被销毁。

  • 适用:执行很多短期的异步任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    /**
    * 1.创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程<br>
    * 2.当任务数增加时,此线程池又可以智能的添加新线程来处理任务<br>
    * 3.此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小<br>
    */
    public static void cacheThreadPool() {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 1; i <= 10; i++) {
    final int ii = i;
    try {
    Thread.sleep(ii * 1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    cachedThreadPool.execute(()->out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
    }
    }
    -----output------
    线程名称:pool-1-thread-1,执行1
    线程名称:pool-1-thread-1,执行2
    线程名称:pool-1-thread-1,执行3
    线程名称:pool-1-thread-1,执行4
    线程名称:pool-1-thread-1,执行5
    线程名称:pool-1-thread-1,执行6
    线程名称:pool-1-thread-1,执行7
    线程名称:pool-1-thread-1,执行8
    线程名称:pool-1-thread-1,执行9
    线程名称:pool-1-thread-1,执行10

newFixedThreadPool:

  • 底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量n,corePoolSize和maximumPoolSize均为n;keepAliveTime为0L;时间单位TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueue() 无界阻塞队列

  • 通俗:创建可容纳固定数量线程的池子,每个线程的存活时间是无限的,当池子满了就不再添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)

  • 适用:执行长期任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
     /**
    * 1.创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小<br>
    * 2.线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程<br>
    * 3.因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称<br>
    */
    public static void fixTheadPoolTest() {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 10; i++) {
    final int ii = i;
    fixedThreadPool.execute(() -> {
    out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    }
    ------output-------
    线程名称:pool-1-thread-3,执行2
    线程名称:pool-1-thread-1,执行0
    线程名称:pool-1-thread-2,执行3
    线程名称:pool-1-thread-3,执行4
    线程名称:pool-1-thread-1,执行5
    线程名称:pool-1-thread-2,执行6
    线程名称:pool-1-thread-3,执行7
    线程名称:pool-1-thread-1,执行8
    线程名称:pool-1-thread-3,执行9

newSingleThreadExecutor:

  • 底层:FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例,corePoolSize为1;maximumPoolSize为1;keepAliveTime为0L;时间单位TimeUnit.MILLISECONDS;workQueue为:new LinkedBlockingQueue() 无解阻塞队列

  • 通俗:创建只有一个线程的线程池,当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)

  • 适用:按顺序执行任务的场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**  *创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
    */
    public static void singleTheadPoolTest() {
    ExecutorService pool = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
    final int ii = i;
    pool.execute(() -> out.println(Thread.currentThread().getName() + "=>" + ii));
    }
    }

    -----output-------

    线程名称:pool-1-thread-1,执行0
    线程名称:pool-1-thread-1,执行1
    线程名称:pool-1-thread-1,执行2
    线程名称:pool-1-thread-1,执行3
    线程名称:pool-1-thread-1,执行4
    线程名称:pool-1-thread-1,执行5
    线程名称:pool-1-thread-1,执行6
    线程名称:pool-1-thread-1,执行7
    线程名称:pool-1-thread-1,执行8
    线程名称:pool-1-thread-1,执行9

NewScheduledThreadPool:

  • 底层:创建ScheduledThreadPoolExecutor实例,该对象继承了ThreadPoolExecutor,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;时间单位TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列

  • 通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构

  • 适用:执行周期性任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * 创建一个定长线程池,支持定时及周期性任务执行。延迟执行
    */
    public static void sceduleThreadPool() {
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    Runnable r1 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:3秒后执行");
    scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS);
    Runnable r2 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:延迟2秒后每3秒执行一次");
    scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS);
    Runnable r3 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:普通任务");
    for (int i = 0; i < 5; i++) {
    scheduledThreadPool.execute(r3);
    }
    }
    ----output------
    线程名称:pool-1-thread-1,执行:普通任务
    线程名称:pool-1-thread-5,执行:普通任务
    线程名称:pool-1-thread-4,执行:普通任务
    线程名称:pool-1-thread-3,执行:普通任务
    线程名称:pool-1-thread-2,执行:普通任务
    线程名称:pool-1-thread-1,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-5,执行:3秒后执行
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次

5、使用实例

在ThreadPoolTaskExecutor的原理章节中,有一系列的方法,如果我们手动调用这些线程池方法实现方法是极其复杂的。

①、在java中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));

for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}


class MyTask implements Runnable {
private int taskNum;

public MyTask(int num) {
this.taskNum = num;
}

@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了。

  不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

1
2
3
Executors.newCachedThreadPool();    //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

 从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

  newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

  newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

②、在Spring中使用

以下为Java线程池在Spring中的使用,ThreadPoolTaskExecutor一个对象注入到Spring的容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 线程池配置
*
* @author tcy
**/
@Configuration
public class ThreadPoolConfig {
// 核心线程池大小
private final int corePoolSize = 50;

// 最大可创建的线程数
private final int maxPoolSize = 200;

// 队列最大长度
private final int queueCapacity = 1000;

// 线程池维护线程所允许的空闲时间
private final int keepAliveSeconds = 300;

@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}

在方法或者类上加 @Async注解,标明该方法或类为多线程方法,Spirng内部会自动调用多线程的拒绝策略、线程初始化等方法。

安利时刻:

image-20220714114938715


Java并发编程系列之二线程基础
http://www.sky1998.cn/2022/03/17/Java/并发编程/Java并发编程系列之二线程基础/
作者
程序员田同学
发布于
2022年3月17日
许可协议