ConcurrentHashMap是如何实现线程安全的

原创 吴就业 88 0 2018-11-20

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/21c35857276346eba8f08208e25608dc

作者:吴就业
链接:https://wujiuye.com/article/21c35857276346eba8f08208e25608dc
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

ConcurrentHashMap和HashTable

HashTable实现线程安全是将整个hash表锁住的,效率可想而知。 ConcurrentHashMap 在1.7中 实现线程安全是通过锁住Segment对象的。而在1.8 中则是针对首个节点(table[hash(key)]取得的链接或红黑树的首个节点)进行加锁操作。

ConcurrentHashMap 1.7

ConcurrentHashMap将数据分别放到多个Segment中,默认16个,每一个Segment中又包含了多个HashEntry列表数组,对于一个key,需要经过三次hash操作,才能最终定位这个元素的位置,这三次hash分别为:

  1. 对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);
  2. 将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;
  3. 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。

每一个Segment都拥有一个锁,当进行写操作时,只需要锁定一个Segment,而其它Segment中的数据是可以访问的。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile HashEntry<K,V>[] table;
        transient int count;
    }

ConcurrentHashMap 1.8

ConcurrentHashMap在jdk1.8中取消segments字段,直接采用transient volatile Node[] table保存数据,采用table数组元素(链表的首个元素或者红黑色的根节点)作为锁,从而实现了对每一行数据进行加锁,减少并发冲突的概率。

transient volatile Node<K,V>[] table;

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        ......
}

ConcurrentHashMap在jdk1.8中将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。jdk1.8中对于链表个数小于等于默认值8的链表使用单向链接,而个数超对默认值的时候将链表转换为红黑树,查询时间复杂度优化到了O(logN)【二分查找】,O(logN)即二叉树的深度。

public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            ......
            else {
                V oldVal = null;
                synchronized (f) {
                    ......
                }
               ......
            }
        }
        addCount(1L, binCount);
        return null;
    }

参考文章 Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进 ConcurrentHashMap实现线程安全的原理

附:红黑树

package wjy.stu.rbtree;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 红黑树,非线程安全
 * 二分查找平均只需要log(o) 时间复杂度 (o为节点总数)
 * 红黑树的最坏查找时间为2log(o)
 * 以下注释"^"表示"次方"
 * 如果有一亿个节点,那么2^(n-1)(2的n-1次方)+ 1 = 1亿 ==> 查找次数只需要大约45次
 * 1:                      40
 * 2:            30                  50
 * 3:      28         34        45        60
 * 4:   16    29   31    38  40    43  54     70
 *
 * 15个数,深度为n=4, 2^(n-1)+1= 15 ,那么查找的次数为n次,n=log(15),以2为底节点总数的对数
 *
 * @author wjy
 */
public class RBTree<K, V> {

    /**
     * 定义节点
     *
     * @author wjy
     */
    static final class Node<K, V> {

        public static final boolean RED = false;//红色
        public static final boolean BLACK = true;//黑色
        private boolean color;//颜色:红、黑

//        private final K key; //key不能修改
        private K key;
        private V value;

        Node<K, V> parent;//父节点
        Node<K, V> left;//左节点
        Node<K, V> right;//右节点

        public Node(){}
        public Node(K key, V value) {
            this.key = key;
            this.value = value;
        }

        public Node(K key, V value, Node parent) {
            this.key = key;
            this.value = value;
            this.parent = parent;
        }

        public void setValue(V value) {
            this.value = value;
        }

        public V getValue() {
            return value;
        }

        @Override
        public int hashCode() {
            int keyHash = (key == null ? 0 : key.hashCode());
            int valueHash = (value == null ? 0 : value.hashCode());
            return keyHash ^ valueHash;//(keyHash与valueHash按位异或)
        }

        @Override
        public boolean equals(Object obj) {
            if (!(obj instanceof Node))
                return false;
            Node node = (Node) obj;
            return (this.key.equals(node.key) && this.value.equals(node.value));
        }
    }


    private Node root;//根节点
    private int size;//实际存储的节点的多少

    public RBTree() {
        clear();
    }

    //key的比较器,根据key来判断大小
    private Comparator<Object> comparator = new Comparator<Object>() {

        @Override
        public int compare(Object o1, Object o2) {
            if (o1.getClass() != o2.getClass()) {
                throw new RuntimeException("key类型不相同无法比较");
            }
            if (o1 instanceof Number) {
                if (o1 instanceof Short) {
                    return (short) o1 - (short) o2;
                } else if (o1 instanceof Integer) {
                    return (int) o1 - (int) o2;
                } else if (o1 instanceof Byte) {
                    return (byte) o1 - (byte) o2;
                } else if (o1 instanceof Float) {
                    return (float) o1 - (float) o2 > 0 ? 1 : ((float) o1 - (float) o2 < 0 ? -1 : 0);
                } else if (o1 instanceof Double) {
                    return (double) o1 - (double) o2 > 0 ? 1 : ((double) o1 - (double) o2 < 0 ? -1 : 0);
                }
            } else if (o1 instanceof String) {
                //字符串就使用字符串的比较器
                return ((String) o1).compareTo((String) o2);
            }

            if (o1 instanceof Comparable) {
                return ((Comparable) o1).compareTo(o2);
            }
            throw new RuntimeException("未实现Comparable接口,无法进行比较");
        }
    };

    /**
     * 清楚节点只需要将根节点引用设置为null,
     * 那么就没有任何引用这颗树的地方了,系统gc会回收内存
     */
    public void clear() {
        this.root = null;
        this.size = 0;
    }

    /**
     * 获取节点总数,即key-value的总数
     *
     * @return
     */
    public int getSize() {
        return this.size;
    }

    /**
     * k1大于k2返回正数
     * k1小于k2返回负数
     * k1==k2返回0
     *
     * @param k1
     * @param k2
     * @return
     */
    final int comp(Object k1, Object k2) {
        return comparator.compare(k1, k2);
    }

    /**
     * 根据key获取值
     * 只需要二分查找即可
     *
     * @param key
     * @return
     */
    public V get(K key) {
        Node prt = root;
        int comp;
        while (prt != null) {
            comp = comp(prt.key, key);//根据key的大小比较进行二分查找
            if (comp == 0) {
                return (V) prt.value;
            } else if (comp < 0) {
                //左
                prt = prt.left;
            } else {
                //右
                prt = prt.right;
            }
        }
        return null;//找不到即没有,返回null
    }

    /**
     * 添加一个key-value,如果存在相同的key则替换新的值并将旧的值返回
     *
     * @param key
     * @param value
     * @return 如果有返回旧的值
     */
    public V put(K key, V value) {
        Node<K, V> newNode = new Node<K, V>(key, value);

        Node prt = root;
        Node parent;
        if (prt == null) {
            //跟节点为空
            root = newNode;
            root.color = Node.BLACK;//根节点必须为黑色
            size++;
            return null;
        }
        int comp;
        do {
            comp = comp(key,prt.key);//根据key的大小比较进行二分查找
            if (comp == 0) {
                prt.value = value;//替换新的值
                return (V) prt.value;//返回旧的值
            } else if (comp < 0) {
                parent = prt;
                //左
                prt = prt.left;
            } else {
                parent = prt;
                //右
                prt = prt.right;
            }
        } while (prt != null);//直到叶子节点

        //将新的节点放到父节点的左右孩子位置
        newNode.parent = parent;
        if (comp > 0) {
            //放在右孩子
            parent.right = newNode;
        } else {
            //放在左孩子
            parent.left = newNode;
        }
        size++;
        //需要判断是否破坏了红黑树规则,如果是需要替换颜色以及旋转节点
        adjustAferInsert(newNode);
        return null;
    }


    /**
     * 在插入新节点之后调整红黑树
     * 需要判断是否破坏了红黑树规则,如果是需要替换颜色以及旋转节点
     * 性质1.节点是红色或黑色。  =>这个条件肯定满足
     * 性质2.根节点是黑色。      =>在变色或旋转之后可能会导致根节点是红色的(旋转后出现原来是红色节点的节点变成了根节点)
     * 性质3 每个叶节点(NIL节点,空节点)是黑色的。 =>这条肯定是true,null节点。
     * 性质4 每个红色节点的两个子节点都是黑色。(从每个叶子到根的所有路径上不能有两个连续的红色节点)
     * 性质5. 从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点。
     * ==> 新加的节点是红色而它的父节点可能会是红色就会破坏4、5这个规定
     *
     * @param node 新插入的节点
     */
    private void adjustAferInsert(Node node) {
        //新添加的节点设置为红色
        node.color = Node.RED;

        /**
         * 当前节点的叔父节点是红色就先进行变色,最后再进行旋转
         */
        while (node != null
                && node != root
                //如果父亲节点也是红色
                && node.parent.color == Node.RED) {
            //在等式不成立的情况下node.parent.parent.left可能为null
            if (node.parent == node.parent.parent.left) {
                /**             node.parent.parent
                 * node.parent                      node.parent.parent.right
                 * 即满足:
                 * ([node.parent]== [node.parent.parent].left)
                 */
                // 当前节点node的父节点的兄弟节点,可能为null
                Node<K, V> nodeParentBrother = node.parent.parent.right;
                if (getColorWithNode(nodeParentBrother) == Node.RED) {
                    //父亲节点的兄弟节点如果是红色

                    //将父亲节点和叔叔节点改为黑色,祖父节点改为红色
                    node.parent.color = Node.BLACK;
                    nodeParentBrother.color = Node.BLACK;
                    node.parent.parent.color = Node.RED;
                    //递归/循环修改
                    node = node.parent.parent;
                } else {
                    //如果父亲的兄弟节点为黑色(null节点也是黑色)

                    //如果当前节点是父亲节点的右节点,以父亲节点进行左旋转
                    if (node == node.parent.right) {
                        node = node.parent;
                        rotateLeft(node);
                    }
                    //父亲节点设为黑色,祖父节点设为红色再进行右旋转
                    node.parent.color = Node.BLACK;
                    node.parent.parent.color = Node.RED;
                    rotateRight(node.parent.parent);
                }
            } else {
                /**                 node.parent.parent
                 * node.parent.parent.left          node.parent
                 * 即满足:
                 * ([node.parent]== [node.parent.parent].right)
                 */
                // 当前节点node的父节点的兄弟节点,可能为null
                Node<K, V> nodeParentBrother = node.parent.parent.left;
                if (getColorWithNode(nodeParentBrother) == Node.RED) {
                    //父亲节点的兄弟节点如果是红色

                    //将父亲节点和叔叔节点改为黑色,祖父节点改为红色
                    node.parent.color = Node.BLACK;
                    nodeParentBrother.color = Node.BLACK;
                    node.parent.parent.color = Node.RED;
                    node = node.parent.parent;
                } else {
                    //如果当前节点是父亲节点的左节点,以父亲节点进行右旋转
                    if (node == node.parent.left) {
                        node = node.parent;
                        rotateRight(node);
                    }
                    //父亲节点设为黑色,祖父节点设为红色再进行左旋转
                    node.parent.color = Node.BLACK;
                    node.parent.parent.color = Node.RED;
                    rotateLeft(node.parent.parent);
                }
            }
        }

        //可以因为旋转或变色改变了根节点的颜色,所以需要将根节点的强制变回黑色
        root.color = Node.BLACK;
    }


    /**
     * 获取节点的颜色:红、黑
     *
     * @param node
     * @return
     */
    private boolean getColorWithNode(Node<K, V> node) {
        //如果节点为null,如nil叶子节点,返回黑色
        if (node == null) return Node.BLACK;
        return node.color;
    }

    /**
     * 以p节点左旋
     *                  40              以p节点(30)左旋后                40
     *          30(p)          50        =============>           36         50
     *       25     36(r)  45                                 30          45
     *  20                                               25
     *                                               20
     *
     * 以p节点左旋: 让p的右孩子代替p的位置,p成为自己右孩子的左孩子 (原本p的右孩子的右孩子还是不用改变的)
     *
     * @param p
     */
    private void rotateLeft(Node<K, V> p) {
        //p节点不能是空节点,节点p不为null==> p.parent不为null
        //p的右节点不为null==> (r = p.right) r!=null && r.p!=nulll
        if (p == null || p.right == null) return;
        Node<K, V> r = p.right;
        /**
         *               40              以p节点(30)左旋后
         *        30            50        =============>           30(p)
         *  25        36    45                                         32(r.left)
         *         32
         */
        p.right = r.left;

        /**
         *               40              以p节点(30)左旋后
         *        30            50        =============>              30(p)
         *  25        36    45                                            32(r.left)
         *         32
         */
        if (r.left != null)
            r.left.parent = p;

        /**
         *               40              以p节点(30)左旋后                40(p.p)
         *        30            50        =============>           36(r)
         *  25        36    45
         *         32
         */
        r.parent = p.parent;

        /**
         * 如果原来的节点p是父节点的左节点,那么将p的父节点的left指向旋转后即p的右节点
         * 否则如果原来的节点p是父节点的右节点,那么将p的父节点的right指向旋转后即p的右节点
         *               40              以p节点(30)左旋后                40(p.p)
         *        30            50        =============>           36(r)
         *  25        36    45
         *         32
         */
        if (p.parent != null) {
            if (p.parent.left == p) {
                p.parent.left = r;
            } else {
                p.parent.right = r;
            }
        } else {
            /**
             * 如果p节点的父节点为null,那么将旋转后的p即r设置为根节点
             * 这里30的根节点是40,所以不能设置为根节点
             */
            root = r;
        }
        /**
         * 将p的右孩子设置的left位置为p
         * 将p的父节点位置为其右节点(r)
         *               40              以p节点(30)左旋后                40
         *        30            50        =============>           36(r)
         *  25        36    45                                 30(p)
         *         32
         */
        r.left = p;
        p.parent = r;
    }

    /**
     * 以p节点右旋
     *                 40              以p节点(30)左旋后                       40|
     *         30(p)           50        =============>                25       |     50
     *    25(l)     36    45                                      20       30   |  45
     * 20     nil(l.right)                                                    36|
     *
     * 以p节点右旋: 让p的左孩子代替p的位置,p成为自己左孩子的右孩子。
     *
     * @param p
     */
    private void rotateRight(Node<K, V> p) {
        if (p == null || p.left == null) return;
        Node<K, V> l = p.left;

        /**
         * 这里l的右为nil,如果不为nil则需要将l.right的父节点指向p
         *          p
         *  l.right
         *  就是:
         *  p.left = l.right; l.right.parent = p;
         */
        p.left = l.right;
        if (l.right != null)
            l.right.parent = p;

        /**
         *  将l节点的父节点指向p节点的父节点
         */
        l.parent = p.parent;

        if (p.parent != null) {
            if (p.parent.left == p) {
                /**
                 *           p
                 *       l
                 */
                p.parent.left = l;
            } else {
                /**
                 *           p
                 *                l
                 */
                p.parent.right = l;
            }
        } else {
            //p节点是根节点,那么旋转后l节点就是根节点
            root = l;
        }

        /**
         *    l
         *        p
         *  就是:
         *  l.right = p; p.parent = l;
         */
        l.right = p;
        p.parent = l;
    }


    /**
     * 打印红黑树
     */
    public void showRBTree() throws InterruptedException {
        //根据查找时间赋值度不超过2倍【n=log(o)】的条件,将最大深度设计为2n
        int n = (int) Math.sqrt(this.size);
        int sd = 2 * n;
        Node<K,V> ptr = root;
        //随便找个队列用了,用于二叉树的广度优先遍历
        Queue<Node<K,V>> nodeQueue = new LinkedBlockingQueue<>();
        ((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr);
        int currentCeng = 1,currentPrintNodeCount=0;
        //为了打印的平衡,需要将空的左节点和右节点打印出占位符"nil"
        Node<K,V> nil = new Node<>();
        while ((ptr=nodeQueue.poll())!=null){
            if(ptr.left!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr.left);
            else if(ptr.key!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(nil);//占位
            if(ptr.right!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr.right);
            else if(ptr.key!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(nil);//占位
            //根据深度计算空格数量
            currentPrintNodeCount++;
            if(Math.pow(2,(currentCeng-1))>currentPrintNodeCount){
                //还在同一层
            }else{
                //下一层了
                currentCeng++;
                System.out.println("\n");
            }
            int nullCharCount = (int) Math.pow(2,(sd-currentCeng));
            printfNullChar(nullCharCount);
            if(ptr==nil)
                System.out.print("  ");
            else
                System.out.print(ptr.key);//也可以打印value,因为用key生成红黑树,所以打印key
            printfNullChar(nullCharCount);
        }
    }

    /**
     * 打印空格
     * @param count
     */
    private void printfNullChar(int count){
        StringBuffer sb = new StringBuffer();
        while (count>=0){
            sb.append("    ");//四个空格
            count--;
        }
        System.out.print(sb.toString());
    }
}

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

使用Jhat排查问题实战:查看类型为List的静态字段的大小

我使用浏览器的开发者功能,找到api,确实服务器返回给浏览器的结果是空数据。然后我顺藤摸瓜找到了这个api的代码,结果发现又是使用的内存缓存。本篇介绍如何借助JHat的强大功能查看内存缓存是否是空的。

JVM方法表、栈桢、局部变量表、操作数栈的理解

看懂Java字节码首先得要了解栈桢和方法表,这两个知识点是比较重要的。另外了解这两个知识点还有助于指导Java性能调优工作。

Redis数据持久化策略

我们可以通过修改redis.conf配置文件来选择使用持久化策略,redis提供了三种持久化策略:RDB快照、AOF(Append-only file)、混合策略。

Spring源码分析ioc容器总结篇

这篇文章是对前面几篇文章做一个小节,本篇不写一行代码,单纯的文字总结。

Spring源码分析(四)bean工厂初始化阶段

继续介绍AnnotationConfigApplicationContext工作流程,本篇介绍refresh方法,即bean工厂的初始化阶段。读完本篇你会了解到bean是什么时候被全部扫描成BeanDefinition注入到工厂中的。还有一些后置处理器的职责。

Spring源码分析(二)bean的创建过程

上一篇有说到过BeanDefinition,主要关注的是其扩展接口AnnotateBeanDefinition和其子类AnnotateGenericBeanDefinition。本篇先超前介绍spring是如果通过BeanDefinition来创建一个bean的。