同步操作将从 icanci/Java-Review 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
首先强调一点:Java多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁。同时,类锁也是对象锁,类是Class对象
现在的需求如下:两个线程打印,判断先发短信还是先打电话
总结
synchronized 是Java提供的关键字,用来保证原子性的
synchronized的作用域如下
synchronized的作用主要有下面3个
这里又涉及到了新的知识和一个关键字:volatile
先看一段简单的代码
package cn.icanci.lock.synchrinized;
/**
* @author: icanci
* @date: Created in 2020/10/22 10:06
* @classAction: Synchronized 底层讲解
*/
@SuppressWarnings("all")
public class SynchronizedTest {
public static void main(String[] args) {
test1();
test2();
}
/**
* 使用synchronized修饰的方法
*/
public synchronized static void test1() {
System.out.println("SynchronizedTest.test1");
}
/**
* 使用synchronized修饰的代码块
*/
public static void test2() {
synchronized (SynchronizedTest.class) {
System.out.println("SynchronizedTest.test2");
}
}
}
执行之后,对其进行执行javap -v
命令反编译
// 省略暂时无用的代码
{
public cn.icanci.lock.synchrinized.SynchronizedTest();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 9: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lcn/icanci/lock/synchrinized/SynchronizedTest;
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
// flags:表示当前方法拥有的标记位 ACC_PUBLIC为public修饰 ACC_STATIC 表示为 static修饰
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=0, locals=1, args_size=1
0: invokestatic #2 // Method test1:()V
3: invokestatic #3 // Method test2:()V
6: return
LineNumberTable:
line 11: 0
line 12: 3
line 13: 6
LocalVariableTable:
Start Length Slot Name Signature
0 7 0 args [Ljava/lang/String;
public static synchronized void test1();
descriptor: ()V
// flags:test1() 这个方法添加了synchronized关键字,它的标志位添加了 ACC_SYNCHRONIZED
// 表示这个方法为原子方法,那么当线程进来的时候,会先检查是否有这样的一个标记
//
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=0, args_size=0
0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String SynchronizedTest.test1
5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 19: 0
line 20: 8
public static void test2();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=2, args_size=0
0: ldc #7 // class cn/icanci/lock/synchrinized/SynchronizedTest
2: dup
3: astore_0
4: monitorenter
5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #8 // String SynchronizedTest.test2
10: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_0
14: monitorexit
15: goto 23
18: astore_1
19: aload_0
20: monitorexit
21: aload_1
22: athrow
23: return
Exception table:
from to target type
5 15 18 any
18 21 18 any
LineNumberTable:
line 26: 0
line 27: 5
line 28: 13
line 29: 23
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 18
locals = [ class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
}
SourceFile: "SynchronizedTest.java"
Process finished with exit code 0
对于上面的指令monitorenter
和 monitorexit
作用,我们直接参考JVM规范中描述
monitorenter
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor's entry count is zero, then tries again to gain ownership.
这段话的大概意思为:
monitorexit
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.
这段话的大概意思为:
执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
但是新的问题来了:为什么要有两个 monitorexit呢?
synchronized修饰代码块的结论
通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。
synchronized修饰方法的结论
而针对同步方法,并没有通过指令monitorenter和monitorexit来完成(理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标示符。JVM就是根据该标示符来实现方法的同步的:当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。
通过上面的分析我们知道,Synchronized是通过对象内部的一个叫做监视器锁(monitor)来实现的。但是监视器锁本质又是依赖于底层的操作系统的Mutex Lock来实现的。而操作系统实现线程之间的切换这就需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么Synchronized效率低的原因。因此,这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”。JDK中对Synchronized做的种种优化,其核心都是为了减少这种重量级锁的使用。JDK1.6以后,为了减少获得锁和释放锁所带来的性能消耗,提高性能,引入了“轻量级锁”和“偏向锁”。
内核互斥锁的定义
/linux/include/linux/mutex.h
作用及访问规则
互斥锁主要用于实现内核中的互斥访问功能。内核互斥锁是在原子 API 之上实现的,但这对于内核用户是不可见的。
对它的访问必须遵循一些规则:同一时间只能有一个任务持有互斥锁,而且只有这个任务可以对互斥锁进行解锁。互斥锁不能进行递归锁定或解锁。一个互斥锁对象必须通过其API初始化,而不能使用memset或复制初始化。一个任务在持有互斥锁的时候是不能结束的。互斥锁所使用的内存区域是不能被释放的。使用中的互斥锁是不能被重新初始化的。并且互斥锁不能用于中断上下文。
互斥锁比当前的内核信号量选项更快,并且更加紧凑。
各字段详解
操作
struct mutex mutex;
mutex_init(&mutex);
直接定义互斥锁mutex并初始化为未锁定,即count为1,wait_lock为未上锁,等待队列wait_list为空。
获取互斥锁:
代码:linux/kernel/mutex.c
void inline fastcall __sched mutex_lock(struct mutex *lock); //获取互斥锁。
实际上是先给count做自减操作,然后使用本身的自旋锁进入临界区操作。首先取得count的值,在将count置为-1,判断如果原来count的置为1,也即互斥锁可以获得,则直接获取,跳出。否则进入循环反复测试互斥锁的状态。在循环中,也是先取得互斥锁原来的状态,在将其之为-1,判断如果可以获取(等于1),则退出循环,否则设置当前进程的状态为不可中断状态,解锁自身的自旋锁,进入睡眠状态,待被在调度唤醒时,再获得自身的自旋锁,进入新一次的查询其自身状态(该互斥锁的状态)的循环。
具体参见linux/kernel/mutex.c
int fastcall _sched mutex_lock_interruptible(struct mutex *lock);
和mutex_lock()一样,也是获取互斥锁。在获得了互斥锁或进入睡眠直到获得互斥锁之后会返回0。如果在等待获取锁的时候进入睡眠状态收到一个信号(被信号打断睡眠),则返回_EINIR。
具体参见linux/kernel/mutex.c
int fastcall __sched mutex_trylock(struct mutex *lock);
试图获取互斥锁,如果成功获取则返回1,否则返回0,不等待
具体参见linux/kernel/mutex.c
void fastcall mutex_unlock(struct mutex *lock);
释放被当前进程获取的互斥锁。该函数不能用在中断上下文中,而且不允许去释放一个没有上锁的互斥锁。
使用形式
struct mutex mutex;
mutex_init(&mutex); /*定义*/
...
mutex_lock(&mutex); /*获取互斥锁*/
... /*临界资源*/
mutex_unlock(&mutex); /*释放互斥锁*/ 释放互斥锁:
Synchronized底层优化(轻量级锁、偏向锁)
锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)JDK 1.6中默认是开启偏向锁和轻量级锁的,
注意:HotSpot JVM是支持锁降级的 但是因为降级的效率太低了,所以在开发中不使用降级的操作
但是锁的状态时存在哪里的呢?
锁存在Java的对象头中的Mark Work。Mark Work默认不仅存放着锁标志位,还存放对象hashCode等信息。运行时,会根据锁的状态,修改Mark Work的存储内容。如果对象是数组类型,则虚拟机用3个字宽存储对象头,如果对象是非数组类型,则用2字宽存储对象头。在32位虚拟机中,一字宽等于四字节,即32bit。
该图主要是对上述内容的总结,如果对上述内容有较好的了解的话,该图应该很容易看懂
package cn.icanci.lock.sync;
/**
* @author: icanci
* @date: Created in 2020/10/22 11:15
* @classAction: 锁粗化
*/
public class StringBufferTest {
StringBuffer stringBuffer = new StringBuffer();
public void append() {
stringBuffer.append("a");
stringBuffer.append("b");
stringBuffer.append("c");
}
}
package cn.icanci.lock.sync;
/**
* @author: icanci
* @date: Created in 2020/10/22 11:16
* @classAction: 锁消除
*/
public class SynchronizedTest02 {
public static void main(String[] args) {
SynchronizedTest02 test02 = new SynchronizedTest02();
//启动预热
for (int i = 0; i < 10000; i++) {
i++;
}
long start = System.currentTimeMillis();
for (int i = 0; i < 100000000; i++) {
test02.append("abc", "def");
}
System.out.println("Time=" + (System.currentTimeMillis() - start));
}
public void append(String str1, String str2) {
StringBuffer sb = new StringBuffer();
sb.append(str1).append(str2);
}
}
JDk中采用轻量级锁和偏向锁等对Synchronized的优化,但是这两种锁也不是完全没缺点的,比如竞争比较激烈的时候,不但无法提升效率,反而会降低效率,因为多了一个锁升级的过程,这个时候就需要通过-XX:-UseBiasedLocking来禁用偏向锁。下面是这几种锁的对比:
锁 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距。 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗。 | 适用于只有一个线程访问同步块场景。 |
轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度。 | 如果始终得不到锁竞争的线程使用自旋会消耗CPU。 | 追求响应时间。同步块执行速度非常快。 |
重量级锁 | 线程竞争不使用自旋,不会消耗CPU。 | 线程阻塞,响应时间缓慢。 | 追求吞吐量。同步块执行速度较长。 |
ReentrantLock是Lock接口的一个实现类
在ReentrantLock内部有一个抽象静态内部类Sync
其中一个是 NonfairSync(非公平锁),另外一个是 FairSync (公平锁),二者都实现了此抽象内部类Sync,ReentrantLock默认使用的是 非公平锁 ,我们看一下源码:
public class ReentrantLock implements Lock, java.io.Serializable {
// 锁的类型
private final Sync sync;
// 抽象静态类Sync继承了AbstractQueueSynchroniser [这个在下面进行解释]
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 抽象加锁方法
abstract void lock();
// 不公平的 tryLock 也就是不公平的尝试获取
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程对象
final Thread current = Thread.currentThread();
// 获取线程的状态
int c = getState();
// 根据线程的不同状态执行不同的逻辑
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 获取独占模式的线程的当前锁的状态
else if (current == getExclusiveOwnerThread()) {
// 获取新的层级大小
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置锁的状态
setState(nextc);
return true;
}
return false;
}
// 尝试释放方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 返回当前线程是不是独占的
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 返回 ConditionObject 对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获得独占的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获得独占线程的状态
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判断是否是加锁的
final boolean isLocked() {
return getState() != 0;
}
// 序列化
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
// 非公平锁继承了Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁操作
final void lock() {
// 判断是不是第一次加锁 底层调用 Unsafe的compareAndSwapInt()方法
if (compareAndSetState(0, 1))
// 设置为独占锁
setExclusiveOwnerThread(Thread.currentThread());
// 如果不是第一次加锁,则调用 acquire 方法在加一层锁
else
acquire(1);
}
// 返回尝试加锁是否成功
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加锁操作,直接设置为1
final void lock() {
acquire(1);
}
// 尝试加锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
Lock接口
public interface Lock {
// 加锁
void lock();
// 不断加锁
void lockInterruptibly() throws InterruptedException;
// 尝试加锁
boolean tryLock();
// 尝试加锁,具有超时时间
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// Condition 对象
Condition newCondition();
}
Condition接口
public interface Condition {
// 等待
void await() throws InterruptedException;
// 超时等待
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 超时纳秒等待
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 可中断等待
void awaitUninterruptibly();
// 等待死亡
boolean awaitUntil(Date deadline) throws InterruptedException;
// 指定唤醒
void signal();
// 唤醒所有
void signalAll();
}
ReentrantLock的其他方法
public class ReentrantLock implements Lock, java.io.Serializable {
// 锁的类型
private final Sync sync;
// 默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造,可以设置锁的类型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 加锁
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 解锁 调用release() 因为是重入锁,所以需要减少重入的层数
public void unlock() {
sync.release(1);
}
// 返回Condition对象 ,用来执行线程的唤醒等待等操作
public Condition newCondition() {
return sync.newCondition();
}
// 获取锁的层数
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 是否加锁
public boolean isLocked() {
return sync.isLocked();
}
// 是否是公平锁
public final boolean isFair() {
return sync instanceof FairSync;
}
// 获取独占锁
protected Thread getOwner() {
return sync.getOwner();
}
// 查询是否有任何线程正在等待获取此锁
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 查询给定线程是否正在等待获取此锁
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
// 获取队列的长度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回一个包含可能正在等待获取该锁的线程的集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
// 判断是否等待
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 获得等待队列的长度
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 获取正在等待的线程集合
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// toString()
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}
总结:
AQS:AbstractQueueSynchronizer => 抽象队列同步器
AQS定义了一套多线程访问共享资源的同步器框架,很多同步器的实现都依赖AQS。如ReentrantLock、Semaphore、CountDownLatch ...
首先看一下AQS队列的框架
它维护了一个volatile int state (代表共享资源)和一个FIFO线程等待队列(多线程争抢资源被阻塞的时候会先进进入此队列),这里的volatile是核心。在下个部分进行讲解~
state的访问方式有三种
AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程可以执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore、CountdownLatch)
不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只需要实现共享资源的获取和释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队)AQS在顶层已经实现好了。
自定义同步器时需要实现以下方法即可
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁,然后将state+1,此后其他线程在调用tryAcquire()就会失败,直到A线程unlock()到state为0为止,其他线程才有机会获取该锁。当前在A释放锁之前,A线程是可以重复获取此锁的(state)会累加。这就是可重入,但是获取多少次,就要释放多少次。
再和CountdownLock为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程的个数一致)。这N个子线程是并行执行的,每个子线程执行完之后countDown一次。state会CAS-1。等到所有的子线程都执行完后(即state=0),会upark()主调用线程,然后主调用线程就会从await()函数返回,继续剩余动作
一般来说,自定义同步器要么是独占方法,要么是共享方式,也只需要实现tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一组即可,但是AQS也支持自定义同步器同时实现独占锁和共享锁两种方式,如:ReentrantReadWriteLock
AQS的源码
AbstractQueueSynchronizer 继承了 AbstractOwnableSynchronizer
AbstractOwnableSynchronizer类
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 独占模式当前的拥有者
private transient Thread exclusiveOwnerThread;
// 设置独占模式当前的拥有者
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 得到独占模式当前的拥有者
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueueSynchronizer类
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
// AbstractQueueSynchronizer 中的静态内部类 Node 节点
static final class Node {
// 指示节点正在以共享模式等待的标记
static final Node SHARED = new Node();
// 指示节点正在以独占模式等待的标记
static final Node EXCLUSIVE = null;
// 表示线程已经取消
static final int CANCELLED = 1;
// 表示线程之后需要释放
static final int SIGNAL = -1;
// 表示线程正在等待条件
static final int CONDITION = -2;
// 指示下一个 acquireShared 应该无条件传播
static final int PROPAGATE = -3;
// 状态标记
volatile int waitStatus;
// 队列的前一个节点
volatile Node prev;
// 队列的后一个节点
volatile Node next;
// 线程
volatile Thread thread;
// 下一个正在等待的节点
Node nextWaiter;
// 判断是否时共享的
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回上一个节点,不能为null,为null抛出空指针异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造
Node() { // Used to establish initial head or SHARED marker
}
// 有参构造,用来添加线程的队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 有参构造,根据等待条件使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 状态
private volatile int state;
// 获取当前的状态
protected final int getState() {
return state;
}
//设置当前的状态
protected final void setState(int newState) {
state = newState;
}
// 比较设置当前的状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 纳秒数,使之更快的旋转
static final long spinForTimeoutThreshold = 1000L;
// 将节点插入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 加一个等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 设置头节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 如果存在后继节点,就唤醒
private void unparkSuccessor(Node node) {
// 获得节点的状态
int ws = node.waitStatus;
// 如果为负数,就执行比较并设置方法设置状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 唤醒后面的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 共享模式的释放动作,并且向后继节点发出信号
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 设置队列的头,并检查后继者能否在共享模式下等待,如果可以,就是否传播设置为>0或者propagate状态
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// 取消正在进行的尝试
private void cancelAcquire(Node node) {
// 节点为null,直接返回
if (node == null)
return;
node.thread = null;
// 跳过已经取消的前一个节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
// 还有好多方法... 其实本质就是基于 队列的判断和操作,AQS提供了独占锁和共享锁的设计
// 在AQS中,使用到了Unsafe类,所以AQS其实就是基于CAS算法的,
// AQS的一些方法就是直接调用 Unsafe 的方法 如下所示
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
// 比较并设置头
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 比较并设置尾
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 比较并设置状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
// 比较并设置下一个节点
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
// 除此之外 AQS 还有一个实现了Condition的类 如下
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
public ConditionObject() { }
// 在等待队列中添加一个新的节点
private Node addConditionWaiter() {
// 获取最后一个节点
Node t = lastWaiter;
// 如果最后一个节点被取消了,就清除它
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 删除并转移节点直到它没有取消或者不为null
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 删除所有的节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 取消节点的连接
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 将等待最长的线程,唤醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒所有的等待线程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 实现不间断的条件等待
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 模式意味着在退出等待时重新中断
private static final int REINTERRUPT = 1;
// 模式的含义是在退出等待时抛出InterruptedException异常
private static final int THROW_IE = -1;
// 检查中断,如果在信号通知之前被中断,则返回THROW_IE;
// 如果在信号通知之后,则返回REINTERRUPT;如果未被中断,则返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 抛出InterruptedException,重新中断当前线程,
// 或不执行任何操作,具体取决于模式。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 实现不可中断的条件等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 纳秒级别的等待
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 绝对定时等待
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 超时等待
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 判断是不是独占的
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
// 返回是否有正在等待的
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 获得等待队列的长度
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 获取所有正在等待的线程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
}
总结:
volatile是Java提供的关键字,是轻量级的同步机制 JSR133提出,Java5增强了语义
volatile关键字有三个重要的特点
提到volatile,就要提到JMM - 什么是JMM
JMM:Java Memory Model
本身就是一种抽象的概念,并不真实存在,它描述的是一组规范和规则,通过这种规则定义了程序的各个变量(包括实例字段、静态字段、和构造数组对象的元素)的访问方式
JMM关于同步的规定
happens-before 规则
重排序
什么是内存屏障?
原子性
有序性
volatile的使用
volatile的底层实现
CAS(Compare And Swap)比较并替换,是线程并发运行时用到的一种技术
CAS是原子操作,保证并发安全,而不能保证并发同步
CAS是CPU的一个指令(需要JNI调用Native方法,才能调用CPU的指令)
CAS是非阻塞的、轻量级的乐观锁
我们可以实现通过手写代码完成CAS自旋锁
CAS包括三个操作数
如果内存位置的值与期望值匹配,那么处理器会自动将该位置的值设置为新值,否则不做改变。无论是哪种情况,都会在CAS指令之前返回该位置的值。
public class Demo {
volatile static int count = 0;
public static void request() throws Exception {
TimeUnit.MILLISECONDS.sleep(5);
// 表示期望值
int expectedCount;
while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) {
}
}
public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {
if (expectedCount == getCount()) {
count = newValue;
return true;
}
return false;
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
int threadSize = 100;
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
request();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("count :" + count + " 耗时:" + (end - start));
}
}
上述是我们自己书写的CAS自旋锁,但是JDK已经提供了响应的方法
Java提供了 CAS 的支持,在 sun.misc.Unsafe
类中,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
参数说明
CAS的实现原理
CAS通过调用JNI的代码实现,JNI:Java Native Interface ,允许Java调用其他语言
而CompareAndSwapXxx系列的方法就是借助“C语言”CPU底层指令实现的
以常用的 Inter x86来说,最后映射到CPU的指令为“cmpxchg”,这个是一个原子指令,CPU执行此命令的时候,实现比较并替换的操作
cmpxchg 如何保证多核心下的线程安全
系统底层进行CAS操作的时候,会判断当前操作系统是否为多核心,如果是,就给“总线”加锁,只有一个线程对总线加锁,保证只有一个线程进行操作,加锁之后会执行CAS操作,也就是说CAS的原子性是平台级别的
CAS这么强,有没有什么问题?
高并发情况下,CAS会一直重试,会损耗性能
CAS的ABA问题
CAS需要在操作值得时候检查下值有没有变化,如果没有发生变化就更新,但是如果原来一个值为A,经过一轮操作之后,变成了B,然后又是一轮操作,又变成了A,此时这个位置有没有发生改变?改变了的,因为不是一直是A,这就是ABA问题
如何解决ABA问题?
解决ABA问题就是给值增加一个修改版本号,每次值的变化,都会修改它的版本号,CAS在操作的时候都会去对比此版本号。
下面给出一个ABA的案例
public class CasAbaDemo {
public static AtomicInteger a = new AtomicInteger(1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get());
try {
int executedNum = a.get();
int newNum = executedNum + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedNum, newNum);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.incrementAndGet();
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get());
a.decrementAndGet();
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
thread.start();
}
}
Java中ABA解决办法(AtomicStampedReference)
AtomicStampedReference 主要包含一个引用对象以及一个自动更新的整数 “stamp”的pair对象来解决ABA问题
public class AtomicStampedReference<V> {
private static class Pair<T> {
// 数据引用
final T reference;
// 版本号
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* 期望引用
* @param expectedReference the expected value of the reference
* 新值引用
* @param newReference the new value for the reference
* 期望引用的版本号
* @param expectedStamp the expected value of the stamp
* 新值的版本号
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
// 期望引用与当前引用一致
expectedReference == current.reference &&
// 期望版本与当前版本一致
expectedStamp == current.stamp &&
// 数据一致
((newReference == current.reference &&
newStamp == current.stamp)
||
// 数据不一致
casPair(current, Pair.of(newReference, newStamp)));
}
}
修改之后完成ABA问题
public class CasAbaDemo02 {
public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference());
try {
Integer executedReference = a.getReference();
Integer newReference = executedReference + 1;
Integer expectStamp = a.getStamp();
Integer newStamp = expectStamp + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference());
a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference());
} catch (Exception e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
thread.start();
}
}
在分布式中如何加锁?
其实看上面的锁的实现原理我们可以知道,可以使用信号量来实现,也就是说在分布式系统中,只要在不同服务器能够拿到相同的信号量就可以初步完成分布式锁
这里可以使用分布式缓存 - Redis来实现
在分布式系统中,可以公用一个服务器的Redis,这样的话,所有的分部署锁都集中在Redis中,那么加锁操作,就基于Redis实现。
什么是分布式锁?
我们需要怎样的分布式锁?
基于数据库做分布式锁
基于乐观锁
基于表主键唯一做分布式锁
**思路:**利用主键唯一的特性,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。
上面这种简单的实现有以下几个问题:
当然,我们也可以有其他方式解决上面的问题。
基于表字段版本号做分布式锁
这个策略源于 mysql 的 mvcc 机制,使用这个策略其实本身没有什么问题,唯一的问题就是对数据表侵入较大,我们要为每个表设计一个版本号字段,然后写一条判断 sql 每次进行判断,增加了数据库操作的次数,在高并发的要求下,对数据库连接的开销也是无法忍受的。
基于悲观锁
基于数据库排他锁做分布式锁
在查询语句后面增加for update
,数据库会在查询过程中给数据库表增加排他锁 (注意: InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排他锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过connection.commit()
操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
for update
语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。但是还是无法直接解决数据库单点和可重入问题。
这里还可能存在另外一个问题,虽然我们对方法字段名使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySQL 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。
还有一个问题,就是我们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。
优点:简单,易于理解
缺点:会有各种各样的问题(操作数据库需要一定的开销,使用数据库的行级锁并不一定靠谱,性能不靠谱)
基于 Redis 做分布式锁
基于 REDIS 的 SETNX()、EXPIRE() 方法做分布式锁
setnx()
setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果 key 不存在,则设置当前 key 成功,返回 1;如果当前 key 已经存在,则设置当前 key 失败,返回 0。
expire()
expire 设置过期时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能通过 expire() 来对 key 设置。
使用步骤
1、setnx(lockkey, 1) 如果返回 0,则说明占位失败;如果返回 1,则说明占位成功
2、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
3、执行完业务代码后,可以通过 delete 命令删除 key。
这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步 setnx 执行成功后,在 expire() 命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用 redis 的 setnx()、get() 和 getset() 方法来实现分布式锁。
基于 REDIS 的 SETNX()、GET()、GETSET()方法做分布式锁
这个方案的背景主要是在 setnx() 和 expire() 的方案上针对可能存在的死锁问题,做了一些优化。
getset()
这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,并且返回 key 原来的旧值。假设 key 原来是不存在的,那么多次执行这个命令,会出现下边的效果:
使用步骤
基于 REDLOCK 做分布式锁
Redlock 是 Redis 的作者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个完全独立的 Redis 节点(通常情况下 N 可以设置成 5)。
算法的步骤如下:
使用 Redlock 算法,可以保证在挂掉最多 2 个节点的时候,分布式锁服务仍然能工作,这相比之前的数据库锁和缓存锁大大提高了可用性,由于 redis 的高效性能,分布式缓存锁性能并不比数据库锁差。
但是,有一位分布式的专家写了一篇文章《How to do distributed locking》,质疑 Redlock 的正确性。
优缺点
优点: 性能高
缺点:
失效时间设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。
基于 REDISSON 做分布式锁
redisson 是 redis 官方的分布式锁组件。GitHub 地址:https://github.com/redisson/redisson
上面的这个问题 ——> 失效时间设置多长时间为好?这个问题在 redisson 的做法是:每获得一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。
基于 ZooKeeper 做分布式锁
ZOOKEEPER 锁相关基础知识
步骤:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。
性能上可能并没有缓存服务那么高,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后将数据同步到所有的 Follower 机器上。还需要对 ZK的原理有所了解。
基于 Consul 做分布式锁
DD 写过类似文章,其实主要利用 Consul 的 Key / Value 存储 API 中的 acquire 和 release 操作来实现。
文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/
使用分布式锁的注意事项
1、注意分布式锁的开销
2、注意加锁的粒度
3、加锁的方式
总结
无论你身处一个什么样的公司,最开始的工作可能都需要从最简单的做起。不要提阿里和腾讯的业务场景 qps 如何大,因为在这样的大场景中你未必能亲自参与项目,亲自参与项目未必能是核心的设计者,是核心的设计者未必能独自设计。希望大家能根据自己公司业务场景,选择适合自己项目的方案。
参考资料
public class Test implements Runnable {
public synchronized void get() {
System.out.println(Thread.currentThread().getId());
set();
}
public synchronized void set() {
System.out.println(Thread.currentThread().getId());
}
@Override
public void run() {
get();
}
public static void main(String[] args) {
Test ss = new Test();
new Thread(ss).start();
new Thread(ss).start();
new Thread(ss).start();
}
}
// 执行结果
// 11
// 11
// 12
// 12
// 13
// 13
public class Test2 implements Runnable {
ReentrantLock lock = new ReentrantLock();
public void get() {
lock.lock();
try {
System.out.println(Thread.currentThread().getId());
set();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void set() {
lock.lock();
try {
System.out.println(Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
@Override
public void run() {
get();
}
public static void main(String[] args) {
Test ss = new Test();
new Thread(ss).start();
new Thread(ss).start();
new Thread(ss).start();
}
}
// 执行结果
// 11
// 11
// 12
// 12
// 13
// 13
自旋锁
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<>();
public void lock() {
Thread current = Thread.currentThread();
while (!owner.compareAndSet(null, current)) {
}
}
public void unlock() {
Thread current = Thread.currentThread();
owner.compareAndSet(current, null);
}
改进的自旋锁
public class SpinLock1 {
private AtomicReference<Thread> owner =new AtomicReference<>();
private int count =0;
public void lock(){
Thread current = Thread.currentThread();
if(current==owner.get()) {
count++;
return ;
}
while(!owner.compareAndSet(null, current)){
}
}
public void unlock (){
Thread current = Thread.currentThread();
if(current==owner.get()){
if(count!=0){
count--;
}else{
owner.compareAndSet(current, null);
}
}
}
}
互斥锁指的是一次最多只能有一个线程持有的锁。如Java的Lock
互斥锁也是为了保护共享资源的同步,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。互斥锁和自旋锁在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁。
Java - 多线程 - 锁和提升 第1篇
开篇所说的8锁
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
// Sync锁
private final Sync sync;
// Sync实现了AbstractQueueSynchronizer
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 初始化版本初始值
Sync(int permits) {
setState(permits);
}
// 获取状态
final int getPermits() {
return getState();
}
// 不公平尝试共享
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 尝试释放共享
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 减少状态的指定值
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
// 构造方法 设置初始版本号
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 构造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 增加1
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 不间断地获取
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 从此信号量获取许可,直到获得一个许可为止,将一直阻塞
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 仅在调用时可用时,才从此信号量获取许可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 定时设置
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放
public void release() {
sync.releaseShared(1);
}
// 设置指定的个数
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 获得许可证的给定数目从这个信号,阻塞直到所有可用。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// 获得许可证的给定数目从此信号量,只有在所有可在调用的时候。
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
// 收购从此信号量许可证的给定数量,如果所有给定的等待时间内变得可用,并且当前线程未被中断
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 释放
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// 返回现在的剩余值
public int availablePermits() {
return sync.getPermits();
}
// 返回减少之后的值
public int drainPermits() {
return sync.drainPermits();
}
// 减少指定个数的值
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// 判断是否时公平锁
public boolean isFair() {
return sync instanceof FairSync;
}
// 判断队列是否有线程
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 返回线程的长度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回队列线程的集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
// toString() 方法
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
总结:
独享锁,也叫独占锁,意思是锁A只能被一个锁拥有,如synchronized,
ReentrantLock是独享锁,他是基于AQS实现的,在ReentrantLock源码中, 使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁 。 而当c等于0的时候说明当前没有线程占有锁,它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,所以AQS可以确保对state的操作是安全的。
// 它默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 创建ReentrantLock,公平锁or非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 而他会分别调用lock方法和unlock方法来释放锁
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
但是其实他不仅仅是会调用lock和unlock方法,因为我们的线程不可能一点问题没有,如果说进入到了waiting状态,在这个时候如果没有unpark()方法,就没有办法来唤醒他, 所以,也就接踵而至出现了tryLock(),tryLock(long,TimeUnit)来做一些尝试加锁或者说是超时来满足某些特定的场景的需求了
ReentrantLock会保证method-body在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的lock方法会返回。其余线程会被挂起,直到获取锁。
从这里我们就能看出,其实ReentrantLock实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。而在源码中通过AQS来获取独享锁是通过调用acquire方法,其实这个方法是阻塞的
总结
独享锁:同时只能有一个线程获得锁。
共享锁:可以有多个线程同时获得锁。
Java 集合框架 - HashMap 底层 红黑树深度解读.md
)java.util.concurrent
包下提供的类都是线程安全的类Collections
类提供的方法// 先从一段代码开始
public class MyList {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
LinkedList<String> strings = new LinkedList<>();
List<String> list1 = Collections.synchronizedList(list);
}
}
// 点进去 Collections.synchronizedList(list); 方法 会跳到这个方法
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
// 因为我们使用的是 LinkedList 所以进入 SynchronizedList构造器
SynchronizedList(List<E> list) {
super(list);
this.list = list;
}
// 此次又调用了父类构造器
// 然后进入了 Collections的SynchronizedCollection静态内部类
// 然后我们惊奇的发现,其中的所有的方法都是使用 synchronized 修饰的
public class Collections {
// Suppresses default constructor, ensuring non-instantiability.
private Collections() {
}
static class SynchronizedCollection<E> implements Collection<E>, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
final Collection<E> c; // Backing Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
this.c = Objects.requireNonNull(c);
mutex = this;
}
SynchronizedCollection(Collection<E> c, Object mutex) {
this.c = Objects.requireNonNull(c);
this.mutex = Objects.requireNonNull(mutex);
}
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return c.isEmpty();}
}
public boolean contains(Object o) {
synchronized (mutex) {return c.contains(o);}
}
public Object[] toArray() {
synchronized (mutex) {return c.toArray();}
}
public <T> T[] toArray(T[] a) {
synchronized (mutex) {return c.toArray(a);}
}
public Iterator<E> iterator() {
return c.iterator(); // Must be manually synched by user!
}
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
synchronized (mutex) {return c.remove(o);}
}
public boolean containsAll(Collection<?> coll) {
synchronized (mutex) {return c.containsAll(coll);}
}
public boolean addAll(Collection<? extends E> coll) {
synchronized (mutex) {return c.addAll(coll);}
}
public boolean removeAll(Collection<?> coll) {
synchronized (mutex) {return c.removeAll(coll);}
}
public boolean retainAll(Collection<?> coll) {
synchronized (mutex) {return c.retainAll(coll);}
}
public void clear() {
synchronized (mutex) {c.clear();}
}
public String toString() {
synchronized (mutex) {return c.toString();}
}
// Override default methods in Collection
@Override
public void forEach(Consumer<? super E> consumer) {
synchronized (mutex) {c.forEach(consumer);}
}
@Override
public boolean removeIf(Predicate<? super E> filter) {
synchronized (mutex) {return c.removeIf(filter);}
}
@Override
public Spliterator<E> spliterator() {
return c.spliterator(); // Must be manually synched by user!
}
@Override
public Stream<E> stream() {
return c.stream(); // Must be manually synched by user!
}
@Override
public Stream<E> parallelStream() {
return c.parallelStream(); // Must be manually synched by user!
}
private void writeObject(ObjectOutputStream s) throws IOException {
synchronized (mutex) {s.defaultWriteObject();}
}
}
}
Java 集合框架 - HashMap 底层 红黑树深度解读.md
)ConcurrentHashMap源码解析
// 此处只谈其分段所锁的实现 因为其本质就是HashMap
// 构造方法
// 无参构造方法
public ConcurrentHashMap() {
}
// map的初始容量为initialCapacity
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 始化参数是一个map
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
// 初始值,和负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
// 初始值,负载因子,和并发等级
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
// 核心分段 静态内部类
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}
// 如果指定的键已经不再与值相关联,尝试使用给定的映射函数计算其值并将其输入到该地图除非null 。
// 法调用原子方式执行,因此至多每键一旦施加的功能。 而运算正在进行
// 计算应短而简单,而且不能尝试更新此地图的任何其他映射这个地图
// 线程上的一些尝试更新操作可能被阻止。
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
if (key == null || mappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
if (casTabAt(tab, i, null, r)) {
binCount = 1;
Node<K,V> node = null;
try {
if ((val = mappingFunction.apply(key)) != null)
node = new Node<K,V>(h, key, val, null);
} finally {
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
boolean added = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek; V ev;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = e.val;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
if ((val = mappingFunction.apply(key)) != null) {
added = true;
pred.next = new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null)
val = p.val;
else if ((val = mappingFunction.apply(key)) != null) {
added = true;
t.putTreeVal(h, key, val);
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (!added)
return val;
break;
}
}
}
if (val != null)
addCount(1L, binCount);
return val;
}
// 如果指定键的值存在,尝试来计算给定的密钥和它的当前映射值的新映射。
// 法调用原子方式执行。 而运算正在进行,所以在计算应短而简单,
// 能尝试更新此地图的任何其他映射这个地图被其它线程上的一些尝试更新操作可能被阻止
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
if (key == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = remappingFunction.apply(key, e.val);
if (val != null)
e.val = val;
else {
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null) {
val = remappingFunction.apply(key, p.val);
if (val != null)
p.val = val;
else {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (binCount != 0)
break;
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
// 尝试计算用于指定键和其当前映射的值的映射(或null ,如果没有当前映射)。
// 法调用原子方式执行。 而运算正在进行,所以在计算应短而简单,
// 能尝试更新此地图的任何其他映射这个地图被其它线程上的一些尝试更新操作可能被阻止。
public V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
if (key == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
if (casTabAt(tab, i, null, r)) {
binCount = 1;
Node<K,V> node = null;
try {
if ((val = remappingFunction.apply(key, null)) != null) {
delta = 1;
node = new Node<K,V>(h, key, val, null);
}
} finally {
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = remappingFunction.apply(key, e.val);
if (val != null)
e.val = val;
else {
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null) {
val = remappingFunction.apply(key, null);
if (val != null) {
delta = 1;
pred.next =
new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 1;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null)
p = r.findTreeNode(h, key, null);
else
p = null;
V pv = (p == null) ? null : p.val;
val = remappingFunction.apply(key, pv);
if (val != null) {
if (p != null)
p.val = val;
else {
delta = 1;
t.putTreeVal(h, key, val);
}
}
else if (p != null) {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
break;
}
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
// 如果指定键已经不再与一个(非空)值相关联,它与给定值关联。
// 替换指定重映射函数的结果,或移除如果该值为null 。 整个方法调用原子方式执行。
// 正在进行,所以在计算应短而简单,而且不能尝试更新此地图的任何其他映射这个地图
// 程上的一些尝试更新操作可能被阻止。
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
if (key == null || value == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(h, key, value, null))) {
delta = 1;
val = value;
break;
}
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = remappingFunction.apply(e.val, value);
if (val != null)
e.val = val;
else {
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null) {
delta = 1;
val = value;
pred.next =
new Node<K,V>(h, key, val, null);
break;
}
}
}
else if (f instanceof TreeBin) {
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r = t.root;
TreeNode<K,V> p = (r == null) ? null :
r.findTreeNode(h, key, null);
val = (p == null) ? value :
remappingFunction.apply(p.val, value);
if (val != null) {
if (p != null)
p.val = val;
else {
delta = 1;
t.putTreeVal(h, key, val);
}
}
else if (p != null) {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
break;
}
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
// 移动或者复制每个仓库的节点
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
// 在替换所有斌链接节点在给定的索引,除非表是太小了,在这种情况下,调整大小来代替。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。