Fork me on GitHub
Blog


  • 首页

  • 标签

  • 分类

  • 归档

  • 关于

【Java多线程】JUC集合 03. ConcurrentSkipListMap

发表于 2018-08-15 | 分类于 Java多线程 , JUC集合

ConcurrentSkipListMap

1. 前言

  • ConcurrentSkipListMap是线程安全的有序的Map,适用于高并发的场景。
  • 不允许空键、值
  • ConcurrentSkipListMap和TreeMap的区别:
    • 都是有序的哈希表
    • ConcurrentSkipListMap是线程安全的,而TreeMap是非线程安全的
    • ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。
  • 跳表(Skip List):它是平衡树的一种替代的数据结构,但是和红黑树不相同的是,跳表对于树的平衡的实现是基于一种随机化的算法的,这样也就是说跳表的插入和删除的工作是比较简单的。

2. 源码分析

2.1 数据结构

UML类图:

跳表:

2.2 内部类

  • Index
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down; //down域
volatile Index<K,V> right; //right域

/**
* Creates index node with given values.
*/
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
  • HeadIndex
1
2
3
4
5
6
7
static final class HeadIndex<K,V> extends Index<K,V> {
final int level; //层级
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
  • Node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next; //下一个节点,单链表结构

/**
* Creates a new regular node.
*/
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}

/**
* Creates a new marker node. A marker is distinguished by
* having its value field point to itself. Marker nodes also
* have null keys, a fact that is exploited in a few places,
* but this doesn't distinguish markers from the base-level
* header node (head.node), which also has a null key.
*/
Node(Node<K,V> next) { //用于建立标记节点,值为本身,在删除时使用
this.key = null;
this.value = this;
this.next = next;
}
//删除结点,在结点后面添加一个marker结点或者将结点和其后的marker结点从其前驱中断开。
void helpDelete(Node<K,V> b, Node<K,V> f) {
/*
* Rechecking links and then doing only one of the
* help-out stages per call tends to minimize CAS
* interference among helping threads.
*/
if (f == next && this == b.next) { // f为当前结点的后继并且b为当前结点的前驱
if (f == null || f.value != f) // not already marked 没有被标记
// 当前结点后添加一个marker结点,并且当前结点的后继为marker,marker结点的后继为f
casNext(f, new Node<K,V>(f));
else // f不为空并且f的值为本身
// 设置b的next域为f的next域
b.casNext(this, f.next);
}
}
}

2.3 核心函数

2.3.1 put(K key, V value)

1
2
3
4
5
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}

put()通过doPut()方法添加键值对:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {//b为先驱结点,n为b的后继结点
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) > 0) {//key大于结点n的key
b = n; //向后移动
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}

z = new Node<K,V>(key, value, n);//新建结点
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;//跳出外层循环
}
}
// 随机生成种子
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0) //判断从右到左有多少个连续的1
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)//生成对应的Index结点
idx = new Index<K,V>(z, idx, null);//从下至上依次赋值,并且赋值了Index结点的down域
}
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)// 为每一层生成一个头结点
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
// idx赋值为之前层级的头结点,并将level赋值为之前的层级
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))//删除r的index结点
break;
r = q.right;
continue;
}
if (c > 0) {
q = r; // 向右寻找
r = r.right;
continue;
}
}

if (j == insertionLevel) {
if (!q.link(r, t)) //idx 插入q和r之间
break; // restart
if (t.node.value == null) {//idx结点值为空,需要删除
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}

if (--j >= insertionLevel && j < level)//下一层
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}

流程如下:

  • 根据给定的key从跳表的左上方往右或者往下查找到Node链表的前驱Node结点,这个查找过程会删除一些已经标记为删除的结点
  • 找到前驱结点后,开始往后插入查找插入的位置(因为找到前驱结点后,可能有另外一个线程在此前驱结点后插入了一个结点,所以先前查找的前驱现在可能不是要插入的结点的前驱,所以需要往后查找)。
  • 随机生成一个种子,判断是否需要增加层级,并且在各层级中插入对应的Index结点。

2.3.2 remove()

1
2
3
public V remove(Object key) {
return doRemove(key, null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//b为key的前继结点, n为“b的后继节点”(即若key存在于“跳表中”,n就是key对应的节点)
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
// c = 0 的情况
if (value != null && !value.equals(v))
break outer;
if (!n.casValue(v, null)) //当前结点n的value设置为null
break;
// 在n结点后添加一个marker结点,并且将b的next域更新为f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {// 添加节点并且更新均成功
// 清除跳表中每一层n结点对应的Index结点
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();//减少层级
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}

流程如下:

  • 根据key值找到前驱结点,查找的过程会删除一个标记为删除的结点
  • 从前驱结点往后查找该结点
  • 在该结点后面添加一个marker结点,若添加成功,则将该结点的前驱的后继设置为该结点之前的后继
  • 头结点的next域是否为空,若为空,则减少层级。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 减少跳表层级
private void tryReduceLevel() {
// 保存头结点
HeadIndex<K,V> h = head;
HeadIndex<K,V> d;
HeadIndex<K,V> e;
if (h.level > 3 &&
(d = (HeadIndex<K,V>)h.down) != null &&
(e = (HeadIndex<K,V>)d.down) != null &&
e.right == null &&
d.right == null &&
h.right == null &&
casHead(h, d) && // try to set
h.right != null) // recheck
casHead(d, h); // try to backout
}

说明:如果最高的前三个HeadIndex不为空,并且其right域都为null,那么就将level减少1层,并将head设置为之前head的下一层,设置完成后,还有检测之前的head的right域是否为null,如果为null,则减少层级成功,否则再次将head设置为h。

2.3.4 get()

1
2
3
public V get(Object key) {
return doGet(key);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//b - key的前驱结点 n - 当前结点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0) {//key为当前结点
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n; //向后查找
n = f;
}
}
return null;
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3498556.html

https://www.cnblogs.com/leesf456/p/5512817.html

【Java多线程】JUC集合 02. ConcurrentHashMap

发表于 2018-08-13 | 分类于 Java多线程 , JUC集合

[TOC]

1. 前言

ConcurrentHashMap是线程安全的哈希表

HashMap、HashTable、ConcurrentHashMap对比:

  • HashMap是非线程安全的哈希表
  • HashTable是线程安全的哈希表,通过synchronized来保证线程安全(synchronized方法),效率较低
  • ConcurrentHashMap的哈希表,通过“分段锁”来保证线程安全

2. JDK1.7

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
//默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

//默认负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//并行级别,segment数
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//每个segment最小容量,2^n
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// segment最大数
static final int MAX_SEGMENTS = 1 << 16;

final Segment<K,V>[] segments;

...
}

2.2 内部类

2.2.1 Segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class Segment<K,V> extends ReentrantLock implements Serializable {


transient volatile HashEntry<K,V>[] table;

transient int count;

transient int threshold;//超过该值扩容

final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

2.2.2 HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next; //next属性为volatile,并发性

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

2.3 核心函数

2.3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//并行级别ssize, 保持为2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//默认值sshift = 4, 移位数segmentShift = 28; 掩码segmentMask = 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//cap是segment中HashEntry数组的长度
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0] 初始化segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),//默认cap=2, 负载因子0.75,阈值为1.5
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

2.3.2 put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;//找到key对应的segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);//初始化segment[j],以segment[0]为原型,CAS更新
return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock()获取锁,成功返回true,失败返回false。
//获取锁失败的话,则通过scanAndLockForPut()获取锁,并返回要插入的"key-value"对应的HashEntry链表。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;//segment数组
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);//first 是tab在该位置处的链表表头
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//直接替换
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)//在scanAndLockForPut()获取锁时,已经新建了key-value对应的HashEntry节点,则将HashEntry添加到Segment中
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);//扩容
else
setEntryAt(tab, index, node);//将新的节点设置成原链表的表头
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

插入流程如下:

  1. 首先找到key对应的segment,没有则进行初始化
  2. 插入key到对应segment的HashEntry数组,在插入前,会先获取segment的互斥锁,插入后再释放锁
  3. 首先根据hash获取key在HashEntry数组的位置,即链表首节点
  4. 遍历链表,若key已经存在,则根据onlyIfAbsent判断是否更新值,再返回;否则新建HashEntry节点,再插入到segment中
  5. 在插入HashEntry节点会进行扩容判断,需要扩容进行rehash(),否则setEntryAt插入hash对应链表表头
  • scanAndLockForPut获取锁如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//重试次数超过MAX_SCAN_RETRIES(多核64),则进入阻塞队列等待锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&//重试偶数次时候,如果表头改变了,则重置e,first,retries,重新走scanAndLockForPut方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
  • 扩容rehash()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;//扩容两倍
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;

// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置,
// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 该位置处只有一个元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是链表表头
HashEntry<K,V> lastRun = e;
// idx 是当前链表的头结点 e 的新位置
int lastIdx = idx;

// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
newTable[lastIdx] = lastRun;
// 下面的操作是处理 lastRun 之前的节点,
// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

2.3.3 get(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);//1. 计算hash值
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//2. 获取key对应的segment,segment中对应的HashEntry链表
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {//3. 遍历链表,找到对应节点
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

2.3.3 并发性

添加节点的操作 put 和删除节点的操作 remove 都要获取segment 上的独占锁,所以它们是线程安全的。

get 过程中是没有加锁的,我们需要考虑的问题是 get 的时候在同一个 segment 中发生了 put 或 remove 操作。

  1. put 操作的线程安全性。

    1. 初始化segment,使用了 CAS 来初始化 Segment 中的数组。
    2. 添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    3. 扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
  2. remove 操作的线程安全性。

    如果 remove 破坏的节点 get 操作已经过去了,那么这里不存在任何问题。

    如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。

3. JDK1.8

3.1 数据结构

jdk1.8的ConcurrentHashMap不再使用Segment代理Map操作这种设计,整体结构变为HashMap结构,但是依旧保留分段锁的思想。之前版本是每个Segment都持有一把锁,1.8版本改为锁住hash桶的第一个节点 tabAt(table, i)。它可能是Node链表的头结点、保留节点ReservationNode、或者是TreeBin节点(TreeBin节点持有红黑树的根节点)

3.2 内部类

jdk1.8 的节点变为4种:Node链表节点,ReservationNode保留节点,TreeBin节点,红黑树节点TreeNode

  • Node 基本节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//此类的子类具有负数hash值,并且不存储实际的数据,如果不使用子类直接使用这个类,那么key和val不会为null
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}
  • TreeNode 红黑树节点
1
2
3
4
5
6
7
8
9
10
11
12
13
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}
  • TreeBin

TreeBin的hash值固定为-2,它是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 红黑树节点TreeNode实际上还保存有链表的指针,因此也可以用链表的方式进行遍历读取操作
// 自身维护一个简单的读写锁,不用考虑写-写竞争的情况
// 不是全部的写操作都要加写锁,只有部分的put/remove需要加写锁
// 很多方法的实现和jdk1.8的ConcurrentHashMap.TreeNode里面的方法基本一样,可以互相参考
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root; // 红黑树结构的跟节点
volatile TreeNode<K,V> first; // 链表结构的头节点
volatile Thread waiter; // 最近的一个设置 WAITER 标识位的线程
volatile int lockState; // 整体的锁状态标识位

// values for lockState
// 二进制001,红黑树的 写锁状态
static final int WRITER = 1; // set while holding write lock
// 二进制010,红黑树的 等待获取写锁的状态
static final int WAITER = 2; // set when waiting for write lock
// 二进制100,红黑树的 读锁状态,读锁可以叠加,也就是红黑树方式可以并发读,每有一个这样的读线程,lockState都加上一个READER的值
static final int READER = 4; // increment value for setting read lock

// 重要的一点,红黑树的 读锁状态 和 写锁状态 是互斥的,但是从ConcurrentHashMap角度来说,读写操作实际上可以是不互斥的
// 红黑树的 读、写锁状态 是互斥的,指的是以红黑树方式进行的读操作和写操作(只有部分的put/remove需要加写锁)是互斥的
// 但是当有线程持有红黑树的 写锁 时,读线程不会以红黑树方式进行读取操作,而是使用简单的链表方式进行读取,此时读操作和写操作可以并发执行
// 当有线程持有红黑树的 读锁 时,写线程可能会阻塞,不过因为红黑树的查找很快,写线程阻塞的时间很短
// 另外一点,ConcurrentHashMap的put/remove/replace方法本身就会锁住TreeBin节点,这里不会出现写-写竞争的情况,因此这里的读写锁可以实现得很简单
}
  • ForwardingNode 转发节点

ForwardingNode是一种临时节点,在扩容进行中才会出现,hash值固定为-1,并且它不存储实际的数据数据。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。

1
2
3
4
5
6
7
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
  • ReservationNode 保留节点

hash值固定为-3,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现,帮助完成对hash桶的加锁操作

1
2
3
4
5
6
7
8
9
10
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}

// 空节点代表这个hash桶当前为null,所以肯定找不到“相等”的节点
Node<K,V> find(int h, Object k) {
return null;
}
}

3.3 源码分析

3.3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//初始化容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

3.3.2 put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;//用于记录对应链表的长度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//初始化数组
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//数组该位置为空,CAS插入该节点;CAS失败,表示有并发操作,则进入下一个循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//数组正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);//帮助数据迁移
else {
V oldVal = null;
synchronized (f) {//获取该位置首节点的监视器锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {//说明是链表,红黑树首节点TreeBin的hash为-2
binCount = 1;//记录链表长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//链表末端,将新节点插入链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树操作
Node<K,V> p;
binCount = 2;
//红黑树方法插入节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);//链表长度超过树化阈值8,将链表转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
//CAS 式更新baseCount,并判断是否需要扩容
addCount(1L, binCount);
return null;
}
  • 初始化数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//其他线程在初始化
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;//table 是volatile属性
sc = n - (n >>> 2);// sc = 0.75n
}
} finally {
sizeCtl = sc;//设置阈值
}
break;
}
}
return tab;
}
  • 链表树化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//小于最小树化容量64
tryPresize(n << 1);//扩容操作
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {//首节点加锁
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//遍历链表,建立红黑树
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;//红黑树TreeBin节点
else
tl.next = p;
tl = p;
}
//将红黑树设置到数组相应位置
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
  • 扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final void tryPresize(int size) {
//c 为 1.5*size + 1,向上取最近的2^n
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {//初始化
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//nextTable不为null,CAS设置SIZECTL加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);//将原来的 tab 数组的元素迁移到新的 nextTable 数组中
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);//外围调用,第一个线程nextTab 为null
}
}
}
  • 数据迁移transfer

此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。

transfer思路:原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。

第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//stride步长,每个线程处理的任务,最小16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//翻倍扩容
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//用于控制迁移的位置,初始为n
}
int nextn = nextTab.length;
//fwd hash值为MOVED=-1, 原数组i处位置完成迁移,会将i处设置为fwd,标志该位置已经完成迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//true为做完一个位置的迁移
boolean finishing = false; // to ensure sweep before committing nextTab
//从后往前,i为位置索引,bound为边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//简单理解结局:i 指向了 transferIndex-1,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//说明原数组的所有位置都有相应的线程去处理了
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//bound为这次迁移的边界
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//所有迁移操作已经完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
//每有一个线程参与迁移就会将 sizeCtl 加 1,这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//所有迁移都做完了,进入上面finishing分支
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode 空节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)//ForwardingNode 空节点,该位置已经迁移过了
advance = true; // already processed
else {
synchronized (f) {//对数组该位置首节点加锁
if (tabAt(tab, i) == f) {
//ln对应i处的链表首节点,hn对应i+n处链表首节点
Node<K,V> ln, hn;
if (fh >= 0) {//链表节点
int runBit = fh & n;
Node<K,V> lastRun = f;
//找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
//lastRun 之前的节点需要进行克隆,然后分到两个链表中
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//将原数组该位置处设置为 fwd,代表该位置已经处理完毕
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo对应i处的树首节点,hi对应i+n处树首节点
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
//lc,hc对应该位置处节点个数
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;//节点添加到尾部
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//节点数少于UNTREEIFY_THRESHOLD=6,将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

3.3.3 get(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//找到再数组中的位置
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//表示红黑树或者正在扩容
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//遍历链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

4. 参考

http://www.jasongj.com/java/concurrenthashmap/

https://javadoop.com/post/hashmap#toc5

https://blog.csdn.net/u011392897/article/details/60479937

【Java多线程】JUC集合 01. CopyOnWriteArrayList

发表于 2018-08-13 | 分类于 Java多线程 , JUC集合

CopyOnWriteArrayList

1. 前言

  • 相当于线程安全的ArrayList
  • 适用于:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突
  • 可变操作(add()、set() 和 remove() 等)需要复制整个基础数组,所以开销很大
  • 迭代器支持hasNext(), next()等不可变操作,但不支持可变 remove()等操作
  • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

/** The lock protecting all mutators */
final transient ReentrantLock lock = new ReentrantLock();

/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array; //只能通过 getArray/setArray 访问
}

UML类图如下:

  • CopyOnWriteArrayList包含了成员lock。每一个CopyOnWriteArrayList都和一个互斥锁lock绑定,通过lock,实现了对CopyOnWriteArrayList的互斥访问。 在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”
  • CopyOnWriteArrayList包含了成员array数组,这说明CopyOnWriteArrayList本质上通过数组实现的。通过 “volatile数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”

2.2 核心方法

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}

public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}

public CopyOnWriteArrayList(E[] toCopyIn) {
setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
}

构造函数通过setArray赋值:

1
2
3
4
5
6
7
8
9
10
final Object[] getArray() {
return array;
}

/**
* Sets the array.
*/
final void setArray(Object[] a) {
array = a;
}
  • add(int index, E element)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//在指定位置插入元素
public void add(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
try {
Object[] elements = getArray();
int len = elements.length;
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+
", Size: "+len);
Object[] newElements;//新建数组
int numMoved = len - index;
if (numMoved == 0)
newElements = Arrays.copyOf(elements, len + 1);
else {
newElements = new Object[len + 1];
System.arraycopy(elements, 0, newElements, 0, index);//复制index前的元素
System.arraycopy(elements, index, newElements, index + 1,//复制index后面的元素
numMoved);
}
newElements[index] = element;
setArray(newElements);//将新建的数组赋值给“volatile数组”
} finally {
lock.unlock();//释放锁
}
}
  • get
1
2
3
4
5
6
7
8
9
//返回第index个元素
public E get(int index) {
return get(getArray(), index);
}

@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
  • remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E remove(int index) {
final ReentrantLock lock = this.lock;
// 获取“锁”
lock.lock();
try {
// 获取原始”volatile数组“中的数据和数据长度。
Object[] elements = getArray();
int len = elements.length;
// 获取elements数组中的第index个数据。
E oldValue = get(elements, index);
int numMoved = len - index - 1;
// 如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。
// 否则,新建数组,然后将”volatile数组中被删除元素之外的其它元素“拷贝到新数组中;最后,将新数组赋值给”volatile数组“。
if (numMoved == 0)
setArray(Arrays.copyOf(elements, len - 1));
else {
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
// 释放“锁”
lock.unlock();
}
}
  • 迭代遍历
1
2
3
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}

迭代通过COWIterator对象实现,COWIterator不支持remove(),set(),add()操作,非fail-fas机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
private final Object[] snapshot; //获取快照
/** Index of element to be returned by subsequent call to next. */
private int cursor; //迭代标尺

private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}

public boolean hasNext() {
return cursor < snapshot.length;
}

public boolean hasPrevious() {
return cursor > 0;
}

@SuppressWarnings("unchecked")
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}

@SuppressWarnings("unchecked")
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
}

public int nextIndex() {
return cursor;
}

public int previousIndex() {
return cursor-1;
}

public void remove() {
throw new UnsupportedOperationException();
}

public void set(E e) {
throw new UnsupportedOperationException();
}

public void add(E e) {
throw new UnsupportedOperationException();
}

@Override
public void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
Object[] elements = snapshot;
final int size = elements.length;
for (int i = cursor; i < size; i++) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
action.accept(e);
}
cursor = size;
}
}
  • addIfAbsent
1
2
3
4
5
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3498483.html

【IO】IO字符流

发表于 2018-08-07 | 分类于 IO

IO字符流

1. 框架

2. CharArrayReader/CharArrayWriter

  • CharArrayReader 是字符数组输入流。它和ByteArrayInputStream类似,操作的数据是以字符为单位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CharArrayReader extends Reader {
// 字符数组缓冲
protected char buf[];
// 下一个被获取的字符的位置
protected int pos;
// 被标记的位置
protected int markedPos = 0;
// 字符缓冲的长度
protected int count;
// 构造函数
public CharArrayReader(char buf[]) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}
}
  • CharArrayReader 是字符数组输出流,用于写入数据符。操作的数据是以字符为单位
  • CharArrayReader 默认数组缓冲区大小为32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 写入字符数组c到CharArrayWriter中。off是“字符数组b中的起始写入位置”,len是写入的长度
public void write(char c[], int off, int len) {
if ((off < 0) || (off > c.length) || (len < 0) ||
((off + len) > c.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
synchronized (lock) {
int newcount = count + len;
if (newcount > buf.length) {
buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));//扩容操作
}
System.arraycopy(c, off, buf, count, len);
count = newcount;
}
}

3. PipedReader/PipedWriter

  • PipedWriter 是字符管道输出流,它继承于Writer。 PipedReader 是字符管道输入流,它继承于Writer。
  • 同“PipedInputStream和PipedOutputStream”一样,可以通过管道进行线程间的通讯
  • PipedWriter 和PipedReader 需要成对使用,使用connect连接
  • PipedWriter 中write方法,实际调用的PipedReader 的receive方法,将数据写入管道输入流的缓冲字符数组,通过in(下一个写入字符位置)和out(下一个读取字符位置)判断数据是否读完,线程等待状态

4. InputStreamReader/OutputStreamWriter

  • InputStreamReader和OutputStreamWriter 是字节流通向字符流的桥梁:它使用指定的 charset 读写字节并将其解码为字符。
  • InputStreamReader 的作用是将“字节输入流”转换成“字符输入流”。它继承于Reader。
  • OutputStreamWriter 的作用是将“字节输出流”转换成“字符输出流”。它继承于Writer。

5. FileReader/FileWriter

  • FileReader 是用于读取字符流的类,它继承于InputStreamReader。要读取原始字节流,请考虑使用 FileInputStream。
  • FileWriter 是用于写入字符流的类,它继承于OutputStreamWriter。要写入原始字节流,请考虑使用 FileOutputStream。
  • FileReader是基于InputStreamReader实现的,构造函数传入FileInputStream
  • FileWriter是基于OutputStreamWriter实现的,构造函数传入FileOutputStream

6. BufferedReader/BufferedWriter

6.1 BufferedReader

  • BufferedReader 是缓冲字符输入流。它继承于Reader。
  • BufferedReader 的作用是为其他字符输入流添加一些缓冲功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//从BufferedReader中读取一个字符,该字符以int的方式返回
public int read() throws IOException {
synchronized (lock) {
ensureOpen();
for (;;) {
if (nextChar >= nChars) {// 若“缓冲区的数据已经被读完”,
fill();
if (nextChar >= nChars)
return -1;
}
if (skipLF) {// 若要“忽略换行符”,
skipLF = false;
if (cb[nextChar] == '\n') {
nextChar++;
continue;
}
}
return cb[nextChar++];
}
}
}

// 填充缓冲区函数。有以下两种情况被调用:
// (01) 缓冲区没有数据时,通过fill()可以向缓冲区填充数据。
// (02) 缓冲区数据被读完,需更新时,通过fill()可以更新缓冲区的数据。
private void fill() throws IOException {
// dst表示“cb中填充数据的起始位置”。
int dst;
if (markedChar <= UNMARKED) {// 没有标记的情况
dst = 0;
} else {
// delta表示“当前标记的长度”,它等于“下一个被读取字符的位置”减去“标记的位置”的差值;
int delta = nextChar - markedChar;
if (delta >= readAheadLimit) {
// 若“当前标记的长度”超过了“标记上限(readAheadLimit)”,
// 则丢弃标记!
markedChar = INVALIDATED;
readAheadLimit = 0;
dst = 0;
} else {
if (readAheadLimit <= cb.length) {
// 若“当前标记的长度”没有超过了“标记上限(readAheadLimit)”,
// 并且“标记上限(readAheadLimit)”小于/等于“缓冲的长度”;
// 则先将“下一个要被读取的位置,距离我们标记的置符的距离”间的字符保存到cb中。
System.arraycopy(cb, markedChar, cb, 0, delta);
markedChar = 0;
dst = delta;
} else {
// 若“当前标记的长度”没有超过了“标记上限(readAheadLimit)”,
// 并且“标记上限(readAheadLimit)”大于“缓冲的长度”;
// 则重新设置缓冲区大小,并将“下一个要被读取的位置,距离我们标记的置符的距离”间的字符保存到cb中。
char ncb[] = new char[readAheadLimit]; //当我们不停的更新缓冲区的时候,被标记的位置会被不停的放大。而内存的容量是有效的,我们不可能不限制长度的存储标记。所以用readAheadLimit来限制标记长度!
System.arraycopy(cb, markedChar, ncb, 0, delta);
cb = ncb;
markedChar = 0;
dst = delta;
}
// 更新nextChar和nChars
nextChar = nChars = delta;
}
}

int n;
do {
// 从“in”中读取数据,并存储到字符数组cb中;
// 从cb的dst位置开始存储,读取的字符个数是cb.length - dst
// n是实际读取的字符个数;若n==0(即一个也没读到),则继续读取!
n = in.read(cb, dst, cb.length - dst);
} while (n == 0);

// 如果从“in”中读到了数据,则设置nChars(cb中字符的数目)=dst+n,
// 并且nextChar(下一个被读取的字符的位置)=dst。
if (n > 0) {
nChars = dst + n;
nextChar = dst;
}
}

6.2 BufferedWriter

  • BufferedWriter 是缓冲字符输出流。它继承于Writer。
  • BufferedWriter 的作用是为其他字符输出流添加一些缓冲功能
  • BufferedWriter通过字符数组来缓冲数据,当缓冲区满或者用户调用flush()函数时,它就会将缓冲区的数据写入到输出流中。

7. PrintWriter

  • PrintWriter 是字符类型的打印输出流,它继承于Writer。
  • PrintStream 用于向文本输出流打印对象的格式化表示形式。它实现在 PrintStream 中的所有 print 方法。它不包含用于写入原始字节的方法,对于这些字节,程序应该使用未编码的字节流进行写入。

8. 参考

http://www.cnblogs.com/skywang12345/p/io_01.html

【IO】IO字节流

发表于 2018-08-06 | 分类于 IO

IO 字节流

1. 框架

2. ByteArrayInputStream/ByteArrayOutputStream

2.1 ByteArrayInputStream

  • ByteArrayInputStream 是字节数组输入流,它继承于InputStream。
  • ByteArrayInputStream 包含一个内部缓冲区,该缓冲区包含从流中读取的字节,本质就是通过字节数组来实现的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ByteArrayInputStream extends InputStream {
// 保存字节输入流数据的字节数组
protected byte buf[];
// 下一个会被读取的字节的索引
protected int pos;
// 标记的索引
protected int mark = 0;
// 字节流的长度
protected int count;

// 构造函数:创建一个内容为buf的字节流
public ByteArrayInputStream(byte buf[]) {
this.buf = buf;
// 初始化“下一个要被读取的字节索引号为0”
this.pos = 0;
// 初始化“字节流的长度为buf的长度”
this.count = buf.length;
}

// 构造函数:创建一个内容为buf的字节流,并且是从offset开始读取数据,读取的长度为length
public ByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
// 初始化“标记的字节流读取位置”
this.mark = offset;
}
}

2.2 ByteArrayOutputStream

  • ByteArrayOutputStream 是字节数组输出流。它继承于OutputStream
  • ByteArrayOutputStream 中的数据被写入一个 byte 数组。缓冲区会随着数据的不断写入而自动增长。可使用 toByteArray() 和 toString() 获取数据
  • 通过ByteArrayOutputStream()创建的“字节数组输出流”对应的字节数组大小是32。

3. PipedInputStream/PipedOutputStream

  • PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。 它们的作用是让多线程可以通过管道进行线程间的通讯
  • PipedOutputStream是基于PipedInputStream实现,内部持有PipedInputStream对象
  • PipedOutputStream和PipedInputStream必须配套使用,使用connect连接
  • PipedOutputStream写入数据,实际调用的是PipedInputStream的receive()方法,PipedInputStream内部缓存区默认大小为1024个字节。当一次性buffer写入1024个byte后,会先notifyAll(),再wait(1000),目的就是把刚才写入的内容被读出,然后再继续写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//PipedOutputStream写入数据
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) { //PipedInputStream输入流是否存在
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
sink.receive(b, off, len); //调用的PipedInputStream的方法
}

//PipedInputStream接收字节数组,写入缓冲区buffer
//in - 下一个写入字节的位置 out - 下一个读取字节的位置
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive(); //判断连接是否关闭
writeSide = Thread.currentThread();
int bytesToTransfer = len;//写入数据总长度
while (bytesToTransfer > 0) {
if (in == out) //若“写入管道”的数据正好全部被读取完,则等待。
awaitSpace();
int nextTransferAmount = 0;//此次写入数据长度
if (out < in) {// 如果“管道中被读取的数据,少于写入管道的数据”;
nextTransferAmount = buffer.length - in;
} else if (in < out) { // 如果“管道中被读取的数据,大于写入管道的数据”
if (in == -1) {//初始化
in = out = 0;
nextTransferAmount = buffer.length - in;
} else { //控制in不超过out,否则覆盖写入的数据
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount); //将数据写入到缓冲中
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
}
  • PipedInputStream读取数据,每次read()前,都会判断缓存区是否有数据(依据in变量判断),如果没有的话就先让出当前的锁(即让写的进程先运行),然后再去读
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 从管道(的缓冲)中读取数据,并将其存入到数组b中
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

/* possibly wait on the first character */
int c = read();
if (c < 0) {
return -1;
}
b[off] = (byte) c;
int rlen = 1;//已读取一个字节
while ((in >= 0) && (len > 1)) {

int available;

if (in > out) {
available = Math.min((buffer.length - out), (in - out));
} else {
available = buffer.length - out;//读取out后面所有字节
}

// A byte is read beforehand outside the loop
if (available > (len - 1)) {
available = len - 1;
}
System.arraycopy(buffer, out, b, off + rlen, available);//复制缓冲区的数据到指定byte[]
out += available;
rlen += available;
len -= available;

if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
}
return rlen;
}
//从管道(的缓冲)中读取一个字节,并将其转换成int类型
public synchronized int read() throws IOException {
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()
&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}

readSide = Thread.currentThread();
int trials = 2;
while (in < 0) { //缓冲区是否有数据
if (closedByWriter) {
/* closed by writer, return EOF */
return -1;
}
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken");
}
/* might be a writer waiting */
notifyAll();
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}

return ret;
}

4. ObjectInputStream/ObjectOutputStream

  • ObjectInputStream 和 ObjectOutputStream 的作用是,对基本数据和对象进行序列化操作支持
  • 只能从流中读取支持java.io.Serializable或java.io.Externalizable接口的对象

5. FileInputStream/FileOutputStream

  • FileInputStream 是文件输入流,可以从某个文件中获得输入字节
  • FileOutputStream 是文件输出流,将数据写入 File 或 FileDescriptor 的输出流

6. FileDescriptor

  • FileDescriptor 是“文件描述符”, 可以被用来表示开放文件、开放套接字等
  • 不能直接通过FileDescriptor对该文件进行操作;若需要通过FileDescriptor对该文件进行操作,则需要新创建FileDescriptor对应的FileOutputStream,再对文件进行操作
  • in、out、err,标准输入输出的句柄,一般不直接使用;Java封装好了相应接口,可以使用System.in, System.out, System.err
1
2
3
4
5
public static final FileDescriptor in = standardStream(0);//标准输入(键盘)的描述符

public static final FileDescriptor out = standardStream(1);//标准输出(屏幕)的描述符

public static final FileDescriptor err = standardStream(2);//标准错误输出(屏幕)的描述符

源码如下:

1
2
3
4
5
6
7
private static native long set(int d);

private static FileDescriptor standardStream(int fd) {
FileDescriptor desc = new FileDescriptor();
desc.handle = set(fd);
return desc;
}

可以看出in/out/err就是一个FileDescriptor对象,只是其handle不同(long类型),通过set(fd)来设置其handle。“fd=0”代表了“标准输入”,“fd=1”代表了“标准输出”,“fd=2”代表了“标准错误输出”

7. FilterInputStream/FilterOutputStream

  • FilterInputStream/FilterOutputStream 的作用是用来“封装其它的输入输出流,并为它们提供额外的功能”。常用的子类有BufferedInputStream/BufferedOutputStream和DataInputStream/DataOutputStream,PrintStream。
  • BufferedInputStream/BufferedOutputStream的作用就是为输入/输出流提供缓冲功能,为输入流提供mark()和reset()功能。
  • DataInputStream/DataOutputStream 是用来装饰其它输入输出流,它“允许应用程序以与机器无关方式从底层输入流中读取基本 Java 数据类型”。应用程序可以使用DataOutputStream(数据输出流)写入由DataInputStream(数据输入流)读取的数据。
  • PrintStream 是用来装饰其它输出流。它能为其他输出流添加了功能,使它们能够方便地打印各种数据值表示形式。

8. BufferedInputStream/BufferedOutputStream

8.1 BufferedInputStream

  • BufferedInputStream 是缓冲输入流。它继承于FilterInputStream
  • 为另一个输入流添加一些功能,例如,提供“缓冲功能”以及支持“mark()标记”和“reset()重置方法”
  • 。例如,在新建某输入流对应的BufferedInputStream后,当我们通过read()读取输入流的数据时

8.1.1 原理

本质上是通过一个内部缓冲区数组实现的。创建BufferedInputStream时,我们会通过它的构造函数指定某个输入流为参数。BufferedInputStream会将该输入流数据分批读取,每次读取一部分到缓冲中;操作完缓冲中的这部分数据之后,再从输入流中读取下一部分的数据到缓冲区中。 为什么需要缓冲呢?原因很简单,效率问题!缓冲中的数据实际上是保存在内存中,而原始数据可能是保存在硬盘或NandFlash等存储介质中;而我们知道,从内存中读取数据的速度比从硬盘读取数据的速度至少快10倍以上。

8.1.2 源码分析

  • read()
1
2
3
4
5
6
7
8
9
//读取下一个字节
public synchronized int read() throws IOException {
if (pos >= count) { //判断是否读完buffer中的数据
fill();
if (pos >= count)
return -1;
}
return getBufIfOpen()[pos++] & 0xff;
}
  • fill()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//从输入流中读取数据,并填充到buffer中
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0) //没有标志,直接读取数据到buffer中
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) //buffer没有多余空间
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz); //复制markpos - buffer.length的数据到 0 - sz中
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else { /* grow buffer */
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz]; //扩容操作。随着读取次数的增多,buffer会越来越大;这会导致我们占据的内存越来越大。因此需要一个marklimit;当buffer>=marklimit时,就不再保存markpos的值了。
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos); //从输入流读取buffer.length - pos的数据填充到buffer中,起始位置pos
if (n > 0)
count = n + pos;
}

8.2 BufferedOutputStream

  • BufferedOutputStream 是缓冲输出流。它继承于FilterOutputStream。
  • BufferedOutputStream 的作用是为另一个输出流提供“缓冲功能”。
  • BufferedOutputStream 关闭流前会进行flush(),将数据刷到输出流;当写入数据超过缓冲区大小时,会将全部数据写入输出流,而不经过缓冲区

9. DataInputStream/DataOutputStream

9.1 DataInputStream

  • DataInputStream 是用来装饰其它输入流,它“允许应用程序以与机器无关方式从底层输入流中读取基本 Java 数据类型”。应用程序可以使用DataOutputStream(数据输出流)写入由DataInputStream(数据输入流)读取的数据。

  • readUTF()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public final static String readUTF(DataInput in) throws IOException {
// 从“数据输入流”中读取“无符号的short类型”的值:
// 注意:UTF-8输入流的前2个字节是数据的长度
int utflen = in.readUnsignedShort();
byte[] bytearr = null;
char[] chararr = null;

if (in instanceof DataInputStream) {
DataInputStream dis = (DataInputStream)in;
if (dis.bytearr.length < utflen){
dis.bytearr = new byte[utflen*2];
dis.chararr = new char[utflen*2];
}
chararr = dis.chararr;
bytearr = dis.bytearr;
} else {
bytearr = new byte[utflen];
chararr = new char[utflen];
}

int c, char2, char3;
int count = 0;
int chararr_count=0;

// 从“数据输入流”中读取数据并存储到字节数组bytearr中;从bytearr的位置0开始存储,存储长度为utflen。
// 注意,这里是存储到字节数组!而且读取的是全部的数据。
in.readFully(bytearr, 0, utflen);

// 将“字节数组bytearr”中的数据 拷贝到 “字符数组chararr”中
// 注意:这里相当于“预处理的输入流中单字节的符号”,因为UTF-8是1-4个字节可变的。
while (count < utflen) {
// 将每个字节转换成int值
c = (int) bytearr[count] & 0xff;
// UTF-8的每个字节的值都不会超过127;所以,超过127,则退出。
if (c > 127) break;
count++;
// 将c保存到“字符数组chararr”中
chararr[chararr_count++]=(char)c;
}

// 处理完输入流中单字节的符号之后,接下来我们继续处理。
while (count < utflen) {
// 下面语句执行了2步操作。
// (01) 将字节由 “byte类型” 转换成 “int类型”。
// 例如, “11001010” 转换成int之后,是 “00000000 00000000 00000000 11001010”
// (02) 将 “int类型” 的数据左移4位
// 例如, “00000000 00000000 00000000 11001010” 左移4位之后,变成 “00000000 00000000 00000000 00001100”
c = (int) bytearr[count] & 0xff;
switch (c >> 4) {
// 若 UTF-8 是单字节,即 bytearr[count] 对应是 “0xxxxxxx” 形式;
// 则 bytearr[count] 对应的int类型的c的取值范围是 0-7。
case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
/* 0xxxxxxx*/
count++;
chararr[chararr_count++]=(char)c;
break;

// 若 UTF-8 是双字节,即 bytearr[count] 对应是 “110xxxxx 10xxxxxx” 形式中的第一个,即“110xxxxx”
// 则 bytearr[count] 对应的int类型的c的取值范围是 12-13。
case 12: case 13:
/* 110x xxxx 10xx xxxx*/
count += 2;
if (count > utflen)
throw new UTFDataFormatException(
"malformed input: partial character at end");
char2 = (int) bytearr[count-1];
if ((char2 & 0xC0) != 0x80) //判断第二个char是否为 /* 10xx xxxx*/
throw new UTFDataFormatException(
"malformed input around byte " + count);
chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
(char2 & 0x3F));
break;

// 若 UTF-8 是三字节,即 bytearr[count] 对应是 “1110xxxx 10xxxxxx 10xxxxxx” 形式中的第一个,即“1110xxxx”
// 则 bytearr[count] 对应的int类型的c的取值是14 。
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen)
throw new UTFDataFormatException(
"malformed input: partial character at end");
char2 = (int) bytearr[count-2];
char3 = (int) bytearr[count-1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
throw new UTFDataFormatException(
"malformed input around byte " + (count-1));
chararr[chararr_count++]=(char)(((c & 0x0F) << 12) |
((char2 & 0x3F) << 6) |
((char3 & 0x3F) << 0));
break;

// 若 UTF-8 是四字节,即 bytearr[count] 对应是 “11110xxx 10xxxxxx 10xxxxxx 10xxxxxx” 形式中的第一个,即“11110xxx”
// 则 bytearr[count] 对应的int类型的c的取值是15
default:
/* 10xx xxxx, 1111 xxxx */
throw new UTFDataFormatException(
"malformed input around byte " + count);
}
}
// The number of chars produced may be less than utflen
return new String(chararr, 0, chararr_count);
}

9.2 DataOutputStream

  • DataOutputStream 是用来装饰其它输出流,将DataOutputStream和DataInputStream输入流配合使用,“允许应用程序以与机器无关方式从底层输入流中读写基本 Java 数据类型”。
  • writeUTF()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 将String数据以UTF-8类型的形式写入到“输出流out”中
static int writeUTF(String str, DataOutput out) throws IOException {
//获取String的长度
int strlen = str.length();
int utflen = 0;
int c, count = 0;

// 由于UTF-8是1~4个字节不等;
// 这里,根据UTF-8首字节的范围,判断UTF-8是几个字节的。
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
}

if (utflen > 65535)
throw new UTFDataFormatException(
"encoded string too long: " + utflen + " bytes");

// 新建“字节数组bytearr”
byte[] bytearr = null;
if (out instanceof DataOutputStream) {
DataOutputStream dos = (DataOutputStream)out;
if(dos.bytearr == null || (dos.bytearr.length < (utflen+2)))
dos.bytearr = new byte[(utflen*2) + 2];
bytearr = dos.bytearr;
} else {
bytearr = new byte[utflen+2];
}

// “字节数组”的前2个字节保存的是“UTF-8数据的长度”
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);

// 对UTF-8中的单字节数据进行预处理
int i=0;
for (i=0; i<strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) break;
bytearr[count++] = (byte) c;
}

// 对预处理后的数据,接着进行处理
for (;i < strlen; i++){
c = str.charAt(i);
// UTF-8数据是1个字节的情况
if ((c >= 0x0001) && (c <= 0x007F)) {
bytearr[count++] = (byte) c;

} else if (c > 0x07FF) {
// UTF-8数据是3个字节的情况
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
} else {
// UTF-8数据是2个字节的情况
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
// 将字节数组写入到“数据输出流”中
out.write(bytearr, 0, utflen+2);
return utflen + 2;
}

10. PrintStream

  • PrintStream 是打印输出流,它继承于FilterOutputStream。
  • PrintStream 是用来装饰其它输出流。它能为其他输出流添加了功能,使它们能够方便地打印各种数据值表示形式。
  • 与其他输出流不同,PrintStream 永远不会抛出 IOException;它产生的IOException会被自身的函数所捕获并设置错误标记, 用户可以通过 checkError() 返回错误标记,从而查看PrintStream内部是否产生了IOException。
  • PrintStream 提供了自动flush 和 字符集设置功能。自动flush,就是往PrintStream写入的数据会立刻调用flush()函数。
  • print()方法实际上调用的是write()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void print(int i) {
write(String.valueOf(i)); //转为string
}

private void write(String s) {
try {
synchronized (this) {
ensureOpen();
textOut.write(s);
textOut.flushBuffer();
charOut.flushBuffer();
if (autoFlush && (s.indexOf('\n') >= 0))
out.flush();
}
}
catch (InterruptedIOException x) {
Thread.currentThread().interrupt();
}
catch (IOException x) {
trouble = true; //不会抛出IOException, checkError()可查看是否发生异常
}
}
  • System中的in,out,err
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final static InputStream in = null;
public final static PrintStream out = null;
public final static PrintStream err = null;

private static native void setIn0(InputStream in);
private static native void setOut0(PrintStream out);
private static native void setErr0(PrintStream err);

private static void initializeSystemClass() {

...

FileInputStream fdIn = new FileInputStream(FileDescriptor.in);
FileOutputStream fdOut = new FileOutputStream(FileDescriptor.out);
FileOutputStream fdErr = new FileOutputStream(FileDescriptor.err);
setIn0(new BufferedInputStream(fdIn));
setOut0(newPrintStream(fdOut, props.getProperty("sun.stdout.encoding")));
setErr0(newPrintStream(fdErr, props.getProperty("sun.stderr.encoding")));

...
}

private static PrintStream newPrintStream(FileOutputStream fos, String enc) {
if (enc != null) {
try {
return new PrintStream(new BufferedOutputStream(fos, 128), true, enc);
} catch (UnsupportedEncodingException uee) {}
}
return new PrintStream(new BufferedOutputStream(fos, 128), true);
}

以out为例,获取过程:

  1. 首先获取标准输出(屏幕)的标识符out(FileDescriptor对象)
  2. 创建“标准输出(屏幕)”对应的“文件输出流”
  3. 创建“文件输出流”对应的“缓冲输出流”。目的是为“文件输出流”添加“缓冲”功能。
  4. 创建“缓冲输出流”对应的“打印输出流”。目的是为“缓冲输出流”提供方便的打印接口,如print(), println(), printf();使其能方便快捷的进行打印输出
  5. 执行setOut0(ps) ,将ps设置为out静态成员变量

11 参考

http://www.cnblogs.com/skywang12345/p/io_01.html

【Java多线程】JUC锁 10. Semaphore

发表于 2018-08-04 | 分类于 Java多线程 , JUC锁

Semaphore

1. 前言

  • Semaphore是一个计数信号量,它的本质是一个共享锁。
  • 线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
  • Semaphore包含Sync对象,有公平信号量和非公平信号量之分

2. 源码解析

2.1 数据结构

同ReentrantLock一样,Semaphore包含Sync对象,有公平信号量和非公平信号量之分,默认非公平信号量

2.2 内部类

  • FairSync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断“当前线程”是不是CLH队列中的第一个线程线程,
// 若是的话,则返回-1。
if (hasQueuedPredecessors())
return -1;
// 可以获得的信号量的许可数
int available = getState();
// 获得acquires个信号量许可之后,剩余的信号量许可数
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
  • NonfairSync
1
2
3
4
5
6
7
8
9
10
11
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

nonfairTryAcquireShared 在Sync中定义:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

公平信号量和非公平信号量的获取信号量机制不同:对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。

而释放信号量的机制是相同的。

2.3 核心方法

  • Semaphore()构造方法
1
2
3
4
5
6
public Semaphore(int permits) {
sync = new NonfairSync(permits); //默认非公平锁
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

是通过Sync来设置锁计数

1
2
3
Sync(int permits) {
setState(permits);
}
  • acquire()信号量获取
1
2
3
4
5
6
7
8
9
10
11
// 从信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

// 从信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

实际上是调用的AQS中的acquireSharedInterruptibly()。

1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,则抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
// 否则,尝试获取“共享锁”;获取成功则直接返回,获取失败,则通过doAcquireSharedInterruptibly()获取。
if (tryAcquireShared(arg) < 0) //公平信号量和非公平信号量区别
doAcquireSharedInterruptibly(arg);
}

doAcquireSharedInterruptibly()使当前线程等待,直到获取共享锁或被中断才返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • release()信号量释放
1
2
3
4
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared()方法在AQS中定义

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //尝试释放共享锁
doReleaseShared(); //释放共享锁
return true;
}
return false;
}

tryReleaseShared()在Sync中定义,公平信号量和非公平信号量机制相同

1
2
3
4
5
6
7
8
9
10
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState(); //当前可获取的信号量数
int next = current + releases; //释放releases个信号后,信号量数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //cas改变信号量state
return true;
}
}

doReleaseShared()释放共享锁。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“头节点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3534050.html

【Java多线程】JUC锁 09. CyclicBarrier

发表于 2018-08-04 | 分类于 Java多线程 , JUC锁

CyclicBarrier

1. 前言

  • 同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点
  • 计数器可重置后使用
  • CyclicBarrier 和 CountDownLatch的区别:
CountDownLatch CyclicBarrier
减计数方式 加计数方式
计算为0时释放所有等待的线程 计数达到指定值时释放所有等待线程
计数为0时,无法重置 计数达到指定值时,计数置为0重新开始
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞
不可重复利用 可重复利用

2. 源码解析

2.1 数据结构

CyclicBarrier 包含 ReentrantLock对象Lock和Condition对象trip,由独占锁ReentrantLock实现。

2.2 成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties; //必须同时到达barrier的线程数
/* The command to run when tripped */
private final Runnable barrierCommand; //到达屏障所执行的操作
/** The current generation */
private Generation generation = new Generation(); //内部类,同一批线程属于同一代(到达屏障后重置)
//处于等待的parties数目
private int count;

private static class Generation {
boolean broken = false;
}

2.3 核心方法

  • CyclicBarrier()构造方法
1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}
//有屏障操作的构造方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
  • await()
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await()在dowait()中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//timed - 是否设置超时时间
//让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取“独占锁(lock)”
lock.lock();
try {
// 保存“当前的generation”
final Generation g = generation;

// 若“当前generation已损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();

// 如果当前线程被中断,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 将“count计数器”-1
int index = --count;
// 如果index=0,则意味着“有parties个线程到达barrier”。
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果barrierCommand不为null,则执行该动作。
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 唤醒所有等待线程,并更新generation。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// 当前线程一直阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, 当前线程才继续执行。
for (;;) {
try {
// 如果不是“超时等待”,则调用awati()进行等待;否则,调用awaitNanos()进行等待。
if (!timed)
trip.await(); //放入AQS等待队列
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果等待过程中,线程被中断,则执行下面的函数。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

// 如果“当前generation已经损坏”,则抛出异常。
if (g.broken)
throw new BrokenBarrierException();

// 如果“generation已经换代”,则返回index。
if (g != generation)
return index;

// 如果是“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier,唤醒CyclicBarrier中所有等待线程。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放“独占锁(lock)”
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//更新换代,唤醒所有等待的线程
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

//中断Barrier
private void breakBarrier() {
generation.broken = true; //将该Generation中断
count = parties; //重置count
trip.signalAll(); //唤醒所有等待的线程
}

dowait() 流程:

  • 如果当前线程被中断,即Thread.interrupted()为true;则通过breakBarrier()终止CyclicBarrier
  • 将count -1,判断是否有parties个线程到达Barrier;如果没到达,线程进行等待,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生, 当前线程才继续执行
  • 如果所有线程都达到Barrier,有设置barrierCommand,则执行barrierCommand,然后唤醒所有等待的线程,更新generation

3. 参考

http://www.cnblogs.com/skywang12345/p/3533995.html

【Java多线程】JUC锁 08. CountDownLatch

发表于 2018-08-03 | 分类于 Java多线程 , JUC锁

CountDownLatch

1. 前言

  • 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
  • 计数器无法重置

2. 源码解析

2.1 数据结构

CountDownLatch包含Sync对象,Sync继承于AQS,底层使用AQS共享锁实现

2.2 核心方法

  • CountDownLatch()构造方法
1
2
3
4
5
6
7
8
9
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

//Sync中实现,设置锁状态为count, "锁计数器"
Sync(int count) {
setState(count);
}
  • await()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//AQS获取共享锁
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

//Sync中实现,返回锁计数是否为0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//让当前线程一直等待,直至获取共享锁或中断才返回
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
  • countDown()释放锁
1
2
3
4
5
6
7
8
9
10
11
12
public void countDown() {
sync.releaseShared(1);
}

//AQS中实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3533887.html

【Java多线程】Volatile关键字

发表于 2018-08-01 | 分类于 Java多线程 , 其他

volatile

1. 前言

  • 最轻量级的同步机制
  • 具有对所有线程具有可见性,但不具有原子性
  • 禁止指令重排序优化

适用场景:

  • 运算结果不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值
  • 变量不需要与其他的状态变量共同参与不变约束

2. 原理

处理器为了提高运行速度,不直接与内存进行通讯,而是通过先将内存中的数据读到内部缓存再进行操作,操作完后写入内存时间不确定

对volitale变量写操作时候,JVM会发送一条lock前缀的指令,lock前缀指令会引起处理器缓存回写到内存,将这个变量缓存行的数据重新写回到内存

为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议(MESI协议等),每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。

缓存一致性机制会阻止同时修改被两个以上处理器缓存的内存区域数据。

【Java多线程】JUC锁 07. StampedLock

发表于 2018-07-31 | 分类于 Java多线程 , JUC锁

StampedLock

1. 前言

  • 对读写锁的改进,读多写少的情况下,可能造成写线程遭遇饥饿问题
  • StampedLock控制锁有三种模式(写,读,乐观读),一个StampedLock状态是由版本和模式两个部分组成
  • StampedLockd的内部实现是基于CLH锁的,CLH锁原理:锁维护着一个等待线程队列,所有申请锁且失败的线程都记录在队列。一个节点代表一个线程,保存着一个标记位locked,用以判断当前线程是否已经释放锁。当一个线程试图获取锁时,从队列尾节点作为前序节点,循环判断所有的前序节点是否已经成功释放锁。
  • 锁不可重入,不支持Condition条件

2. 源码分析

2.1 数据结构

2.2 原理

内部类 WNode:

1
2
3
4
5
6
7
8
9
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // 读模式使用该节点形成栈
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}

StampedLockd中内部类WNote就是等待链表队列,每一个WNode标识一个等待线程,whead为CLH队列头,wtail为CLH队列尾,state为锁的状态。long型即64位,倒数第八位标识写锁状态,如果为1,标识写锁占用!下面围绕这个state来讲述锁操作。

常量标识:

WBIT=1000 0000(即-128)写锁第8位为1

RBIT =0111 1111(即127) 读锁前7位累加

SBIT =1000 0000(后7位表示当前正在读取的线程数量,清0)

  • 写锁writeLock

writeLock是一个独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这跟ReentrantReadWriteLock 的写锁很相似,不过要注意的是StampedLock的写锁是不可重入锁,当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp 票据变量来表示该锁的版本

  • 悲观锁readLock

readLock是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁;如果已经有线程持有写锁,其他线程请求获取该锁会被阻塞,这类似ReentrantReadWriteLock 的读锁(不同在于这里的读锁是不可重入锁)

这里说的悲观是指在具体操作数据前,悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量来表示该锁的版本

  • 乐观读锁 tryOptimisticRead

在操作数据前并没有通过 CAS 设置锁的状态,仅仅是通过位运算测试;如果当前没有线程持有写锁,则简单的返回一个非 0 的 stamp 版本信息,获取该 stamp 后在具体操作数据前还需要调用 validate 验证下该 stamp 是否已经不可用,也就是看当调用 tryOptimisticRead 返回 stamp 后,到当前时间是否有其它线程持有了写锁,如果是那么 validate 会返回 0,否者就可以使用该 stamp 版本的锁对数据进行操作。

由于 tryOptimisticRead 并没有使用 CAS 设置锁状态,所以不需要显示的释放该锁。

该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及 CAS 操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其它写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的

3. 参考

https://www.cnblogs.com/huangjuncong/p/9191760.html?utm_source=debugrun&utm_medium=referral

123…5
JumpsZ

JumpsZ

42 日志
10 分类
8 标签
© 2018 JumpsZ
All rights reserved
|
本站访客数: