Steve's Blog

Talk is cheap, show me the code.

0%

LongAdder

当原子类竞争很激烈时,使用CAS性能将变低,JDK提供了一个高性能的支持并发的计数器,LongAdder。

1. Striped64

2. LongAdder

a. 常量变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Number of CPUS, to place bound on table size */
// 当前机器CPU的核数
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
* Table of cells. When non-null, size is a power of 2.
* cell数组,当base发生竞争是,使用cells进行增加
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
* 默认新增时使用的数字,相当于AtomicLong 的value
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
* cell数组用的锁,默认为0,当cells数组初始化或者扩容时为1
*/
transient volatile int cellsBusy;

b. add()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
     public void add(long x) {
// as : cell数组
// b : base的值
// v : slot下Cell元素修改前的值
// m : cells数组长度 - 1
// a : slot下Cell元素
Cell[] as; long b, v; int m; Cell a;
// 条件一:(as = cells) != null
// true -> cells数组不为空,进入代码块
// false -> cells数组为空,此时进入条件二
// 条件二:!casBase(b = base, b + x)
// true -> 更新base失败,进入代码块
// false -> 更新base成功,完成更新
// 有两种情况进入代码块:
// 1. cells数组不为空;
// 2. cells数组为空,且CAS更新base失败
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended表示对于cells数组内元素是否有竞争,true表示无竞争,false表示有竞争
boolean uncontended = true;
// 条件一二:as == null || (m = as.length - 1) < 0
// true -> cells数组为空,进入代码块,初始化数组
// false -> cells数组不为空,进行下一个条件判断
// 条件三:(a = as[getProbe() & m]) == null
// true -> cells数组对应的slot为空,进行初始化
// false -> cells数组对应的slot不为空,进行下一个条件判断
// 条件四:!(uncontended = a.cas(v = a.value, v + x))
// 通过CAS的方式对对slot中的cell元素中value数值进行更新
// true -> 更新失败,表示有竞争,uncontended设置为false,进入代码块
// true -> 更新成功,表示没竞争,uncontended设置为true,结束
// 有三种情况调用longAccumulate方法
// 1. cells数组为空(前置条件是:针对base的CAS操作失败了)
// 2. 当前线程对应的slot的cell元素为null
// 3. slot中cell元素不为空,但是CAS更新失败
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

涉及到了几个方法

  • casBase()方法
  • getProbe()方法
  • cell.cas()方法
  • longAccumulate()方法

a. casBase()方法

1
2
3
4
final boolean casBase(long cmp, long val) {
// 使用CAS更新base的值
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

b. getProbe()方法
获取当前线程的probe值
1
2
3
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

c. cell.cas()方法
1
2
3
4
5
final boolean cas(long cmp, long val) {
// valueOffset是Cell类的value变量在类中的地址偏移量
// 通过CAS的方式对cell元素中value数值进行更新
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

c. longAccumulate()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* 有三种情况调用longAccumulate方法
* 1. cells数组为空(前置条件是:针对base的CAS操作失败了)
* 2. 当前线程对应的slot的cell元素为null
* 3. slot中cell元素不为空,但是CAS更新失败
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// h : 当前线程的probe值,计算slot下标用
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 如果线程probe还未初始化,则说明slot数组还未初始化,之前对于slot肯定无竞争
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
// as : cell数组
// a : 要处理的cell
// n : cells数组长度
// v : CAS更新前的值
Cell[] as; Cell a; int n; long v;
// CASE1 : 数组不为空时进入if代码块,有两种情况:slot下cell为空或者cell不为空且CAS更新cell失败了
if ((as = cells) != null && (n = as.length) > 0) {
// CASE1.1 当前slot为null
if ((a = as[(n - 1) & h]) == null) {
// 当前没有线程拿到cells的写锁,返回false表示数组正在初始化
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 尝试获取cells数组的写锁
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 拿到锁之后,再次判断cells数组是否有更新,有更新表示其他线程对数据进行了操作,不再创建
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 为slot赋值新的Cell元素
rs[j] = r;
created = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
// created == true表示插入新的Cell成功,直接退出自旋
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE1.2 当前slot不为null,且之前有竞争,此时将wasUncontended设置为true,再自旋一次
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 当前slot不为null,且之前无竞争,尝试通过CAS更新slot下Cell的value,更新成功则退出自旋
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果cells数组的长度达到了CPU核心数,或者cells扩容了
// 设置collide为false并通过下面的语句修改线程的probe再重新尝试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// CAS更新slot下cell失败,且cell数组未扩容,数组长度未达到CPU核心数,再自旋一次
else if (!collide)
collide = true;
// slot下cell不为空且之前碰撞了一次,尝试拿cells数组的写锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 拿到锁之后再次进行判断是否有其它线程修改数组
if (cells == as) { // Expand table unless stale
// 对cells数组进行扩容,扩容后的数组长度为原来的2倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
collide = false;
// 进入下一次自旋,再次尝试写入cells数组
continue; // Retry with expanded table
}
// 修改线程的probe再重新尝试
// 有3种情况会进入这里:
// 1. wasUncontended=false: 第一次进入longAccumulate且CAS更新slot下的cell元素失败
// 2. 数组正在初始化
// 3. n >= NCPU || cells != as:数组长度达到CPU核心数或者数组扩容了
// 4. collide = false :CAS更新slot下cell失败,且cell数组未扩容,数组长度未达到CPU核心数,再自旋一次
h = advanceProbe(h);
}
// CASE2: cells数组为null,cellsBusy为0表示cells数组未加锁,如果CAS获得cells数组的写锁则进行初始化操作
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 拿到锁之后再次进行判断,防止其他线程已经完成初始化
if (cells == as) {
// 初始化一个长度为2的cells数组并且插入一个Cell元素
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 处理完后释放锁
cellsBusy = 0;
}
if (init)
// init == true表示已经更新完毕,直接退出自旋
break;
}
// CASE3: 数组为空且未拿到cells的写锁,再次尝试CAS写base,写成功退出自旋,否则继续自旋
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

casCellsBusy()方法

1
2
3
4
  final boolean casCellsBusy() {
// 尝试获取cells数组的写锁
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}