Java 并发容器和框架
Java 并发容器和框架
ConcurrentHashMap 的实现原理与使用
- ConcurrentHashMap 是线程安全且高效的 HashMap。
- 为什么要使用 ConcurrentHashMap:
- 在并发编程中使用 HashMap 可能导致程序死循环。
- 使用线程安全的 HashTable 效率非常低下。
锁分段技术
- 将数据分成一段一段地存储,然后给每一段数据配一把锁。当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。
ConcurrentHashMap 的结构
- ConcurrentHashMap 里包含一个 Segment 数组。
Segment 数组结构
- Segment 是一种可重入锁(ReentrantLock)。
- 一个 Segment 里包含一个 HashEntry 数组。
- 当对 HashEntry 数组的数据进行修改时,必须首先获得与它对应的 Segment 锁。
HashEntry 数组结构
- 存储键值对数据。
ConcurrentHashMap 的初始化
- ConcurrentHashMap 初始化方法是通过 initialCapacity、loadFactor 和 concurrencyLevel(预估的并发数)等几个参数来初始化 Segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 Segment 里的 HashEntry 数组来实现的。
初始化 segments 数组
- segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出。
- 必须保证 segments 数组的长度是 2 的 N 次方。
- concurrencyLevel 的最大值是 65535,这意味着 segments 数组的长度最大为 65536。
- sshift 等于 ssize 从 1 向左移位的次数。在默认情况下 concurrencyLevel 等于 16,1 需要向左移位移动 4 次,所以 sshift 等于 4。
初始化 segmentShift 和 segmentMask
- 这两个全局变量需要在定位 segment 时的散列算法里使用。
- segmentShift 用于定位参与散列运算的位数,segmentShift 等于 32 减 sshift。
- segmentMask 是散列运算的掩码,等于 ssize 减 1。
初始化每个 segment
- initialCapacity 是 ConcurrentHashMap 的初始化容量。
- loadFactor 是每个 segment 的负载因子。
- segment 的容量 threshold =(int)cap * loadFactor,默认情况下 initialCapacity 等于 16,loadFactor 等于 0.75。
- cap 就是 segment 里 HashEntry 数组的长度。
ConcurrentHashMap 的操作
get 操作
- Segment 的 get 操作非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。
- get 方法里将要使用的共享变量都定义成 volatile 类型,如用于统计当前 Segment 大小的 count 字段和用于存储值的 HashEntry 的 value。
put 操作(必须加锁)
- 判断是否需要对 Segment 里的 HashEntry 数组进行扩容:
- 是否需要扩容:
- 先判断 Segment 里的 HashEntry 数组是否超过容量(threshold)。
- 如何扩容:
- 创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap 不会对整个容器进行扩容,而只对某个 Segment 进行扩容。
- 定位添加元素的位置,然后将其放在 HashEntry 数组。
- 是否需要扩容:
size 操作
- 先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有 Segment 的大小。
ConcurrentLinkedQueue
- 线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。
- 阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
- 非阻塞的实现方式则可以使用循环 CAS 的方式来实现。
ConcurrentLinkedQueue 的实现
- ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,它会返回队列头部的元素。它采用了“wait-free”算法(即 CAS 算法)来实现。
单线程入队
- 将入队节点设置成当前队列尾节点的下一个节点。
- 更新 tail 节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点。
多线程入队
- 定位出尾节点。
- 使用 CAS 算法将入队节点设置成尾节点的 next 节点。
- 入队方法永远返回 true,所以不要通过返回值判断是否成功。
出队列
- 当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。
Java 中的阻塞队列
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
JDK 7 提供了 7 个阻塞队列:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- 默认情况下不保证线程公平的访问队列。
- 访问者的公平性是使用可重入锁实现的。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- 默认和最大长度为 Integer.MAX_VALUE。
- 按照先进先出的原则对元素进行排序。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- 元素采取自然顺序升序排列。也可以自定义类实现
compareTo()
方法来指定元素排序规则,或者初始化PriorityBlockingQueue
时,指定构造参数Comparator
来对元素进行排序。 - 不能保证同优先级元素的顺序。
- 元素采取自然顺序升序排列。也可以自定义类实现
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
- 应用场景:
- 缓存系统的设计:可以用
DelayQueue
保存缓存元素的有效期,使用一个线程循环查询DelayQueue
,一旦能从DelayQueue
中获取元素时,表示缓存有效期到了。 - 定时任务调度:使用
DelayQueue
保存当天将会执行的任务和执行时间,一旦从DelayQueue
中获取到任务就开始执行,比如TimerQueue
就是使用DelayQueue
实现的。
- 缓存系统的设计:可以用
- 如何实现 Delayed 接口:
- 在对象创建时,初始化基本数据。
- 实现
getDelay
方法,该方法返回当前元素还需要延时多长时间,单位是纳秒。 - 实现
compareTo
方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。
- 如何实现延时阻塞队列:
- 当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。
SynchronousQueue:一个不存储元素的阻塞队列。
- 每一个
put
操作必须等待一个take
操作,否则不能继续添加元素。 - 负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。
- 每一个
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- 相对于其他阻塞队列,
LinkedTransferQueue
多了tryTransfer
和transfer
方法。transfer
方法:- 如果当前有消费者正在等待接收元素(消费者使用
take()
方法或带时间限制的poll()
方法时),transfer
方法可以把生产者传入的元素立刻transfer
(传输)给消费者。如果没有消费者在等待接收元素,transfer
方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
- 如果当前有消费者正在等待接收元素(消费者使用
tryTransfer
方法:tryTransfer
方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。与transfer
方法的区别是tryTransfer
方法无论消费者是否接收,方法立即返回,而transfer
方法是必须等到消费者消费了才返回。
- 相对于其他阻塞队列,
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- 双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
阻塞队列的实现原理
- 问题:如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?
- 使用通知模式实现:
- **当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
LockSupport.park(this)
**。- 只有以下 4 种情况中的一种发生时,该方法才会返回:
- 与 park 对应的 unpark 执行或已经执行时。“已经执行”是指 unpark 先执行,然后再执行 park 的情况。
- 线程被中断时。
- 等待完
time
参数指定的毫秒数时。 - 异常现象发生时,这个异常现象没有任何原因。
- 只有以下 4 种情况中的一种发生时,该方法才会返回:
- **当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
- 使用通知模式实现:
All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
Comment