程序员读书俱乐部 发表于 2020-12-31 14:50:59

ConcurrentHashMap源码解析,含6大焦点方法

本文是建立在上篇《HashMap源码分析》的底子上。其中的一些重复的方法和知识点不会再赘述。有迷惑的同学可以移步到上一篇文章。依旧以jdk1.8源码为底子来讲解ConcurrentHashMap。它的大体结构与HashMap相同,table容量同样要求是2的幂次。
HashMap高效快捷,但不安全,特别是2020年,安全很重要。现在市面上有3种提供安全的map的方式,分别是

[*]hashtable:相对古老的线程安全机制。任一时间只有一个线程能写操作。现在基本被服从更高的ConcurrentHashMap替代。
[*]synchronizedMap:Collections中的内部类,可以把普通的map转成线程安全的map。原理就是在操作对象上加synchronized。
[*]ConcurrentHashMap:线程安全的HashMap。
https://p1.pstatp.com/large/dfic-imagehandler/3c00c425-2e7f-4f42-bd4b-4885ac89d433
如何做到线程安全

多线程并发带来的题目现在总体有2种解决机制。

[*]悲观机制:以为最坏的结果肯定发生,从开始就设置一定规则最大限度减少发生概率,有点以防万一、未雨绸缪的意思。比如安全套,肯定不会每次都中,可万一呢?并发的悲观实现一般采用锁,java中的synchronized关键字也被广泛应用在多线程并发的场景中。但是锁会低落步伐性能。
[*]乐观机制:以为结果总是好的,先干了再说,不行再想办法:重试,调停,版本号控制等。意外怀孕的痴男怨女们可能就太乐观了,事后只能接纳调停措施。CAS就是乐观机制。
ConcurrentHashMap中重要采用的CAS+自旋,改成功就改,改不成功继续试(自旋)。也有synchronized配合利用。
CAS & Unsafe
CAS的全称是Compare And Swap,即比较交换,它也是JUC的底子。我们只是简单介绍它的原理,细节题目需要同学们另行研究。
/*   * v-表示要更新的变量,e-表示预期值,n-新值。   * 方法的目的是给变量v修改值。   * 假如变量v的值和预期值e相等就把v的值改成n,返回true,假如不相等就返回false,有其他线程修改该值。   * 这里可能出现ABA题目(从A变成B又变回A,造成变量没变得假象),但java做了很多优化。可以忽略不计。*/boolean CAS(v,e,n) cas流程很好理解,但在多cpu多线程的情况下会不会不安全,放心安全。java的cas其实是通过Unsafe类方法调用cpu的一条cas原子指令。操作系统自己是对内存操作做了线程安全的,篇幅太少也说不清晰,这里大家可以自行研究一下JMM,JMM不是本文重点。这里只要知道结论就是CAS可以保证原子性。不过它只提供单个变量的原子性,多个变量的原子性还需要借助synchronized。
Unsafe是java里的另类,java有个特点是安全,并不答应步伐员直接操作内存,而Unsafe类可以,才有条件执行CAS方法。但是不建议大家利用Unsafe.class,因为不安全,sun公司有计划取消Unsafe。
源码解析

sizeCtl & constructor

ConcurrentHashMap和HashMap在各自的构造函数中都没有做数组初始化,初始化放在了第一次添加元素中。值得注意的是ConcurrentHashMap中有一个属性sizeCtl特别重要,理清晰它的变革,也就理解了整个Map源码的流程。下面是它的说明
       /**   * Table initialization and resizing control.When negative, the   * table is being initialized or resized: -1 for initialization,   * else -(1 + the number of active resizing threads).Otherwise,   * when table is null, holds the initial table size to use upon   * creation, or 0 for default. After initialization, holds the   * next element count value upon which to resize the table.   *
   * 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义   *
   * 1. 当为负数时:-1代表正在初始化,-N代表有N-1个线程正在举行扩容   *
   * 2.当为0时:代表当时的table还没有被初始化   *
   * 3.当为正数时:未初始化表示的是初始化数组的初始容量,假如已经初始化,   * 记录的是扩容的阈值(达到阈值举行扩容)   */    private transient volatile int sizeCtl;再看一下ConcurrentHashMap带初始化容量的代码
   /**   * Creates a new, empty map with an initial table size   * accommodating the specified number of elements without the need   * to dynamically resize.   *   * @param initialCapacity The implementation performs internal   * sizing to accommodate this many elements.   * @throws IllegalArgumentException if the initial capacity of   * elements is negative   *   * 此时sizeCtl记录的就是数组的初始化容量   *      * 比如initialCapacity=5   * 调用tableSizeFor(5+5/2+1)==tableSizeFor(8)   */    public ConcurrentHashMap(int initialCapacity) {      if (initialCapacity < 0)            throw new IllegalArgumentException();      int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?                   MAXIMUM_CAPACITY :                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));      this.sizeCtl = cap;    }      /**   * Returns a power of two table size for the given desired capacity.   * See Hackers Delight, sec 3.2   * 返回一个大于即是c的2的幂次方数   *       * 当c=8时   * n = c-1=7   * 接下来验算最闭幕果   * 0000 0000 0000 0000 0000 0000 0000 0111   * >>> 1   * = 0000 0000 0000 0000 0000 0000 0000 0011   * | 0000 0000 0000 0000 0000 0000 0000 0111   * = 0000 0000 0000 0000 0000 0000 0000 0111   *>>> 2   * = 0000 0000 0000 0000 0000 0000 0000 0001   * | 0000 0000 0000 0000 0000 0000 0000 0111   * = 0000 0000 0000 0000 0000 0000 0000 0111   *>>> 4   * = 0000 0000 0000 0000 0000 0000 0000 0000   * | 0000 0000 0000 0000 0000 0000 0000 0111   * = 0000 0000 0000 0000 0000 0000 0000 0111   * 下面再 >>> 8 和 >>> 16后的二进制都是0   * 所以最闭幕果就是111,也就是7最后返回结果再+1,即是8   *      * 总结 右移一共1+2+4+8+16=31位,和与之对应 | 运算   * 最终把n的二进制中全部1移到低位。新的数高位都是0,低位都是1。这样格式的数在HashMap中提到过,就是2的幂次-1。   * 最后结果是这个数+1,那就是2的幂次。   */    private static final int tableSizeFor(int c) {      int n = c - 1;      n |= n >>> 1;      n |= n >>> 2;      n |= n >>> 4;      n |= n >>> 8;      n |= n >>> 16;      return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;    }当我们new ConcurrentHashMap(c) 时,初始化容量并不是c,而是一个大于即是c的2的幂次方数。我们利用发射来验证下
public static void main(String[] args) {      ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(5);      Class clazz = concurrentHashMap.getClass();      try {            Field field = clazz.getDeclaredField("sizeCtl");            //打开私有访问            field.setAccessible(true);            //获取属性            String name = field.getName();            //获取属性值            Object value = field.get(concurrentHashMap);            System.out.println("ConcurrentHashMap的初始容量为:= "+value);      } catch (Exception e) {            e.printStackTrace();      }    }--打印结果是: Map的初始容量=8put & putVal

    /**   * Maps the specified key to the specified value in this table.   * Neither the key nor the value can be null.   *   *
The value can be retrieved by calling the {@code get} method   * with a key that is equal to the original key.   *   * @param key   key with which the specified value is to be associated   * @param value value to be associated with the specified key   * @return the previous value associated with {@code key}, or   * {@code null} if there was no mapping for {@code key}   * @throws NullPointerException if the specified key or value is null   */    @Override    public V put(K key, V value) {      return putVal(key, value, false);    }    /**   * Implementation for put and putIfAbsent   *   */    final V putVal(K key, V value, boolean onlyIfAbsent) {      //假如有空值大概空键,直接抛非常      if (key == null || value == null) {            throw new NullPointerException();      }      //两次hash,减少hash冲突,可以均匀分布      int hash = spread(key.hashCode());      int binCount = 0;      //迭代当前table      for (Node[] tab = table; ; ) {            Node f;            int n, i, fh;            //1. 假如table未初始化,先初始化            if (tab == null || (n = tab.length) == 0) {                tab = initTable();            }            //假如i位置没有数据,cas插入            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                //cas和外侧else if条件形成双保险,保证数据安全                if (casTabAt(tab, i, null,                        new Node(hash, key, value, null))) {                  break;// no lock when adding to empty bin                }            }            //2. hash值是MOVED表示数组正在扩容,则协助扩容,先扩容在新加元素            else if ((fh = f.hash) == MOVED) {                tab = helpTransfer(tab, f);            } else {                //hash计算的bucket不为空,且当前没有处于扩容操作,举行元素添加                V oldVal = null;                //对当前bucket举行加锁,保证线程安全,执行元素添加操作                synchronized (f) {                  //判断是否为f,防止它变成tree                  if (tabAt(tab, i) == f) {                        //hash值>=0 表示该节点是链表结构                        if (fh >= 0) {                            binCount = 1;                            //e记录的是头节点                            for (Node e = f; ; ++binCount) {                              K ek;                              //相同的key举行put就会覆盖原先的value                              if (e.hash == hash &&                                        ((ek = e.key) == key ||                                                (ek != null && key.equals(ek)))) {                                    oldVal = e.val;                                    if (!onlyIfAbsent)                                        e.val = value;                                    break;                              }                              Node pred = e;                              if ((e = e.next) == null) {                                    //插入链表尾部                                    pred.next = new Node(hash, key,                                          value, null);                                    break;                              }                            }                        } else if (f instanceof TreeBin) {                            Node p;                            binCount = 2;                            //红黑树结构旋转插入                            if ((p = ((TreeBin) f).putTreeVal(hash, key,                                    value)) != null) {                              oldVal = p.val;                              if (!onlyIfAbsent) {                                    p.val = value;                              }                            }                        }                  }                }                if (binCount != 0) {                  //链表长度大于8时转换红黑树                  if (binCount >= TREEIFY_THRESHOLD) {                        treeifyBin(tab, i);                  }                  if (oldVal != null) {                        return oldVal;                  }                  break;                }            }      }      //统计size,而且查抄是否需要扩容      addCount(1L, binCount);         return null;    }putVal()总体是自旋+CAS的方式,流程和HashMap一样。

[*]自旋:假如table==null,调用initTable()初始化假如没有hash碰撞就CAS添加假如正在扩容就协助扩容假如存在hash碰撞,假如是单向列表就插到bucket尾部,假如是红黑树就插入数结构假如链表bucket长度大于8,转红黑树假如添加成功就调用addCount()方法统计size,查抄是否需要扩容
从源码中可以看到put新元素时,假如发生hash冲突,先锁定发生冲突的bucket,不影响其他bucket操作,达到并发安全且高效的目的。下面是`putVal
initTable,初始化table

   /**   * Initializes table, using the size recorded in sizeCtl.   * 初始化table,从新记录sizeCtl值,此时值为数组下次扩容的阈值   */    private final Node[] initTable() {      Node[] tab;      int sc;      //再次判断空的table才气进入初始化操作      while ((tab = table) == null || tab.length == 0) {            // sizeCtl 0) ? sc : DEFAULT_CAPACITY;                        Node[] nt = (Node[]) new Node;                        table = tab = nt;                        //记录下次扩容的大小,相当于n-n/4=0.75n                        sc = n - (n >>> 2);                     }                } finally {                  //此时sizeCtl的值为下次扩容的阈值                  sizeCtl = sc;                }                break;            }      }      return tab;    }helpTransfer 协助扩容

    /**   * Helps transfer if a resize is in progress.   *
   * 假如数组正在扩容,协助之,多个工作线程一起扩容   * 从旧的table的元素复制到新的table中   *   */    final Node[] helpTransfer(Node[] tab, Node f) {      Node[] nextTab;      int sc;      //假如f是ForwardingNode,说明f正在扩容,hash值已经被标为MOVED。      //ForwardingNode.nextTable就是新table不为空      if (tab != null && (f instanceof ForwardingNode) &&                (nextTab = ((ForwardingNode) f).nextTable) != null) {            //根据 length 得到一个前16位的标识符,数组容量大小。            int rs = resizeStamp(tab.length);            //多重条件判断未扩容完成,还在举行中,新老数组都没有变,且sizeCtl> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||                        sc == rs + MAX_RESIZERS || transferIndex1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {            stride = MIN_TRANSFER_STRIDE; // subdivide range      }      //假如是扩容线程,此时新数组为null      if (nextTab == null) {            // initiating            try {                //构建新数组,其容量为原来容量的2倍                @SuppressWarnings("unchecked")                Node[] nt = (Node[]) new Node[n =bound 表示当前线程分配的迁移任务还没有完成            while (advance) {                int nextIndex, nextBound;                if (--i >= bound || finishing) {                  advance = false;                //没有元素需要迁移                } else if ((nextIndex = transferIndex)stride ?                                        nextIndex - stride : 0))) {                  bound = nextBound;                  i = nextIndex - 1;                  advance = false;                }            }            //没有更多的需要迁移的bucket            if (i < 0 || i >= n || i + n >= nextn) {                int sc;                // 扩容竣事后,table指向新数组,重新计算扩容阈值,赋值给sizeCtl                if (finishing) {                  nextTable = null;                  table = nextTab;                  sizeCtl = (n >> 1);                  return;                }                // 扩容任务线程数减1                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                  //判断当前全部扩容任务是否执行完成,相等表明完成                  if ((sc - 2) != resizeStamp(n) = 0 ,表示为链表节点                        if (fh >= 0) {                            // 构造两个链表,一个是原链表,另一个是原链表的反序分列                            int runBit = fh & n;                            Node lastRun = f;                            for (Node p = f.next; p != null; p = p.next) {                              int b = p.hash & n;                              if (b != runBit) {                                    runBit = b;                                    lastRun = p;                              }                            }                            if (runBit == 0) {                              ln = lastRun;                              hn = null;                            } else {                              hn = lastRun;                              ln = null;                            }                            for (Node p = f; p != lastRun; p = p.next) {                              int ph = p.hash;                              K pk = p.key;                              V pv = p.val;                              if ((ph & n) == 0) {                                    ln = new Node(ph, pk, pv, ln);                              } else {                                    hn = new Node(ph, pk, pv, hn);                              }                            }                            // 先扩容再插入相应值                            //新table的i位置添加元素                            setTabAt(nextTab, i, ln);                            //新table的i+1位置添加元素                            setTabAt(nextTab, i + n, hn);                            // 旧table i 位置处插上ForwardingNode,表示该节点已经处置惩罚过                            setTabAt(tab, i, fwd);                            advance = true;                            // 红黑树处置惩罚逻辑,实质上是维护双向链表                        } else if (f instanceof TreeBin) {                            TreeBin t = (TreeBin) f;                            TreeNode lo = null, loTail = null;                            TreeNode hi = null, hiTail = null;                            int lc = 0, hc = 0;                            for (Node e = t.first; e != null; e = e.next) {                              int h = e.hash;                              TreeNode p = new TreeNode                                        (h, e.key, e.val, null, null);                              if ((h & n) == 0) {                                    if ((p.prev = loTail) == null) {                                        lo = p;                                    } else {                                        loTail.next = p;                                    }                                    loTail = p;                                    ++lc;                              } else {                                    if ((p.prev = hiTail) == null) {                                        hi = p;                                    } else {                                        hiTail.next = p;                                    }                                    hiTail = p;                                    ++hc;                              }                            }                            // 扩容后红黑树节点个数若

knppt 发表于 2020-12-31 20:17:41

转发了

闪亮的星2017 发表于 2020-12-31 16:07:35

转发了
页: [1]
查看完整版本: ConcurrentHashMap源码解析,含6大焦点方法