0%

Java并发框架

Java并发的书籍断断续续的看了三本:Java并发编程之美,实战Java高并发程序设计,深入浅出Java多线程。看关于线程、进程、volatile、synchronized这些知识点的时候还是觉得非常容易理解的,但是每次看到Atomic、Future、Executor等这些J.U.C工具类就看不明白了,所以就想弄一个框架梳理一下高并发到底在做什么。本文结合Java并发实现原理:JDK源码剖析深入浅出Java多线程来进行总结。

分为两部分:并发基础(关键字、CAS、AQS)、并发过程中使用的工具类

并发基础

Thread类和Runnable接口

简介
  • 继承Thread类,并重写run方法;
  • 实现Runnable接口的run方法;(主要)
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static class MyThread extends Thread {
@Override
public void run() {
System.out.println("MyThread");
}
}

public static void main(String[] args) {
Thread myThread = new MyThread();
myThread.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Demo {
public static class MyThread implements Runnable {
@Override
public void run() {
System.out.println("MyThread");
}
}

public static void main(String[] args) {

new Thread(new MyThread()).start();

// Java 8 函数式编程,可以省略MyThread类
new Thread(() -> {
System.out.println("Java 8 匿名内部类");
}).start();
}
}

Thread类的几个常用的方法:

  • currentThread():静态方法,返回对当前正在执行的线程对象的引用;
  • start():开始执行线程的方法,java虚拟机会调用线程内的run()方法;
  • yield():yield在英语里有放弃的意思,同样,这里的yield()指的是当前线程愿意让出对当前处理器的占用。这里需要注意的是,就算当前线程调用了yield()方法,程序在调度的时候,也还有可能继续运行这个线程的;
  • sleep():静态方法,使当前线程睡眠一段时间;
  • join():使当前线程等待另一个线程执行完毕之后再继续执行,内部调用的是Object类的wait方法实现的;
Callable、Future优化接口

通常来说,我们使用RunnableThread来创建一个新的线程。但是它们有一个弊端,就是run方法是没有返回值的。而有时候我们希望开启一个线程去执行一个任务,并且这个任务执行完成后有一个返回值。JDK提供了Callable接口与Future接口为我们解决这个问题,这也是所谓的“异步”模型。

CallableRunnable类似,同样是只有一个抽象方法的函数式接口。不同的是,Callable提供的方法是有返回值的,而且支持泛型

Callable一般是配合线程池工具ExecutorService来使用的。我们会在后续章节解释线程池的使用。这里只介绍ExecutorService可以使用submit方法来让一个Callable接口执行。它会返回一个Future,我们后续的程序可以通过这个Futureget方法得到结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 自定义Callable
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 模拟计算需要一秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws Exception {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
// 注意调用get方法会阻塞当前线程,直到得到结果。
// 所以实际编码中建议使用可以设置超时时间的重载get方法。
System.out.println(result.get());
}
}

Future接口只有几个比较简单的方法:

1
2
3
4
5
6
7
8
public abstract interface Future<V> {
public abstract boolean cancel(boolean paramBoolean);
public abstract boolean isCancelled();
public abstract boolean isDone();
public abstract V get() throws InterruptedException, ExecutionException;
public abstract V get(long paramLong, TimeUnit paramTimeUnit)
throws InterruptedException, ExecutionException, TimeoutException;
}

为了让任务有能够取消的功能,就使用Callable来代替Runnable。如果为了可取消性而使用 Future但又不提供可用的结果,则可以声明 Future<?>形式类型、并返回 null作为底层任务的结果

Future接口。这个接口有一个实现类叫FutureTaskFutureTask是实现的RunnableFuture接口的,而RunnableFuture接口同时继承了Runnable接口和Future接口:

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

Future只是一个接口,而它里面的cancelgetisDone等方法要自己实现起来都是非常复杂的。所以JDK提供了一个FutureTask类来供我们使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 自定义Callable,与上面一样
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
// 模拟计算需要一秒
Thread.sleep(1000);
return 2;
}
public static void main(String args[]) throws Exception {
// 使用
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<>(new Task());
executor.submit(futureTask);
System.out.println(futureTask.get());
}
}

这里是使用FutureTask直接取get取值,而上面的Demo是通过submit方法返回的Future去取值。

JMM与happen-before

JMM

在Java中,使用的是共享内存并发模型

在栈中的变量(局部变量、方法定义参数、异常处理器参数)不会在线程之间共享,也就不会有内存可见性(下文会说到)的问题,也不受内存模型的影响。而在堆中的变量是共享的,本文称为共享变量。

  1. 所有的共享变量都存在主内存中。
  2. 每个线程都保存了一份该线程使用到的共享变量的副本。
  3. 如果线程A与线程B之间要通信的话,必须经历下面2个步骤:
    1. 线程A将本地内存A中更新过的共享变量刷新到主内存中去。
    2. 线程B到主内存中去读取线程A之前已经更新过的共享变量。

所以,线程A无法直接访问线程B的工作内存,线程间通信必须经过主内存。

注意,根据JMM的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取

所以线程B并不是直接去主内存中读取共享变量的值,而是先在本地内存B中找到这个共享变量,发现这个共享变量已经被更新了,然后本地内存B去主内存中读取这个共享变量的新值,并拷贝到本地内存B中,最后线程B再读取本地内存B中的新值。

指令重排

指令重排一般分为以下三种:

  • 编译器优化重排

    编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。

  • 指令并行重排

    现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性(即后一个执行的语句无需依赖前面执行的语句的结果),处理器可以改变语句对应的机器指令的执行顺序。

  • 内存系统重排

    由于处理器使用缓存和读写缓存冲区,这使得加载(load)和存储(store)操作看上去可能是在乱序执行,因为三级缓存的存在,导致内存与缓存的数据同步存在时间差。

第三类就是造成“内存可见性”问题的主因

happen-before

happens-before关系的定义如下:

  1. 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  2. 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么JMM也允许这样的重排序。

总之,如果操作A happens-before操作B,那么操作A在内存上所做的操作对操作B都是可见的,不管它们在不在一个线程。

在Java中,有以下天然的happens-before关系:

  • 程序顺序规则:一个线程中的每一个操作,happens-before于该线程中的任意后续操作。
  • 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
  • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
  • 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
  • start规则:如果线程A执行操作ThreadB.start()启动线程B,那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作
  • join规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。

volatile

volatile的三重功效:64位写入的原子性、内存可见性和禁止重排序

在理论层面,可以把基本的CPU内存屏障分成四种:(1)LoadLoad:禁止读和读的重排序。(2)StoreStore:禁止写和写的重排序。(3)LoadStore:禁止读和写的重排序。(4)StoreLoad:禁止写和读的重排序。

实现volatile关键字的语义的一种参考做法:

(1)在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。

(2)在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。

(3)在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序

在保证内存可见性这一点上,volatile有着与锁相同的内存语义,所以可以作为一个“轻量级”的锁来使用。但由于volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁可以保证整个临界区代码的执行具有原子性。所以在功能上,锁比volatile更强大;在性能上,volatile更有优势

Synchronized

简介

我们通常使用synchronized关键字来给一段代码或一个方法上锁。它通常有以下三种形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
// code
}
// 关键字在静态方法上,锁为当前Class对象
public static synchronized void classLock() {
// code
}
// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
Object o = new Object();
synchronized (o) {
// code
}
}

我们这里介绍一下“临界区”的概念。所谓“临界区”,指的是某一块代码区域,它同一时刻只能由一个线程执行。在上面的例子中,如果synchronized关键字在方法上,那临界区就是整个方法内部。而如果是使用synchronized代码块,那临界区就指的是代码块内部的区域。

同理,下面这两个方法也应该是等价的:

1
2
3
4
5
6
7
8
9
10
// 关键字在静态方法上,锁为当前Class对象
public static synchronized void classLock() {
// code
}
// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
synchronized (this.getClass()) {
// code
}
}

一个对象其实有四种锁状态,它们级别由低到高依次是:

  1. 无锁状态
  2. 偏向锁状态
  3. 轻量级锁状态
  4. 重量级锁状态

每个Java对象都有对象头。如果是非数组类型,则用2个字宽来存储对象头,如果是数组,则会用3个字宽来存储对象头。在32位处理器中,一个字宽是32位;在64位虚拟机中,一个字宽是64位。对象头的内容如下表:

长度 内容 说明
32/64bit Mark Word 存储对象的hashCode或锁信息等
32/64bit Class Metadata Address 存储到对象类型数据的指针
32/64bit Array length 数组的长度(如果是数组)

我们主要来看看Mark Word的格式:

锁状态 29 bit 或 61 bit 1 bit 是否是偏向锁? 2 bit 锁标志位
无锁 0 01
偏向锁 线程ID 1 01
轻量级锁 指向栈中锁记录的指针 此时这一位不用于标识偏向锁 00
重量级锁 指向互斥量(重量级锁)的指针 此时这一位不用于标识偏向锁 10
GC标记 此时这一位不用于标识偏向锁 11

可以看到,当对象状态为偏向锁时,Mark Word存储的是偏向的线程ID;当状态为轻量级锁时,Mark Word存储的是指向线程栈中Lock Record的指针;当状态为重量级锁时,Mark Word为指向堆中的monitor对象的指针。

偏向锁

偏向锁会偏向于第一个访问锁的线程,如果在接下来的运行过程中,该锁没有被其他的线程访问,则持有偏向锁的线程将永远不需要触发同步。也就是说,偏向锁在资源无竞争情况下消除了同步语句,连CAS操作都不做了,提高了程序的运行性能。

大白话就是对锁置个变量,如果发现为true,代表资源无竞争,则无需再走各种加锁/解锁流程。如果为false,代表存在其他线程竞争资源,那么就会走后面的流程。

一个线程在第一次进入同步块时,会在对象头和栈帧中的锁记录里存储锁的偏向的线程ID。当下次该线程进入这个同步块时,会去检查锁的Mark Word里面是不是放的自己的线程ID。

如果是,表明该线程已经获得了锁,以后该线程在进入和退出同步块时不需要花费CAS操作来加锁和解锁 ;如果不是,就代表有另一个线程来竞争这个偏向锁。这个时候会尝试使用CAS来替换Mark Word里面的线程ID为新线程的ID,这个时候要分两种情况:

  • 成功,表示之前的线程不存在了, Mark Word里面的线程ID为新线程的ID,锁不会升级,仍然为偏向锁;
  • 失败,表示之前的线程仍然存在,那么暂停之前的线程,设置偏向锁标识为0,并设置锁标志位为00,升级为轻量级锁,会按照轻量级锁的方式进行竞争锁。

偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时, 持有偏向锁的线程才会释放锁。

偏向锁升级成轻量级锁时,会暂停拥有偏向锁的线程,重置偏向锁标识,这个过程看起来容易,实则开销还是很大的,大概的过程如下:

  1. 在一个安全点(在这个时间点上没有字节码正在执行)停止拥有锁的线程。
  2. 遍历线程栈,如果存在锁记录的话,需要修复锁记录和Mark Word,使其变成无锁状态。
  3. 唤醒被停止的线程,将当前锁升级成轻量级锁。
轻量锁

多个线程在不同时段获取同一把锁,即不存在锁竞争的情况,也就没有线程阻塞。针对这种情况,JVM采用轻量级锁来避免线程的阻塞与唤醒。

JVM会为每个线程在当前线程的栈帧中创建用于存储锁记录的空间,我们称为Displaced Mark Word。如果一个线程获得锁的时候发现是轻量级锁,会把锁的Mark Word复制到自己的Displaced Mark Word里面。

然后线程尝试用CAS将锁的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示Mark Word已经被替换成了其他线程的锁记录,说明在与其它线程竞争锁,当前线程就尝试使用自旋来获取锁。

自旋也不是一直进行下去的,如果自旋到一定程度(和JVM、操作系统相关),依然没有获取到锁,称为自旋失败,那么这个线程会阻塞。同时这个锁就会升级成重量级锁

在释放锁时,当前线程会使用CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里面。如果没有发生竞争,那么这个复制的操作会成功。如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁,那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程。

重量级锁

重量级锁依赖于操作系统的互斥量(mutex) 实现的,而操作系统中线程间状态的转换需要相对比较长的时间,所以重量级锁效率很低,但被阻塞的线程不会消耗CPU。

锁升级流程

每一个线程在准备获取共享资源时: 第一步,检查MarkWord里面是不是放的自己的ThreadId ,如果是,表示当前线程是处于 “偏向锁” 。

第二步,如果MarkWord不是自己的ThreadId,锁升级,这时候,用CAS来执行切换,新的线程根据MarkWord里面现有的ThreadId,通知之前线程暂停,之前线程将Markword的内容置为空。

第三步,两个线程都把锁对象的HashCode复制到自己新建的用于存储锁的记录空间,接着开始通过CAS操作, 把锁对象的MarKword的内容修改为自己新建的记录空间的地址的方式竞争MarkWord。

第四步,第三步中成功执行CAS的获得资源,失败的则进入自旋 。

第五步,自旋的线程在自旋过程中,成功获得资源(即之前获的资源的线程执行完成并释放了共享资源),则整个状态依然处于 轻量级锁的状态,如果自旋失败 。

第六步,进入重量级锁的状态,这个时候,自旋的线程进行阻塞,等待之前线程执行完成并唤醒自己。

下表来自《Java并发编程的艺术》:

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 适用于只有一个线程访问同步块场景。
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度。 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 追求响应时间。同步块执行速度非常快。
重量级锁 线程竞争不使用自旋,不会消耗CPU。 线程阻塞,响应时间缓慢。 追求吞吐量。同步块执行时间较长。

乐观锁与悲观锁

乐观锁多用于“读多写少“的环境,避免频繁加锁影响性能;而悲观锁多用于”写多读少“的环境,避免频繁失败和重试影响性能。

CAS的全称是:比较并交换(Compare And Swap)。在CAS中,有这样三个值:

  • V:要更新的变量(var)
  • E:预期值(expected)—旧值
  • N:新值(new)

比较并交换的过程如下:

判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。

CAS是一种原子操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

并发高级

Atomic类

AtomicAllClass

这些类大概的用途:

  • 原子更新基本类型
  • 原子更新数组
  • 原子更新引用
  • 原子更新字段(属性)
示例

这里我们以AtomicInteger类的getAndAdd(int delta)方法为例,来看看Java是如何实现原子操作的。

先看看这个方法的源码:

1
2
3
public final int getAndAdd(int delta) {
return U.getAndAddInt(this, VALUE, delta);
}

这里的U其实就是一个Unsafe对象:

1
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

所以其实AtomicInteger类的getAndAdd(int delta)方法是调用Unsafe类的方法来实现的:

1
2
3
4
5
6
7
8
@HotSpotIntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}

首先,对象othis,也就是一个AtomicInteger对象。然后offset是一个常量VALUE。这个常量是在AtomicInteger类中声明的:

1
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

同样是调用的Unsafe的方法。从方法名字上来看,是得到了一个对象字段偏移量。

这里声明了一个v,也就是要返回的值。从getAndAddInt来看,它返回的应该是原来的值,而新的值的v + delta

这里使用的是do-while循环。这种循环不多见,它的目的是保证循环体内的语句至少会被执行一遍。这样才能保证return 的值v是我们期望的值。

简单来说,weakCompareAndSet操作仅保留了volatile自身变量的特性,而除去了happens-before规则带来的内存语义。也就是说,weakCompareAndSet无法保证处理操作目标的volatile变量外的其他变量的执行顺序( 编译器和处理器为了优化程序性能而对指令序列进行重新排序 ),同时也无法保证这些变量的可见性。这在一定程度上可以提高性能。

ABA问题

到目前为止,CAS都是基于“值”来做比较的。但如果另外一个线程把变量的值从A改为B,再从B改回到A,那么尽管修改过两次,可是在当前线程做CAS操作的时候,却会因为值没变而认为数据没有被其他线程修改过,这就是所谓的ABA问题。要解决ABA 问题,不仅要比较“值”,还要比较“版本号”,而这正是AtomicStamped-Reference做的事情

之前的CAS只有两个参数,这里的CAS有四个参数,后两个参数就是版本号的旧值和新值。当expectedReference!=对象当前的reference时,说明该数据肯定被其他线程修改过;当expectedReference==对象当前的reference时,再进一步比较expectedStamp是否等于对象当前的版本号,以此判断数据是否被其他线程修改过。

AtomicMarkableReference与AtomicStampedReference原理类似,只是Pair里面的版本号是boolean类型的,而不是整型的累加变量

AtomicXXXFieldUpdater

为什么需要AtomicXXXFieldUpdater

如果一个类是自己编写的,则可以在编写的时候把成员变量定义为Atomic类型。但如果是一个已经有的类,在不能更改其源代码的情况下,要想实现对其成员变量的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater。

1
2
3
4
5
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}

newUpdater(..)静态函数传入的是要修改的类(不是对象)和对应的成员变量的名字

要想使用AtomicIntegerFieldUpdater修改成员变量,成员变量必须是volatile的int类型(不能是Integer包装类)

数组操作

Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三个数组元素的原子操作。注意,这里并不是说对整个数组的操作是原子的,而是针对数组中一个元素的原子操作而言。

AQS

AQSAbstractQueuedSynchronizer的简称,即抽象队列同步器,在Locks包里面从字面意思上理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)队列存储数据;
  • 同步:实现了同步的功能。

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的同步器,比如我们提到的ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。

AQS内部使用了一个volatile的变量state来作为资源的标识。同时定义了几个获取和改变state的protected方法,子类可以覆盖这些方法来实现自己的逻辑:

1
2
3
getState()
setState()
compareAndSetState()

这三种叫做均是原子操作,其中compareAndSetState的实现依赖于Unsafe的compareAndSwapInt()方法。

锁与条件

因为在Concurrent包中的锁都是“可重入锁”,所以一般都命名为ReentrantX,因为所有的锁。“可重入锁”是指当一个线程调用object.lock()拿到锁,进入互斥区后,再次调用object.lock(),仍然可以拿到该锁。很显然,通常的锁都要设计成可重入的,否则就会发生死锁。

synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。

ReentrantLock

ReentrantLock内部有两个非抽象类NonfairSyncFairSync,它们都继承了Sync。从名字上看得出,分别是”非公平同步器“和”公平同步器“的意思。通过看这两个同步器的源码可以发现,它们的实现都是”独占“的。都调用了AOS的setExclusiveOwnerThread方法,所以ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。

ReentrantReadWriteLock

ReentrantReadWriteLock实现了读写锁,但它有一个小弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”。读线程和读线程之间不互斥(可以同时拿到这把锁),读线程和写线程互斥,写线程和写线程也互斥。

Condition

Condition本身也是一个接口,其功能和wait/notify类似,必须和Lock一起使用。

StampedLock

StampedLock的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。这种操作方式决定了StampedLock在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。

同步工具

作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为0时开始工作
CyclicBarrier 作用跟CountDownLatch类似,但是可以重复使用
Phaser 增强的CyclicBarrier
Semaphore

Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”。

可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。

1
2
3
4
5
6
7
// 默认情况下使用非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。

每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

当初始的资源个数为1的时候,Semaphore退化为排他锁。

Exchanger

Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。

Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。Exchanger只能是两个线程交换数据吗?那三个调用同一个实例的exchange方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。

CountDownLatch

CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值

CyclicBarrirer

CyclicBarrirer从名字上来理解是“循环的屏障”的意思。前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障。

Phaser

Phaser 可以在运行期间动态地调整要同步的线程个数。

并发容器

BlockingQueue

BlockingQueue是一个带阻塞功能的队列,当入队列时,若队列已满,则阻塞调用者;当出队列时,若队列为空,则阻塞调用者。

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() - -
  • 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常
  • 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
  • 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
  • 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

ArrayBlockingQueue 是一个用数组实现的环形队列,在构造函数中,会要求传入数组的容量。

LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为队头和队尾是2个指针分开操作的,所以用了2把锁+2个条件,同时有1个AtomicInteger的原子变量记录count数。

PriorityQueue是按照元素的优先级从小到大出队列的。正因为如此,PriorityQueue中的2个元素之间需要可以比较大小,并实现Comparable接口。如果不指定初始大小,内部会设定一个默认值11,当元素个数超过这个大小之后,会自动扩容。

DelayQueue即延迟队列,也就是一个按延迟时间从小到大出队的PriorityQueue。所谓延迟时间,就是“未来将要执行的时间”-“当前时间”

SynchronousQueue是一种特殊的BlockingQueue,它本身没有容量。先调put(..),线程会阻塞;直到另外一个线程调用了take(),两个线程才同时解锁,反之亦然

BlockingDeque

BlockingDeque定义了一个阻塞的双端队列接口,该接口在继承了BlockingQueue接口的同时,增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

CopyOnWrite

CopyOnWrite指在“写”的时候,不是直接“写”源数据,而是把数据拷贝一份进行修改,再通过悲观锁或者乐观锁的方式写回。那为什么不直接修改,而是要拷贝一份修改呢?这是为了在“读”的时候不加锁。

CopyOnWriteArrayList的核心数据结构也是一个数组。

CopyOnWriteArraySet 就是用Array 实现的一个Set,保证所有元素都不重复。其内部是封装的一个CopyOnWriteArrayList。

优点: CopyOnWriteArrayList经常被用于“读多写少”的并发场景,是因为CopyOnWriteArrayList无需任何同步措施,大大增强了读的性能。在Java中遍历线程非安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器进行修改,那么会抛出ConcurrentModificationException异常。CopyOnWriteArrayList由于其”读写分离”,遍历和修改操作分别作用在不同的List容器,所以在使用迭代器遍历的时候,则不会抛出异常。

缺点: 第一个缺点是CopyOnWriteArrayList每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁Full GC(ZGC因为没有使用Full GC)。比如这些对象占用的内存200M左右,那么再写入100M数据进去,内存就会多占用300M。

第二个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。

ConcurrentLinkedQueue/Deque

JDK并没有提供线程安全的List类,因为对List来说,很难去开发一个通用并且没有并发瓶颈的线程安全的List。因为即使简单的读操作,拿contains() 这样一个操作来说,很难想到搜索的时候如何避免锁住整个list。所以退一步,JDK提供了对队列和双端队列的线程安全的类:ConcurrentLinkedQueue和ConcurrentLinkedDeque。

ConcurrentHashMap

ConcurrentHashMap同HashMap一样也是基于散列表的map,但是它提供了一种与Hashtable完全不同的加锁策略,提供更高效的并发性和伸缩性。

JDK 1.7

ConcurrentHashMap在JDK 1.7中,提供了一种粒度更细的加锁机制来实现在多线程下更高的性能,这种机制叫分段锁(Lock Striping)。

提供的优点是:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能。

可以这样理解分段锁,就是将数据分段,对每一段数据分配一把锁。当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

有些方法需要跨段,比如size()、isEmpty()、containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

JDK 1.8

而在JDK 1.8中,ConcurrentHashMap主要做了两个优化:

  • 同HashMap一样,链表也会在长度达到8的时候转化为红黑树,这样可以提升大量冲突时候的查询效率;
  • 以某个位置的头结点(链表的头结点或红黑树的root结点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能。
ConcurrentSkipListMap/Set

在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的

JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使用ConcurrentSkipListMap实现。

线程池、Future

线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。

线程池

有两个核心的类:ThreadPoolExector和ScheduledThreadPoolExecutor,后者不仅可以执行某个任务,还可以周期性地执行任务。向线程池中提交的每个任务,都必须实现Runnable接口,通过最上面的Executor接口中的execute(Runnable command)向线程池提交任务。然后,在ExecutorService 中,定义了线程池的关闭接口shutdown(),还定义了可以有返回值的任务,也就是Callable

ThreadPoolExector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize:在线程池中始终维护的线程个数。

maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。

keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。

blockingQueue:线程池所用的队列类型。

threadFactory:线程创建工厂,可以自定义,也有一个默认的。

RejectedExecutionHandler handler

拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :

  1. ThreadPoolExecutor.AbortPolicy默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
  2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

线程池创建后处于RUNNING状态。

调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。

调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。线程池状态就是通过AtomicInteger类型的成员变量ctl来获取的。

获取的ctl值传入runStateOf方法,与~CAPACITY位与运算(CAPACITY是低29位全1的int变量)。

~CAPACITY在这里相当于掩码,用来获取ctl的高3位,表示线程池状态;而另外的低29位用于表示工作线程数

线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

处理流程
  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。
线程复用

一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?

原来,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。

四种常见的线程池

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CacheThreadPool运行流程如下:

  1. 提交任务进线程池。
  2. 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
  3. 尝试将任务添加到SynchronousQueue队列。
  4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
  5. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。

当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

1
2
3
4
5
6
7
8
9
10
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
Callable与Future

execute(Runnable command)接口是无返回值的,与之相对应的是一个有返回值的接口Future submit(Callable task),Callable也就是一个有返回值的Runnable

ScheduledThreadPoolExecutor
  • 延迟执行任务
  • 周期执行任务