更新時間:2022-08-26 來源:黑馬程序員 瀏覽量:
一、文章導讀:
這部分內容讓大家讀懂ConcurrentHashMap源碼的底層實現(xiàn)從而在工作中合理去使用他并且在面試中能做到游刃有余,主要內容如下:
* 核心構造方法
* 核心成員變量
* put方法過程數(shù)據(jù)的完整過程
* get方法的無鎖實現(xiàn)
二、文章講解內容
JDK8中ConcurrentHashMap的結構是:數(shù)組+鏈表+紅黑樹。
因為在hash沖突嚴重的情況下,鏈表的查詢效率是O(n),所以jdk8中改成了單個鏈表的個數(shù)大于8時,數(shù)組長度小于64就擴容,數(shù)組長度大于等于64,則鏈表會轉換為紅黑樹,這樣以空間換時間,查詢效率會變O(nlogn)。
紅黑樹在Node數(shù)組內部存儲的不是一個TreeNode對象,而是一個TreeBin對象,TreeBin內部維持著一個紅黑樹。
在JDK8中ConcurrentHashMap最經(jīng)點的實現(xiàn)是使用CAS+synchronized+volatile 來保證并發(fā)安全。
一、JDK7中ConcurrentHashMap的實現(xiàn)
在JDK1.7中ConcurrentHashMap是通過定義多個Segment來實現(xiàn)的分段加鎖,一個Segment對應一個數(shù)組,如果多線程對同一個數(shù)組中的元素進行添加那么多個線程會去競爭同一把鎖,他鎖的是一個數(shù)組,有多少個segment那么就有多少把鎖,這個力度還是比較粗的。
JDK8的實現(xiàn)是下文要重點探討的內容,同時下文全部都是JDK8的實現(xiàn)。
二、核心構造方法
/** * Creates a new, empty map with the default initial table size (16). 無參構造函數(shù),new一個默認table容量為16的ConcurrentHashMap */ public ConcurrentHashMap() { }
/** * Creates a new, empty map with an initial table size * accommodating the specified number of elements without the need * to dynamically resize. * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative 做了3件事: 1.如果入?yún)?lt;0,拋出異常。 2.如果入?yún)⒋笥谧畲笕萘浚瑒t使用最大容量(是2的30次方) 3.tableSizeFor方法得到一個大于負載因子入?yún)⑶易罱咏?的N次方的數(shù)作為容量 4.設置sizeCtl的值等于初始化容量。未對table進行初始化,table的初始化要在第一次put的時候進行。 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
特別注意:未對table進行初始化,table的初始化要在第一次put的時候進行。
三、核心成員變量
// 該字段控制table(也被稱作hash桶數(shù)組)的初始化和擴容 private transient volatile int sizeCtl; // table最大容量是2的30次方 private static final int MAXIMUM_CAPACITY = 1 << 30; // table的默認初始化容量-擴容總是2的n次方 private static final int DEFAULT_CAPACITY = 16; // table的負載因子。當前已使用容量 >= 負載因子*總容量的時候,進行resize擴容 private static final float LOAD_FACTOR = 0.75f; // 存儲數(shù)據(jù)的數(shù)組-注意添加了volatile transient volatile Node<K,V>[] table; // 當桶內鏈表長度>=8時,會將鏈表轉成紅黑樹-注意需要和MIN_TREEIFY_CAPACITY結合起來用 static final int TREEIFY_THRESHOLD = 8; // table的總容量,要大于64,桶內鏈表才轉換為樹形結構,否則當桶內鏈表長度>=8時會擴容 static final int MIN_TREEIFY_CAPACITY = 64; // 當桶內node小于6時,紅黑樹會轉成鏈表。 static final int UNTREEIFY_THRESHOLD = 6;
四、put存儲數(shù)據(jù)
先從JDK的源碼中復制一段上頭的代碼,如下代碼就完成了數(shù)據(jù)的添加,在添加的時候還完成了數(shù)組的初始化、擴容、CAS修改、分段鎖的實現(xiàn),大家大體的對該代碼有一個認識即可我們后面會詳細的畫圖分析該過程。
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果key或者value為空,拋出異常 if (key == null || value == null) throw new NullPointerException(); // 得到hash值 int hash = spread(key.hashCode()); // 用來記錄所在table數(shù)組中的桶的中鏈表的個數(shù),后面會用于判斷是否鏈表過長需要轉紅黑樹 int binCount = 0; // for循環(huán),直到put成功插入數(shù)據(jù)才會跳出 for (Node<K,V>[] tab = table;;) { // f=桶頭節(jié)點 n=table的長度 i=在數(shù)組中的哪個下標 fh=頭節(jié)點的hash值 Node<K,V> f; int n, i, fh; // 如果table沒有初始化 if (tab == null || (n = tab.length) == 0) // 初始化table tab = initTable(); // 根據(jù)數(shù)組長度減1再對hash值取余得到在node數(shù)組中位于哪個下標 // 用tabAt獲取數(shù)組中該下標的元素 // 如果該元素為空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 直接將put的值包裝成Node用tabAt方法放入數(shù)組內這個下標的位置中 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果頭結點hash值為-1,則為ForwardingNode結點,說明正再擴容, else if ((fh = f.hash) == MOVED) // 調用hlepTransfer幫助擴容 tab = helpTransfer(tab, f); // 否則鎖住槽的頭節(jié)點 else { V oldVal = null; // 鎖桶的頭節(jié)點 synchronized (f) { // 雙重鎖檢測,看在加鎖之前,該桶的頭節(jié)點是不是被改過了 if (tabAt(tab, i) == f) { // 如果桶的頭節(jié)點的hash值大于0 if (fh >= 0) { binCount = 1; // 遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果遇到節(jié)點hash值相同,key相同,看是否需要更新value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果到鏈表尾部都沒有遇到相同的 if ((e = e.next) == null) { // 就生成Node掛在鏈表尾部,該Node成為一個新的鏈尾。 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果桶的頭節(jié)點是個TreeBin else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; // 用紅黑樹的形式添加節(jié)點或者更新相同hash、key的值。 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果鏈表長度>需要樹化的閾值 if (binCount >= TREEIFY_THRESHOLD) //調用treeifyBin方法將鏈表轉換為紅黑樹,而這個方法中會判斷數(shù)組值是否大于64,如果沒有大于64則只擴容 treeifyBin(tab, i); if (oldVal != null) // 如果是修改,不是新增,則返回被修改的原值 return oldVal; break; } } } // 計數(shù)器加1,完成新增后,table擴容,就是這里面觸發(fā) addCount(1L, binCount); // 新增后,返回Null return null; }
4.1 高效的hash算法
int hash = spread(key.hashCode()); static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
該算法其實在HashMap中我們就已經(jīng)重點說過了,他既保存了key值得hash的高16位也保存了低16位,從而讓key值在不同的數(shù)組中盡可能的散列,從而避免hash沖突。
4.2 數(shù)組的初始化
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();
注意是在循環(huán)中判斷的table是否為空如果為空則會調用initTable完成數(shù)組的默認初始化。
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab;
以上代碼做了如下事情:
1.如果table為空,進入while準備開始初始化。
2.將sizeCtl賦值給sc。如果sizeCtl<0,線程等待,小于零時表示有其他線程在執(zhí)行初始化。
3.如果能將sizeCtl設置為-1,則開始進行初始化操作
4.用戶有指定初始化容量,就用用戶指定的,否則用默認的16.
5.生成一個長度為16的Node數(shù)組,把引用給table。
6.重新設置sizeCtl=數(shù)組長度 - (數(shù)組長度 >>>2) 這是忽略符號的移位運算符,可以理解成 n - (n / 2)。
4.3 不沖突的數(shù)據(jù)添加過程
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); }
高效的尋址算法: (n - 1) & hash這個在我們的HashMap中也是講過的了就不在贅述。
通過高效的尋址算法得到一個索引并獲取該索引的數(shù)據(jù)如果不為空則調用casTabAt方法進行CAS的原子修改,CAS在Java層面是無鎖的所以速度會很快,但是他在硬件層面是有鎖的,他會在硬件的拉鏈散列表中加鎖。
如果有多個線程同時hash到了該索引我們的CAS也能保證只有一個線程能添加成功其他的線程其實是走分段加鎖的過程。
4.4 分段加鎖策略
需要大家特別注意的是synchronized (f)鎖了一個f對象,那么這個f對象是什么呢?其實就是我們高效尋址算法的到的一個下標中存儲的對象,注意他鎖的是我們尋址到的這個對象并沒有鎖這個數(shù)組,所以我們當前的鎖一共有多少把呢?就看你的size有多少了默認是16那么就會有16把鎖可以來并發(fā)的修改,這個其實就是JDK1.8的分段鎖拉,他比1.7鎖的粒度更細那么并發(fā)的效果就會更好。
五、無鎖獲取
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
以上代碼中的內容幾乎我們都講過了,所以就不在贅述,但是值得一說的是獲取的過程中并沒有對數(shù)組或者某一個Node元素添加鎖,獲取是無鎖的所有性能高。
還有一個問題需要注意的是獲取是無鎖的那么他如果出現(xiàn)多線程修改或者寫入的時候他就有可能會出現(xiàn)可見性的問題,因為每一個線程都有自己的工作內存,那么ConcurrentHashMap是如何解決可見性的問題呢?
transient volatile Node<K,V>[] table;
從變量的申明中我們可以看到存儲數(shù)據(jù)的table其實是添加了volatile關鍵字的,所以其他線程修改了以后我們要求其他的線程把數(shù)據(jù)刷新主內存從而保證數(shù)據(jù)的可見性。
五、總結:
1. JDK1.7 使用分段鎖實現(xiàn)
2. JDK1.8 使用CAS+synchronized+volatile 的具體實現(xiàn)
3. put方法的復雜實現(xiàn)過程
4. get方法的無鎖實現(xiàn)尤其是volatile關鍵字的使用