JUC并发编程

xlc5202022年8月14日
大约 270 分钟约 81139 字

一、线程基础

1、Java多线程相关概念

1、进程

是程序的⼀次执⾏,是系统进⾏资源分配和调度的独⽴单位,每⼀个进程都有它⾃⼰的内存空间和系统资源

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。

进程具有的特征:

  • 动态性:进程是程序的一次执行过程,是临时的,有生命期的,是动态产生,动态消亡的
  • 并发性:任何进程都可以同其他进行一起并发执行
  • 独立性:进程是系统进行资源分配和调度的一个独立单位
  • 结构性:进程由程序,数据和进程控制块三部分组成

我们经常使用windows系统,经常会看见.exe后缀的文件,双击这个.exe文件的时候,这个文件中的指令就会被系统加载,那么我们就能得到一个关于这个.exe程序的进程。进程是**“活”**的,或者说是正在被执行的。

2、线程

在同⼀个进程内⼜可以执⾏多个任务,⽽这每⼀个任务我们就可以看做是⼀个线程 ⼀个进程会有1个或多个线程的

线程是轻量级的进程,是程序执行的最小单元,使用多线程而不是多进程去进行并发程序的设计,是因为线程间的切换和调度的成本远远小于进程。

3、进程与线程的一个简单解释

进程(process)和线程(thread)是操作系统的基本概念,但是它们比较抽象,不容易掌握。

1.计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。

f65f6640-fde3-4f6f-be2a-aa05b6a3c1b9

2.假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。

aa874eba-0c27-4924-be97-9c853c009ca9

3.进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。

f03b160d-4e18-46a1-9158-913a4afdb2b2

4.一个车间里,可以有很多工人。他们协同完成一个任务。

9985e48b-92bd-434e-8a4e-6f85c95a8dbd

5.线程就好比车间里的工人。一个进程可以包括多个线程。

3dc76caa-b3d9-4555-b8c4-c805bb97e03e

6.车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存。

b3ef804e-346b-4280-bb8d-809c9bd42853

7.可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。

bea2bf20-2b08-484e-80b7-3e210d0e20df

8.一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫”互斥锁”(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。

876a4f82-7931-4674-ac6b-a6d04a88a5b1

9.还有些房间,可以同时容纳n个人,比如厨房。也就是说,如果人数大于n,多出来的人只能在外面等着。这好比某些内存区域,只能供给固定数目的线程使用。

ae960b3a-c0e8-4c3d-bbcb-bfc5ebbecb79

10.这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。这种做法叫做”信号量”(Semaphore),用来保证多个线程不会互相冲突。

11.操作系统的设计,因此可以归结为三点: (1)以多进程形式,允许多个任务同时运行; (2)以多线程形式,允许单个任务分成不同的部分运行; (3)提供协调机制,一方面防止进程之间和线程之间产生冲突,另一方面允许进程之间和线程之间共享资源。

4、管程

Monitor(监视器),也就是我们平时所说的锁

// Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。
// JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象,
Object o = new Object();
new Thread(() -> {
    synchronized (o)
    {
    }
},"t1").start();
// Monitor对象会和Java对象一同创建并销毁,它底层是由C++语言来实现的。

image-20210904000040589

5、线程状态?

// Thread.State
public enum State {
    NEW,(新建)
    RUNNABLE,(准备就绪)
    BLOCKED,(阻塞)
    WAITING,(不见不散)
    TIMED_WAITING,(过时不候)
    TERMINATED;(终结)
}

线程几个状态的介绍:

  • New:表示刚刚创建的线程,这种线程还没有开始执行
  • RUNNABLE:运行状态,线程的start()方法调用后,线程会处于这种状态
  • BLOCKED:阻塞状态。当线程在执行的过程中遇到了synchronized同步块,但这个同步块被其他线程已获取还未释放时,当前线程将进入阻塞状态,会暂停执行,直到获取到锁。当线程获取到锁之后,又会进入到运行状态(RUNNABLE)
  • WAITING:等待状态。和TIME_WAITING都表示等待状态,区别是WAITING会进入一个无时间限制的等,而TIME_WAITING会进入一个有限的时间等待,那么等待的线程究竟在等什么呢?一般来说,WAITING的线程正式在等待一些特殊的事件,比如,通过wait()方法等待的线程在等待notify()方法,而通过join()方法等待的线程则会等待目标线程的终止。一旦等到期望的事件,线程就会再次进入RUNNABLE运行状态。
  • TERMINATED:表示结束状态,线程执行完毕之后进入结束状态。

注意:从NEW状态出发后,线程不能在回到NEW状态,同理,处理TERMINATED状态的线程也不能在回到RUNNABLE状态

6、wait/sleep的区别?

功能都是当前线程暂停,有什么区别?

wait放开手去睡,放开手里的锁

sleep握紧手去睡,醒了手里还有锁

2、线程的基本操作

1、新建线程

新建线程很简单。只需要使用new关键字创建一个线程对象,然后调用它的start()启动线程即可。

Thread thread1 = new Thread1();
t1.start();

那么线程start()之后,会干什么呢?线程有个run()方法,start()会创建一个新的线程并让这个线程执行run()方法。

这里需要注意,下面代码也能通过编译,也能正常执行。但是,却不能新建一个线程,而是在当前线程中调用run()方法,将run方法只是作为一个普通的方法调用。

Thread thread = new Thread1();
thread1.run();

所以,希望大家注意,调用start方法和直接调用run方法的区别。

start方法是启动一个线程,run方法只会在垫钱线程中串行的执行run方法中的代码。

默认情况下, 线程的run方法什么都没有,启动一个线程之后马上就结束了,所以如果你需要线程做点什么,需要把您的代码写到run方法中,所以必须重写run方法。

Thread thread1 = new Thread() {            @Override            public void run() {                System.out.println("hello,我是一个线程!");            }        };thread1.start();

上面是使用匿名内部类实现的,重写了Thread的run方法,并且打印了一条信息。**我们可以通过继承Thread类,然后重写run方法,来自定义一个线程。**但考虑java是单继承的,从扩展性上来说,我们实现一个接口来自定义一个线程更好一些,java中刚好提供了Runnable接口来自定义一个线程。

@FunctionalInterfacepublic interface Runnable {    public abstract void run();}

Thread类有一个非常重要的构造方法:

public Thread(Runnable target)

我们在看一下Thread的run方法:

public void run() {        if (target != null) {            target.run();        }    }

当我们启动线程的start方法之后,线程会执行run方法,run方法中会调用Thread构造方法传入的target的run方法。

实现Runnable接口是比较常见的做法,也是推荐的做法。

2、终止线程

一般来说线程执行完毕就会结束,无需手动关闭。但是如果我们想关闭一个正在运行的线程,有什么方法呢?可以看一下Thread类中提供了一个stop()方法,调用这个方法,就可以立即将一个线程终止,非常方便。

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;

@Slf4j
public class Demo01 {
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread() {
            @Override
            public void run() {
                log.info("start");
                boolean flag = true;
                while (flag) {
                    ;
                }
                log.info("end");
            }
        };
        thread1.setName("thread1");
        thread1.start();
        //当前线程休眠1秒
        TimeUnit.SECONDS.sleep(1);
        //关闭线程thread1
        thread1.stop();
        //输出线程thread1的状态
        log.info("{}", thread1.getState());
        //当前线程休眠1秒
        TimeUnit.SECONDS.sleep(1);
        //输出线程thread1的状态
        log.info("{}", thread1.getState());
    }
}

运行代码,输出:

18:02:15.312 [thread1] INFO com.itsoku.chat01.Demo01 - start
18:02:16.311 [main] INFO com.itsoku.chat01.Demo01 - RUNNABLE
18:02:17.313 [main] INFO com.itsoku.chat01.Demo01 - TERMINATED

代码中有个死循环,调用stop方法之后,线程thread1的状态变为TERMINATED(结束状态),线程停止了。

我们使用idea或者eclipse的时候,会发现这个方法是一个废弃的方法,也就是说,在将来,jdk可能就会移除该方法。

stop方法为何会被废弃而不推荐使用?stop方法过于暴力,强制把正在执行的方法停止了。

大家是否遇到过这样的场景:电力系统需要维修,此时咱们正在写代码,维修人员直接将电源关闭了,代码还没保存的,是不是很崩溃,这种方式就像直接调用线程的stop方法类似。线程正在运行过程中,被强制结束了,可能会导致一些意想不到的后果。可以给大家发送一个通知,告诉大家保存一下手头的工作,将电脑关闭。

3、线程中断

在java中,线程中断是一种重要的线程写作机制,从表面上理解,中断就是让目标线程停止执行的意思,实际上并非完全如此。在上面中,我们已经详细讨论了stop方法停止线程的坏处,jdk中提供了更好的中断线程的方法。严格的说,线程中断并不会使线程立即退出,而是给线程发送一个通知,告知目标线程,有人希望你退出了!至于目标线程接收到通知之后如何处理,则完全由目标线程自己决定,这点很重要,如果中断后,线程立即无条件退出,我们又会到stop方法的老问题。

Thread提供了3个与线程中断有关的方法,这3个方法容易混淆,大家注意下:

public void interrupt() //中断线程
public boolean isInterrupted() //判断线程是否被中断
public static boolean interrupted()  //判断线程是否被中断,并清除当前中断状态

interrupt()方法是一个实例方法,它通知目标线程中断,也就是设置中断标志位为true,中断标志位表示当前线程已经被中断了。isInterrupted()方法也是一个实例方法,它判断当前线程是否被中断(通过检查中断标志位)。最后一个方法interrupted()是一个静态方法,返回boolean类型,也是用来判断当前线程是否被中断,但是同时会清除当前线程的中断标志位的状态。

while (true) {
            if (this.isInterrupted()) {
                System.out.println("我要退出了!");
                break;
            }
        }
    }
};
thread1.setName("thread1");
thread1.start();
TimeUnit.SECONDS.sleep(1);
thread1.interrupt();

上面代码中有个死循环,interrupt()方法被调用之后,线程的中断标志将被置为true,循环体中通过检查线程的中断标志是否为ture(this.isInterrupted())来判断线程是否需要退出了。

再看一种中断的方法:

static volatile boolean isStop = false;
public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new Thread() {
        @Override
        public void run() {
            while (true) {
                if (isStop) {
                    System.out.println("我要退出了!");
                    break;
                }
            }
        }
    };
    thread1.setName("thread1");
    thread1.start();
    TimeUnit.SECONDS.sleep(1);
    isStop = true;
}

代码中通过一个变量isStop来控制线程是否停止。

通过变量控制和线程自带的interrupt方法来中断线程有什么区别呢?

如果一个线程调用了sleep方法,一直处于休眠状态,通过变量控制,还可以中断线程么?大家可以思考一下。

此时只能使用线程提供的interrupt方法来中断线程了。

public static void main(String[] args) throws InterruptedException {
    Thread thread1 = new Thread() {
        @Override
        public void run() {
            while (true) {
                //休眠100秒
                try {
                    TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("我要退出了!");
                break;
            }
        }
    };
    thread1.setName("thread1");
    thread1.start();
    TimeUnit.SECONDS.sleep(1);
    thread1.interrupt();
}

调用interrupt()方法之后,线程的sleep方法将会抛出InterruptedException异常。

Thread thread1 = new Thread() {
    @Override
    public void run() {
        while (true) {
            //休眠100秒
            try {
                TimeUnit.SECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (this.isInterrupted()) {
                System.out.println("我要退出了!");
                break;
            }
        }
    }
};

运行上面的代码,发现程序无法终止。为什么?

代码需要改为:

Thread thread1 = new Thread() {
    @Override
    public void run() {
        while (true) {
            //休眠100秒
            try {
                TimeUnit.SECONDS.sleep(100);
            } catch (InterruptedException e) {
                this.interrupt();
                e.printStackTrace();
            }
            if (this.isInterrupted()) {
                System.out.println("我要退出了!");
                break;
            }
        }
    }
};

上面代码可以终止。

注意:sleep方法由于中断而抛出异常之后,线程的中断标志会被清除(置为false),所以在异常中需要执行this.interrupt()方法,将中断标志位置为true

4、等待(wait)和通知(notify)

为了支持多线程之间的协作,JDK提供了两个非常重要的方法:等待wait()方法和通知notify()方法。这2个方法并不是在Thread类中的,而是在Object类中定义的。这意味着所有的对象都可以调用者两个方法。

public final void wait() throws InterruptedException;
public final native void notify();

当在一个对象实例上调用wait()方法后,当前线程就会在这个对象上等待。这是什么意思?比如在线程A中,调用了obj.wait()方法,那么线程A就会停止继续执行,转为等待状态。等待到什么时候结束呢?线程A会一直等到其他线程调用obj.notify()方法为止,这时,obj对象成为了多个线程之间的有效通信手段。

那么wait()方法和notify()方法是如何工作的呢?如图2.5展示了两者的工作过程。如果一个线程调用了object.wait()方法,那么它就会进出object对象的等待队列。这个队列中,可能会有多个线程,因为系统可能运行多个线程同时等待某一个对象。当object.notify()方法被调用时,它就会从这个队列中随机选择一个线程,并将其唤醒。这里希望大家注意一下,这个选择是不公平的,并不是先等待线程就会优先被选择,这个选择完全是随机的。

f950f73b-52a6-4ecd-a8cb-5422bcd3a44e

除notify()方法外,Object独享还有一个nofiyAll()方法,它和notify()方法的功能类似,不同的是,它会唤醒在这个等待队列中所有等待的线程,而不是随机选择一个。

这里强调一点,Object.wait()方法并不能随便调用。它必须包含在对应的synchronize语句汇总,无论是wait()方法或者notify()方法都需要首先获取目标独享的一个监视器。图2.6显示了wait()方法和nofiy()方法的工作流程细节。其中T1和T2表示两个线程。T1在正确执行wait()方法钱,必须获得object对象的监视器。而wait()方法在执行后,会释放这个监视器。这样做的目的是使其他等待在object对象上的线程不至于因为T1的休眠而全部无法正常执行。

线程T2在notify()方法调用前,也必须获得object对象的监视器。所幸,此时T1已经释放了这个监视器,因此,T2可以顺利获得object对象的监视器。接着,T2执行了notify()方法尝试唤醒一个等待线程,这里假设唤醒了T1。T1在被唤醒后,要做的第一件事并不是执行后续代码,而是要尝试重新获得object对象的监视器,而这个监视器也正是T1在wait()方法执行前所持有的那个。如果暂时无法获得,则T1还必须等待这个监视器。当监视器顺利获得后,T1才可以在真正意义上继续执行。

给大家上个例子:

public class Demo06 {
    static Object object = new Object();
    public static class T1 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println(System.currentTimeMillis() + ":T1 start!");
                try {
                    System.out.println(System.currentTimeMillis() + ":T1 wait for object");
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(System.currentTimeMillis() + ":T1 end!");
            }
        }
    }
    public static class T2 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println(System.currentTimeMillis() + ":T2 start,notify one thread! ");
                object.notify();
                System.out.println(System.currentTimeMillis() + ":T2 end!");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new T1().start();
        new T2().start();
    }
}

运行结果:

1562934497212:T1 start!
1562934497212:T1 wait for object
1562934497212:T2 start,notify one thread!
1562934497212:T2 end!
1562934499213:T1 end!

注意下打印结果,T2调用notify方法之后,T1并不能立即继续执行,而是要等待T2释放objec投递锁之后,T1重新成功获取锁后,才能继续执行。因此最后2行日志相差了2秒(因为T2调用notify方法后休眠了2秒)。

注意:Object.wait()方法和Thread.sleep()方法都可以让现场等待若干时间。除wait()方法可以被唤醒外,另外一个主要的区别就是wait()方法会释放目标对象的锁,而Thread.sleep()方法不会释放锁。

再给大家讲解一下wait(),notify(),notifyAll(),加深一下理解:

可以这么理解,obj对象上有2个队列,如图1,q1:等待队列,q2:准备获取锁的队列;两个队列都为空。

5f1e0099-c802-4e79-803a-a5117e6666ff

obj.wait()过程:

synchronize(obj){
    obj.wait();
}

假如有3个线程,t1、t2、t3同时执行上面代码,t1、t2、t3会进入q2队列,如图2,进入q2的队列的这些线程才有资格去争抢obj的锁,假设t1争抢到了,那么t2、t3机型在q2中等待着获取锁,t1进入代码块执行wait()方法,此时t1会进入q1队列,然后系统会通知q2队列中的t2、t3去争抢obj的锁,抢到之后过程如t1的过程。最后t1、t2、t3都进入了q1队列,如图3。

6c5eec72-3303-40a5-b70d-6e24ba12d2de

45fd3950-b371-4d46-a5df-2ce1c8303707

上面过程之后,又来了线程t4执行了notify()方法,如下:**

synchronize(obj){
    obj.notify();
}

t4会获取到obj的锁,然后执行notify()方法,系统会从q1队列中随机取一个线程,将其加入到q2队列,假如t2运气比较好,被随机到了,然后t2进入了q2队列,如图4,进入q2的队列的锁才有资格争抢obj的锁,t4线程执行完毕之后,会释放obj的锁,此时队列q2中的t2会获取到obj的锁,然后继续执行,执行完毕之后,q1中包含t1、t3,q2队列为空,如图5

fbf3b798-65f7-4b90-a614-66854fcce5fa

b3868bc9-3da0-474b-a48e-20f90c1335ee

接着又来了个t5队列,执行了notifyAll()方法,如下:

synchronize(obj){
    obj.notifyAll();
}

2.调用obj.wait()方法,当前线程会加入队列queue1,然后会释放obj对象的锁

t5会获取到obj的锁,然后执行notifyAll()方法,系统会将队列q1中的线程都移到q2中,如图6,t5线程执行完毕之后,会释放obj的锁,此时队列q2中的t1、t3会争抢obj的锁,争抢到的继续执行,未增强到的带锁释放之后,系统会通知q2中的线程继续争抢索,然后继续执行,最后两个队列中都为空了。

9e648015-c445-4e84-bd8c-a53a380cbd7f

5、挂起(suspend)和继续执行(resume)线程

Thread类中还有2个方法,即线程挂起(suspend)继续执行(resume),这2个操作是一对相反的操作,被挂起的线程,必须要等到resume()方法操作后,才能继续执行。系统中已经标注着2个方法过时了,不推荐使用。

系统不推荐使用suspend()方法去挂起线程是因为suspend()方法导致线程暂停的同时,并不会释放任何锁资源。此时,其他任何线程想要访问被它占用的锁时,都会被牵连,导致无法正常运行(如图2.7所示)。直到在对应的线程上进行了resume()方法操作,被挂起的线程才能继续,从而其他所有阻塞在相关锁上的线程也可以继续执行。但是,如果resume()方法操作意外地在suspend()方法前就被执行了,那么被挂起的线程可能很难有机会被继续执行了。并且,更严重的是:它所占用的锁不会被释放,因此可能会导致整个系统工作不正常。而且,对于被挂起的线程,从它线程的状态上看,居然还是Runnable状态,这也会影响我们队系统当前状态的判断。

上个例子:

public class Demo07 {
    static Object object = new Object();
    public static class T1 extends Thread {
        public T1(String name) {
            super(name);
        }
        @Override
        public void run() {
            synchronized (object) {
                System.out.println("in " + this.getName());
                Thread.currentThread().suspend();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1("t1");
        t1.start();
        Thread.sleep(100);
        T1 t2 = new T1("t2");
        t2.start();
        t1.resume();
        t2.resume();
        t1.join();
        t2.join();
    }
}

运行代码输出:

in t1
in t2

我们会发现程序不会结束,线程t2被挂起了,导致程序无法结束,使用jstack命令查看线程堆栈信息可以看到:

"t2" #13 prio=5 os_prio=0 tid=0x000000002796c000 nid=0xa3c runnable [0x000000002867f000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Thread.suspend0(Native Method)
        at java.lang.Thread.suspend(Thread.java:1029)
        at com.itsoku.chat01.Demo07$T1.run(Demo07.java:20)
        - locked <0x0000000717372fc0> (a java.lang.Object)

发现t2线程在suspend0处被挂起了,t2的状态竟然还是RUNNABLE状态,线程明明被挂起了,状态还是运行中容易导致我们队当前系统进行误判,代码中已经调用resume()方法了,但是由于时间先后顺序的缘故,resume并没有生效,这导致了t2永远滴被挂起了,并且永远占用了object的锁,这对于系统来说可能是致命的。

6、等待线程结束(join)和谦让(yeild)

很多时候,一个线程的输入可能非常依赖于另外一个或者多个线程的输出,此时,这个线程就需要等待依赖的线程执行完毕,才能继续执行。jdk提供了join()操作来实现这个功能。如下所示,显示了2个join()方法:

public final void join() throws InterruptedException;
public final synchronized void join(long millis) throws InterruptedException;

第1个方法表示无限等待,它会一直只是当前线程。知道目标线程执行完毕。

第2个方法有个参数,用于指定等待时间,如果超过了给定的时间目标线程还在执行,当前线程也会停止等待,而继续往下执行。

比如:线程T1需要等待T2、T3完成之后才能继续执行,那么在T1线程中需要分别调用T2和T3的join()方法。

上个示例:

public class Demo08 {
    static int num = 0;
    public static class T1 extends Thread {
        public T1(String name) {
            super(name);
        }
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ",start " + this.getName());
            for (int i = 0; i < 10; i++) {
                num++;
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(System.currentTimeMillis() + ",end " + this.getName());
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1("t1");
        t1.start();
        t1.join();
        System.out.println(System.currentTimeMillis() + ",num = " + num);
    }
}

执行结果:

1562939889129,start t1
1562939891134,end t1
1562939891134,num = 10

num的结果为10,1、3行的时间戳相差2秒左右,说明主线程等待t1完成之后才继续执行的。

看一下jdk1.8中Thread.join()方法的实现:

public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;
    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }
    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

从join的代码中可以看出,在被等待的线程上使用了synchronize,调用了它的wait()方法,线程最后执行完毕之后,系统会自动调用它的notifyAll()方法,唤醒所有在此线程上等待的其他线程。

注意:被等待的线程执行完毕之后,系统自动会调用该线程的notifyAll()方法。所以一般情况下,我们不要去在线程对象上使用wait()、notify()、notifyAll()方法。

另外一个方法是Thread.yield(),他的定义如下:

public static native void yield();

yield是谦让的意思,这是一个静态方法,一旦执行,它会让当前线程出让CPU,但需要注意的是,出让CPU并不是说不让当前线程执行了,当前线程在出让CPU后,还会进行CPU资源的争夺,但是能否再抢到CPU的执行权就不一定了。因此,对Thread.yield()方法的调用好像就是在说:我已经完成了一些主要的工作,我可以休息一下了,可以让CPU给其他线程一些工作机会了。

如果觉得一个线程不太重要,或者优先级比较低,而又担心此线程会过多的占用CPU资源,那么可以在适当的时候调用一下Thread.yield()方法,给与其他线程更多的机会。

7、总结

  1. 创建线程的2中方式:继承Thread类;实现Runnable接口
  2. 启动线程:调用线程的start()方法
  3. 终止线程:调用线程的stop()方法,方法已过时,建议不要使用
  4. 线程中断相关的方法:调用线程实例interrupt()方法将中断标志置为true;使用**线程实例方法isInterrupted()获取中断标志;调用Thread的静态方法interrupted()**获取线程是否被中断,此方法调用之后会清除中断标志(将中断标志置为false了)
  5. wait、notify、notifyAll方法,这块比较难理解,可以回过头去再理理
  6. 线程挂起使用线程实例方法suspend(),恢复线程使用线程实例方法resume(),这2个方法都过时了,不建议使用
  7. 等待线程结束:调用线程实例方法join()
  8. 出让cpu资源:调用线程静态方法yeild()

2、为什么多线程极其重要???

  1. 硬件方面 - 摩尔定律失效

摩尔定律: 它是由英特尔创始人之一Gordon Moore(戈登·摩尔)提出来的。其内容为: 当价格不变时,集成电路上可容纳的元器件的数目约每隔18-24个月便会增加一倍,性能也将提升一倍。 换言之,每一美元所能买到的电脑性能,将每隔18-24个月翻一倍以上。这一定律揭示了信息技术进步的速度。

可是从2003年开始CPU主频已经不再翻倍,而是采用多核而不是更快的主频。

摩尔定律失效。

在主频不再提高且核数在不断增加的情况下,要想让程序更快就要用到并行或并发编程。

  1. 软件方面

高并发系统,异步+回调等生产需求

3、从start一个线程说起

// Java线程理解以及openjdk中的实现
private native void start0();
// Java语言本身底层就是C++语言

OpenJDK源码网址:http://openjdk.java.net/open in new window

openjdk8\hotspot\src\share\vm\runtime

更加底层的C++源码解读

openjdk8\jdk\src\share\native\java\lang   thread.c
java线程是通过start的方法启动执行的,主要内容在native方法start0中,Openjdk的写JNI一般是一一对应的,Thread.java对应的就是Thread.c start0其实就是JVM_StartThread。此时查看源代码可以看到在jvm.h中找到了声明,jvm.cpp中有实现。    

image-20210903235656449

openjdk8\hotspot\src\share\vm\prims  jvm.cpp

image-20210903235812379

image-20210903235817486

openjdk8\hotspot\src\share\vm\runtime  thread.cpp

image-20210903235840971

4、用户线程和守护线程

Java线程分为用户线程和守护线程,线程的daemon属性为true表示是守护线程,false表示是用户线程

守护线程

是一种特殊的线程,在后台默默地完成一些系统性的服务,比如垃圾回收线程

用户线程

是系统的工作线程,它会完成这个程序需要完成的业务操作

public class DaemonDemo {

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 开始运行," + (Thread.currentThread().isDaemon() ? "守护线程" : "用户线程"));
            while (true) {

            }
        }, "t1");

        //线程的daemon属性为true表示是守护线程,false表示是用户线程
        t1.setDaemon(true);
        t1.start();

        //3秒钟后主线程再运行
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------main线程运行完毕");
    }

}

重点

当程序中所有用户线程执行完毕之后,不管守护线程是否结束,系统都会自动退出

如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以当系统只剩下守护进程的时候,java虚拟机会自动退出

设置守护线程,需要在start()方法之前进行

5、获得多线程的方法几种?

  • 传统的是

    • 继承thread类
    • 实现runnable接口,
  • java5以后

    • 实现callable接口
    • java的线程池获得

6、Callable接口

1、与runnable对比

// 创建新类MyThread实现runnable接口
class MyThread implements Runnable{
 @Override
 public void run() {
 
 }
}
// 新类MyThread2实现callable接口
class MyThread2 implements Callable<Integer>{
 @Override
 public Integer call() throws Exception {
  return 200;
 } 
}
// 面试题:callable接口与runnable接口的区别?
 
// 答:(1)是否有返回值
//     (2)是否抛异常
//    (3)落地方法不一样,一个是run,一个是call

2、怎么用

直接替换runnable是否可行?

image

不可行,因为:thread类的构造方法根本没有Callable

image

认识不同的人找中间人

image

public static void main(String[] args) throws ExecutionException, InterruptedException {
  FutureTask futureTask = new FutureTask(new MyThread2());
  new Thread(futureTask,"AA").start();
}

运行成功后如何获得返回值?

image

public static void main(String[] args) throws ExecutionException, InterruptedException {
  FutureTask futureTask = new FutureTask(new MyThread2());
  new Thread(futureTask,"AA").start();
  System.out.println(futureTask.get());
}

二、线程池

1、什么是线程池

大家用jdbc操作过数据库应该知道,操作数据库需要和数据库建立连接,拿到连接之后才能操作数据库,用完之后销毁。数据库连接的创建和销毁其实是比较耗时的,真正和业务相关的操作耗时是比较短的。每个数据库操作之前都需要创建连接,为了提升系统性能,后来出现了数据库连接池,系统启动的时候,先创建很多连接放在池子里面,使用的时候,直接从连接池中获取一个,使用完毕之后返回到池子里面,继续给其他需要者使用,这其中就省去创建连接的时间,从而提升了系统整体的性能。

线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程中中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。

简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。

2、为什么用线程池

线程池的优势:

​ 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。

第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

3、线程池的使用

1、Executors.newFixedThreadPool(int)

​ newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的是LinkedBlockingQueue执行长期任务性能好,创建一个线程池,一池有N个固定的线程,有固定线程数的线程

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

2、Executors.newSingleThreadExecutor()

​ newSingleThreadExecutor 创建的线程池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue一个任务一个任务的执行,一池一线程

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

3、Executors.newCachedThreadPool()

​ newCachedThreadPool创建的线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们。可扩容,遇强则强

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 线程池
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //固定数的线程池,一池五线程

//       ExecutorService threadPool =  Executors.newFixedThreadPool(5); //一个银行网点,5个受理业务的窗口
//       ExecutorService threadPool =  Executors.newSingleThreadExecutor(); //一个银行网点,1个受理业务的窗口
       ExecutorService threadPool =  Executors.newCachedThreadPool(); //一个银行网点,可扩展受理业务的窗口

        //10个顾客请求
        try {
            for (int i = 1; i <=10; i++) {
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }
}

4、ThreadPoolExecutor底层原理

fdae3766-9607-424f-8f77-ba9a14583e8e

举个例子,加深理解:

咱们作为开发者,上面都有开发主管,主管下面带领几个小弟干活,CTO给主管授权说,你可以招聘5个小弟干活,新来任务,如果小弟还不到吴哥,立即去招聘一个来干这个新来的任务,当5个小弟都招来了,再来任务之后,将任务记录到一个表格中,表格中最多记录100个,小弟们会主动去表格中获取任务执行,如果5个小弟都在干活,并且表格中也记录满了,那你可以将小弟扩充到20个,如果20个小弟都在干活,并且存放任务的表也满了,产品经理再来任务后,是直接拒绝,还是让产品自己干,这个由你自己决定,小弟们都尽心尽力在干活,任务都被处理完了,突然公司业绩下滑,几个员工没事干,打酱油,为了节约成本,CTO主管把小弟控制到5人,其他15个人直接被干掉了。所以作为小弟们,别让自己闲着,多干活。

**原理:**先找几个人干活,大家都忙于干活,任务太多可以排期,排期的任务太多了,再招一些人来干活,最后干活的和排期都达到上层领导要求的上限了,那需要采取一些其他策略进行处理了。对于长时间不干活的人,考虑将其开掉,节约资源和成本。

image

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前把核心线程都创造好,并启动

  2. maximumPoolSize:线程池允许创建的最大线程数,此值必须大于等于1。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了

  3. keepAliveTime:多余的空闲线程的存活时间,当前池中线程数量超过corePoolSize时,当空闲时间,达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止,如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率

  4. unit:keepAliveTIme的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit,这个枚举也经常使用

  5. workQueue:任务队列,被提交但尚未被执行的任务,用于缓存待处理任务的阻塞队列

  6. threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

  7. handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略

image

image

调用线程池的execute方法处理任务,执行execute方法的过程:

  1. 判断线程池中运行的线程数是否小于corepoolsize,是:则创建新的线程来处理任务,否:执行下一步
  2. 试图将任务添加到workQueue指定的队列中,如果无法添加到队列,进入下一步
  3. 判断线程池中运行的线程数是否小于maximumPoolSize,是:则新增线程处理当前传入的任务,否:将任务传递给handler对象rejectedExecution方法处理
1、在创建了线程池后,开始等待请求。
2、当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
  2.1如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
  2.2如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
  2.3如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
  2.4如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
    如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
    所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。

5、拒绝策略?生产中如设置合理参数

1、线程池的拒绝策略

​ 等待队列已经排满了,再也塞不下新任务了,同时,线程池中的max线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。

2、JDK内置的拒绝策略

AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行

CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。

DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。

DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

以上内置拒绝策略均实现了RejectedExecutionHandle接口

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo5 {
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,
                1,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //自定义饱和策略
                    //记录一下无法处理的任务
                    System.out.println("无法处理的任务:" + r.toString());
                });
        for (int i = 0; i < 5; i++) {
            executor.execute(new Task("任务-" + i));
        }
        executor.shutdown();
    }
}
无法处理的任务:Task{name='任务-2'}
无法处理的任务:Task{name='任务-3'}
pool-1-thread-1处理任务-0
无法处理的任务:Task{name='任务-4'}
pool-1-thread-1处理任务-1

输出结果中可以看到有3个任务进入了饱和策略中,记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。

3、生产中合理的设置参数

要想合理的配置线程池,需要先分析任务的特性,可以从以下几个角度分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
  • 任务的优先级:高、中、低
  • 任务的执行时间:长、中、短
  • 任务的依赖性:是否依赖其他的系统资源,如数据库连接。

性质不同任务可以用不同规模的线程池分开处理。CPU密集型任务应该尽可能小的线程,如配置cpu数量+1个线程的线程池。由于IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。可以通过Runtime.getRuntime().availableProcessors()方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。

使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无解队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

线程池汇总线程大小对系统的性能有一定的影响,我们的目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有消息的使用机器的性能。咋Java Concurrency inPractice书中给出了估算线程池大小的公式:

Ncpu = CUP的数量
Ucpu = 目标CPU的使用率,0<=Ucpu<=1
W/C = 等待时间与计算时间的比例
为保存处理器达到期望的使用率,最有的线程池的大小等于:
Nthreads = Ncpu × Ucpu × (1+W/C)
  1. CPU密集型

    // 查看CPU核数
    System. out .println(Runtime. getRuntime ().availableProcessors());
    

    image-20220329145519183

  2. IO密集型

    1. 由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数 * 2
    2. image-20220329145652545

看公司业务是CPU密集型还是IO密集型的,这两种不一样,来决定线程池线程数的最佳合理配置数。

6、超级大坑 在工作中单一的/固定数的/可变的三种创建线程池的方法哪个用的多?

答案是一个都不用,我们工作中只能使用自定义的

image

7、自定义线程池

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * 线程池
 * Arrays
 * Collections
 * Executors
 */
public class MyThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.AbortPolicy()
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardPolicy()
        );
        //10个顾客请求
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }

    private static void threadPool() {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //固定数的线程池,一池五线程

//       ExecutorService threadPool =  Executors.newFixedThreadPool(5); //一个银行网点,5个受理业务的窗口
//       ExecutorService threadPool =  Executors.newSingleThreadExecutor(); //一个银行网点,1个受理业务的窗口
        ExecutorService threadPool = Executors.newCachedThreadPool(); //一个银行网点,可扩展受理业务的窗口

        //10个顾客请求
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

8、线程池中的2个关闭方法

线程池提供了2个关闭方法:shutdownshutdownNow,当调用者两个方法之后,线程池会遍历内部的工作线程,然后调用每个工作线程的interrrupt方法给线程发送中断信号,内部如果无法响应中断信号的可能永远无法终止,所以如果内部有无线循环的,最好在循环内部检测一下线程的中断信号,合理的退出。调用者两个方法中任意一个,线程池的isShutdown方法就会返回true,当所有的任务线程都关闭之后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

调用shutdown方法之后,线程池将不再接口新任务,内部会将所有已提交的任务处理完毕,处理完毕之后,工作线程自动退出。

而调用shutdownNow方法后,线程池会将还未处理的(在队里等待处理的任务)任务移除,将正在处理中的处理完毕之后,工作线程自动退出。

至于调用哪个方法来关闭线程,应该由提交到线程池的任务特性决定,多数情况下调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

9、BlockingQueue阻塞队列

1、栈与队列

栈:先进后出,后进先出

队列:先进先出

2、阻塞队列

阻塞:必须要阻塞/不得不阻塞

image

线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素

当队列是空的,从队列中获取元素的操作将会被阻塞

当队列是满的,从队列中添加元素的操作将会被阻塞

试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素

试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增

image

3、种类分析

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。

PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

DelayQueue:使用优先级队列实现的延迟无界阻塞队列。

SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列

LinkedTransferQueue:由链表组成的无界阻塞队列。

LinkedBlockingDeque:由链表组成的双向阻塞队列。

import java.util.concurrent.*;

public class Demo2 {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 50; i++) {
            int j = i;
            String taskName = "任务" + j;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "处理" + taskName);
                //模拟任务内部处理耗时
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }
}

代码中使用Executors.newCachedThreadPool()创建线程池,看一下的源码:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

从输出中可以看出,系统创建了50个线程处理任务,代码中使用了SynchronousQueue同步队列,这种队列比较特殊,放入元素必须要有另外一个线程去获取这个元素,否则放入元素会失败或者一直阻塞在那里直到有线程取走,示例中任务处理休眠了指定的时间,导致已创建的工作线程都忙于处理任务,所以新来任务之后,将任务丢入同步队列会失败,丢入队列失败之后,会尝试新建线程处理任务。使用上面的方式创建线程池需要注意,如果需要处理的任务比较耗时,会导致新来的任务都会创建新的线程进行处理,可能会导致创建非常多的线程,最终耗尽系统资源,触发OOM。

PriorityBlockingQueue优先级队列的线程池

import java.util.concurrent.*;

public class Demo3 {
    static class Task implements Runnable, Comparable<Task> {
        private int i;
        private String name;
        public Task(int i, String name) {
            this.i = i;
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.name);
        }
        @Override
        public int compareTo(Task o) {
            return Integer.compare(o.i, this.i);
        }
    }
    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new PriorityBlockingQueue());
        for (int i = 0; i < 10; i++) {
            String taskName = "任务" + i;
            executor.execute(new Task(i, taskName));
        }
        for (int i = 100; i >= 90; i--) {
            String taskName = "任务" + i;
            executor.execute(new Task(i, taskName));
        }
        executor.shutdown();
    }
}

输出中,除了第一个任务,其他任务按照优先级高低按顺序处理。原因在于:创建线程池的时候使用了优先级队列,进入队列中的任务会进行排序,任务的先后顺序由Task中的i变量决定。向PriorityBlockingQueue加入元素的时候,内部会调用代码中Task的compareTo方法决定元素的先后顺序。

4、BlockingQueue核心方法

image

image


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 阻塞队列
 */
public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {

//        List list = new ArrayList();

        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        //第一组
//        System.out.println(blockingQueue.add("a"));
//        System.out.println(blockingQueue.add("b"));
//        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.element());

        //System.out.println(blockingQueue.add("x"));
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//        System.out.println(blockingQueue.remove());
//    第二组
//        System.out.println(blockingQueue.offer("a"));
//        System.out.println(blockingQueue.offer("b"));
//        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("x"));
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());
//    第三组        
//         blockingQueue.put("a");
//         blockingQueue.put("b");
//         blockingQueue.put("c");
//         //blockingQueue.put("x");
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
//        System.out.println(blockingQueue.take());
        
//    第四组        
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("a",3L, TimeUnit.SECONDS));

    }
}

10、扩展线程池

虽然jdk提供了ThreadPoolExecutor这个高性能线程池,但是如果我们自己想在这个线程池上面做一些扩展,比如,监控每个任务执行的开始时间,结束时间,或者一些其他自定义的功能,我们应该怎么办?

这个jdk已经帮我们想到了,ThreadPoolExecutor内部提供了几个方法beforeExecuteafterExecuteterminated,可以由开发人员自己去这些方法。看一下线程池内部的源码:

try {
    beforeExecute(wt, task);//任务执行之前调用的方法
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x;
        throw x;
    } catch (Error x) {
        thrown = x;
        throw x;
    } catch (Throwable x) {
        thrown = x;
        throw new Error(x);
    } finally {
        afterExecute(task, thrown);//任务执行完毕之后调用的方法
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

beforeExecute:任务执行之前调用的方法,有2个参数,第1个参数是执行任务的线程,第2个参数是任务

protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任务执行完成之后调用的方法,2个参数,第1个参数表示任务,第2个参数表示任务执行时的异常信息,如果无异常,第二个参数为null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:线程池最终关闭之后调用的方法。所有的工作线程都退出了,最终线程池会退出,退出时调用该方法

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Demo6 {
    static class Task implements Runnable {
        String name;
        public Task(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.name);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "Task{" +
                    "name='" + name + '\'' +
                    '}';
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                10,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    //自定义饱和策略
                    //记录一下无法处理的任务
                    System.out.println("无法处理的任务:" + r.toString());
                }) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(System.currentTimeMillis() + "," + t.getName() + ",开始执行任务:" + r.toString());
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",任务:" + r.toString() + ",执行完毕!");
            }
            @Override
            protected void terminated() {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",关闭线程池!");
            }
        };
        for (int i = 0; i < 10; i++) {
            executor.execute(new Task("任务-" + i));
        }
        TimeUnit.SECONDS.sleep(1);
        executor.shutdown();
    }
}
1564324574847,pool-1-thread-1,开始执行任务:Task{name='任务-0'}
1564324574850,pool-1-thread-3,开始执行任务:Task{name='任务-2'}
pool-1-thread-3处理任务-2
1564324574849,pool-1-thread-2,开始执行任务:Task{name='任务-1'}
pool-1-thread-2处理任务-1
1564324574848,pool-1-thread-5,开始执行任务:Task{name='任务-4'}
pool-1-thread-5处理任务-4
1564324574848,pool-1-thread-4,开始执行任务:Task{name='任务-3'}
pool-1-thread-4处理任务-3
1564324574850,pool-1-thread-7,开始执行任务:Task{name='任务-6'}
pool-1-thread-7处理任务-6
1564324574850,pool-1-thread-6,开始执行任务:Task{name='任务-5'}
1564324574851,pool-1-thread-8,开始执行任务:Task{name='任务-7'}
pool-1-thread-8处理任务-7
pool-1-thread-1处理任务-0
pool-1-thread-6处理任务-5
1564324574851,pool-1-thread-10,开始执行任务:Task{name='任务-9'}
pool-1-thread-10处理任务-9
1564324574852,pool-1-thread-9,开始执行任务:Task{name='任务-8'}
pool-1-thread-9处理任务-8
1564324576851,pool-1-thread-2,任务:Task{name='任务-1'},执行完毕!
1564324576851,pool-1-thread-3,任务:Task{name='任务-2'},执行完毕!
1564324576852,pool-1-thread-1,任务:Task{name='任务-0'},执行完毕!
1564324576852,pool-1-thread-4,任务:Task{name='任务-3'},执行完毕!
1564324576852,pool-1-thread-8,任务:Task{name='任务-7'},执行完毕!
1564324576852,pool-1-thread-7,任务:Task{name='任务-6'},执行完毕!
1564324576852,pool-1-thread-5,任务:Task{name='任务-4'},执行完毕!
1564324576853,pool-1-thread-6,任务:Task{name='任务-5'},执行完毕!
1564324576853,pool-1-thread-10,任务:Task{name='任务-9'},执行完毕!
1564324576853,pool-1-thread-9,任务:Task{name='任务-8'},执行完毕!
1564324576853,pool-1-thread-9,关闭线程池!

从输出结果中可以看到,每个需要执行的任务打印了3行日志,执行前由线程池的beforeExecute打印,执行时会调用任务的run方法,任务执行完毕之后,会调用线程池的afterExecute方法,从每个任务的首尾2条日志中可以看到每个任务耗时2秒左右。线程池最终关闭之后调用了terminated方法。

三、CompletableFuture

1、Future和Callable接口

Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

image-20210904000352470

Callable接口中定义了需要有返回的任务需要实现的方法

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

2、从之前的FutureTask开始

Future接口相关架构

image-20210904000529226

code1

public class CompletableFutureDemo{
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException{
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return ThreadLocalRandom.current().nextInt(100);
        });

        Thread t1 = new Thread(futureTask,"t1");
        t1.start();

        //3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)
        //System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());

        //3秒钟后才出来结果,我只想等待1秒钟,过时不候
        System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get(1L,TimeUnit.SECONDS));

        System.out.println(Thread.currentThread().getName()+"\t"+" run... here");

    }
}
  • get()阻塞 一旦调用get()方法,不管是否计算完成都会导致阻塞

code2

public class CompletableFutureDemo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return ""+ ThreadLocalRandom.current().nextInt(100);
        });

        new Thread(futureTask,"t1").start();

        System.out.println(Thread.currentThread().getName()+"\t"+"线程完成任务");

        /**
         * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
         */
        while (true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }
        }
    }
}

isDone()轮询

轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果.

如果想要异步获取结果,通常都会以轮询的方式去获取结果 尽量不要阻塞

不见不散 -- 过时不候 -- 轮询

3、对Future的改进

1、类CompletableFuture

image-20210904001054865

image-20210904001102892

image-20210904001143180

2、接口CompletionStage

image-20210904001214909

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

4、核心的四个静态方法

1、runAsync 无 返回值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)  

2、supplyAsync 有 返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

上述Executor executor参数说明

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

3、Code 无 返回值

public class CompletableFutureDemo3{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----task is over");
        });
        System.out.println(future.get());
    }
}

image-20210904001511947

4、Code 有 返回值

public class CompletableFutureDemo3{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        });

        System.out.println(completableFuture.get());
    }
}

5、Code 减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

public class CompletableFutureDemo3{
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----计算结束耗时1秒钟,result: "+result);
            if(result > 6){
                int age = 10/0;
            }
            return result;
        }).whenComplete((v,e) ->{
            if(e == null){
                System.out.println("-----result: "+v);
            }
        }).exceptionally(e -> {
            System.out.println("-----exception: "+e.getCause()+"\t"+e.getMessage());
            return -44;
        });

        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

6、CompletableFuture的优点

异步任务结束时,会自动回调某个对象的方法;

异步任务出错时,会自动回调某个对象的方法;

主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

5、join和get对比

get会抛出异常,join不需要

6、案例精讲-从电商网站的比价需求说开去

切记,功能→性能

​ 经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理, 对于分布式微服务的调用,按照实际业务,如果是无关联step by step的业务,可以尝试是否可以多箭齐发,同时调用。我们去比同一个商品在各个平台上的价格,要求获得一个清单列表, 1 step by step,查完京东查淘宝,查完淘宝查天猫......

2 all 一口气同时查询。。。。。

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class T1{
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("tmall"),
            new NetMall("pdd"),
            new NetMall("mi")
    );

    public static List<String> findPriceSync(List<NetMall> list,String productName){
        return list.stream().map(mall -> String.format(productName+" %s price is %.2f",mall.getNetMallName(),mall.getPriceByName(productName))).collect(Collectors.toList());
    }

    public static List<String> findPriceASync(List<NetMall> list,String productName){
        return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
    }


    public static void main(String[] args){
        long startTime = System.currentTimeMillis();
        List<String> list1 = findPriceSync(list, "thinking in java");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = findPriceASync(list, "thinking in java");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}

class NetMall{
    @Getter
    private String netMallName;

    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double getPriceByName(String productName){
        return calcPrice(productName);
    }

    private double calcPrice(String productName){
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        return ThreadLocalRandom.current().nextDouble() + productName.charAt(0);
    }
}

7、CompletableFuture常用方法

1、获得结果和触发计算

获取结果

// 不见不散
public T get()
    
// 过时不候
public T get(long timeout, TimeUnit unit)
    
// 没有计算完成的情况下,给我一个替代结果 
// 立即获取结果不阻塞 计算完,返回计算完成后的结果  没算完,返回设定的valueIfAbsent值
public T getNow(T valueIfAbsent)
  
public class CompletableFutureDemo2{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 533;
        });

        //去掉注释上面计算没有完成,返回444
        //开启注释上满计算完成,返回计算结果
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(completableFuture.getNow(444));
    }
}

public T join()
public class CompletableFutureDemo2{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join());
    }
}   

主动触发计算

// 是否打断get方法立即返回括号值
public boolean complete(T value)
  
public class CompletableFutureDemo4{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 533;
        });

        //注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        //当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
        System.out.println(completableFuture.complete(444)+"\t"+completableFuture.get());
    }
}   

2、对计算结果进行处理

thenApply
// 计算结果存在依赖关系,这两个线程串行化
// 由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停。
public class CompletableFutureDemo4{
	public static void main(String[] args) throws ExecutionException, InterruptedException{
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).thenApply(f -> {
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            //int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

    	System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
	}   
}  
handle
// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
public class CompletableFutureDemo4{

    public static void main(String[] args) throws ExecutionException, InterruptedException{
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            int age = 10/0;
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}   

image-20210904003033912

image-20210904003036925

3、对计算结果进行消费

接收任务的处理结果,并消费处理,无返回结果

//thenAccept
public static void main(String[] args) throws ExecutionException, InterruptedException{
    CompletableFuture.supplyAsync(() -> {
        return 1;
    }).thenApply(f -> {
        return f + 2;
    }).thenApply(f -> {
        return f + 3;
    }).thenApply(f -> {
        return f + 4;
    }).thenAccept(r -> System.out.println(r));
}  

Code之任务之间的顺序执行

thenRun
thenRun(Runnable runnable)
// 任务 A 执行完执行 B,并且 B 不需要 A 的结果
    
thenAccept
thenAccept(Consumer action)
// 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
  
thenApply
thenApply(Function fn)
// 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
 
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());

4、对计算速度进行选用

谁快用谁

applyToEither

public class CompletableFutureDemo5{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return f + 1;
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    }
}

5、对计算结果进行合并

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理

先完成的先等着,等待其它分支任务

thenCombine

code标准版,好理解先拆分

public class CompletableFutureDemo2{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return x + y;
        });
        
        System.out.println(thenCombineResult.get());
    }
}

code表达式

public class CompletableFutureDemo6{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());


        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

8、分支合并框架

Fork:把一个复杂任务进行分拆,大事化小

Join:把分拆任务的结果进行合并

image

1、相关类

1、ForkJoinPool

image

2、ForkJoinTask

image

3、RecursiveTask

image

// 递归任务:继承后可以实现递归(自己调自己)调用的任务
 class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
 }

2、示例

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class MyTask extends RecursiveTask<Integer>{
    private static final Integer ADJUST_VALUE = 10;
    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if((end - begin)<=ADJUST_VALUE){
           for(int i =begin;i <= end;i++){
                result = result + i;
           }
        }else{
            int middle = (begin + end)/2;
            MyTask task01 = new MyTask(begin,middle);
            MyTask task02 = new MyTask(middle+1,end);
            task01.fork();
            task02.fork();
            result =  task01.join() + task02.join();
        }


        return result;
    }
}


/**
 * 分支合并例子
 * ForkJoinPool
 * ForkJoinTask
 * RecursiveTask
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyTask myTask = new MyTask(0,100);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

        System.out.println(forkJoinTask.get());

        forkJoinPool.shutdown();
    }
}

四、Java“锁”事

1、Lock

image

// Lock implementations provide more extensive locking operations than can be obtained using synchronized methods and statements. They allow more flexible structuring, may have quite different properties, and may support multiple associated Condition objects.

// 锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象

2、synchronized与Lock的区别

  1. 首先synchronized是java内置关键字,在jvm层面,Lock是个java类;
  2. synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
  3. synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
  4. 用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;
  5. synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
  6. Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。

3、synchronized

  1. 修饰实例方法,作用于当前实例,进入同步代码前需要先获取实例的锁
  2. 修饰静态方法,作用于类的Class对象,进入修饰的静态方法前需要先获取类的Class对象的锁
  3. 修饰代码块,需要指定加锁对象(记做lockobj),在进入同步代码块前需要先获取lockobj的锁

1、synchronized作用于实例对象

所谓实例对象锁就是用synchronized修饰实例对象的实例方法,注意是实例方法,不是静态方法,如:

public class Demo2 {
    int num = 0;
    public synchronized void add() {
        num++;
    }
    public static class T extends Thread {
        private Demo2 demo2;
        public T(Demo2 demo2) {
            this.demo2 = demo2;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                this.demo2.add();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Demo2 demo2 = new Demo2();
        T t1 = new T(demo2);
        T t2 = new T(demo2);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(demo2.num);
    }
}

main()方法中创建了一个对象demo2和2个线程t1、t2,t1、t2中调用demo2的add()方法10000次,add()方法中执行了num++,num++实际上是分3步,获取num,然后将num+1,然后将结果赋值给num,如果t2在t1读取num和num+1之间获取了num的值,那么t1和t2会读取到同样的值,然后执行num++,两次操作之后num是相同的值,最终和期望的结果不一致,造成了线程安全失败,因此我们对add方法加了synchronized来保证线程安全。

注意:m1()方法是实例方法,两个线程操作m1()时,需要先获取demo2的锁,没有获取到锁的,将等待,直到其他线程释放锁为止。

synchronize作用于实例方法需要注意:

  1. 实例方法上加synchronized,线程安全的前提是,多个线程操作的是同一个实例,如果多个线程作用于不同的实例,那么线程安全是无法保证的
  2. 同一个实例的多个实例方法上有synchronized,这些方法都是互斥的,同一时间只允许一个线程操作同一个实例的其中的一个synchronized方法

2、synchronized作用于静态方法

当synchronized作用于静态方法时,锁的对象就是当前类的Class对象。如:

public class Demo3 {
    static int num = 0;
    public static synchronized void m1() {
        for (int i = 0; i < 10000; i++) {
            num++;
        }
    }
    public static class T1 extends Thread {
        @Override
        public void run() {
            Demo3.m1();
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        T1 t2 = new T1();
        T1 t3 = new T1();
        t1.start();
        t2.start();
        t3.start();
        //等待3个线程结束打印num
        t1.join();
        t2.join();
        t3.join();
        System.out.println(Demo3.num);
        /**
         * 打印结果:
         * 30000
         */
    }
}

上面代码打印30000,和期望结果一致。m1()方法是静态方法,有synchronized修饰,锁用于与Demo3.class对象,和下面的写法类似:

public static void m1() {
    synchronized (Demo4.class) {
        for (int i = 0; i < 10000; i++) {
            num++;
        }
    }
}

3、synchronized同步代码块

除了使用关键字修饰实例方法和静态方法外,还可以使用同步代码块,在某些情况下,我们编写的方法体可能比较大,同时存在一些比较耗时的操作,而需要同步的代码又只有一小部分,如果直接对整个方法进行同步操作,可能会得不偿失,此时我们可以使用同步代码块的方式对需要同步的代码进行包裹,这样就无需对整个方法进行同步操作了,同步代码块的使用示例如下:

public class Demo5 implements Runnable {
    static Demo5 instance = new Demo5();
    static int i = 0;
    @Override
    public void run() {
        //省略其他耗时操作....
        //使用同步代码块对变量i进行同步操作,锁对象为instance
        synchronized (instance) {
            for (int j = 0; j < 10000; j++) {
                i++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}

从代码看出,将synchronized作用于一个给定的实例对象instance,即当前实例对象就是锁对象,每次当线程进入synchronized包裹的代码块时就会要求当前线程持有instance实例对象锁,如果当前有其他线程正持有该对象锁,那么新到的线程就必须等待,这样也就保证了每次只有一个线程执行i++;操作。当然除了instance作为对象外,我们还可以使用this对象(代表当前实例)或者当前类的class对象作为锁,如下代码:

//this,当前实例对象锁
synchronized(this){
    for(int j=0;j<1000000;j++){
        i++;
    }
}
//class对象锁
synchronized(Demo5.class){
    for(int j=0;j<1000000;j++){
        i++;
    }
}

分析代码是否互斥的方法,先找出synchronized作用的对象是谁,如果多个线程操作的方法中synchronized作用的锁对象一样,那么这些线程同时异步执行这些方法就是互斥的。如下代码:

public class Demo6 {
    //作用于当前类的实例对象
    public synchronized void m1() {
    }
    //作用于当前类的实例对象
    public synchronized void m2() {
    }
    //作用于当前类的实例对象
    public void m3() {
        synchronized (this) {
        }
    }
    //作用于当前类Class对象
    public static synchronized void m4() {
    }
    //作用于当前类Class对象
    public static void m5() {
        synchronized (Demo6.class) {
        }
    }
    public static class T extends Thread{
        Demo6 demo6;
        public T(Demo6 demo6) {
            this.demo6 = demo6;
        }
        @Override
        public void run() {
            super.run();
        }
    }
    public static void main(String[] args) {
        Demo6 d1 = new Demo6();
        Thread t1 = new Thread(() -> {
            d1.m1();
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            d1.m2();
        });
        t2.start();
        Thread t3 = new Thread(() -> {
            d1.m2();
        });
        t3.start();
        Demo6 d2 = new Demo6();
        Thread t4 = new Thread(() -> {
            d2.m2();
        });
        t4.start();
        Thread t5 = new Thread(() -> {
            Demo6.m4();
        });
        t5.start();
        Thread t6 = new Thread(() -> {
            Demo6.m5();
        });
        t6.start();
    }
}

分析上面代码:

  1. 线程t1、t2、t3中调用的方法都需要获取d1的锁,所以他们是互斥的
  2. t1/t2/t3这3个线程和t4不互斥,他们可以同时运行,因为前面三个线程依赖于d1的锁,t4依赖于d2的锁
  3. t5、t6都作用于当前类的Class对象锁,所以这两个线程是互斥的,和其他几个线程不互斥

4、ReentrantLock

ReentrantLock是Lock的默认实现,在聊ReentranLock之前,我们需要先弄清楚一些概念:

  1. 可重入锁:可重入锁是指同一个线程可以多次获得同一把锁;ReentrantLock和关键字Synchronized都是可重入锁
  2. 可中断锁:可中断锁时子线程在获取锁的过程中,是否可以相应线程中断操作。synchronized是不可中断的,ReentrantLock是可中断的
  3. 公平锁和非公平锁:公平锁是指多个线程尝试获取同一把锁的时候,获取锁的顺序按照线程到达的先后顺序获取,而不是随机插队的方式获取。synchronized是非公平锁,而ReentrantLock是两种都可以实现,不过默认是非公平锁

1、synchronized的局限性

synchronized是java内置的关键字,它提供了一种独占的加锁方式。synchronized的获取和释放锁由jvm实现,用户不需要显示的释放锁,非常方便,然而synchronized也有一定的局限性,例如:

  1. 当线程尝试获取锁的时候,如果获取不到锁会一直阻塞,这个阻塞的过程,用户无法控制
  2. 如果获取锁的线程进入休眠或者阻塞,除非当前线程异常,否则其他线程尝试获取锁必须一直等待

JDK1.5之后发布,加入了Doug Lea实现的java.util.concurrent包。包内提供了Lock类,用来提供更多扩展的加锁功能。Lock弥补了synchronized的局限,提供了更加细粒度的加锁功能。

2、ReentrantLock基本使用

我们使用3个线程来对一个共享变量++操作,先使用synchronized实现,然后使用ReentrantLock实现。

synchronized方式

public class Demo2 {
    private static int num = 0;
    private static synchronized void add() {
        num++;
    }
    public static class T extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Demo2.add();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T();
        T t2 = new T();
        T t3 = new T();
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        System.out.println(Demo2.num);
    }
}

ReentrantLock方式

import java.util.concurrent.locks.ReentrantLock;

public class Demo3 {
    private static int num = 0;
    private static ReentrantLock lock = new ReentrantLock();
    private static void add() {
        lock.lock();
        try {
            num++;
        } finally {
            lock.unlock();
        }
    }
    public static class T extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Demo3.add();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T();
        T t2 = new T();
        T t3 = new T();
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        System.out.println(Demo3.num);
    }
}

ReentrantLock的使用过程:

  1. 创建锁:ReentrantLock lock = new ReentrantLock();
  2. 获取锁:lock.lock()
  3. 释放锁:lock.unlock();

对比上面的代码,与关键字synchronized相比,ReentrantLock锁有明显的操作过程,开发人员必须手动的指定何时加锁,何时释放锁,正是因为这样手动控制,ReentrantLock对逻辑控制的灵活度要远远胜于关键字synchronized,上面代码需要注意**lock.unlock()**一定要放在finally中,否则,若程序出现了异常,锁没有释放,那么其他线程就再也没有机会获取这个锁了。

3、ReentrantLock获取锁的过程是可中断的

对于synchronized关键字,如果一个线程在等待获取锁,最终只有2种结果:

  1. 要么获取到锁然后继续后面的操作
  2. 要么一直等待,直到其他线程释放锁为止

而ReentrantLock提供了另外一种可能,就是在等待获取锁的过程中(发起获取锁请求到还未获取到锁这段时间内)是可以被中断的,也就是说在等待锁的过程中,程序可以根据需要取消获取锁的请求。有些使用这个操作是非常有必要的。比如:你和好朋友越好一起去打球,如果你等了半小时朋友还没到,突然你接到一个电话,朋友由于突发状况,不能来了,那么你一定达到回府。中断操作正是提供了一套类似的机制,如果一个线程正在等待获取锁,那么它依然可以收到一个通知,被告知无需等待,可以停止工作了。

示例代码:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Demo6 {
    private static ReentrantLock lock1 = new ReentrantLock(false);
    private static ReentrantLock lock2 = new ReentrantLock(false);
    public static class T extends Thread {
        int lock;
        public T(String name, int lock) {
            super(name);
            this.lock = lock;
        }
        @Override
        public void run() {
            try {
                if (this.lock == 1) {
                    lock1.lockInterruptibly();
                    TimeUnit.SECONDS.sleep(1);
                    lock2.lockInterruptibly();
                } else {
                    lock2.lockInterruptibly();
                    TimeUnit.SECONDS.sleep(1);
                    lock1.lockInterruptibly();
                }
            } catch (InterruptedException e) {
                System.out.println("中断标志:" + this.isInterrupted());
                e.printStackTrace();
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
                if (lock2.isHeldByCurrentThread()) {
                    lock2.unlock();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1", 1);
        T t2 = new T("t2", 2);
        t1.start();
        t2.start();
    }
}

先运行一下上面代码,发现程序无法结束,使用jstack查看线程堆栈信息,发现2个线程死锁了。

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x0000000717380c20, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x0000000717380c50, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2

lock1被线程t1占用,lock2被线程t2占用,线程t1在等待获取lock2,线程t2在等待获取lock1,都在相互等待获取对方持有的锁,最终产生了死锁,如果是在synchronized关键字情况下发生了死锁现象,程序是无法结束的。

我们对上面代码改造一下,线程t2一直无法获取到lock1,那么等待5秒之后,我们中断获取锁的操作。主要修改一下main方法,如下:

T t1 = new T("t1", 1);
T t2 = new T("t2", 2);
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(5);
t2.interrupt();

新增了2行代码TimeUnit.SECONDS.sleep(5);t2.interrupt();,程序可以结束了,运行结果:

java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at com.itsoku.chat06.Demo6$T.run(Demo6.java:31)
中断标志:false

从上面信息中可以看出,代码的31行触发了异常,中断标志输出:false

ec24264d-651f-4eb6-aa60-bb98a3098f78

t2在31行一直获取不到lock1的锁,主线程中等待了5秒之后,t2线程调用了interrupt()方法,将线程的中断标志置为true,此时31行会触发InterruptedException异常,然后线程t2可以继续向下执行,释放了lock2的锁,然后线程t1可以正常获取锁,程序得以继续进行。线程发送中断信号触发InterruptedException异常之后,中断标志将被清空。

关于获取锁的过程中被中断,注意几点:

  1. ReentrankLock中必须使用实例方法lockInterruptibly()获取锁时,在线程调用interrupt()方法之后,才会引发InterruptedException异常
  2. 线程调用interrupt()之后,线程的中断标志会被置为true
  3. 触发InterruptedException异常之后,线程的中断标志会被清空,即置为false
  4. 所以当线程调用interrupt()引发InterruptedException异常,中断标志的变化是:false->true->false

4、ReentrantLock锁申请等待限时

申请锁等待限时是什么意思?一般情况下,获取锁的时间我们是不知道的,synchronized关键字获取锁的过程中,只能等待其他线程把锁释放之后才能够有机会获取到锁。所以获取锁的时间有长有短。如果获取锁的时间能够设置超时时间,那就非常好了。

ReentrantLock刚好提供了这样功能,给我们提供了获取锁限时等待的方法tryLock(),可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。

tryLock无参方法

看一下源码中tryLock方法:

public boolean tryLock()

返回boolean类型的值,此方法会立即返回,结果表示获取锁是否成功,示例:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Demo8 {
    private static ReentrantLock lock1 = new ReentrantLock(false);
    public static class T extends Thread {
        public T(String name) {
            super(name);
        }
        @Override
        public void run() {
            try {
                System.out.println(System.currentTimeMillis() + ":" + this.getName() + "开始获取锁!");
                //获取锁超时时间设置为3秒,3秒内是否能否获取锁都会返回
                if (lock1.tryLock()) {
                    System.out.println(System.currentTimeMillis() + ":" + this.getName() + "获取到了锁!");
                    //获取到锁之后,休眠5秒
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    System.out.println(System.currentTimeMillis() + ":" + this.getName() + "未能获取到锁!");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        T t2 = new T("t2");
        t1.start();
        t2.start();
    }
}

代码中获取锁成功之后,休眠5秒,会导致另外一个线程获取锁失败,运行代码,输出:

1563356291081:t2开始获取锁!
1563356291081:t2获取到了锁!
1563356291081:t1开始获取锁!
1563356291081:t1未能获取到锁!

tryLock有参方法

可以明确设置获取锁的超时时间,该方法签名:

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException

该方法在指定的时间内不管是否可以获取锁,都会返回结果,返回true,表示获取锁成功,返回false表示获取失败。此方法有2个参数,第一个参数是时间类型,是一个枚举,可以表示时、分、秒、毫秒等待,使用比较方便,第1个参数表示在时间类型上的时间长短。此方法在执行的过程中,如果调用了线程的中断interrupt()方法,会触发InterruptedException异常。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Demo7 {
    private static ReentrantLock lock1 = new ReentrantLock(false);
    public static class T extends Thread {
        public T(String name) {
            super(name);
        }
        @Override
        public void run() {
            try {
                System.out.println(System.currentTimeMillis() + ":" + this.getName() + "开始获取锁!");
                //获取锁超时时间设置为3秒,3秒内是否能否获取锁都会返回
                if (lock1.tryLock(3, TimeUnit.SECONDS)) {
                    System.out.println(System.currentTimeMillis() + ":" + this.getName() + "获取到了锁!");
                    //获取到锁之后,休眠5秒
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    System.out.println(System.currentTimeMillis() + ":" + this.getName() + "未能获取到锁!");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (lock1.isHeldByCurrentThread()) {
                    lock1.unlock();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        T t2 = new T("t2");
        t1.start();
        t2.start();
    }
}

程序中调用了ReentrantLock的实例方法tryLock(3, TimeUnit.SECONDS),表示获取锁的超时时间是3秒,3秒后不管是否能否获取锁,该方法都会有返回值,获取到锁之后,内部休眠了5秒,会导致另外一个线程获取锁失败。

运行程序,输出:

1563355512901:t2开始获取锁!
1563355512901:t1开始获取锁!
1563355512902:t2获取到了锁!
1563355515904:t1未能获取到锁!

输出结果中分析,t2获取到锁了,然后休眠了5秒,t1获取锁失败,t1打印了2条信息,时间相差3秒左右。

关于tryLock()方法和tryLock(long timeout, TimeUnit unit)方法,说明一下:

  1. 都会返回boolean值,结果表示获取锁是否成功
  2. tryLock()方法,不管是否获取成功,都会立即返回;而有参的tryLock方法会尝试在指定的时间内去获取锁,中间会阻塞的现象,在指定的时间之后会不管是否能够获取锁都会返回结果
  3. tryLock()方法不会响应线程的中断方法;而有参的tryLock方法会响应线程的中断方法,而触发InterruptedException异常,这个从2个方法的声明上可以可以看出来

5、ReentrantLock其他常用的方法

  1. isHeldByCurrentThread:实例方法,判断当前线程是否持有ReentrantLock的锁,上面代码中有使用过。

获取锁的4种方法对比

获取锁的方法是否立即响应(不会阻塞)是否响应中断
lock()××
lockInterruptibly()×
tryLock()×
tryLock(long timeout, TimeUnit unit)×

6、总结

  1. ReentrantLock可以实现公平锁和非公平锁
  2. ReentrantLock默认实现的是非公平锁
  3. ReentrantLock的获取锁和释放锁必须成对出现,锁了几次,也要释放几次
  4. 释放锁的操作必须放在finally中执行
  5. lockInterruptibly()实例方法可以相应线程的中断方法,调用线程的interrupt()方法时,lockInterruptibly()方法会触发InterruptedException异常
  6. 关于InterruptedException异常说一下,看到方法声明上带有 throws InterruptedException,表示该方法可以相应线程中断,调用线程的interrupt()方法时,这些方法会触发InterruptedException异常,触发InterruptedException时,线程的中断中断状态会被清除。所以如果程序由于调用interrupt()方法而触发InterruptedException异常,线程的标志由默认的false变为ture,然后又变为false
  7. 实例方法tryLock()会尝试获取锁,会立即返回,返回值表示是否获取成功
  8. 实例方法tryLock(long timeout, TimeUnit unit)会在指定的时间内尝试获取锁,指定的时间内是否能够获取锁,都会返回,返回值表示是否获取锁成功,该方法会响应线程的中断

5、悲观锁

认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。

synchronized关键字和Lock的实现类都是悲观锁

适合写操作多的场景,先加锁可以保证写操作时数据正确。

显式的锁定之后再操作同步资源

//=============悲观锁的调用方式
public synchronized void m1()
{
    //加锁后的业务逻辑......
}

// 保证多个线程使用的是同一个lock对象的前提下
ReentrantLock lock = new ReentrantLock();
public void m2() {
    lock.lock();
    try {
        // 操作同步资源
    }finally {
        lock.unlock();
    }
}

6、乐观锁

//=============乐观锁的调用方式
// 保证多个线程使用的是同一个AtomicInteger
private AtomicInteger atomicInteger = new AtomicInteger();
atomicInteger.incrementAndGet();

​ 乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。

如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作

乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的。

适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。

乐观锁则直接去操作同步资源,是一种无锁算法,得之我幸不得我命,再抢

乐观锁一般有两种实现方式:

  1. 采用版本号机制
  2. CAS(Compare-and-Swap,即比较并替换)算法实现

7、八锁案例

1、JDK源码(notify方法)

image-20210907200227293

2、8种锁的案例实际体现在3个地方

  1. 作用于实例方法,当前实例加锁,进入同步代码前要获得当前实例的锁;
  2. 作用于代码块,对括号里配置的对象加锁。
  3. 作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁;
1、标准访问有ab两个线程,请问先打印邮件还是短信
class Phone //资源类
{
    public synchronized void sendEmail()
    {
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone.sendSMS();
        },"b").start();

    }
}
-------sendEmail
-------sendSMS
2、sendEmail方法暂停3秒钟,请问先打印邮件还是短信
class Phone //资源类
{
    public synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone.sendSMS();
        },"b").start();

    }
}
-------sendEmail
-------sendSMS
1-2结论
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,
其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一的一个线程去访问这些synchronized方法
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
3、新增一个普通的hello方法,请问先打印邮件还是hello
class Phone //资源类
{
    public synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone.hello();
        },"b").start();

    }
}
-------hello
-------sendEmail
4、有两部手机,请问先打印邮件还是短信
class Phone //资源类
{
    public synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1
        Phone phone2 = new Phone();//资源类2

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone2.sendSMS();
        },"b").start();
    }
}
-------sendSMS
-------sendEmail
3-4结论
加个普通方法后发现和同步锁无关,hello
换成两个对象后,不是同一把锁了,情况立刻变化。
5、两个静态同步方法,同1部手机,请问先打印邮件还是短信
class Phone //资源类
{
    public static synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public static synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone.sendSMS();
        },"b").start();
    }
}
-------sendEmail
-------sendSMS
6、两个静态同步方法, 2部手机,请问先打印邮件还是短信
class Phone //资源类
{
    public static synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public static synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1
        Phone phone2 = new Phone();//资源类2

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone2.sendSMS();
        },"b").start();
    }
}
-------sendEmail
-------sendSMS
5-6结论
都换成静态同步方法后,情况又变化
三种 synchronized 锁的内容有一些差别:
对于普通同步方法,锁的是当前实例对象,通常指this,具体的一部部手机,所有的普通同步方法用的都是同一把锁——实例对象本身,
对于静态同步方法,锁的是当前类的Class对象,如Phone.class唯一的一个模板
对于同步方法块,锁的是 synchronized 括号内的对象
7、1个静态同步方法,1个普通同步方法,同1部手机,请问先打印邮件还是短信
class Phone //资源类
{
    public static synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone.sendSMS();
        },"b").start();
    }
}
-------sendSMS
-------sendEmail
8、1个静态同步方法,1个普通同步方法,2部手机,请问先打印邮件还是短信
class Phone //资源类
{
    public static synchronized void sendEmail()
    {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("-------sendEmail");
    }

    public synchronized void sendSMS()
    {
        System.out.println("-------sendSMS");
    }

    public void hello()
    {
        System.out.println("-------hello");
    }
}

public class Lock8Demo
{
    public static void main(String[] args)//一切程序的入口,主线程
    {
        Phone phone = new Phone();//资源类1
        Phone phone2 = new Phone();//资源类2

        new Thread(() -> {
            phone.sendEmail();
        },"a").start();

        //暂停毫秒
        try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            phone2.sendSMS();
        },"b").start();
    }
}
-------sendSMS
-------sendEmail
7-8结论
当一个线程试图访问同步代码时它首先必须得到锁,退出或抛出异常时必须释放锁。

所有的普通同步方法用的都是同一把锁——实例对象本身,就是new出来的具体实例对象本身,本类this
也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁。

所有的静态同步方法用的也是同一把锁——类对象本身,就是我们说过的唯一模板Class
具体实例对象this和唯一模板Class,这两把锁是两个不同的对象,所以静态同步方法与普通同步方法之间是不会有竞态条件的
但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。

8、公平锁和非公平锁

在大多数情况下,锁的申请都是非公平的,也就是说,线程1首先请求锁A,接着线程2也请求了锁A。那么当锁A可用时,是线程1可获得锁还是线程2可获得锁呢?这是不一定的,系统只是会从这个锁的等待队列中随机挑选一个,因此不能保证其公平性。这就好比买票不排队,大家都围在售票窗口前,售票员忙的焦头烂额,也顾及不上谁先谁后,随便找个人出票就完事了,最终导致的结果是,有些人可能一直买不到票。而公平锁,则不是这样,它会按照到达的先后顺序获得资源。公平锁的一大特点是:它不会产生饥饿现象,只要你排队,最终还是可以等到资源的;synchronized关键字默认是有jvm内部实现控制的,是非公平锁。而ReentrantLock运行开发者自己设置锁的公平性。

看一下jdk中ReentrantLock的源码,2个构造方法:

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认构造方法创建的是非公平锁。

第2个构造方法,有个fair参数,当fair为true的时候创建的是公平锁,公平锁看起来很不错,不过要实现公平锁,系统内部肯定需要维护一个有序队列,因此公平锁的实现成本比较高,性能相对于非公平锁来说相对低一些。因此,在默认情况下,锁是非公平的,如果没有特别要求,则不建议使用公平锁。

公平锁和非公平锁在程序调度上是很不一样,来一个公平锁示例看一下:

import java.util.concurrent.locks.ReentrantLock;

public class Demo5 {
    private static int num = 0;
    private static ReentrantLock fairLock = new ReentrantLock(true);
    public static class T extends Thread {
        public T(String name) {
            super(name);
        }
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                fairLock.lock();
                try {
                    System.out.println(this.getName() + "获得锁!");
                } finally {
                    fairLock.unlock();
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        T t2 = new T("t2");
        T t3 = new T("t3");
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
    }
}

看一下输出的结果,锁是按照先后顺序获得的。

修改一下上面代码,改为非公平锁试试,如下:

ReentrantLock fairLock = new ReentrantLock(false);

从ReentrantLock卖票编码演示公平和非公平现象

import java.util.concurrent.locks.ReentrantLock;

class Ticket
{
    private int number = 30;
    ReentrantLock lock = new ReentrantLock();

    public void sale()
    {
        lock.lock();
        try
        {
            if(number > 0)
            {
                System.out.println(Thread.currentThread().getName()+"卖出第:\t"+(number--)+"\t 还剩下:"+number);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

public class SaleTicketDemo
{
    public static void main(String[] args)
    {
        Ticket ticket = new Ticket();

        new Thread(() -> { for (int i = 0; i <35; i++)  ticket.sale(); },"a").start();
        new Thread(() -> { for (int i = 0; i <35; i++)  ticket.sale(); },"b").start();
        new Thread(() -> { for (int i = 0; i <35; i++)  ticket.sale(); },"c").start();
    }
}

生活中,排队讲求先来后到视为公平。程序中的公平性也是符合请求锁的绝对时间的,其实就是 FIFO,否则视为不公平

1、源码解读

​ 按序排队公平锁,就是判断同步队列是否还有先驱节点的存在(我前面还有人吗?),如果没有先驱节点才能获取锁;先占先得非公平锁,是不管这个事的,只要能抢获到同步状态就可以

image-20210916224629198

2、为什么会有公平锁/非公平锁的设计为什么默认非公平?

  1. 恢复挂起的线程到真正锁的获取还是有时间差的,从开发人员来看这个时间微乎其微,但是从CPU的角度来看,这个时间差存在的还是很明显的。所以非公平锁能更充分的利用CPU 的时间片,尽量减少 CPU 空闲状态时间。
  2. 使用多线程很重要的考量点是线程切换的开销,当采用非公平锁时,当1个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的概率就变得非常大,所以就减少了线程的开销。

3、使⽤公平锁会有什么问题

公平锁保证了排队的公平性,非公平锁霸气的忽视这个规则,所以就有可能导致排队的长时间在排队,也没有机会获取到锁,这就是传说中的 “锁饥饿”

4、什么时候用公平?什么时候用非公平?

如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了;否则那就用公平锁,大家公平使用。

9、可重入锁(又名递归锁)

是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。

如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚。所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

1、“可重入锁”这四个字分开来解释:

可:可以。
重:再次。
入:进入。
锁:同步锁。

进入什么:进入同步域(即同步代码块/方法或显式锁锁定的代码)
一句话:一个线程中的多个流程可以获取同一把锁,持有这把同步锁可以再次进入。
自己可以获取自己的内部锁

2、可重入锁种类

1、隐式锁(即synchronized关键字使用的锁)默认是可重入锁
指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。
简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的

与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。

同步块

public class ReEntryLockDemo{
    public static void main(String[] args){
        final Object objectLockA = new Object();

        new Thread(() -> {
            synchronized (objectLockA){
                System.out.println("-----外层调用");
                synchronized (objectLockA){
                    System.out.println("-----中层调用");
                    synchronized (objectLockA){
                        System.out.println("-----内层调用");
                    }
                }
            }
        },"a").start();
    }
}

同步方法

public class ReEntryLockDemo{
    public synchronized void m1(){
        System.out.println("-----m1");
        m2();
    }
    public synchronized void m2(){
        System.out.println("-----m2");
        m3();
    }
    public synchronized void m3(){
        System.out.println("-----m3");
    }

    public static void main(String[] args){
        ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();

        reEntryLockDemo.m1();
    }
}
2、显式锁(即Lock)也有ReentrantLock这样的可重入锁。
public class Demo4 {
    private static int num = 0;
    private static ReentrantLock lock = new ReentrantLock();
    private static void add() {
        lock.lock();
        lock.lock();
        try {
            num++;
        } finally {
            lock.unlock();
            lock.unlock();
        }
    }
    public static class T extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                Demo4.add();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        T t1 = new T();
        T t2 = new T();
        T t3 = new T();
        t1.start();
        t2.start();
        t3.start();
        t1.join();
        t2.join();
        t3.join();
        System.out.println(Demo4.num);
    }
}

上面代码中add()方法中,当一个线程进入的时候,会执行2次获取锁的操作,运行程序可以正常结束,并输出和期望值一样的30000,假如ReentrantLock是不可重入的锁,那么同一个线程第2次获取锁的时候由于前面的锁还未释放而导致死锁,程序是无法正常结束的。ReentrantLock命名也挺好的Re entrant Lock,和其名字一样,可重入锁。

代码中还有几点需要注意:

  1. lock()方法和unlock()方法需要成对出现,锁了几次,也要释放几次,否则后面的线程无法获取锁了;可以将add中的unlock删除一个事实,上面代码运行将无法结束
  2. unlock()方法放在finally中执行,保证不管程序是否有异常,锁必定会释放
/**
 * @create 2020-05-14 11:59
 * 在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的
 */
public class ReEntryLockDemo{
    static Lock lock = new ReentrantLock();

    public static void main(String[] args){
        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println("----外层调用lock");
                lock.lock();
                try
                {
                    System.out.println("----内层调用lock");
                }finally {
                    // 这里故意注释,实现加锁次数和释放次数不一样
                    // 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。
                    lock.unlock(); // 正常情况,加锁几次就要解锁几次
                }
            }finally {
                lock.unlock();
            }
        },"a").start();

        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println("b thread----外层调用lock");
            }finally {
                lock.unlock();
            }
        },"b").start();

    }
}

3、Synchronized的重入的实现机理

​ 每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。

​ 当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。

​ 在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。

​ 当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。

10、死锁

​ 死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。

图像

1、产生死锁主要原因

  1. 系统资源不足
  2. 进程运行推进的顺序不合适
  3. 资源分配不当
public class DeadLockDemo{
    public static void main(String[] args){
        final Object objectLockA = new Object();
        final Object objectLockB = new Object();

        new Thread(() -> {
            synchronized (objectLockA){
                System.out.println(Thread.currentThread().getName()+"\t"+"自己持有A,希望获得B");
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                synchronized (objectLockB)
                {
                    System.out.println(Thread.currentThread().getName()+"\t"+"A-------已经获得B");
                }
            }
        },"A").start();

        new Thread(() -> {
            synchronized (objectLockB){
                System.out.println(Thread.currentThread().getName()+"\t"+"自己持有B,希望获得A");
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                synchronized (objectLockA){
                    System.out.println(Thread.currentThread().getName()+"\t"+"B-------已经获得A");
                }
            }
        },"B").start();

    }
}

2、如何排查死锁

  1. 纯命令
jps -l
jstack 进程编号
  1. 图形化
jconsole

五、线程间通信

1、面试题:两个线程打印

两个线程,一个线程打印1-52,另一个打印字母A-Z打印顺序为12A34B...5152Z

1、synchronized实现

package com.xue.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
 
 
class ShareDataOne//资源类{
  private int number = 0;//初始值为零的一个变量
 
  public synchronized void increment() throws InterruptedException {
     //1判断
     if(number !=0 ) {
       this.wait();
     }
     //2干活
     ++number;
     System.out.println(Thread.currentThread().getName()+"\t"+number);
     //3通知
     this.notifyAll();
  }
  
  public synchronized void decrement() throws InterruptedException {
     // 1判断
     if (number == 0) {
       this.wait();
     }
     // 2干活
     --number;
     System.out.println(Thread.currentThread().getName() + "\t" + number);
     // 3通知
     this.notifyAll();
  }
}
 
/**
 * 
 * @Description:
 *现在两个线程,
 * 可以操作初始值为零的一个变量,
 * 实现一个线程对该变量加1,一个线程对该变量减1,
 * 交替,来10轮。 
 * @author xialei
 *
 *  * 笔记:Java里面如何进行工程级别的多线程编写
 * 1 多线程变成模板(套路)-----上
 *     1.1  线程    操作    资源类  
 *     1.2  高内聚  低耦合
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 
 */
public class NotifyWaitDemoOne{
  public static void main(String[] args){
     ShareDataOne sd = new ShareDataOne();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.increment();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "A").start();
     new Thread(() -> {
       for (int i = 1; i < 10; i++) {
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "B").start();
  }
}
/*
 * * 
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 * 3 防止虚假唤醒用while
 * 
 * 
 * */

2、换成4个线程

​ 换成4个线程会导致错误,虚假唤醒

​ 原因:在java多线程判断时,不能用if,程序出事出在了判断上面,

突然有一添加的线程进到if了,突然中断了交出控制权,

没有进行验证,而是直接走下去了,加了两次,甚至多次

3、4个线程解决方案

解决虚假唤醒:查看API,java.lang.Object

image

中断和虚假唤醒是可能产生的,所以要用loop循环,if只判断一次,while是只要唤醒就要拉回来再判断一次。if换成while

4、java8新版实现

image

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 
 
   final Object[] items = new Object[100];
   int putptr, takeptr, count;
 
   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }
package com.xue.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
import org.omg.IOP.Codec;
 
 
class ShareData//资源类
{
  private int number = 0;//初始值为零的一个变量
 
  private Lock lock = new ReentrantLock();
  private Condition condition  = lock.newCondition(); 
   
  public  void increment() throws InterruptedException 
  {
     
      lock.lock();
         try {
          //判断
          while(number!=0) {
            condition.await();
          }
          //干活
          ++number;
          System.out.println(Thread.currentThread().getName()+" \t "+number);
          //通知
          condition.signalAll();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
     
  }
  
  
  public  void decrement() throws InterruptedException 
  {
      
      lock.lock();
         try {
          //判断
          while(number!=1) {
            condition.await();
          }
          //干活
          --number;
          System.out.println(Thread.currentThread().getName()+" \t "+number);
          //通知
          condition.signalAll();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
     
  }
  
  /*public synchronized void increment() throws InterruptedException 
  {
     //判断
     while(number!=0) {
       this.wait();
     }
     //干活
     ++number;
     System.out.println(Thread.currentThread().getName()+" \t "+number);
     //通知
     this.notifyAll();;
  }
  
  public synchronized void decrement() throws InterruptedException 
  {
     //判断
     while(number!=1) {
       this.wait();
     }
     //干活
     --number;
     System.out.println(Thread.currentThread().getName()+" \t "+number);
     //通知
     this.notifyAll();
  }*/
}
 
/**
 * 
 * @Description:
 *现在两个线程,
 * 可以操作初始值为零的一个变量,
 * 实现一个线程对该变量加1,一个线程对该变量减1,
 * 交替,来10轮。 
 *
 *  * 笔记:Java里面如何进行工程级别的多线程编写
 * 1 多线程变成模板(套路)-----上
 *     1.1  线程    操作    资源类  
 *     1.2  高内聚  低耦合
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 
 */
public class NotifyWaitDemo
{
  public static void main(String[] args)
  {
     ShareData sd = new ShareData();
     new Thread(() -> {
 
       for (int i = 1; i <= 10; i++) {
          try {
            sd.increment();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "A").start();
     
     new Thread(() -> {
 
       for (int i = 1; i <= 10; i++) {
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "B").start();
     new Thread(() -> {
 
       for (int i = 1; i <= 10; i++) {
          try {
            sd.increment();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "C").start();
     new Thread(() -> {
 
       for (int i = 1; i <= 10; i++) {
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "D").start();
     
  }
}

/*
 * * 
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 * 3 防止虚假唤醒用while
 * 
 * 
 * */

2、线程间定制化调用通信

1、有顺序通知,需要有标识位

2、有一个锁Lock,3把钥匙Condition

3、判断标志位

4、输出线程名+第几次+第几轮

5、修改标志位,通知下一个

package com.xue.thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
 
class ShareResource
{
  private int number = 1;//1:A 2:B 3:C 
  private Lock lock = new ReentrantLock();
  private Condition c1 = lock.newCondition();
  private Condition c2 = lock.newCondition();
  private Condition c3 = lock.newCondition();
 
  public void print5(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判断
       while(number != 1)
       {
          //A 就要停止
          c1.await();
       }
       //2 干活
       for (int i = 1; i <=5; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 2;
       c2.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }
  public void print10(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判断
       while(number != 2)
       {
          //A 就要停止
          c2.await();
       }
       //2 干活
       for (int i = 1; i <=10; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 3;
       c3.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }  
  
  public void print15(int totalLoopNumber)
  {
     lock.lock();
     try 
     {
       //1 判断
       while(number != 3)
       {
          //A 就要停止
          c3.await();
       }
       //2 干活
       for (int i = 1; i <=15; i++) 
       {
          System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
       }
       //3 通知
       number = 1;
       c1.signal();
     } catch (Exception e) {
       e.printStackTrace();
     } finally {
       lock.unlock();
     }
  }  
}
 
 
/**
 * 
 * @Description: 
 * 多线程之间按顺序调用,实现A->B->C
 * 三个线程启动,要求如下:
 * 
 * AA打印5次,BB打印10次,CC打印15次
 * 接着
 * AA打印5次,BB打印10次,CC打印15次
 * ......来10轮  
 *
 */
public class ThreadOrderAccess
{
  public static void main(String[] args)
  {
     ShareResource sr = new ShareResource();
     
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print5(i);
       }
     }, "AA").start();
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print10(i);
       }
     }, "BB").start();
     new Thread(() -> {
       for (int i = 1; i <=10; i++) 
       {
          sr.print15(i);
       }
     }, "CC").start();   
  }
}

六、LockSupport与线程中断

1、线程中断机制

1、如何停止、中断一个运行中的线程??

fdsfsdf

2、什么是中断?

首先 一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。所以,Thread.stop, Thread.suspend, Thread.resume 都已经被废弃了。

其次 在Java中没有办法立即停止一条线程,然而停止线程却显得尤为重要,如取消一个耗时操作。因此,Java提供了一种用于停止线程的机制——中断。

​ 中断只是一种协作机制,Java没有给中断增加任何语法,中断的过程完全需要程序员自己实现。若要中断一个线程,你需要手动调用该线程的interrupt方法,该方法也仅仅是将线程对象的中断标识设成true;接着你需要自己写代码不断地检测当前线程的标识位,如果为true,表示别的线程要求这条线程中断, 此时究竟该做什么需要你自己写代码实现。

​ 每个线程对象中都有一个标识,用于表示线程是否被中断;该标识位为true表示中断,为false表示未中断;通过调用线程对象的interrupt方法将该线程的标识位设为true;可以在别的线程中调用,也可以在自己的线程中调用

3、中断的相关API方法
public void interrupt()实例方法,
实例方法interrupt()仅仅是设置线程的中断状态为true,不会停止线程
public static boolean interrupted()静态方法,Thread.interrupted();
判断线程是否被中断,并清除当前中断状态
这个方法做了两件事:
1 返回当前线程的中断状态
2 将当前线程的中断状态设为false

这个方法有点不好理解,因为连续调用两次的结果可能不一样。
public boolean isInterrupted()实例方法,
判断当前线程是否被中断(通过检查中断标志位)

2、如何使用中断标识停止线程?

在需要中断的线程中不断监听中断状态,一旦发生中断,就执行相应的中断处理业务逻辑。

1、通过一个volatile变量实现

public class InterruptDemo{
    
	public volatile static boolean exit = false;
    	public static class T extends Thread {
        @Override
        public void run() {
            while (true) {
                //循环处理业务
                if (exit) {
                    break;
                }
            }
        }
    }
    public static void setExit() {
        exit = true;
    }
    public static void main(String[] args) throws InterruptedException {
        T t = new T();
        t.start();
        TimeUnit.SECONDS.sleep(3);
        setExit();
    }
}

代码中启动了一个线程,线程的run方法中有个死循环,内部通过exit变量的值来控制是否退出。TimeUnit.SECONDS.sleep(3);让主线程休眠3秒,此处为什么使用TimeUnit?TimeUnit使用更方便一些,能够很清晰的控制休眠时间,底层还是转换为Thread.sleep实现的。程序有个重点:volatile关键字,exit变量必须通过这个修饰,如果把这个去掉,程序无法正常退出。volatile控制了变量在多线程中的可见性。

2、通过AtomicBoolean

public class StopThreadDemo
{
    private final static AtomicBoolean atomicBoolean = new AtomicBoolean(true);

    public static void main(String[] args)
    {
        Thread t1 = new Thread(() -> {
            while(atomicBoolean.get())
            {
                try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----hello");
            }
        }, "t1");
        t1.start();

        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

        atomicBoolean.set(false);
    }
}

3、通过Thread类自带的中断api方法实现

  1. 实例方法interrupt(),没有返回值

image-20210916231409508

public void interrupt()实例方法,
调用interrupt()方法仅仅是在当前线程中打了一个停止的标记,并不是真正立刻停止线程。

vcxvdfgsdgdg

image-20210916231506817

  1. 实例方法isInterrupted,返回布尔值

image-20210916231603313

public boolean isInterrupted()实例方法,
获取中断标志位的当前值是什么,
判断当前线程是否被中断(通过检查中断标志位),默认是false

image-20210916231626044

public class InterruptDemo
{
    public static void main(String[] args)
    {
        Thread t1 = new Thread(() -> {
            while(true)
            {
                if(Thread.currentThread().isInterrupted())
                {
                    System.out.println("-----t1 线程被中断了,break,程序结束");
                    break;
                }
                System.out.println("-----hello");
            }
        }, "t1");
        t1.start();

        System.out.println("**************"+t1.isInterrupted());
        //暂停5毫秒
        try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        t1.interrupt();
        System.out.println("**************"+t1.isInterrupted());
    }
}

运行上面的程序,程序可以正常结束。线程内部有个中断标志,当调用线程的interrupt()实例方法之后,线程的中断标志会被置为true,可以通过线程的实例方法isInterrupted()获取线程的中断标志。

4、当前线程的中断标识为true,是不是就立刻停止?

具体来说,当对一个线程,调用 interrupt() 时:

① 如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。 被设置中断标志的线程将继续正常运行,不受影响。所以, interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。

② 如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),在别的线程中调用当前线程对象的interrupt方法, 那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。

public class InterruptDemo2 {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 300; i++) {
                System.out.println("-------" + i);
            }
            System.out.println("after t1.interrupt()--第2次---: " + Thread.currentThread().isInterrupted());
        }, "t1");
        t1.start();

        System.out.println("before t1.interrupt()----: " + t1.isInterrupted());
        //实例方法interrupt()仅仅是设置线程的中断状态位设置为true,不会停止线程
        t1.interrupt();
        //活动状态,t1线程还在执行中
        try {
            TimeUnit.MILLISECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after t1.interrupt()--第1次---: " + t1.isInterrupted());
        //非活动状态,t1线程不在执行中,已经结束执行了。
        try {
            TimeUnit.MILLISECONDS.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after t1.interrupt()--第3次---: " + t1.isInterrupted());
    }
}

image-20210916231745805

image-20210916231758523

中断只是一种协同机制,修改中断标识位仅此而已,不是立刻stop打断

5、静态方法Thread.interrupted()

/**
 * 作用是测试当前线程是否被中断(检查中断标志),返回一个boolean并清除中断状态,
 * 第二次再调用时中断状态已经被清除,将返回一个false。
 */
public class InterruptDemo
{

    public static void main(String[] args) throws InterruptedException
    {
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
        System.out.println("111111");
        Thread.currentThread().interrupt();
        System.out.println("222222");
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
    }
}
public static boolean interrupted()静态方法,Thread.interrupted();
判断线程是否被中断,并清除当前中断状态,类似i++
这个方法做了两件事:
1 返回当前线程的中断状态
2 将当前线程的中断状态设为false

这个方法有点不好理解,因为连续调用两次的结果可能不一样。

image-20210916231923048

都会返回中断状态,两者对比

image-20210916231944854

6、总结

线程中断相关的方法:

interrupt()方法是一个实例方法 它通知目标线程中断,也就是设置目标线程的中断标志位为true,中断标志位表示当前线程已经被中断了。

isInterrupted()方法也是一个实例方法 它判断当前线程是否被中断(通过检查中断标志位)并获取中断标志

Thread类的静态方法interrupted() 返回当前线程的中断状态(boolean类型)且将当前线程的中断状态设为false,此方法调用之后会清除当前线程的中断标志位的状态(将中断标志置为false了),返回当前值并清零置false

3、LockSupport是什么

LockSupport位于java.util.concurrent简称juc)包中,算是juc中一个基础类,juc中很多地方都会使用LockSupport,非常重要,希望大家一定要掌握。

关于线程等待/唤醒的方法,前面的文章中我们已经讲过2种了:

  1. 方式1:使用Object中的wait()方法让线程等待,使用Object中的notify()方法唤醒线程
  2. 方式2:使用juc包中Condition的await()方法让线程等待,使用signal()方法唤醒线程

image-20210916232319808

image-20210916232333615

image-20210916232340860

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。

下面这句话,后面详细说 LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程

4、线程等待唤醒机制

1、3种让线程等待和唤醒的方法

  1. 使用Object中的wait()方法让线程等待,使用Object中的notify()方法唤醒线程
  2. 使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程
  3. LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程

2、Object类中的wait和notify方法实现线程等待和唤醒

/**
 *
 * 要求:t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作
 *
 * 1 正常程序演示
 *
 * 以下异常情况:
 * 2 wait方法和notify方法,两个都去掉同步代码块后看运行效果
 *   2.1 异常情况
 *   Exception in thread "t1" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method)
 *   Exception in thread "t2" java.lang.IllegalMonitorStateException at java.lang.Object.notify(Native Method)
 *   2.2 结论
 *   Object类中的wait、notify、notifyAll用于线程等待和唤醒的方法,都必须在synchronized内部执行(必须用到关键字synchronized)。
 *
 * 3 将notify放在wait方法前面
 *   3.1 程序一直无法结束
 *   3.2 结论
 *   先wait后notify、notifyall方法,等待中的线程才会被唤醒,否则无法唤醒
 */
public class LockSupportDemo
{

    public static void main(String[] args)//main方法,主线程一切程序入口
    {
        Object objectLock = new Object(); //同一把锁,类似资源类

        new Thread(() -> {
            synchronized (objectLock) {
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒了");
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            synchronized (objectLock) {
                objectLock.notify();
            }

            //objectLock.notify();

            /*synchronized (objectLock) {
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }*/
        },"t2").start();
    }
}
1、正常
public class LockSupportDemo
{
    public static void main(String[] args)//main方法,主线程一切程序入口
    {
        Object objectLock = new Object(); //同一把锁,类似资源类

        new Thread(() -> {
            synchronized (objectLock) {
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒了");
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            synchronized (objectLock) {
                objectLock.notify();
            }
        },"t2").start();
    }
}
2、异常1
/**
 * 要求:t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作
 * 以下异常情况:
 * 2 wait方法和notify方法,两个都去掉同步代码块后看运行效果
 *   2.1 异常情况
 *   Exception in thread "t1" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method)
 *   Exception in thread "t2" java.lang.IllegalMonitorStateException at java.lang.Object.notify(Native Method)
 *   2.2 结论
 *   Object类中的wait、notify、notifyAll用于线程等待和唤醒的方法,都必须在synchronized内部执行(必须用到关键字synchronized)。
 */
public class LockSupportDemo
{

    public static void main(String[] args)//main方法,主线程一切程序入口
    {
        Object objectLock = new Object(); //同一把锁,类似资源类

        new Thread(() -> {
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒了");
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            objectLock.notify();
        },"t2").start();
    }
}

wait方法和notify方法,两个都去掉同步代码块

image-20210916232855724

3、异常2
/**
 *
 * 要求:t1线程等待3秒钟,3秒钟后t2线程唤醒t1线程继续工作
 *
 * 3 将notify放在wait方法前先执行,t1先notify了,3秒钟后t2线程再执行wait方法
 *   3.1 程序一直无法结束
 *   3.2 结论
 *   先wait后notify、notifyall方法,等待中的线程才会被唤醒,否则无法唤醒
 */
public class LockSupportDemo
{

    public static void main(String[] args)//main方法,主线程一切程序入口
    {
        Object objectLock = new Object(); //同一把锁,类似资源类

        new Thread(() -> {
            synchronized (objectLock) {
                objectLock.notify();
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"通知了");
        },"t1").start();

        //t1先notify了,3秒钟后t2线程再执行wait方法
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            synchronized (objectLock) {
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒了");
        },"t2").start();
    }
}

将notify放在wait方法前面

程序无法执行,无法唤醒

4、总结

wait和notify方法必须要在同步块或者方法里面,且成对出现使用

先wait后notify才OK

3、Condition接口中的await后signal方法实现线程的等待和唤醒

1、正常
public class LockSupportDemo2
{
    public static void main(String[] args)
    {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println(Thread.currentThread().getName()+"\t"+"start");
                condition.await();
                System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            lock.lock();
            try
            {
                condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"通知了");
        },"t2").start();

    }
}
2、异常1
/**
 * 异常:
 * condition.await();和condition.signal();都触发了IllegalMonitorStateException异常
 *
 * 原因:调用condition中线程等待和唤醒的方法的前提是,要在lock和unlock方法中,要有锁才能调用
 */
public class LockSupportDemo2
{
    public static void main(String[] args)
    {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            try
            {
                System.out.println(Thread.currentThread().getName()+"\t"+"start");
                condition.await();
                System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            try
            {
                condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"\t"+"通知了");
        },"t2").start();

    }
}

去掉lock/unlock

image-20210916233230684

condition.await();和 condition.signal();都触发了 IllegalMonitorStateException异常。

结论: lock、unlock对里面才能正确调用调用condition中线程等待和唤醒的方法

3、异常2
/**
 * 异常:
 * 程序无法运行
 *
 * 原因:先await()后signal才OK,否则线程无法被唤醒
 */
public class LockSupportDemo2
{
    public static void main(String[] args)
    {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try
            {
                condition.signal();
                System.out.println(Thread.currentThread().getName()+"\t"+"signal");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        },"t1").start();

        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println(Thread.currentThread().getName()+"\t"+"等待被唤醒");
                condition.await();
                System.out.println(Thread.currentThread().getName()+"\t"+"被唤醒");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        },"t2").start();

    }
}

先signal后await

4、总结

Condtion中的线程等待和唤醒方法之前,需要先获取锁

一定要先await后signal,不要反了

4、Object和Condition使用的限制条件

线程先要获得并持有锁,必须在锁块(synchronized或lock)中

必须要先等待后唤醒,线程才能够被唤醒

5、LockSupport类中的park等待和unpark唤醒

通过park()和unpark(thread)方法来实现阻塞和唤醒线程的操作

image-20210916233452889

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。

​ LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能, 每个线程都有一个许可(permit), permit只有两个值1和零,默认是零。 可以把许可看成是一种(0,1)信号量(Semaphore),但与 Semaphore 不同的是,许可的累加上限是1。

1、主要方法

image-20210916233517944

阻塞

park() /park(Object blocker)

image-20210916233615025

阻塞当前线程/阻塞传入的具体线程

唤醒

unpark(Thread thread)

image-20210916233726972

唤醒处于阻塞状态的指定线程

2、代码

正常+无锁块要求

public class LockSupportDemo3
{
    public static void main(String[] args)
    {
        //正常使用+不需要锁块
Thread t1 = new Thread(() -> {
    System.out.println(Thread.currentThread().getName()+" "+"1111111111111");
    LockSupport.park();
    System.out.println(Thread.currentThread().getName()+" "+"2222222222222------end被唤醒");
},"t1");
t1.start();

//暂停几秒钟线程
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }

LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName()+"   -----LockSupport.unparrk() invoked over");

    }
}

之前错误的先唤醒后等待,LockSupport照样支持

public class T1
{
    public static void main(String[] args)
    {
        Thread t1 = new Thread(() -> {
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+System.currentTimeMillis());
            LockSupport.park();
            System.out.println(Thread.currentThread().getName()+"\t"+System.currentTimeMillis()+"---被叫醒");
        },"t1");
        t1.start();

        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        LockSupport.unpark(t1);
        System.out.println(Thread.currentThread().getName()+"\t"+System.currentTimeMillis()+"---unpark over");
    }
}

image-20210916233832563

七、集合不安全

1、线程不安全错误

java.util.ConcurrentModificationException
ArrayList在迭代的时候如果同时对其进行修改就会
抛出java.util.ConcurrentModificationException异常 并发修改异常

2、List不安全

List<String> list = new ArrayList<>();
for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(list);
            },String.valueOf(i)).start();
        }
 
// 看ArrayList的源码
public boolean add(E e) {
    ensureCapacityInternal(size + 1);  // Increments modCount!!
    elementData[size++] = e;
    return true;
}
// 没有synchronized线程不安全

1、 解决方案

1、Vector
List list = new Vector<>();

image

// 看Vector的源码
public synchronized boolean add(E e) {
    modCount++;
    ensureCapacityHelper(elementCount + 1);
    elementData[elementCount++] = e;
    return true;
}
// 有synchronized线程安全
2、Collections
List list = Collections.synchronizedList(new ArrayList<>());
// Collections提供了方法synchronizedList保证list是同步线程安全的
// 那HashMap,HashSet是线程安全的吗?也不是,所以有同样的线程安全方法

image

3、写时复制(JUC)
List<String> list = new CopyOnWriteArrayList<>();

image

4、CopyOnWrite理论
/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

​ CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。添加元素后,再将原容器的引用指向新的容器setArray(newElements)。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

3、Set不安全

Set<String> set = new HashSet<>();//线程不安全
 
Set<String> set = new CopyOnWriteArraySet<>();//线程安全
HashSet底层数据结构是什么?
HashMap  ?HashSet的add是放一个值,而HashMap是放KV键值对
 
public HashSet() {
    map = new HashMap<>();
}
 
private static final Object PRESENT = new Object();
 
public boolean add(E e) {
    return map.put(e, PRESENT)==null;
}

4、Map不安全

Map<String,String> map = new HashMap<>();//线程不安全

Map<String,String> map = new ConcurrentHashMap<>();//线程安全
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 请举例说明集合类是不安全的
 */
public class NotSafeDemo {
    public static void main(String[] args) {

        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,8));
                System.out.println(map);
            },String.valueOf(i)).start();
        }


    }

    private static void setNoSafe() {
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 0; i <30 ; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0,8));
                System.out.println(set);
            },String.valueOf(i)).start();
        }
    }

    private static void listNoSafe() {
        //        List<String> list = Arrays.asList("a","b","c");
        //        list.forEach(System.out::println);
        //写时复制
        List<String> list = new CopyOnWriteArrayList<>();
        // new CopyOnWriteArrayList<>();
        //Collections.synchronizedList(new ArrayList<>());
        //new Vector<>();//new ArrayList<>();

        for (int i = 0; i <30 ; i++) {
                    new Thread(()->{
                        list.add(UUID.randomUUID().toString().substring(0,8));
                        System.out.println(list);
                    },String.valueOf(i)).start();
                }
    }


}

    /**
     * 写时复制
     CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,
     而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[] newElements里添加元素。
     添加元素后,再将原容器的引用指向新的容器setArray(newElements)。
     这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。
     所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

     *
     *
     *
     *

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
     */

八、JUC强大的辅助类

1、CountDownLatch减少计数

CountDownLatch称之为闭锁,它可以使一个或一批线程在闭锁上等待,等到其他线程执行完相应操作后,闭锁打开,这些等待的线程才可以继续执行。确切的说,闭锁在内部维护了一个倒计数器。通过该计数器的值来决定闭锁的状态,从而决定是否允许等待的线程继续执行。

常用方法:

public CountDownLatch(int count):构造方法,count表示计数器的值,不能小于0,否者会报异常。

public void await() throws InterruptedException:调用await()会让当前线程等待,直到计数器为0的时候,方法才会返回,此方法会响应线程中断操作。

public boolean await(long timeout, TimeUnit unit) throws InterruptedException:限时等待,在超时之前,计数器变为了0,方法返回true,否者直到超时,返回false,此方法会响应线程中断操作。

public void countDown():让计数器减1

CountDownLatch使用步骤:

  1. 创建CountDownLatch对象
  2. 调用其实例方法await(),让当前线程等待
  3. 调用countDown()方法,让计数器减1
  4. 当计数器变为0的时候,await()方法会返回
package com.xue.thread;

import java.util.concurrent.CountDownLatch;
 
 
/**
 * 
 * @Description:
 *  *让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
 * 
 * CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
 * 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
 * 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 * 
 * 解释:6个同学陆续离开教室后值班同学才可以关门。
 * 
 * main主线程必须要等前面6个线程完成全部工作后,自己才能开干 
 */
public class CountDownLatchDemo
{
   public static void main(String[] args) throws InterruptedException
   {
         CountDownLatch countDownLatch = new CountDownLatch(6);
       
       for (int i = 1; i <=6; i++) //6个上自习的同学,各自离开教室的时间不一致
       {
          new Thread(() -> {
              System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室");
              countDownLatch.countDown();
          }, String.valueOf(i)).start();
       }
       countDownLatch.await();
       System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长");
          
   }
}
  • CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

示例1

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要统计解析总耗时。分析一下:解析每个sheet耗时可能不一样,总耗时就是最长耗时的那个操作。

我们能够想到的最简单的做法是使用join,代码如下:

import java.util.concurrent.TimeUnit;

public class Demo1 {
    public static class T extends Thread {
        //休眠时间(秒)
        int sleepSeconds;
        public T(String name, int sleepSeconds) {
            super(name);
            this.sleepSeconds = sleepSeconds;
        }
        @Override
        public void run() {
            Thread ct = Thread.currentThread();
            long startTime = System.currentTimeMillis();
            System.out.println(startTime + "," + ct.getName() + ",开始处理!");
            try {
                //模拟耗时操作,休眠sleepSeconds秒
                TimeUnit.SECONDS.sleep(this.sleepSeconds);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endTime = System.currentTimeMillis();
            System.out.println(endTime + "," + ct.getName() + ",处理完毕,耗时:" + (endTime - startTime));
        }
    }
    public static void main(String[] args) throws InterruptedException {
        long starTime = System.currentTimeMillis();
        T t1 = new T("解析sheet1线程", 2);
        t1.start();
        T t2 = new T("解析sheet2线程", 5);
        t2.start();
        t1.join();
        t2.join();
        long endTime = System.currentTimeMillis();
        System.out.println("总耗时:" + (endTime - starTime));
    }
}