同步操作将从 flatfish/Java-Review 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
首先强调一点:Java多线程的锁都是基于对象的,Java中的每一个对象都可以作为一个锁。同时,类锁也是对象锁,类是Class对象
synchronized 是Java提供的关键字,用来保证原子性的
synchronized的作用域如下
先看一段简单的代码
public class SynchronizedTest {
public static void main(String[] args) {
test1();
test2();
}
// 使用synchronized修饰的方法
public synchronized static void test1() {
System.out.println("SynchronizedTest.test1");
}
// 使用synchronized修饰的代码块
public static void test2() {
synchronized (SynchronizedTest.class) {
System.out.println("SynchronizedTest.test2");
}
}
}
执行之后,对其进行执行javap -v
命令反编译
// 省略啰嗦的代码
public class cn.zq.sync.SynchronizedTest
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
{
// 源码
public cn.zq.sync.SynchronizedTest();
descriptor: ()V
flags: ACC_PUBLIC
// main 方法
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
// synchronized 修饰的静态方法 test1()
public static synchronized void test1();
descriptor: ()V
// 在这里我们可以看到 flags 中有一个 ACC_SYNCHRONIZED
// 这个就是一个标记符这是 保证原子性的关键
// 当方法调用的时候,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标记符是否被设置
// 如果设置了,线程将先获取 monitor,获取成功之后才会执行方法体,方法执行之后,释放monitor
// 在方法执行期间,其他任何线程都无法在获得一个 monitor 对象,本质上没区别。
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=0, args_size=0
0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String SynchronizedTest.test1
5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 17: 0
line 18: 8
// 代码块使用的 synchronized
public static void test2();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=2, args_size=0
0: ldc #7 // class cn/zq/sync/SynchronizedTest
2: dup
3: astore_0
// 这个 monitorenter 是一个指令
// 每个对象都有一个监视器锁(monitor),当monitor被占用的时候就会处于锁定状态
// 线程执行monitorenter的时候,尝试获取monitor的锁。过程如下
// 1.任何monitor进入数为0,则线程进入并设置为1,此线程就是monitor的拥有者
// 2.如果线程已经占用,当前线程再次进入的时候,会将monitor的次数+1
// 3.如何其他的线程已经占用了monitor,则线程进阻塞状态,直到monitor的进入数为0
// 4.此时其他线程才能获取当前代码块的执行权
4: monitorenter
5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #8 // String SynchronizedTest.test2
10: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_0
// 执行monitorexit这条指令的线程必须是拥有monitor的
// 执行的之后,monitor的进入数-1.如果为0,那么线程就退出 monitor,不再是此代码块的执行者
// 此时再由其他的线程获得所有权
// 其实 wait/notify 等方法也依赖于monitor对象,
// 所以只有在同步方法或者同步代码块中才可以使用,否则会报错 java.lang.IllegalMonitorstateException 异常
14: monitorexit
15: goto 23
18: astore_1
19: aload_0
20: monitorexit
21: aload_1
22: athrow
23: return
Exception table:
from to target type
5 15 18 any
18 21 18 any
LineNumberTable:
line 21: 0
line 22: 5
line 23: 13
line 24: 23
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 18
locals = [ class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
}
SourceFile: "SynchronizedTest.java"
总结:
ReentrantLock是Lock接口的一个实现类
在ReentrantLock内部有一个抽象静态内部类Sync
其中一个是 NonfairSync(非公平锁),另外一个是 FairSync (公平锁),二者都实现了此抽象内部类Sync,ReentrantLock默认使用的是 非公平锁 ,我们看一下源码:
public class ReentrantLock implements Lock, java.io.Serializable {
// 锁的类型
private final Sync sync;
// 抽象静态类Sync继承了AbstractQueueSynchroniser [这个在下面进行解释]
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 抽象加锁方法
abstract void lock();
// 不公平的 tryLock 也就是不公平的尝试获取
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程对象
final Thread current = Thread.currentThread();
// 获取线程的状态
int c = getState();
// 根据线程的不同状态执行不同的逻辑
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 获取独占模式的线程的当前锁的状态
else if (current == getExclusiveOwnerThread()) {
// 获取新的层级大小
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置锁的状态
setState(nextc);
return true;
}
return false;
}
// 尝试释放方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 返回当前线程是不是独占的
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 返回 ConditionObject 对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获得独占的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获得独占线程的状态
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判断是否是加锁的
final boolean isLocked() {
return getState() != 0;
}
// 序列化
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
// 非公平锁继承了Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁操作
final void lock() {
// 判断是不是第一次加锁 底层调用 Unsafe的compareAndSwapInt()方法
if (compareAndSetState(0, 1))
// 设置为独占锁
setExclusiveOwnerThread(Thread.currentThread());
// 如果不是第一次加锁,则调用 acquire 方法在加一层锁
else
acquire(1);
}
// 返回尝试加锁是否成功
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加锁操作,直接设置为1
final void lock() {
acquire(1);
}
// 尝试加锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
Lock接口
public interface Lock {
// 加锁
void lock();
// 不断加锁
void lockInterruptibly() throws InterruptedException;
// 尝试加锁
boolean tryLock();
// 尝试加锁,具有超时时间
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// Condition 对象
Condition newCondition();
}
Condition接口
public interface Condition {
// 等待
void await() throws InterruptedException;
// 超时等待
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 超时纳秒等待
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 可中断等待
void awaitUninterruptibly();
// 等待死亡
boolean awaitUntil(Date deadline) throws InterruptedException;
// 指定唤醒
void signal();
// 唤醒所有
void signalAll();
}
ReentrantLock的其他方法
public class ReentrantLock implements Lock, java.io.Serializable {
// 锁的类型
private final Sync sync;
// 默认是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造,可以设置锁的类型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 加锁
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 解锁 调用release() 因为是重入锁,所以需要减少重入的层数
public void unlock() {
sync.release(1);
}
// 返回Condition对象 ,用来执行线程的唤醒等待等操作
public Condition newCondition() {
return sync.newCondition();
}
// 获取锁的层数
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 是否加锁
public boolean isLocked() {
return sync.isLocked();
}
// 是否是公平锁
public final boolean isFair() {
return sync instanceof FairSync;
}
// 获取独占锁
protected Thread getOwner() {
return sync.getOwner();
}
// 查询是否有任何线程正在等待获取此锁
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 查询给定线程是否正在等待获取此锁
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
// 获取队列的长度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回一个包含可能正在等待获取该锁的线程的集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
// 判断是否等待
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 获得等待队列的长度
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 获取正在等待的线程集合
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// toString()
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}
总结:
AQS:AbstractQueueSynchronizer => 抽象队列同步器
AQS定义了一套多线程访问共享资源的同步器框架,很多同步器的实现都依赖AQS。如ReentrantLock、Semaphore、CountDownLatch ...
首先看一下AQS队列的框架
它维护了一个volatile int state (代表共享资源)和一个FIFO线程等待队列(多线程争抢资源被阻塞的时候会先进进入此队列),这里的volatile是核心。在下个部分进行讲解~
state的访问方式有三种
AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程可以执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore、CountdownLatch)
不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只需要实现共享资源的获取和释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队)AQS在顶层已经实现好了。
自定义同步器时需要实现以下方法即可
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁,然后将state+1,此后其他线程在调用tryAcquire()就会失败,直到A线程unlock()到state为0为止,其他线程才有机会获取该锁。当前在A释放锁之前,A线程是可以重复获取此锁的(state)会累加。这就是可重入,但是获取多少次,就要释放多少次。
再和CountdownLock为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程的个数一致)。这N个子线程是并行执行的,每个子线程执行完之后countDown一次。state会CAS-1。等到所有的子线程都执行完后(即state=0),会upark()主调用线程,然后主调用线程就会从await()函数返回,继续剩余动作
一般来说,自定义同步器要么是独占方法,要么是共享方式,也只需要实现tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一组即可,但是AQS也支持自定义同步器同时实现独占锁和共享锁两种方式,如:ReentrantReadWriteLock
AQS的源码
AbstractQueueSynchronizer 继承了 AbstractOwnableSynchronizer
AbstractOwnableSynchronizer类
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 独占模式当前的拥有者
private transient Thread exclusiveOwnerThread;
// 设置独占模式当前的拥有者
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 得到独占模式当前的拥有者
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueueSynchronizer类
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
// AbstractQueueSynchronizer 中的静态内部类 Node 节点
static final class Node {
// 指示节点正在以共享模式等待的标记
static final Node SHARED = new Node();
// 指示节点正在以独占模式等待的标记
static final Node EXCLUSIVE = null;
// 表示线程已经取消
static final int CANCELLED = 1;
// 表示线程之后需要释放
static final int SIGNAL = -1;
// 表示线程正在等待条件
static final int CONDITION = -2;
// 指示下一个 acquireShared 应该无条件传播
static final int PROPAGATE = -3;
// 状态标记
volatile int waitStatus;
// 队列的前一个节点
volatile Node prev;
// 队列的后一个节点
volatile Node next;
// 线程
volatile Thread thread;
// 下一个正在等待的节点
Node nextWaiter;
// 判断是否时共享的
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回上一个节点,不能为null,为null抛出空指针异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造
Node() { // Used to establish initial head or SHARED marker
}
// 有参构造,用来添加线程的队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 有参构造,根据等待条件使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 状态
private volatile int state;
// 获取当前的状态
protected final int getState() {
return state;
}
//设置当前的状态
protected final void setState(int newState) {
state = newState;
}
// 比较设置当前的状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 纳秒数,使之更快的旋转
static final long spinForTimeoutThreshold = 1000L;
// 将节点插入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 加一个等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 设置头节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 如果存在后继节点,就唤醒
private void unparkSuccessor(Node node) {
// 获得节点的状态
int ws = node.waitStatus;
// 如果为负数,就执行比较并设置方法设置状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 唤醒后面的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 共享模式的释放动作,并且向后继节点发出信号
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 设置队列的头,并检查后继者能否在共享模式下等待,如果可以,就是否传播设置为>0或者propagate状态
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// 取消正在进行的尝试
private void cancelAcquire(Node node) {
// 节点为null,直接返回
if (node == null)
return;
node.thread = null;
// 跳过已经取消的前一个节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
// 还有好多方法... 其实本质就是基于 队列的判断和操作,AQS提供了独占锁和共享锁的设计
// 在AQS中,使用到了Unsafe类,所以AQS其实就是基于CAS算法的,
// AQS的一些方法就是直接调用 Unsafe 的方法 如下所示
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
// 比较并设置头
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 比较并设置尾
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 比较并设置状态
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
// 比较并设置下一个节点
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
// 除此之外 AQS 还有一个实现了Condition的类 如下
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
public ConditionObject() { }
// 在等待队列中添加一个新的节点
private Node addConditionWaiter() {
// 获取最后一个节点
Node t = lastWaiter;
// 如果最后一个节点被取消了,就清除它
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 删除并转移节点直到它没有取消或者不为null
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 删除所有的节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 取消节点的连接
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 将等待最长的线程,唤醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒所有的等待线程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 实现不间断的条件等待
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 模式意味着在退出等待时重新中断
private static final int REINTERRUPT = 1;
// 模式的含义是在退出等待时抛出InterruptedException异常
private static final int THROW_IE = -1;
// 检查中断,如果在信号通知之前被中断,则返回THROW_IE;
// 如果在信号通知之后,则返回REINTERRUPT;如果未被中断,则返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 抛出InterruptedException,重新中断当前线程,
// 或不执行任何操作,具体取决于模式。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 实现不可中断的条件等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 纳秒级别的等待
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 绝对定时等待
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 超时等待
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 判断是不是独占的
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
// 返回是否有正在等待的
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 获得等待队列的长度
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 获取所有正在等待的线程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
}
总结:
volatile是Java提供的关键字,是轻量级的同步机制 JSR133提出,Java5增强了语义
volatile关键字有三个重要的特点
提到volatile,就要提到JMM - 什么是JMM
JMM:Java Memory Model
本身就是一种抽象的概念,并不真实存在,它描述的是一组规范和规则,通过这种规则定义了程序的各个变量(包括实例字段、静态字段、和构造数组对象的元素)的访问方式
JMM关于同步的规定
happens-before 规则
重排序
什么是内存屏障?
原子性
有序性
volatile的使用
volatile的底层实现
CAS(Compare And Swap)比较并替换,是线程并发运行时用到的一种技术
CAS是原子操作,保证并发安全,而不能保证并发同步
CAS是CPU的一个指令(需要JNI调用Native方法,才能调用CPU的指令)
CAS是非阻塞的、轻量级的乐观锁
我们可以实现通过手写代码完成CAS自旋锁
CAS包括三个操作数
如果内存位置的值与期望值匹配,那么处理器会自动将该位置的值设置为新值,否则不做改变。无论是哪种情况,都会在CAS指令之前返回该位置的值。
public class Demo {
volatile static int count = 0;
public static void request() throws Exception {
TimeUnit.MILLISECONDS.sleep(5);
// 表示期望值
int expectedCount;
while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) {
}
}
public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {
if (expectedCount == getCount()) {
count = newValue;
return true;
}
return false;
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
int threadSize = 100;
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
request();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("count :" + count + " 耗时:" + (end - start));
}
}
上述是我们自己书写的CAS自旋锁,但是JDK已经提供了响应的方法
Java提供了 CAS 的支持,在 sun.misc.Unsafe
类中,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
参数说明
CAS的实现原理
CAS通过调用JNI的代码实现,JNI:Java Native Interface ,允许Java调用其他语言
而CompareAndSwapXxx系列的方法就是借助“C语言”CPU底层指令实现的
以常用的 Inter x86来说,最后映射到CPU的指令为“cmpxchg”,这个是一个原子指令,CPU执行此命令的时候,实现比较并替换的操作
cmpxchg 如何保证多核心下的线程安全
系统底层进行CAS操作的时候,会判断当前操作系统是否为多核心,如果是,就给“总线”加锁,只有一个线程对总线加锁,保证只有一个线程进行操作,加锁之后会执行CAS操作,也就是说CAS的原子性是平台级别的
CAS这么强,有没有什么问题?
高并发情况下,CAS会一直重试,会损耗性能
CAS的ABA问题
CAS需要在操作值得时候检查下值有没有变化,如果没有发生变化就更新,但是如果原来一个值为A,经过一轮操作之后,变成了B,然后又是一轮操作,又变成了A,此时这个位置有没有发生改变?改变了的,因为不是一直是A,这就是ABA问题
如何解决ABA问题?
解决ABA问题就是给值增加一个修改版本号,每次值的变化,都会修改它的版本号,CAS在操作的时候都会去对比此版本号。
下面给出一个ABA的案例
public class CasAbaDemo {
public static AtomicInteger a = new AtomicInteger(1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get());
try {
int executedNum = a.get();
int newNum = executedNum + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedNum, newNum);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.incrementAndGet();
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get());
a.decrementAndGet();
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
thread.start();
}
}
Java中ABA解决办法(AtomicStampedReference)
AtomicStampedReference 主要包含一个引用对象以及一个自动更新的整数 “stamp”的pair对象来解决ABA问题
public class AtomicStampedReference<V> {
private static class Pair<T> {
// 数据引用
final T reference;
// 版本号
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* 期望引用
* @param expectedReference the expected value of the reference
* 新值引用
* @param newReference the new value for the reference
* 期望引用的版本号
* @param expectedStamp the expected value of the stamp
* 新值的版本号
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
// 期望引用与当前引用一致
expectedReference == current.reference &&
// 期望版本与当前版本一致
expectedStamp == current.stamp &&
// 数据一致
((newReference == current.reference &&
newStamp == current.stamp)
||
// 数据不一致
casPair(current, Pair.of(newReference, newStamp)));
}
}
修改之后完成ABA问题
public class CasAbaDemo02 {
public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference());
try {
Integer executedReference = a.getReference();
Integer newReference = executedReference + 1;
Integer expectStamp = a.getStamp();
Integer newStamp = expectStamp + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主线程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference());
a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference());
} catch (Exception e) {
e.printStackTrace();
}
}, "干扰线程");
main.start();
thread.start();
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。