Java并发的书籍断断续续的看了三本:Java并发编程之美,实战Java高并发程序设计,深入浅出Java多线程。看关于线程、进程、volatile、synchronized这些知识点的时候还是觉得非常容易理解的,但是每次看到Atomic、Future、Executor等这些J.U.C工具类就看不明白了,所以就想弄一个框架梳理一下高并发到底在做什么。本文结合Java并发实现原理:JDK源码剖析和深入浅出Java多线程来进行总结。
分为两部分:并发基础(关键字、CAS、AQS)、并发过程中使用的工具类。
并发基础
Thread类和Runnable接口
简介
- 继承
Thread
类,并重写run
方法; - 实现
Runnable
接口的run
方法;(主要)
1 | public class Demo { |
1 | public class Demo { |
Thread类的几个常用的方法:
- currentThread():静态方法,返回对当前正在执行的线程对象的引用;
- start():开始执行线程的方法,java虚拟机会调用线程内的run()方法;
- yield():yield在英语里有放弃的意思,同样,这里的yield()指的是当前线程愿意让出对当前处理器的占用。这里需要注意的是,就算当前线程调用了yield()方法,程序在调度的时候,也还有可能继续运行这个线程的;
- sleep():静态方法,使当前线程睡眠一段时间;
- join():使当前线程等待另一个线程执行完毕之后再继续执行,内部调用的是Object类的wait方法实现的;
Callable、Future优化接口
通常来说,我们使用Runnable
和Thread
来创建一个新的线程。但是它们有一个弊端,就是run
方法是没有返回值的。而有时候我们希望开启一个线程去执行一个任务,并且这个任务执行完成后有一个返回值。JDK提供了Callable
接口与Future
接口为我们解决这个问题,这也是所谓的“异步”模型。
Callable
与Runnable
类似,同样是只有一个抽象方法的函数式接口。不同的是,Callable
提供的方法是有返回值的,而且支持泛型。
Callable
一般是配合线程池工具ExecutorService
来使用的。我们会在后续章节解释线程池的使用。这里只介绍ExecutorService
可以使用submit
方法来让一个Callable
接口执行。它会返回一个Future
,我们后续的程序可以通过这个Future
的get
方法得到结果。
1 | // 自定义Callable |
Future
接口只有几个比较简单的方法:
1 | public abstract interface Future<V> { |
为了让任务有能够取消的功能,就使用Callable
来代替Runnable
。如果为了可取消性而使用 Future
但又不提供可用的结果,则可以声明 Future<?>
形式类型、并返回 null
作为底层任务的结果
Future
接口。这个接口有一个实现类叫FutureTask
。FutureTask
是实现的RunnableFuture
接口的,而RunnableFuture
接口同时继承了Runnable
接口和Future
接口:
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
Future
只是一个接口,而它里面的cancel
,get
,isDone
等方法要自己实现起来都是非常复杂的。所以JDK提供了一个FutureTask
类来供我们使用。
1 | // 自定义Callable,与上面一样 |
这里是使用FutureTask
直接取get
取值,而上面的Demo是通过submit
方法返回的Future
去取值。
JMM与happen-before
JMM
在Java中,使用的是共享内存并发模型。
在栈中的变量(局部变量、方法定义参数、异常处理器参数)不会在线程之间共享,也就不会有内存可见性(下文会说到)的问题,也不受内存模型的影响。而在堆中的变量是共享的,本文称为共享变量。
- 所有的共享变量都存在主内存中。
- 每个线程都保存了一份该线程使用到的共享变量的副本。
- 如果线程A与线程B之间要通信的话,必须经历下面2个步骤:
- 线程A将本地内存A中更新过的共享变量刷新到主内存中去。
- 线程B到主内存中去读取线程A之前已经更新过的共享变量。
所以,线程A无法直接访问线程B的工作内存,线程间通信必须经过主内存。
注意,根据JMM的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。
所以线程B并不是直接去主内存中读取共享变量的值,而是先在本地内存B中找到这个共享变量,发现这个共享变量已经被更新了,然后本地内存B去主内存中读取这个共享变量的新值,并拷贝到本地内存B中,最后线程B再读取本地内存B中的新值。
指令重排
指令重排一般分为以下三种:
编译器优化重排
编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
指令并行重排
现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性(即后一个执行的语句无需依赖前面执行的语句的结果),处理器可以改变语句对应的机器指令的执行顺序。
内存系统重排
由于处理器使用缓存和读写缓存冲区,这使得加载(load)和存储(store)操作看上去可能是在乱序执行,因为三级缓存的存在,导致内存与缓存的数据同步存在时间差。
第三类就是造成“内存可见性”问题的主因
happen-before
happens-before关系的定义如下:
- 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
- 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么JMM也允许这样的重排序。
总之,如果操作A happens-before操作B,那么操作A在内存上所做的操作对操作B都是可见的,不管它们在不在一个线程。
在Java中,有以下天然的happens-before关系:
- 程序顺序规则:一个线程中的每一个操作,happens-before于该线程中的任意后续操作。
- 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
- volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
- 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
- start规则:如果线程A执行操作ThreadB.start()启动线程B,那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作
- join规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
volatile
volatile的三重功效:64位写入的原子性、内存可见性和禁止重排序
在理论层面,可以把基本的CPU内存屏障分成四种:(1)LoadLoad:禁止读和读的重排序。(2)StoreStore:禁止写和写的重排序。(3)LoadStore:禁止读和写的重排序。(4)StoreLoad:禁止写和读的重排序。
实现volatile关键字的语义的一种参考做法:
(1)在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。
(2)在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。
(3)在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序
在保证内存可见性这一点上,volatile有着与锁相同的内存语义,所以可以作为一个“轻量级”的锁来使用。但由于volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁可以保证整个临界区代码的执行具有原子性。所以在功能上,锁比volatile更强大;在性能上,volatile更有优势。
Synchronized
简介
我们通常使用synchronized
关键字来给一段代码或一个方法上锁。它通常有以下三种形式:
1 | // 关键字在实例方法上,锁为当前实例 |
我们这里介绍一下“临界区”的概念。所谓“临界区”,指的是某一块代码区域,它同一时刻只能由一个线程执行。在上面的例子中,如果synchronized
关键字在方法上,那临界区就是整个方法内部。而如果是使用synchronized代码块,那临界区就指的是代码块内部的区域。
同理,下面这两个方法也应该是等价的:
1 | // 关键字在静态方法上,锁为当前Class对象 |
锁
一个对象其实有四种锁状态,它们级别由低到高依次是:
- 无锁状态
- 偏向锁状态
- 轻量级锁状态
- 重量级锁状态
每个Java对象都有对象头。如果是非数组类型,则用2个字宽来存储对象头,如果是数组,则会用3个字宽来存储对象头。在32位处理器中,一个字宽是32位;在64位虚拟机中,一个字宽是64位。对象头的内容如下表:
长度 | 内容 | 说明 |
---|---|---|
32/64bit | Mark Word | 存储对象的hashCode或锁信息等 |
32/64bit | Class Metadata Address | 存储到对象类型数据的指针 |
32/64bit | Array length | 数组的长度(如果是数组) |
我们主要来看看Mark Word的格式:
锁状态 | 29 bit 或 61 bit | 1 bit 是否是偏向锁? | 2 bit 锁标志位 |
---|---|---|---|
无锁 | 0 | 01 | |
偏向锁 | 线程ID | 1 | 01 |
轻量级锁 | 指向栈中锁记录的指针 | 此时这一位不用于标识偏向锁 | 00 |
重量级锁 | 指向互斥量(重量级锁)的指针 | 此时这一位不用于标识偏向锁 | 10 |
GC标记 | 此时这一位不用于标识偏向锁 | 11 |
可以看到,当对象状态为偏向锁时,Mark Word
存储的是偏向的线程ID;当状态为轻量级锁时,Mark Word
存储的是指向线程栈中Lock Record
的指针;当状态为重量级锁时,Mark Word
为指向堆中的monitor对象的指针。
偏向锁
偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。也就是说,偏向锁在资源无竞争情况下消除了同步语句,连CAS操作都不做了,提高了程序的运行性能。
大白话就是对锁置个变量,如果发现为true,代表资源无竞争,则无需再走各种加锁/解锁流程。如果为false,代表存在其他线程竞争资源,那么就会走后面的流程。
一个线程在第一次进入同步块时,会在对象头和栈帧中的锁记录里存储锁的偏向的线程ID。当下次该线程进入这个同步块时,会去检查锁的Mark Word里面是不是放的自己的线程ID。
如果是,表明该线程已经获得了锁,以后该线程在进入和退出同步块时不需要花费CAS操作来加锁和解锁 ;如果不是,就代表有另一个线程来竞争这个偏向锁。这个时候会尝试使用CAS来替换Mark Word里面的线程ID为新线程的ID,这个时候要分两种情况:
- 成功,表示之前的线程不存在了, Mark Word里面的线程ID为新线程的ID,锁不会升级,仍然为偏向锁;
- 失败,表示之前的线程仍然存在,那么暂停之前的线程,设置偏向锁标识为0,并设置锁标志位为00,升级为轻量级锁,会按照轻量级锁的方式进行竞争锁。
偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时, 持有偏向锁的线程才会释放锁。
偏向锁升级成轻量级锁时,会暂停拥有偏向锁的线程,重置偏向锁标识,这个过程看起来容易,实则开销还是很大的,大概的过程如下:
- 在一个安全点(在这个时间点上没有字节码正在执行)停止拥有锁的线程。
- 遍历线程栈,如果存在锁记录的话,需要修复锁记录和Mark Word,使其变成无锁状态。
- 唤醒被停止的线程,将当前锁升级成轻量级锁。
轻量锁
多个线程在不同时段获取同一把锁,即不存在锁竞争的情况,也就没有线程阻塞。针对这种情况,JVM采用轻量级锁来避免线程的阻塞与唤醒。
JVM会为每个线程在当前线程的栈帧中创建用于存储锁记录的空间,我们称为Displaced Mark Word。如果一个线程获得锁的时候发现是轻量级锁,会把锁的Mark Word复制到自己的Displaced Mark Word里面。
然后线程尝试用CAS将锁的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示Mark Word已经被替换成了其他线程的锁记录,说明在与其它线程竞争锁,当前线程就尝试使用自旋来获取锁。
自旋也不是一直进行下去的,如果自旋到一定程度(和JVM、操作系统相关),依然没有获取到锁,称为自旋失败,那么这个线程会阻塞。同时这个锁就会升级成重量级锁。
在释放锁时,当前线程会使用CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里面。如果没有发生竞争,那么这个复制的操作会成功。如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁,那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程。
重量级锁
重量级锁依赖于操作系统的互斥量(mutex) 实现的,而操作系统中线程间状态的转换需要相对比较长的时间,所以重量级锁效率很低,但被阻塞的线程不会消耗CPU。
锁升级流程
每一个线程在准备获取共享资源时: 第一步,检查MarkWord里面是不是放的自己的ThreadId ,如果是,表示当前线程是处于 “偏向锁” 。
第二步,如果MarkWord不是自己的ThreadId,锁升级,这时候,用CAS来执行切换,新的线程根据MarkWord里面现有的ThreadId,通知之前线程暂停,之前线程将Markword的内容置为空。
第三步,两个线程都把锁对象的HashCode复制到自己新建的用于存储锁的记录空间,接着开始通过CAS操作, 把锁对象的MarKword的内容修改为自己新建的记录空间的地址的方式竞争MarkWord。
第四步,第三步中成功执行CAS的获得资源,失败的则进入自旋 。
第五步,自旋的线程在自旋过程中,成功获得资源(即之前获的资源的线程执行完成并释放了共享资源),则整个状态依然处于 轻量级锁的状态,如果自旋失败 。
第六步,进入重量级锁的状态,这个时候,自旋的线程进行阻塞,等待之前线程执行完成并唤醒自己。
下表来自《Java并发编程的艺术》:
锁 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 | 适用于只有一个线程访问同步块场景。 |
轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度。 | 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 | 追求响应时间。同步块执行速度非常快。 |
重量级锁 | 线程竞争不使用自旋,不会消耗CPU。 | 线程阻塞,响应时间缓慢。 | 追求吞吐量。同步块执行时间较长。 |
乐观锁与悲观锁
乐观锁多用于“读多写少“的环境,避免频繁加锁影响性能;而悲观锁多用于”写多读少“的环境,避免频繁失败和重试影响性能。
CAS的全称是:比较并交换(Compare And Swap)。在CAS中,有这样三个值:
- V:要更新的变量(var)
- E:预期值(expected)—旧值
- N:新值(new)
比较并交换的过程如下:
判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。
CAS是一种原子操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
并发高级
Atomic类
这些类大概的用途:
- 原子更新基本类型
- 原子更新数组
- 原子更新引用
- 原子更新字段(属性)
示例
这里我们以AtomicInteger
类的getAndAdd(int delta)
方法为例,来看看Java是如何实现原子操作的。
先看看这个方法的源码:
1 | public final int getAndAdd(int delta) { |
这里的U其实就是一个Unsafe
对象:
1 | private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); |
所以其实AtomicInteger
类的getAndAdd(int delta)
方法是调用Unsafe
类的方法来实现的:
1 |
|
首先,对象o
是this
,也就是一个AtomicInteger
对象。然后offset
是一个常量VALUE
。这个常量是在AtomicInteger
类中声明的:
1 | private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value"); |
同样是调用的Unsafe
的方法。从方法名字上来看,是得到了一个对象字段偏移量。
这里声明了一个v,也就是要返回的值。从getAndAddInt
来看,它返回的应该是原来的值,而新的值的v + delta
。
这里使用的是do-while循环。这种循环不多见,它的目的是保证循环体内的语句至少会被执行一遍。这样才能保证return 的值v
是我们期望的值。
简单来说,weakCompareAndSet
操作仅保留了volatile
自身变量的特性,而除去了happens-before规则带来的内存语义。也就是说,weakCompareAndSet
无法保证处理操作目标的volatile变量外的其他变量的执行顺序( 编译器和处理器为了优化程序性能而对指令序列进行重新排序 ),同时也无法保证这些变量的可见性。这在一定程度上可以提高性能。
ABA问题
到目前为止,CAS都是基于“值”来做比较的。但如果另外一个线程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。要解决ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是AtomicStamped-Reference做的事情
之前的CAS只有两个参数,这里的CAS有四个参数,后两个参数就是版本号的旧值和新值。当expectedReference!=对象当前的reference时,说明该数据肯定被其他线程修改过;当expectedReference==对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。
AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是boolean类型的,而不是整型的累加变量
AtomicXXXFieldUpdater
为什么需要AtomicXXXFieldUpdater
如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。
1 | public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, |
newUpdater(..)静态函数传入的是要修改的类(不是对象)和对应的成员变量的名字
要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类)
数组操作
Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。
AQS
AQS是AbstractQueuedSynchronizer
的简称,即抽象队列同步器
,在Locks包里面从字面意思上理解:
- 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
- 队列:使用先进先出(FIFO)队列存储数据;
- 同步:实现了同步的功能。
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS内部使用了一个volatile的变量state来作为资源的标识。同时定义了几个获取和改变state的protected方法,子类可以覆盖这些方法来实现自己的逻辑:
1 | getState() |
这三种叫做均是原子操作,其中compareAndSetState的实现依赖于Unsafe的compareAndSwapInt()方法。
锁与条件
因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。“可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。
synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。
ReentrantLock
ReentrantLock内部有两个非抽象类NonfairSync
和FairSync
,它们都继承了Sync。从名字上看得出,分别是”非公平同步器“和”公平同步器“的意思。通过看这两个同步器的源码可以发现,它们的实现都是”独占“的。都调用了AOS的setExclusiveOwnerThread
方法,所以ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。
ReentrantReadWriteLock
ReentrantReadWriteLock实现了读写锁,但它有一个小弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。
Condition
Condition本身也是一个接口,其功能和wait/notify类似,必须和Lock一起使用。
StampedLock
StampedLock的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。这种操作方式决定了StampedLock在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。
同步工具
类 | 作用 |
---|---|
Semaphore | 限制线程的数量 |
Exchanger | 两个线程交换数据 |
CountDownLatch | 线程等待直到计数器减为0时开始工作 |
CyclicBarrier | 作用跟CountDownLatch类似,但是可以重复使用 |
Phaser | 增强的CyclicBarrier |
Semaphore
Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int
类型的数据,也可以看成是一种“资源”。
可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。
1 | // 默认情况下使用非公平 |
acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。
每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。
当初始的资源个数为1的时候,Semaphore退化为排他锁。
Exchanger
Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。
Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。Exchanger只能是两个线程交换数据吗?那三个调用同一个实例的exchange方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。
CountDownLatch
CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。
构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。
CyclicBarrirer
CyclicBarrirer从名字上来理解是“循环的屏障”的意思。前面提到了CountDownLatch一旦计数值count
被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()
方法重置屏障。
Phaser
Phaser 可以在运行期间动态地调整要同步的线程个数。
并发容器
BlockingQueue
BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | - | - |
- 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常
- 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
- 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
- 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。
ArrayBlockingQueue 是一个用数组实现的环形队列,在构造函数中,会要求传入数组的容量。
LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数。
PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。
DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是“未来将要执行的时间”-“当前时间”
SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(..),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然
BlockingDeque
BlockingDeque定义了一个阻塞的双端队列接口,该接口在继承了BlockingQueue接口的同时,增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。
CopyOnWrite
CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。
CopyOnWriteArrayList的核心数据结构也是一个数组。
CopyOnWriteArraySet 就是用Array 实现的一个Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList。
优点: CopyOnWriteArrayList经常被用于“读多写少”的并发场景,是因为CopyOnWriteArrayList无需任何同步措施,大大增强了读的性能。在Java中遍历线程非安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器进行修改,那么会抛出ConcurrentModificationException异常。CopyOnWriteArrayList由于其”读写分离”,遍历和修改操作分别作用在不同的List容器,所以在使用迭代器遍历的时候,则不会抛出异常。
缺点: 第一个缺点是CopyOnWriteArrayList每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁Full GC(ZGC因为没有使用Full GC)。比如这些对象占用的内存200M左右,那么再写入100M数据进去,内存就会多占用300M。
第二个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。
ConcurrentLinkedQueue/Deque
JDK并没有提供线程安全的List类,因为对List来说,很难去开发一个通用并且没有并发瓶颈的线程安全的List。因为即使简单的读操作,拿contains() 这样一个操作来说,很难想到搜索的时候如何避免锁住整个list。所以退一步,JDK提供了对队列和双端队列的线程安全的类:ConcurrentLinkedQueue和ConcurrentLinkedDeque。
ConcurrentHashMap
ConcurrentHashMap同HashMap一样也是基于散列表的map,但是它提供了一种与Hashtable完全不同的加锁策略,提供更高效的并发性和伸缩性。
JDK 1.7
ConcurrentHashMap在JDK 1.7中,提供了一种粒度更细的加锁机制来实现在多线程下更高的性能,这种机制叫分段锁(Lock Striping)。
提供的优点是:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能。
可以这样理解分段锁,就是将数据分段,对每一段数据分配一把锁。当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
有些方法需要跨段,比如size()、isEmpty()、containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,HashEntry则用于存储键值对数据。
一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
JDK 1.8
而在JDK 1.8中,ConcurrentHashMap主要做了两个优化:
- 同HashMap一样,链表也会在长度达到8的时候转化为红黑树,这样可以提升大量冲突时候的查询效率;
- 以某个位置的头结点(链表的头结点或红黑树的root结点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能。
ConcurrentSkipListMap/Set
在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的
JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使用ConcurrentSkipListMap实现。
线程池、Future
线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。
有两个核心的类:ThreadPoolExector和ScheduledThreadPoolExecutor,后者不仅可以执行某个任务,还可以周期性地执行任务。向线程池中提交的每个任务,都必须实现Runnable接口,通过最上面的Executor接口中的execute(Runnable command)向线程池提交任务。然后,在ExecutorService 中,定义了线程池的关闭接口shutdown(),还定义了可以有返回值的任务,也就是Callable
ThreadPoolExector
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize:在线程池中始终维护的线程个数。
maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
blockingQueue:线程池所用的队列类型。
threadFactory:线程创建工厂,可以自定义,也有一个默认的。
RejectedExecutionHandler handler
拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
线程池创建后处于RUNNING状态。
调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。
调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。
当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。
ThreadPoolExecutor中有一个控制状态的属性叫
ctl
,它是一个AtomicInteger类型的变量。线程池状态就是通过AtomicInteger类型的成员变量ctl
来获取的。获取的
ctl
值传入runStateOf
方法,与~CAPACITY
位与运算(CAPACITY
是低29位全1的int变量)。
~CAPACITY
在这里相当于掩码,用来获取ctl的高3位,表示线程池状态;而另外的低29位用于表示工作线程数
线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。
处理流程
- 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
- 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
- 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
- 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
线程复用
一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?
原来,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。
四种常见的线程池
newCachedThreadPool
1 | public static ExecutorService newCachedThreadPool() { |
CacheThreadPool
的运行流程如下:
- 提交任务进线程池。
- 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
- 尝试将任务添加到SynchronousQueue队列。
- 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
- 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。
当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源
newFixedThreadPool
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。
newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor() { |
有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
Callable与Future
execute(Runnable command)接口是无返回值的,与之相对应的是一个有返回值的接口Future submit(Callable task),Callable也就是一个有返回值的Runnable
ScheduledThreadPoolExecutor
- 延迟执行任务
- 周期执行任务