1 Star 0 Fork 165

ElonChung / Java-Review

forked from flatfish / Java-Review 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Java-多线程-增强篇-FutureTask.md 14.81 KB
一键复制 编辑 原始数据 按行查看 历史
icanci 提交于 2020-09-07 23:09 . :fire:更新文件夹

Java - 多线程 - 增强篇 - FutureTask

继承体系

FutureTask是Future的实现类

FutureTask的继承体系见下图

1597026992405

Runnable
@FunctionalInterface
public interface Runnable {
    
    // 执行的内容
    public abstract void run();
}
Future
public interface Future<V> {
    
    // 取消线程执行
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 线程是否被取消
    boolean isCancelled();
    
    // 线程是否结束
    boolean isDone();
    
    // 获取返回值
    V get() throws InterruptedException, ExecutionException;
    
    // 超时获取返回值
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFutrue
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
    // 方法
}

FutureTask源码分析

public class FutureTask<V> implements RunnableFuture<V> {

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */

    // 当前task的状态
    private volatile int state;
    // 当前任务尚未执行
    private static final int NEW          = 0;
    // 当前任务正则结束,稍微完全结束,一种临界状态
    private static final int COMPLETING   = 1;
    // 当前任务正常结束
    private static final int NORMAL       = 2;
    // 当前任务执行中发生了异常,内部封装的 Callable接口的run方法向上抛出异常
    private static final int EXCEPTIONAL  = 3;
    // 当前任务被取消
    private static final int CANCELLED    = 4;
    // 当前任务在中断中
    private static final int INTERRUPTING = 5;
    // 当前任务已经中断 (中断只是通知,线程不一定执行)
    private static final int INTERRUPTED  = 6;

    // submit(runnable/callable) 都会赋值到这个地方 runnable 使用装饰者模式 伪装成Callable了
    private Callable<V> callable;
    // 正常情况下,任务执行结束,outcome保存执行的结果
    // 非正常清理,callable向上抛出异常,outcome保存异常
    private Object outcome;
    // 当前任务被线程执行期间,保存当前任务的线程对象引用
    private volatile Thread runner;
    // 因为会有很多线程去get当前任务的结果,所以这里使用了一种数据结构,Stack 栈 
    private volatile WaitNode waiters;

    // 构造方法
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        // 程序员自己实现的业务类
        this.callable = callable;
        // 设置线程的状态为NEW
        this.state = NEW;       // ensure visibility of callable
    }

    // 构造方法
    public FutureTask(Runnable runnable, V result) {
        // 使用装饰者模式将runnable转换为了 callable 接口,外部线程 通过get获取
        // 当前任务执行结果,可能为null 也可能为传入的值
        this.callable = Executors.callable(runnable, result);
        // 设置状态
        this.state = NEW;       // ensure visibility of callable
    }

    // submit(runnable/callable) -> newTaskFor(runnable)
    // -> execute(task) -> pool
    // 任务执行的入口
    public void run() {
        // 条件1:判断 state ,如果不为NEW 就说明已经被执行过了 或者被cancel了
        // 条件2:!UNSAFE.compareAndSwapObject(this, runnerOffset,  null, Thread.currentThread())
        // 条件2设置成功的时候,不执行,否则返回值fasle 当前任务被其他线程抢占了 再取反为true,就不在执行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        // 执行到这里 当前task一定是NEW状态,而且当前线程也抢占task成功
        try {
            // 拿到自己的逻辑callable或者runnable的装饰后的runnable
            Callable<V> c = callable;
            // c!=null 防止空指针异常
            // 防止外部线程cancel掉线程
            if (c != null && state == NEW) {
                // 结果引用
                V result;
                // true 表示代码执行成功,没有抛出异常
                // false 表示抛出异常 
                boolean ran;
                try {
                    // 执行call方法
                    result = c.call();
                    // 设置状态
                    ran = true;
                } catch (Throwable ex) {
                    // 出现问题了
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    // 说明正常结束了
                    // set就是设置结果到outcome
                    set(result);
            }
        } finally {
            // 当前执行的线程设置为null
            runner = null;
            // 拿到状态 
            int s = state;
            // 大于中断中的这个状态
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    // set 方法
    protected void set(V v) {
        // 使用CAS设置当前的任务状态为完成中
        // 有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前,将task取消了
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 将返回值 v 设置给 outcome
            outcome = v;
            // 使用CAS设置当前的任务为 正常结束
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state、
            // 再把阻塞的线程唤醒
            finishCompletion();
        }
    }

    // 设置outcome为异常信息
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


    private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 
    }

    // get 方法
    // 可以有多个线程调用这个方法
    // 多个线程等待当前线程执行的结果
    public V get() throws InterruptedException, ExecutionException {
        // 获取当前任务状态
        int s = state;
        // 条件成立:未执行、正在执行 、正完成 
        // 调用get的外部线程会被阻塞到get方法上
        if (s <= COMPLETING)
            // 
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 不带超时的
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        // 引用当前线程 封装成WaitNode对象
        WaitNode q = null;
        // 表示当前的线程 waitNode 对象有没有入队
        boolean queued = false;
        // 自旋
        for (;;) {
            // 如果线程唤醒是被其他线程使用中断这种方式唤醒的 interrupted() 方法
            // 返回true后,将中断标记重置为 false
            if (Thread.interrupted()) {
                // 当前线程node出队
                removeWaiter(q);
                // get方法 抛出中断异常
                throw new InterruptedException();
            }

            // 假设当前线程是被其他线程使用 unpark(thread) 唤醒的话,会正常自选,走下面的逻辑
            int s = state;
            // 说明当前任务已经有结果了,可能是好,可能是坏
            if (s > COMPLETING) {
                // 当前已经被当前线程创建node了
                if (q != null)
                    q.thread = null;
                // 直接返回当前的状态
                return s;
            }
            // 当前线程快要完成了
            // 这里让线程再次释放CPU,进行下一次抢占
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // q == null 第一次自选,当前线程还没有创建 waitNode 对象
            else if (q == null)
                q = new WaitNode();
            // 第二次自选
            // 当前线程已经创建了对象,但是还没有入队
            else if (!queued)
                // 当前线程的node节点,指向原队列的头节点 waiters 一直指向队列的头
                // CAS 方式设置waiters引用指向当前线程node,成功的话, queued == true 
                // 否则其他线程可能先一步入队了
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 第三次自选
            // 
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 当前get线程操作的线程就被park了,线程状态就会变成WAITING状态,相当于休眠了
                // 除非有其他线程将你唤醒,或者将当前线程中断
                LockSupport.park(this);
        }
    }

    // 出队的方法
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

    private V report(int s) throws ExecutionException {
        // 正常情况下 outcome保存的是callable运行结束的结果
        // 非正常情况下,保存的是 callable 产生的异常
        Object x = outcome;
        // 根据state的状态来判断
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        // 执行到这里,就是抛出callable中出现的异常
        throw new ExecutionException((Throwable)x);
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        // q 指向 waiters 链表的头节点
        for (WaitNode q; (q = waiters) != null;) {
            // CAS 置空 waiters 因为是防止外部线程使用 cancel 取消当前任务
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    // 获取当前节点封装的 thread
                    Thread t = q.thread;
                    // 条件成立
                    if (t != null) {
                        q.thread = null;
                        // 唤醒线程 t
                        LockSupport.unpark(t);
                    }
                    // 拿到当前节点的下一个阶段
                    WaitNode next = q.next;
                    // 如果当前节点的下一个节点为null 
                    // 当前就是最后一个节点
                    if (next == null)
                        break;
                    // 删除最后一个节点
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        // help GC
        callable = null;        // to reduce footprint
    }

    // 取消的方法
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 当前任务处于运行中
        // 或者处于线程池任务队列中
        // 条件成立,说明修改状态成功,可以去指向下面的操作
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                                       mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    // 执行当前FutureTask线程,有可能是null。当前任务在队列中,还没有线程获取
                    Thread t = runner;
                    // 条件成立 说明当前线程正在执行 task
                    if (t != null)
                        // 给线程一个中断信号,但是不一定立即中断 由线程决定
                        t.interrupt();
                } finally { // final state
                    // CAS设置状态 为中断完成
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 唤醒操作
            finishCompletion();
        }
        return true;
    }

}
1
https://gitee.com/elonchung/Java-Review.git
git@gitee.com:elonchung/Java-Review.git
elonchung
Java-Review
Java-Review
master

搜索帮助