为什么要用ConcurrentHashMap?
1、线程不安全的HashMap
在多线程环境下,使用HashMap的put操作会引起死循环,原因是多线程会导致HashMap的Entry链表形成环形数据结构,导致Entry的next节点永远不为空,就会产生死循环获取Entry。
2、效率低下的HashTable
HashTable容器使用sychronized来保证线程安全,采取锁住整个表结构来达到同步目的,在线程竞争激烈的情况下,当一个线程访问HashTable的同步方法,其他线程也访问同步方法时,会进入阻塞或轮询状态;如线程1使用put方法时,其他线程既不能使用put方法,也不能使用get方法,效率非常低下。
3、ConcurrentHashMap的锁分段技术可提升并发访问效率
首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap的结构
- ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成;
- Segment是一种可重入锁(ReentrantLock),HashEntry用于存储键值对数据;
- 一个ConcurrentHashMap包含一个由若干个Segment对象组成的数组,每个Segment对象守护整个散列映射表的若干个桶,每个桶是由若干个HashEntry对象链接起来的链表,table是一个由HashEntry对象组成的数组,table数组的每一个数组成员就是散列映射表的一个桶。
HashEntry类
1 | static final class HashEntry<K,V> { |
在ConcurrentHashMap中,在散列时如果产生“碰撞”,将采用“分离链接法”来处理“碰撞”:把“碰撞”的HashEntry对象链接成一个链表。由于HashEntry的next域为final型,所以新节点只能在链表的表头处插入。
下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:
HashEntry对象的不变性
HashEntry对象的key、hash、next都声明为final类型,这意味着不能把节点添加到链表的中间和尾部,也不能再链表的中间和尾部删除节点;这个特性可以保证:在访问某个节点时,这个节点之后的链接不改变。
同时,HashEntry的value被声明为volatile类型,Java的内存模型可以保证:某个写线程对value的写入马上可以被后续的读线程看到。ConcurrentHashMap不允许用null为键和值,当读线程读到某个HashEntry的value为null时,便知道产生了冲突——发生了重排序现象,需要加锁后重新读这个value值。这些特性保证读线程不用加锁也能正确访问ConcurrentHashMap。
结构性修改操作:put、remove、clear
- clear只是把容器中所有的桶置空,每个桶之前引用的链表依然存在,正在遍历某个链表的读线程依然可以正常执行对该链表的遍历。
- put操作在插入一个新节点到链表时,会在链表头部插入新节点,此时,链表原有节点的链表并没有修改,不会影响读操作正常遍历这个链表。
- remove操作,首先根据散列码找到具体的链表,然后遍历这个链表找到要删除的节点,最后把待删除节点之后的所有节点原样保留在新链表中,把待删除节点之前的每个节点克隆到新链表中,注意克隆到新链表中的链接顺序被反转了。
删除之前的原链表:
删除节点C之后的链表:
总结:写线程对某个链表的结构性修改不会影响其他的并发读线程对这个链表的遍历访问。
Segment类
1 | static final class Segment<K,V> extends ReentrantLock implements Serializable { |
下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:
ConcurrentHashMap类
- ConcurrentHashMap在默认并发级别会创建包含16个Segment对象的数组。
- 每个Segment的成员对象table包含若干个散列表的桶。
- 每个桶是由HashEntry链接起来的一个链表。
- 如果键能均匀散列,每个Segment大约守护整个散列表中桶总数的 1/16。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
/**
* 散列映射表的默认初始容量为 16,即初始默认为 16 个桶
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_INITIAL_CAPACITY= 16;
/**
* 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与 table 数组长度的比值
* 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时,将触发再散列
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final float DEFAULT_LOAD_FACTOR= 0.75f;
/**
* 散列表的默认并发级别为 16。该值表示当前更新线程的估计数
* 在构造函数中没有指定这个参数时,使用本参数
*/
static final int DEFAULT_CONCURRENCY_LEVEL= 16;
/**
* segments 的掩码值,对应的二进制每一位都是1,等于ssize-1,最大值是65535,默认值是15
* key 的散列码的高位用来选择具体的 segment
*/
final int segmentMask;
/**
* 偏移量,用于定位参与散列运算的位数,等于32-sshift,最大值为16,默认值是28
*/
final int segmentShift;
/**
* 由 Segment 对象组成的数组
*/
final Segment<K,V>[] segments;
/**
* 创建一个带有指定初始容量、加载因子和并发级别的新的空映射。
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if(!(loadFactor > 0) || initialCapacity < 0 ||
concurrencyLevel <= 0)
throw new IllegalArgumentException();
if(concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// ssize从1向左移位的次数
int sshift = 0;
// Segment数组的长度,为2的N次方
int ssize = 1;
while(ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift; // 偏移量值
segmentMask = ssize - 1; // 掩码值
this.segments = Segment.newArray(ssize); // 创建数组
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if(c * ssize < initialCapacity)
++c;
// HashEntry数组的长度
int cap = 1;
while(cap < c)
cap <<= 1;
// 依次遍历每个数组元素
for(int i = 0; i < this.segments.length; ++i){
// 初始化每个数组元素引用的 Segment 对象
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
/**
* 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) 的空散列映射表。
*/
public ConcurrentHashMap() {
// 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
}
ConcurrentHashMap的操作
put操作
1、根据key算出对应的hash值
1 | public V put(K key, V value) { |
2、根据hash值找到对应的Segment对象
1 | /** |
3、在Segment中执行具体的put操作
1 | V put(K key, int hash, V value, boolean onlyIfAbsent) { |
插入操作需要两个步骤
1、是否需要扩容
在插入元素前会先判断Segment里面的HashEntry数组是否超过容量(threshold),如果超过则进行扩容。Segment的扩容比HashMap更恰当,HashMap是在插入元素后再判断元素是否已经到达容量。
2、如何扩容
首先会创建一个容量是原来容量两倍的数组,然后将原数组里面的元素进行再散列后插入到新数组里;ConcurrentHashMap不会对整个容器进行扩容,只对某个Segment进行扩容。
get操作
先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素,代码如下:
1 | public V get(Object key){ |
get操作的高效之处在于get过程不需要加锁,除非读到的值是null才会加锁重读。原因是它的get方法里面将要使用的共享变量都定义成volatile类型,在多线程之间保持可见性,原理是根据Java内存模型的happen-before原则,对volatile字段的写入操作先于读操作。
定位Segment和HashEntry的不同: 定位Segment使用的是元素的hashcode通过再散列后得到值的高位;定位HashEntry直接使用的是再散列后的值。
1 | //定位Segment的算法 |
size操作
做法是累加每个Segment里面的全局变量count,它是volatile类型,用来统计Segment中HashEntry的个数,但是不能直接进行累加,因为累加的时候count可能会发生变化。所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计过程中,容器的count发生了变化,则采用再加锁的方式来统计所有Segment的大小。
那是如何判断在统计的时候容器是否发生了变化呢? 使用modCount变量,在put、remove、clean方法里操作元素前都会将该变量进行加一,在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生了变化。
remove操作
1 | V remove(Object key, int hash, Object value) { |
ConcurrentHashMap实现高并发的总结
读操作的高效率
在实际的应用中,散列表一般的应用场景是:除了少数插入操作和删除操作外,绝大多数都是读取操作,而且读操作在大多数时候都是成功的。正是基于这个前提,ConcurrentHashMap针对读操作做了大量的优化。通过HashEntry对象的不变性和用volatile型变量协调线程间的内存可见性,使得大多数时候,读操作不需要加锁就可以正确获得值。
比HashTable和HashMap拥有更高并发性
相比于HashTable和用同步包装器包装的HashMap
Collections.synchronizedMap(new HashMap());
ConcurrentHashMap拥有更高的并发性。在HashTable和由同步包装器包装的HashMap中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。
ConcurrentHashMap的高并发性主要来自于三个方面
- 用分离锁实现多个线程间的更深层次的共享访问。
- 用HashEntery对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求。
- 通过对同一个volatile变量的写/读访问,协调不同线程间读/写操作的内存可见性。
参考来源: