ConcurrentHashMap和HashTable
HashTable实现线程安全是将整个hash表锁住的,效率可想而知。 ConcurrentHashMap 在1.7中 实现线程安全是通过锁住Segment对象的。而在1.8 中则是针对首个节点(table[hash(key)]取得的链接或红黑树的首个节点)进行加锁操作。
ConcurrentHashMap 1.7
ConcurrentHashMap将数据分别放到多个Segment中,默认16个,每一个Segment中又包含了多个HashEntry列表数组,对于一个key,需要经过三次hash操作,才能最终定位这个元素的位置,这三次hash分别为:
- 对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key);
- 将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素的放在哪个Segment;
- 将得到的h1进行第三次hash,得到hash值h3,也即h3 = hash3(h1),通过h3能够确定该元素放置在哪个HashEntry。
每一个Segment都拥有一个锁,当进行写操作时,只需要锁定一个Segment,而其它Segment中的数据是可以访问的。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
transient int count;
}
ConcurrentHashMap 1.8
ConcurrentHashMap在jdk1.8中取消segments字段,直接采用transient volatile Node[] table保存数据,采用table数组元素(链表的首个元素或者红黑色的根节点)作为锁,从而实现了对每一行数据进行加锁,减少并发冲突的概率。
transient volatile Node<K,V>[] table;
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
}
ConcurrentHashMap在jdk1.8中将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。jdk1.8中对于链表个数小于等于默认值8的链表使用单向链接,而个数超对默认值的时候将链表转换为红黑树,查询时间复杂度优化到了O(logN)【二分查找】,O(logN)即二叉树的深度。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
......
else {
V oldVal = null;
synchronized (f) {
......
}
......
}
}
addCount(1L, binCount);
return null;
}
参考文章 Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进 ConcurrentHashMap实现线程安全的原理
附:红黑树
package wjy.stu.rbtree;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 红黑树,非线程安全
* 二分查找平均只需要log(o) 时间复杂度 (o为节点总数)
* 红黑树的最坏查找时间为2log(o)
* 以下注释"^"表示"次方"
* 如果有一亿个节点,那么2^(n-1)(2的n-1次方)+ 1 = 1亿 ==> 查找次数只需要大约45次
* 1: 40
* 2: 30 50
* 3: 28 34 45 60
* 4: 16 29 31 38 40 43 54 70
*
* 15个数,深度为n=4, 2^(n-1)+1= 15 ,那么查找的次数为n次,n=log(15),以2为底节点总数的对数
*
* @author wjy
*/
public class RBTree<K, V> {
/**
* 定义节点
*
* @author wjy
*/
static final class Node<K, V> {
public static final boolean RED = false;//红色
public static final boolean BLACK = true;//黑色
private boolean color;//颜色:红、黑
// private final K key; //key不能修改
private K key;
private V value;
Node<K, V> parent;//父节点
Node<K, V> left;//左节点
Node<K, V> right;//右节点
public Node(){}
public Node(K key, V value) {
this.key = key;
this.value = value;
}
public Node(K key, V value, Node parent) {
this.key = key;
this.value = value;
this.parent = parent;
}
public void setValue(V value) {
this.value = value;
}
public V getValue() {
return value;
}
@Override
public int hashCode() {
int keyHash = (key == null ? 0 : key.hashCode());
int valueHash = (value == null ? 0 : value.hashCode());
return keyHash ^ valueHash;//(keyHash与valueHash按位异或)
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Node))
return false;
Node node = (Node) obj;
return (this.key.equals(node.key) && this.value.equals(node.value));
}
}
private Node root;//根节点
private int size;//实际存储的节点的多少
public RBTree() {
clear();
}
//key的比较器,根据key来判断大小
private Comparator<Object> comparator = new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
if (o1.getClass() != o2.getClass()) {
throw new RuntimeException("key类型不相同无法比较");
}
if (o1 instanceof Number) {
if (o1 instanceof Short) {
return (short) o1 - (short) o2;
} else if (o1 instanceof Integer) {
return (int) o1 - (int) o2;
} else if (o1 instanceof Byte) {
return (byte) o1 - (byte) o2;
} else if (o1 instanceof Float) {
return (float) o1 - (float) o2 > 0 ? 1 : ((float) o1 - (float) o2 < 0 ? -1 : 0);
} else if (o1 instanceof Double) {
return (double) o1 - (double) o2 > 0 ? 1 : ((double) o1 - (double) o2 < 0 ? -1 : 0);
}
} else if (o1 instanceof String) {
//字符串就使用字符串的比较器
return ((String) o1).compareTo((String) o2);
}
if (o1 instanceof Comparable) {
return ((Comparable) o1).compareTo(o2);
}
throw new RuntimeException("未实现Comparable接口,无法进行比较");
}
};
/**
* 清楚节点只需要将根节点引用设置为null,
* 那么就没有任何引用这颗树的地方了,系统gc会回收内存
*/
public void clear() {
this.root = null;
this.size = 0;
}
/**
* 获取节点总数,即key-value的总数
*
* @return
*/
public int getSize() {
return this.size;
}
/**
* k1大于k2返回正数
* k1小于k2返回负数
* k1==k2返回0
*
* @param k1
* @param k2
* @return
*/
final int comp(Object k1, Object k2) {
return comparator.compare(k1, k2);
}
/**
* 根据key获取值
* 只需要二分查找即可
*
* @param key
* @return
*/
public V get(K key) {
Node prt = root;
int comp;
while (prt != null) {
comp = comp(prt.key, key);//根据key的大小比较进行二分查找
if (comp == 0) {
return (V) prt.value;
} else if (comp < 0) {
//左
prt = prt.left;
} else {
//右
prt = prt.right;
}
}
return null;//找不到即没有,返回null
}
/**
* 添加一个key-value,如果存在相同的key则替换新的值并将旧的值返回
*
* @param key
* @param value
* @return 如果有返回旧的值
*/
public V put(K key, V value) {
Node<K, V> newNode = new Node<K, V>(key, value);
Node prt = root;
Node parent;
if (prt == null) {
//跟节点为空
root = newNode;
root.color = Node.BLACK;//根节点必须为黑色
size++;
return null;
}
int comp;
do {
comp = comp(key,prt.key);//根据key的大小比较进行二分查找
if (comp == 0) {
prt.value = value;//替换新的值
return (V) prt.value;//返回旧的值
} else if (comp < 0) {
parent = prt;
//左
prt = prt.left;
} else {
parent = prt;
//右
prt = prt.right;
}
} while (prt != null);//直到叶子节点
//将新的节点放到父节点的左右孩子位置
newNode.parent = parent;
if (comp > 0) {
//放在右孩子
parent.right = newNode;
} else {
//放在左孩子
parent.left = newNode;
}
size++;
//需要判断是否破坏了红黑树规则,如果是需要替换颜色以及旋转节点
adjustAferInsert(newNode);
return null;
}
/**
* 在插入新节点之后调整红黑树
* 需要判断是否破坏了红黑树规则,如果是需要替换颜色以及旋转节点
* 性质1.节点是红色或黑色。 =>这个条件肯定满足
* 性质2.根节点是黑色。 =>在变色或旋转之后可能会导致根节点是红色的(旋转后出现原来是红色节点的节点变成了根节点)
* 性质3 每个叶节点(NIL节点,空节点)是黑色的。 =>这条肯定是true,null节点。
* 性质4 每个红色节点的两个子节点都是黑色。(从每个叶子到根的所有路径上不能有两个连续的红色节点)
* 性质5. 从任一节点到其每个叶子的所有路径都包含相同数目的黑色节点。
* ==> 新加的节点是红色而它的父节点可能会是红色就会破坏4、5这个规定
*
* @param node 新插入的节点
*/
private void adjustAferInsert(Node node) {
//新添加的节点设置为红色
node.color = Node.RED;
/**
* 当前节点的叔父节点是红色就先进行变色,最后再进行旋转
*/
while (node != null
&& node != root
//如果父亲节点也是红色
&& node.parent.color == Node.RED) {
//在等式不成立的情况下node.parent.parent.left可能为null
if (node.parent == node.parent.parent.left) {
/** node.parent.parent
* node.parent node.parent.parent.right
* 即满足:
* ([node.parent]== [node.parent.parent].left)
*/
// 当前节点node的父节点的兄弟节点,可能为null
Node<K, V> nodeParentBrother = node.parent.parent.right;
if (getColorWithNode(nodeParentBrother) == Node.RED) {
//父亲节点的兄弟节点如果是红色
//将父亲节点和叔叔节点改为黑色,祖父节点改为红色
node.parent.color = Node.BLACK;
nodeParentBrother.color = Node.BLACK;
node.parent.parent.color = Node.RED;
//递归/循环修改
node = node.parent.parent;
} else {
//如果父亲的兄弟节点为黑色(null节点也是黑色)
//如果当前节点是父亲节点的右节点,以父亲节点进行左旋转
if (node == node.parent.right) {
node = node.parent;
rotateLeft(node);
}
//父亲节点设为黑色,祖父节点设为红色再进行右旋转
node.parent.color = Node.BLACK;
node.parent.parent.color = Node.RED;
rotateRight(node.parent.parent);
}
} else {
/** node.parent.parent
* node.parent.parent.left node.parent
* 即满足:
* ([node.parent]== [node.parent.parent].right)
*/
// 当前节点node的父节点的兄弟节点,可能为null
Node<K, V> nodeParentBrother = node.parent.parent.left;
if (getColorWithNode(nodeParentBrother) == Node.RED) {
//父亲节点的兄弟节点如果是红色
//将父亲节点和叔叔节点改为黑色,祖父节点改为红色
node.parent.color = Node.BLACK;
nodeParentBrother.color = Node.BLACK;
node.parent.parent.color = Node.RED;
node = node.parent.parent;
} else {
//如果当前节点是父亲节点的左节点,以父亲节点进行右旋转
if (node == node.parent.left) {
node = node.parent;
rotateRight(node);
}
//父亲节点设为黑色,祖父节点设为红色再进行左旋转
node.parent.color = Node.BLACK;
node.parent.parent.color = Node.RED;
rotateLeft(node.parent.parent);
}
}
}
//可以因为旋转或变色改变了根节点的颜色,所以需要将根节点的强制变回黑色
root.color = Node.BLACK;
}
/**
* 获取节点的颜色:红、黑
*
* @param node
* @return
*/
private boolean getColorWithNode(Node<K, V> node) {
//如果节点为null,如nil叶子节点,返回黑色
if (node == null) return Node.BLACK;
return node.color;
}
/**
* 以p节点左旋
* 40 以p节点(30)左旋后 40
* 30(p) 50 =============> 36 50
* 25 36(r) 45 30 45
* 20 25
* 20
*
* 以p节点左旋: 让p的右孩子代替p的位置,p成为自己右孩子的左孩子 (原本p的右孩子的右孩子还是不用改变的)
*
* @param p
*/
private void rotateLeft(Node<K, V> p) {
//p节点不能是空节点,节点p不为null==> p.parent不为null
//p的右节点不为null==> (r = p.right) r!=null && r.p!=nulll
if (p == null || p.right == null) return;
Node<K, V> r = p.right;
/**
* 40 以p节点(30)左旋后
* 30 50 =============> 30(p)
* 25 36 45 32(r.left)
* 32
*/
p.right = r.left;
/**
* 40 以p节点(30)左旋后
* 30 50 =============> 30(p)
* 25 36 45 32(r.left)
* 32
*/
if (r.left != null)
r.left.parent = p;
/**
* 40 以p节点(30)左旋后 40(p.p)
* 30 50 =============> 36(r)
* 25 36 45
* 32
*/
r.parent = p.parent;
/**
* 如果原来的节点p是父节点的左节点,那么将p的父节点的left指向旋转后即p的右节点
* 否则如果原来的节点p是父节点的右节点,那么将p的父节点的right指向旋转后即p的右节点
* 40 以p节点(30)左旋后 40(p.p)
* 30 50 =============> 36(r)
* 25 36 45
* 32
*/
if (p.parent != null) {
if (p.parent.left == p) {
p.parent.left = r;
} else {
p.parent.right = r;
}
} else {
/**
* 如果p节点的父节点为null,那么将旋转后的p即r设置为根节点
* 这里30的根节点是40,所以不能设置为根节点
*/
root = r;
}
/**
* 将p的右孩子设置的left位置为p
* 将p的父节点位置为其右节点(r)
* 40 以p节点(30)左旋后 40
* 30 50 =============> 36(r)
* 25 36 45 30(p)
* 32
*/
r.left = p;
p.parent = r;
}
/**
* 以p节点右旋
* 40 以p节点(30)左旋后 40|
* 30(p) 50 =============> 25 | 50
* 25(l) 36 45 20 30 | 45
* 20 nil(l.right) 36|
*
* 以p节点右旋: 让p的左孩子代替p的位置,p成为自己左孩子的右孩子。
*
* @param p
*/
private void rotateRight(Node<K, V> p) {
if (p == null || p.left == null) return;
Node<K, V> l = p.left;
/**
* 这里l的右为nil,如果不为nil则需要将l.right的父节点指向p
* p
* l.right
* 就是:
* p.left = l.right; l.right.parent = p;
*/
p.left = l.right;
if (l.right != null)
l.right.parent = p;
/**
* 将l节点的父节点指向p节点的父节点
*/
l.parent = p.parent;
if (p.parent != null) {
if (p.parent.left == p) {
/**
* p
* l
*/
p.parent.left = l;
} else {
/**
* p
* l
*/
p.parent.right = l;
}
} else {
//p节点是根节点,那么旋转后l节点就是根节点
root = l;
}
/**
* l
* p
* 就是:
* l.right = p; p.parent = l;
*/
l.right = p;
p.parent = l;
}
/**
* 打印红黑树
*/
public void showRBTree() throws InterruptedException {
//根据查找时间赋值度不超过2倍【n=log(o)】的条件,将最大深度设计为2n
int n = (int) Math.sqrt(this.size);
int sd = 2 * n;
Node<K,V> ptr = root;
//随便找个队列用了,用于二叉树的广度优先遍历
Queue<Node<K,V>> nodeQueue = new LinkedBlockingQueue<>();
((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr);
int currentCeng = 1,currentPrintNodeCount=0;
//为了打印的平衡,需要将空的左节点和右节点打印出占位符"nil"
Node<K,V> nil = new Node<>();
while ((ptr=nodeQueue.poll())!=null){
if(ptr.left!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr.left);
else if(ptr.key!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(nil);//占位
if(ptr.right!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(ptr.right);
else if(ptr.key!=null)((LinkedBlockingQueue<Node<K,V>>) nodeQueue).put(nil);//占位
//根据深度计算空格数量
currentPrintNodeCount++;
if(Math.pow(2,(currentCeng-1))>currentPrintNodeCount){
//还在同一层
}else{
//下一层了
currentCeng++;
System.out.println("\n");
}
int nullCharCount = (int) Math.pow(2,(sd-currentCeng));
printfNullChar(nullCharCount);
if(ptr==nil)
System.out.print(" ");
else
System.out.print(ptr.key);//也可以打印value,因为用key生成红黑树,所以打印key
printfNullChar(nullCharCount);
}
}
/**
* 打印空格
* @param count
*/
private void printfNullChar(int count){
StringBuffer sb = new StringBuffer();
while (count>=0){
sb.append(" ");//四个空格
count--;
}
System.out.print(sb.toString());
}
}