代码拉取完成,页面将自动刷新
同步操作将从 icanci/Java-Review 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
LongAdder类的作用是为了在高并发下完成统计的一个类
为什么不使用AtomicLong系列的类呢?
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder() {
}
// 自增自减核心方法
public void add(long x) {
// as 表示cells引用
// b 表示base值
// v 表示期望值
// m 表示数组长度
// a 表示当前线程命中的单元格
Cell[] as; long b, v; int m; Cell a;
// 条件1:true 表示cells已经初始化过了 当前线程应该将数据写入到对应的cell中
// fasle 表示cells未初始化 当前所有线程应该将数据写到base中
// 条件2:!true 表示当前线程CAS替换数据成功
// !false 表示发生竞争了,可能需要重试或者扩容
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 什么时候会进来
// 条件1:true 表示cells已经初始化过了 当前线程应该将数据写入到对应的cell中
// 条件2:false 表示发生竞争了,可能需要重试或者扩容
// true 表示未发生竞争 false 发生竞争
boolean uncontended = true;
// 条件1:true cells未初始化 也就是多线程写base发生竞争了
// false 已经初始化 当前线程应该找自己的cell写值
// 条件2:getProbe()获取当前线程的hash值 m 表示 cells 长度-1 cells 的长度一定是2的次方数
// true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
// false 说明当前线程对应的cell不为空,说明下一步需要将x值添加到cells中
// 条件3:true CAS失败 意味着当前线程对应的cell 有竞争
// false 表示CAS成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 哪些情况会调用?
// 1.true 说明cells未初始化,也就是多线程写base发生竞争了 [重试 | 初始化cells]
// 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
// 3.true CAS失败 意味着当前线程对应的cell 有竞争
longAccumulate(x, null, uncontended);
}
}
// 自增1
public void increment() {
add(1L);
}
// 自减1
public void decrement() {
add(-1L);
}
// 拿到真正的值
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
// 清空cells的值
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
// 获取reset之后的值
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
// 获取值
public long longValue() {
return sum();
}
public int intValue() {
return (int)sum();
}
public float floatValue() {
return (float)sum();
}
public double doubleValue() {
return (double)sum();
}
}
abstract class Striped64 extends Number {
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
// 获取CPU的核心数
// 用来控制cells的长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
// cell 数组
transient volatile Cell[] cells;
// 没有发生竞争的时候的值,数据会类加
// 或者cells扩容时候,需要将数据写入到base中
transient volatile long base;
// 初始化cells或者扩容cells都需要获取锁,0表示无锁状态 1表示其他线程已经持有锁
transient volatile int cellsBusy;
Striped64() {
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
// 通过CAS的方式获取锁
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
// 获取当前线程的hash值
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
// 重置当前线程的hash值
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
// 哪些情况会调用?
// 1.true 说明cells未初始化,也就是多线程写base发生竞争了 [重试 | 初始化cells]
// 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
// 3.true CAS失败 意味着当前线程对应的cell 有竞争
// x:就是add方法的增量
// fn:是一个接口,可以实现然后拓展算法
// wasUnconteded:是否真正发生竞争 只有cells初始化之后,并且当前线程竞争修改失败 才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
// 线程的hash值
int h;
// 如果当前线程的hash值为0,那就是还没有为当前线程分配哈希值
if ((h = getProbe()) == 0) {
// 给当前线程分配hash值
ThreadLocalRandom.current(); // force initialization
// 取出当前线程的hash值,赋值给h
h = getProbe();
// 因为默认情况下,当前线程肯定是写到了 cells[0]的位置,不把它当作是一次真正的竞争
wasUncontended = true;
}
// 表示扩容意向 如果是false一定不扩容
// 如果是true则可能扩容
boolean collide = false; // True if last slot nonempty
// 自旋
for (;;) {
// as 表示cells 引用
// a 表示当前线程命中的 cell
// n 表示cells数组长度
// v 表示期望值
Cell[] as; Cell a; int n; long v;
// CASE1:表示cells已经初始化了,当前线程应该是将数据写入到对应的cell中
if ((as = cells) != null && (n = as.length) > 0) {
// 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
// 3.true CAS失败 意味着当前线程对应的cell 有竞争
// CASE1.1:true 表示当前线程对应的线程下标位置的cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {
// true 表示当前锁未被占用 false 表示锁被占用
if (cellsBusy == 0) { // Try to attach new Cell
// 拿当前的 x 创建Cell
Cell r = new Cell(x); // Optimistically create
// 条件1:true 表示当前锁未被占用 false 表示锁被占用
// 条件2:casCellsBusy 为true获取锁成功
if (cellsBusy == 0 && casCellsBusy()) {
// 是否创建成功 标记
boolean created = false;
try { // Recheck under lock
// rs 当前cells引用
// m cells长度
// j 当前线程命中的下标
Cell[] rs; int m, j;
// 条件1条件2恒成立
// 条件3:rs[j = (m - 1) & h] == null
// 为了防止其他线程初始化过该位置,当前线程再次初始化位置
// 导致修改数据
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
// 设置为无锁状态
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
// 强制改为false
collide = false;
}
// CASE1.2
// 只有一种情况
// wasUnconteded:是否真正发生竞争
// 只有cells初始化之后,并且当前线程竞争修改失败 才会是false
else if (!wasUncontended) // CAS already known to fail
//
wasUncontended = true; // Continue after rehash
// CASE1.3
// 当前线程rehash过hash值,然后新命中的cell
// true 写成功 退出循环
// false 写失败
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// CASE1.4
// 条件1:N>NCPU true 扩容意向,改为false,表示不扩容了, false 说明还是可以扩容的
// 条件2:cells!=as true就是其他线程已经扩容过了,就开始重试
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// CASE1.5
// !collide = true 表示需要扩容 设置扩容意向为 true 但是不一样扩容
else if (!collide)
collide = true;
// CASE1.6:扩容的代码
// 条件1:cellBusy == 0 true 表示当前无锁状态,线程可以去竞争
// 条件2:casCellBusy true 表示当前线程获取锁成功 可以执行扩容逻辑
// false 表示当前时刻有其他线程再执行扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//
if (cells == as) { // Expand table unless stale
// 扩容机制 扩容2倍
Cell[] rs = new Cell[n << 1];
// 赋值
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 释放锁
cellsBusy = 0;
}
//
collide = false;
continue; // Retry with expanded table
}
// 重置hash值
h = advanceProbe(h);
}
// CASE2:前置条件cells还未初始化 as 为null
// 条件1:true 表示当前未加锁
// 条件2:cell == null 因为其他线程可能会在你给as赋值之后修改了
// 条件3:true 表示获取锁成功 会把 cellBusy = 1,false表示其他线程正在持有这把锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
// 防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// CASE3:
// 1.当前cellBusy 加锁状态 表示其他线程正在初始化cells
// 2.cells被其他线程初始化后,当前线程需要累加到base
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。