本文是建立在上篇《HashMap源码分析》的底子上。其中的一些重复的方法和知识点不会再赘述。有迷惑的同学可以移步到上一篇文章。依旧以jdk1.8源码为底子来讲解ConcurrentHashMap。它的大体结构与HashMap相同,table容量同样要求是2的幂次。
HashMap高效快捷,但不安全,特别是2020年,安全很重要。现在市面上有3种提供安全的map的方式,分别是
- hashtable:相对古老的线程安全机制。任一时间只有一个线程能写操作。现在基本被服从更高的ConcurrentHashMap替代。
- synchronizedMap:Collections中的内部类,可以把普通的map转成线程安全的map。原理就是在操作对象上加synchronized。
- ConcurrentHashMap:线程安全的HashMap。
如何做到线程安全
多线程并发带来的题目现在总体有2种解决机制。
- 悲观机制:以为最坏的结果肯定发生,从开始就设置一定规则最大限度减少发生概率,有点以防万一、未雨绸缪的意思。比如安全套,肯定不会每次都中,可万一呢?并发的悲观实现一般采用锁,java中的synchronized关键字也被广泛应用在多线程并发的场景中。但是锁会低落步伐性能。
- 乐观机制:以为结果总是好的,先干了再说,不行再想办法:重试,调停,版本号控制等。意外怀孕的痴男怨女们可能就太乐观了,事后只能接纳调停措施。CAS就是乐观机制。
ConcurrentHashMap中重要采用的CAS+自旋,改成功就改,改不成功继续试(自旋)。也有synchronized配合利用。
CAS & Unsafe
CAS的全称是Compare And Swap,即比较交换,它也是JUC的底子。我们只是简单介绍它的原理,细节题目需要同学们另行研究。
/* * v-表示要更新的变量,e-表示预期值,n-新值。 * 方法的目的是给变量v修改值。 * 假如变量v的值和预期值e相等就把v的值改成n,返回true,假如不相等就返回false,有其他线程修改该值。 * 这里可能出现ABA题目(从A变成B又变回A,造成变量没变得假象),但java做了很多优化。可以忽略不计。 */ boolean CAS(v,e,n) cas流程很好理解,但在多cpu多线程的情况下会不会不安全,放心安全。java的cas其实是通过Unsafe类方法调用cpu的一条cas原子指令。操作系统自己是对内存操作做了线程安全的,篇幅太少也说不清晰,这里大家可以自行研究一下JMM,JMM不是本文重点。这里只要知道结论就是CAS可以保证原子性。不过它只提供单个变量的原子性,多个变量的原子性还需要借助synchronized。
Unsafe是java里的另类,java有个特点是安全,并不答应步伐员直接操作内存,而Unsafe类可以,才有条件执行CAS方法。但是不建议大家利用Unsafe.class,因为不安全,sun公司有计划取消Unsafe。
源码解析
sizeCtl & constructor
ConcurrentHashMap和HashMap在各自的构造函数中都没有做数组初始化,初始化放在了第一次添加元素中。值得注意的是ConcurrentHashMap中有一个属性sizeCtl特别重要,理清晰它的变革,也就理解了整个Map源码的流程。下面是它的说明
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. *
* 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 *
* 1. 当为负数时:-1代表正在初始化,-N代表有N-1个线程正在举行扩容 *
* 2.当为0时:代表当时的table还没有被初始化 *
* 3.当为正数时:未初始化表示的是初始化数组的初始容量,假如已经初始化, * 记录的是扩容的阈值(达到阈值举行扩容) */ private transient volatile int sizeCtl;再看一下ConcurrentHashMap带初始化容量的代码
/** * Creates a new, empty map with an initial table size * accommodating the specified number of elements without the need * to dynamically resize. * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative * * 此时sizeCtl记录的就是数组的初始化容量 * * 比如initialCapacity=5 * 调用tableSizeFor(5+5/2+1)==tableSizeFor(8) */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } /** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 * 返回一个大于即是c的2的幂次方数 * * 当c=8时 * n = c-1=7 * 接下来验算最闭幕果 * 0000 0000 0000 0000 0000 0000 0000 0111 * >>> 1 * = 0000 0000 0000 0000 0000 0000 0000 0011 * | 0000 0000 0000 0000 0000 0000 0000 0111 * = 0000 0000 0000 0000 0000 0000 0000 0111 * >>> 2 * = 0000 0000 0000 0000 0000 0000 0000 0001 * | 0000 0000 0000 0000 0000 0000 0000 0111 * = 0000 0000 0000 0000 0000 0000 0000 0111 * >>> 4 * = 0000 0000 0000 0000 0000 0000 0000 0000 * | 0000 0000 0000 0000 0000 0000 0000 0111 * = 0000 0000 0000 0000 0000 0000 0000 0111 * 下面再 >>> 8 和 >>> 16后的二进制都是0 * 所以最闭幕果就是111,也就是7最后返回结果再+1,即是8 * * 总结 右移一共1+2+4+8+16=31位,和与之对应 | 运算 * 最终把n的二进制中全部1移到低位。新的数高位都是0,低位都是1。这样格式的数在HashMap中提到过,就是2的幂次-1。 * 最后结果是这个数+1,那就是2的幂次。 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }当我们new ConcurrentHashMap(c) 时,初始化容量并不是c,而是一个大于即是c的2的幂次方数。我们利用发射来验证下
public static void main(String[] args) { ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(5); Class clazz = concurrentHashMap.getClass(); try { Field field = clazz.getDeclaredField("sizeCtl"); //打开私有访问 field.setAccessible(true); //获取属性 String name = field.getName(); //获取属性值 Object value = field.get(concurrentHashMap); System.out.println("ConcurrentHashMap的初始容量为:= "+value); } catch (Exception e) { e.printStackTrace(); } }--打印结果是: Map的初始容量=8put & putVal
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * *
The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */ @Override public V put(K key, V value) { return putVal(key, value, false); } /** * Implementation for put and putIfAbsent * */ final V putVal(K key, V value, boolean onlyIfAbsent) { //假如有空值大概空键,直接抛非常 if (key == null || value == null) { throw new NullPointerException(); } //两次hash,减少hash冲突,可以均匀分布 int hash = spread(key.hashCode()); int binCount = 0; //迭代当前table for (Node[] tab = table; ; ) { Node f; int n, i, fh; //1. 假如table未初始化,先初始化 if (tab == null || (n = tab.length) == 0) { tab = initTable(); } //假如i位置没有数据,cas插入 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //cas和外侧else if条件形成双保险,保证数据安全 if (casTabAt(tab, i, null, new Node(hash, key, value, null))) { break; // no lock when adding to empty bin } } //2. hash值是MOVED表示数组正在扩容,则协助扩容,先扩容在新加元素 else if ((fh = f.hash) == MOVED) { tab = helpTransfer(tab, f); } else { //hash计算的bucket不为空,且当前没有处于扩容操作,举行元素添加 V oldVal = null; //对当前bucket举行加锁,保证线程安全,执行元素添加操作 synchronized (f) { //判断是否为f,防止它变成tree if (tabAt(tab, i) == f) { //hash值>=0 表示该节点是链表结构 if (fh >= 0) { binCount = 1; //e记录的是头节点 for (Node e = f; ; ++binCount) { K ek; //相同的key举行put就会覆盖原先的value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { //插入链表尾部 pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node p; binCount = 2; //红黑树结构旋转插入 if ((p = ((TreeBin) f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) { p.val = value; } } } } } if (binCount != 0) { //链表长度大于8时转换红黑树 if (binCount >= TREEIFY_THRESHOLD) { treeifyBin(tab, i); } if (oldVal != null) { return oldVal; } break; } } } //统计size,而且查抄是否需要扩容 addCount(1L, binCount); return null; }putVal()总体是自旋+CAS的方式,流程和HashMap一样。
- 自旋:假如table==null,调用initTable()初始化假如没有hash碰撞就CAS添加假如正在扩容就协助扩容假如存在hash碰撞,假如是单向列表就插到bucket尾部,假如是红黑树就插入数结构假如链表bucket长度大于8,转红黑树假如添加成功就调用addCount()方法统计size,查抄是否需要扩容
从源码中可以看到put新元素时,假如发生hash冲突,先锁定发生冲突的bucket,不影响其他bucket操作,达到并发安全且高效的目的。下面是`putVal
initTable,初始化table
/** * Initializes table, using the size recorded in sizeCtl. * 初始化table,从新记录sizeCtl值,此时值为数组下次扩容的阈值 */ private final Node[] initTable() { Node[] tab; int sc; //再次判断空的table才气进入初始化操作 while ((tab = table) == null || tab.length == 0) { // sizeCtl 0) ? sc : DEFAULT_CAPACITY; Node[] nt = (Node[]) new Node[n]; table = tab = nt; //记录下次扩容的大小,相当于n-n/4=0.75n sc = n - (n >>> 2); } } finally { //此时sizeCtl的值为下次扩容的阈值 sizeCtl = sc; } break; } } return tab; }helpTransfer 协助扩容
/** * Helps transfer if a resize is in progress. *
* 假如数组正在扩容,协助之,多个工作线程一起扩容 * 从旧的table的元素复制到新的table中 * */ final Node[] helpTransfer(Node[] tab, Node f) { Node[] nextTab; int sc; //假如f是ForwardingNode,说明f正在扩容,hash值已经被标为MOVED。 //ForwardingNode.nextTable就是新table不为空 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode) f).nextTable) != null) { //根据 length 得到一个前16位的标识符,数组容量大小。 int rs = resizeStamp(tab.length); //多重条件判断未扩容完成,还在举行中,新老数组都没有变,且sizeCtl> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) { stride = MIN_TRANSFER_STRIDE; // subdivide range } //假如是扩容线程,此时新数组为null if (nextTab == null) { // initiating try { //构建新数组,其容量为原来容量的2倍 @SuppressWarnings("unchecked") Node[] nt = (Node[]) new Node[n =bound 表示当前线程分配的迁移任务还没有完成 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) { advance = false; //没有元素需要迁移 } else if ((nextIndex = transferIndex) stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //没有更多的需要迁移的bucket if (i < 0 || i >= n || i + n >= nextn) { int sc; // 扩容竣事后,table指向新数组,重新计算扩容阈值,赋值给sizeCtl if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n >> 1); return; } // 扩容任务线程数减1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //判断当前全部扩容任务是否执行完成,相等表明完成 if ((sc - 2) != resizeStamp(n) = 0 ,表示为链表节点 if (fh >= 0) { // 构造两个链表,一个是原链表,另一个是原链表的反序分列 int runBit = fh & n; Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) { ln = new Node(ph, pk, pv, ln); } else { hn = new Node(ph, pk, pv, hn); } } // 先扩容再插入相应值 //新table的i位置添加元素 setTabAt(nextTab, i, ln); //新table的i+1位置添加元素 setTabAt(nextTab, i + n, hn); // 旧table i 位置处插上ForwardingNode,表示该节点已经处置惩罚过 setTabAt(tab, i, fwd); advance = true; // 红黑树处置惩罚逻辑,实质上是维护双向链表 } else if (f instanceof TreeBin) { TreeBin t = (TreeBin) f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) { lo = p; } else { loTail.next = p; } loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) { hi = p; } else { hiTail.next = p; } hiTail = p; ++hc; } } // 扩容后红黑树节点个数若 |