ConcurrentHashMap 是Java并发包中提供的一个线程安全且高效的HashMap实现
HashMap的缺点:
多线程环境下HashMap会有线程安全题目,扩容可能会造成环形链表,使cpu空转达到100%,但是HashTable可以保证线程安全
HashTable缺点:
底层使用synchronized锁保证线程安全题目,但是将整个数组锁住了,最终只有一个线程能够调用get或者是put方法,如果没有获取锁的线程就会变为阻塞状态,效率非常低。
多线程环境下建议使用ConcurrentHashMap聚集
ConcurrentHashMap1.7源码解读#
ConcurrentHashMap相当于把一个大的ConcurrentHashMap聚集拆分成16个HashTable类,每次存放先要计算存放在哪个HashTable里面,然后还要计算存放在HashTable里面的哪个HashEntry里面,相当于把锁的粒度进行拆分了,把大锁拆分成小锁。
核心构造参数分析
//初始的容量private static final int DEFAULT_CAPACITY = 16;//最大容量private static final int MAXIMUM_CAPACITY = 1 > segmentShift) & segmentMask; //判定索引下是否有Segment对象,没有帮忙创建 s=Segment[j]=null if ((s = (Segment)UNSAFE.getObject(segments, (j . e=first一开始就是null(可以理解为即一开始就遍历到了尾节点) if (node != null) //这里有可能获取到锁是通过scanAndLockForPut方法内自旋获取到的,这种环境下依据找好或者说是新建好了对应节点,node不为空 node.setNext(first); // 当然也有可能是这里直接第一次tryLock就获取到了锁,从而node没有分配对应节点,即需要给依据插入的k,v来创建一个新节点 else node = new HashEntry(hash, key, value, first); int c = count + 1; //总数+1 在这里依据获取到了锁,即是线程安全的!对应了上述对count变量的使用规范阐明。 if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判定是否需要进行扩容 //扩容是直接重新new一个新的HashEntry数组,这个数组的容量是老数组的两倍, //新数组创建好后再依次将老的table中的HashEntry插入新数组中,以是这个过程是十分费时的,应尽量制止。 //扩容完毕后,还会将这个node插入到新的数组中。 rehash(node); else //数组无需扩容,那么就直接插入node到指定index位置,这个方法里用的是UNSAFE.putOrderedObject //网上查阅到的资料关于使用这个方法的缘故起因都是说因为它使用的是StoreStore屏蔽,而不是十分耗时的StoreLoad屏蔽 //给我个人感觉就是putObjectVolatile是对写入对象的写入赋予了volatile语义,但是代价是用了StoreLoad屏蔽 //而putOrderedObject则是使用了StoreStore屏蔽保证了写入顺序的禁止重排序,但是未实现volatile语义导致更新后的不可见性, //当然这里由于是加锁了,以是在释放锁前会将所有变革从线程自身的工作内存更新到主存中。 //这一块对于putOrderedObject和putObjectVolatile的区别有点混乱,不是完全理解,网上也没找到具体解答,检察了C源码也是不大确定。 //盼望有理解的人看到能指点一下,后续如果弄明白了再更新这一块。 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue;}多线程下put方法获取锁失败,就会使用scanAndLockForPut遍历获取锁然后进行数据插入
private HashEntry scanAndLockForPut(K key, int hash, V value) { //获取k所在的segment中的HashEntry的头节点(segment中放得是HashEntry数组,HashEntry又是个链表结构) HashEntry first = entryForHash(this, hash); HashEntry e = first;//缓存需要插入的数据 HashEntry node = null; int retries = -1; // negative while locating node while (!tryLock()) {//自旋尝试获取k所在segment的锁 HashEntry f; // to recheck first below if (retries < 0) { if (e == null) {//所在HashEntry链表不存在,则根据传过来的key-value创建一个HashEntry if (node == null) node = new HashEntry(hash, key, value, null); retries = 0; } else if (key.equals(e.key))//找到要放得值,则设置segment重试次数为0 retries = 0; else //重新节点往下寻找key对应的HashEntry e = e.next; } else if (++retries > MAX_SCAN_RETRIES) {//凌驾最大重试次数就将当前操作放入到Lock的队列中 阻塞了 lock(); break; //retries只有0和1 如果发现头节点有变革,就重新获取HashEntry链表的头结点 } else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node;}trylock尝试获取锁不会阻塞,lock获取锁不成功会阻塞,先缓存了当前冲突的链表,然后每次检查当前链表头节点是否有变革,如果有变革阐明被修改过,因为采用的是头插法。在发生变革的环境下,修改为最新的。
通过MAX_SCAN_RETRIES控制自旋次数最大为64,防止无穷定的重复自旋浪费资源。
底层实现原理:
由多个差别的Segment对象构成,lock锁锁住,用Unsafe类查询主内存数据,使用cas做修改
假如有16个线程均匀分布在每个Segment上,往里面添加数据,锁没有产生竞争,那么效率是没有任何影响的。但如果有三个线程同时都获取到同一个Segment中,那么这三个key都会上锁,JDK1.7里面采用乐观锁lock保证线程安全。
ConcurrentHashMap1.8源码解读#
//默认为0 -1标示其他线程已经在扩容了 -2标示当前有两个线程在扩容private transient volatile int sizeCtl;put方法#
final V putVal(K key, V value, boolean onlyIfAbsent) { //key不允许为空 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //记录链表长度 大于8的环境下转为红黑树 int binCount = 0; //死循环 相当于自旋 table是全局可见变量 for (Node[] tab = table;;) { Node f; int n, i, fh; //第一次循环如果tab为空就做初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //第二次循环tab不为空 计算key所在的index 查找该node节点 如果没有发生index冲突 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //CAS 在当前位置赋值一个node节点 if (casTabAt(tab, i, null,new Node(hash, key, value, null))) break; // no lock when adding to empty bin } //其他线程在扩容,参与一起扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //index节点发生冲突之后 else { V oldVal = null; //给当前index位的node节点加锁 synchronized (f) { //再次赋值判定 如果是当前缓存的节点 if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; //key值比较相等就修改覆盖 if (e.hash == hash &&((ek = e.key) == key ||(ek != null&& key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //不相等的话 追加在链表后面 Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } //判定是否为红黑树 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // if (binCount != 0) { //binCount>8 转换为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //对每个线程里面保存的size进行求和 算出整个聚集的元素 addCount(1L, binCount); return null; }加锁分为了两种:
1、没有发生hash冲突的时候,如果添加的元素的位置在数组中是空的话,那么就使用CAS的方式来加入元素,这里加锁的粒度是数组中的元素
2、如果出现了hash冲突,添加的元素的位置在数组中已经有了值,使用的synchronized,那么又存在三种环境
(1)key相同,那么直接用新的元素覆盖旧的元素
(2)如果数组中的元素是链表的情势,那么将新的元素挂载在链表尾部
(3)如果数组中的元素是红黑树的情势,那么将新的元素加入到红黑树
锁住的对象就是数组中的node元素,加锁的粒度和第一种环境相同
初始化Table
private final Node[] initTable() { Node[] tab; int sc; //tab为空的时候 while ((tab = table) == null || tab.length == 0) { //sizeCtl如果小于0表示已经有其他线程在做扩容了 当前如果只有一个线程那么sizeCtl=0 if ((sc = sizeCtl) < 0) //当火线程就释放cpu执行权 如果线程被唤醒那么tab肯定不为空 Thread.yield(); //CAS修改SIZECTL为-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //如果当前tab为空的环境下 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //对node初始化长度默认16 Node[] nt = (Node[])new Node[n]; table = tab = nt; //n=16 16>>>2=4 sc=12 扩容巨细 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } //初始化完毕 return tab; }只有一个线程环境下就会把SIZECTL改为-1,然后赋值table结束循环返回数据,但是16个线程进来同时,只有一个能够进入到tab为空的方法判定,SIZECTL为0,这时候剩下15个不停在自旋等待SIZECTL修改为-1,假如第一个线程还没执行完就挂机,就会导致剩下的不停进行自旋,使CPU飙升。
口试题#
ConcurrentHashMap1.7底层实现原理?#
ReentrantLock+Segment+HashEntry
一个 ConcurrentHashMap 里包含一个 Segment 数组。一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构,当对 HashEntry 数组的数据进行修改时,必须起首获得对应的 Segment的锁。
ConcurrentHashMap1.7 put操作原理?#
put操作时需要计算两次hash索引,先计算所在Segment的下标,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后计算在该Segment中HashEntry 的下标,ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,使用尾插法。如果Segment被其他线程占据,当火线程会以自旋的方式继续尝试获取锁知道达到最大重试次数就挂起等待唤醒。
ConcurrentHashMap1.7是怎样实现扩容的?#
ConcurrentHashMap 的扩容是仅仅和每个Segment元素中HashEntry数组的长度有关,但需要扩容时,只扩容当前Segment中HashEntry数组即可。也就是说ConcurrentHashMap中Segment[]数组的长度是在初始化的时候就确定了默认16,后面扩容不会改变这个长度。
ConcurrentHashMap 的并发度是什么?#
步伐运行时能够同时更新 ConccurentHashMap且不产生锁竞争的最大线程数。在JDK1.7中,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度,默认是16,这个值可以在构造函数中设置。
如果自己设置了并发度,ConcurrentHashMap 会使用大于等于该值的最小的2的幂指数作为实际并发度,也就是比如你设置的值是17,那么实际并发度是32。
如果并发度设置的过小,会带来严肃的锁竞争题目;如果并发度设置的过大,本来位于同一个Segment内的访问会扩散到差别的Segment中,CPU cache掷中率会下降,从而引起步伐性能下降。
ConcurrentHashMap1.8底层实现原理?#
实际上就是HashMap的数据结构+synchronized锁,只需要锁住这个链表的头节点就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。
为什么ConcurrentHashMap1.8需要去除Segments分段锁?#
- 1.7中锁的粒度太大了,对每个Segment加了锁,为了减少并发冲突的概率,去除了Segments分段锁。
- JDK1.6以后 对 synchronized锁做了很多优化
- 在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存
ConcurrentHashMap1.8 put操作原理?#
根据 key 计算出 hash 值,判定是否需要进行初始化;
定位到 Node,拿到首节点 f,判定首节点 f:
如果为 null ,则通过 CAS 的方式尝试添加;
如果为 f.hash = MOVED = -1 ,阐明其他线程在扩容,参与一起扩容;
如果都不满足 ,synchronized 锁住 f 节点,判定是链表照旧红黑树,遍历插入;
为什么ConcurrentHashMap1.8使用synchronsized而不消lock?#
1.7lock锁是加上了while循环实现自旋,效率不高,JDK1.6以后 对 synchronized锁做了优化,
ConcurrentHashMap1.8如何让基于Node节点高效实现锁机制?#
hash没有发生冲突的时候使用CAS,已经发生冲突的时候使用synchronized锁。
多线程同时执行CAS效率真的非常高吗?#
不肯定,极低的概率下会发生CPU飙升的环境。
ConcurrentHashMap1.8和ConcurrentHashMap1.7的区别#
|