1 Star 0 Fork 165

shk / Java-Review

forked from icanci / Java-Review 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Java-多线程-增强篇-LongAdder.md 14.35 KB
一键复制 编辑 原始数据 按行查看 历史
icanci 提交于 2020-09-07 23:09 . :fire:更新文件夹

Java - 多线程 - 增强篇 - LongAdder

LongAdder类的作用是为了在高并发下完成统计的一个类

为什么不使用AtomicLong系列的类呢?

  • 因为AtomicLong底层使用的是CAS算法,在高并发的情况下,参与争抢的线程数目越多,每个线程获取锁的几率就越小,此时就会出现出现漫长的等待。严重损耗性能

LongAdder 类的原理

1597056342681

LongAdder 类的源码分析

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;

    public LongAdder() {
    }

    // 自增自减核心方法
    public void add(long x) {
        // as 表示cells引用
        // b 表示base值
        // v 表示期望值
        // m 表示数组长度
        // a 表示当前线程命中的单元格
        Cell[] as; long b, v; int m; Cell a;
        // 条件1:true 表示cells已经初始化过了 当前线程应该将数据写入到对应的cell中
        //       fasle 表示cells未初始化 当前所有线程应该将数据写到base中
        // 条件2:!true 表示当前线程CAS替换数据成功 
        //       !false 表示发生竞争了,可能需要重试或者扩容
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            // 什么时候会进来
            // 条件1:true 表示cells已经初始化过了 当前线程应该将数据写入到对应的cell中
            // 条件2:false 表示发生竞争了,可能需要重试或者扩容

            // true 表示未发生竞争 false 发生竞争
            boolean uncontended = true;
            // 条件1:true cells未初始化 也就是多线程写base发生竞争了
            //       false 已经初始化  当前线程应该找自己的cell写值
            // 条件2:getProbe()获取当前线程的hash值 m 表示 cells 长度-1 cells 的长度一定是2的次方数
            //       true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
            //       false 说明当前线程对应的cell不为空,说明下一步需要将x值添加到cells中
            // 条件3:true CAS失败 意味着当前线程对应的cell 有竞争
            //        false 表示CAS成功
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                // 哪些情况会调用?
                // 1.true 说明cells未初始化,也就是多线程写base发生竞争了 [重试 | 初始化cells]
                // 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
                // 3.true CAS失败 意味着当前线程对应的cell 有竞争
                longAccumulate(x, null, uncontended);
        }
    }

    // 自增1
    public void increment() {
        add(1L);
    }

    // 自减1
    public void decrement() {
        add(-1L);
    }

    // 拿到真正的值
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    // 清空cells的值
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }
    // 获取reset之后的值
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }

    // 获取值
    public long longValue() {
        return sum();
    }

    public int intValue() {
        return (int)sum();
    }
    public float floatValue() {
        return (float)sum();
    }

    public double doubleValue() {
        return (double)sum();
    }
}

Striped64类的源码分析

abstract class Striped64 extends Number {
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    // 获取CPU的核心数 
    // 用来控制cells的长度
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    // cell 数组
    transient volatile Cell[] cells;

    // 没有发生竞争的时候的值,数据会类加
    // 或者cells扩容时候,需要将数据写入到base中
    transient volatile long base;

    // 初始化cells或者扩容cells都需要获取锁,0表示无锁状态 1表示其他线程已经持有锁
    transient volatile int cellsBusy;

    Striped64() {
    }

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }
    // 通过CAS的方式获取锁
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    // 获取当前线程的hash值
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

    // 重置当前线程的hash值
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

    // 哪些情况会调用?
    // 1.true 说明cells未初始化,也就是多线程写base发生竞争了 [重试 | 初始化cells]
    // 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
    // 3.true CAS失败 意味着当前线程对应的cell 有竞争

    // x:就是add方法的增量
    // fn:是一个接口,可以实现然后拓展算法
    // wasUnconteded:是否真正发生竞争 只有cells初始化之后,并且当前线程竞争修改失败 才会是false
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        // 线程的hash值
        int h;
        // 如果当前线程的hash值为0,那就是还没有为当前线程分配哈希值
        if ((h = getProbe()) == 0) {
            // 给当前线程分配hash值
            ThreadLocalRandom.current(); // force initialization
            // 取出当前线程的hash值,赋值给h
            h = getProbe();
            // 因为默认情况下,当前线程肯定是写到了 cells[0]的位置,不把它当作是一次真正的竞争
            wasUncontended = true;
        }
        // 表示扩容意向 如果是false一定不扩容
        // 如果是true则可能扩容
        boolean collide = false;                // True if last slot nonempty
        // 自旋
        for (;;) {
            // as 表示cells 引用
            // a 表示当前线程命中的 cell
            // n 表示cells数组长度
            // v 表示期望值
            Cell[] as; Cell a; int n; long v;
            // CASE1:表示cells已经初始化了,当前线程应该是将数据写入到对应的cell中
            if ((as = cells) != null && (n = as.length) > 0) {
                // 2.true 说明当前线程对应下面的cell为空,需要创建longAccumulate支持
                // 3.true CAS失败 意味着当前线程对应的cell 有竞争

                // CASE1.1:true 表示当前线程对应的线程下标位置的cell为null,需要创建new Cell
                if ((a = as[(n - 1) & h]) == null) {
                    // true 表示当前锁未被占用 false 表示锁被占用
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        // 拿当前的 x 创建Cell
                        Cell r = new Cell(x);   // Optimistically create
                        // 条件1:true 表示当前锁未被占用 false 表示锁被占用
                        // 条件2:casCellsBusy 为true获取锁成功
                        if (cellsBusy == 0 && casCellsBusy()) {
                            // 是否创建成功 标记
                            boolean created = false;
                            try {               // Recheck under lock
                                // rs 当前cells引用
                                // m cells长度
                                // j 当前线程命中的下标
                                Cell[] rs; int m, j;
                                // 条件1条件2恒成立
                                // 条件3:rs[j = (m - 1) & h] == null
                                // 为了防止其他线程初始化过该位置,当前线程再次初始化位置
                                // 导致修改数据
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                // 设置为无锁状态
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    // 强制改为false
                    collide = false;
                }
                // CASE1.2
                // 只有一种情况
                // wasUnconteded:是否真正发生竞争 
                // 只有cells初始化之后,并且当前线程竞争修改失败 才会是false
                else if (!wasUncontended)       // CAS already known to fail
                    // 
                    wasUncontended = true;      // Continue after rehash
                // CASE1.3
                // 当前线程rehash过hash值,然后新命中的cell
                // true 写成功 退出循环
                // false 写失败
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                // CASE1.4
                // 条件1:N>NCPU true 扩容意向,改为false,表示不扩容了, false 说明还是可以扩容的
                // 条件2:cells!=as true就是其他线程已经扩容过了,就开始重试
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                // CASE1.5
                // !collide = true 表示需要扩容 设置扩容意向为 true  但是不一样扩容
                else if (!collide)
                    collide = true;
                // CASE1.6:扩容的代码
                // 条件1:cellBusy == 0 true 表示当前无锁状态,线程可以去竞争
                // 条件2:casCellBusy true 表示当前线程获取锁成功 可以执行扩容逻辑
                //                   false 表示当前时刻有其他线程再执行扩容
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        // 
                        if (cells == as) {      // Expand table unless stale
                            // 扩容机制 扩容2倍
                            Cell[] rs = new Cell[n << 1];
                            // 赋值
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        // 释放锁
                        cellsBusy = 0;
                    }
                    // 
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                // 重置hash值
                h = advanceProbe(h);
            }
            // CASE2:前置条件cells还未初始化 as 为null 
            // 条件1:true 表示当前未加锁
            // 条件2:cell == null 因为其他线程可能会在你给as赋值之后修改了
            // 条件3:true 表示获取锁成功 会把 cellBusy = 1,false表示其他线程正在持有这把锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    // 防止其他线程已经初始化了,当前线程再次初始化,导致丢失数据
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // CASE3:
            // 1.当前cellBusy 加锁状态 表示其他线程正在初始化cells
            // 2.cells被其他线程初始化后,当前线程需要累加到base
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
}

总结

  • LongAdder 是一种类似分治的思想,通过上面的图可以明显看出
  • 但是如果业务场景需要的是高精度,没有任何数据偏差,就不能使用这个类
1
https://gitee.com/shokaku/Java-Review.git
git@gitee.com:shokaku/Java-Review.git
shokaku
Java-Review
Java-Review
master

搜索帮助