1. Runnable, Callable, Future, FutureTask
a. Runnable
Runnable代表一个可以被线程执行的任务,其中只有一个run()
方法1
2
3
4
5
6public interface Runnable {
/**
* 只有一个无参方法run(),当线程执行时,会调用这个run方法
*/
public abstract void run();
}
Thread类继承了Runnable接口, 其中也有Runnable的引用,它的run方法就是执行引用的Runnable实例的run方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class Thread implements Runnable {
private Runnable target;
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
boolean started = false;
try {
// 底层调用JNI方法start0()向OS申请线程资源
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
public void run() {
// target在构造方法中设置,线程在执行具体的业务逻辑时会调用run方法
if (target != null) {
target.run();
}
}
}
b. Callable
Callable代表一个可以有返回值的被线程执行的任务,其中只有一个call()
方法,和Runnable相比,它多了一个返回值1
2
3
4
5
6
7
8
9public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
c. Future
是一个接口,表示一个异步操作的结果。FutureTask
是它的一个实现类,同时FutureTask
继承了Runnable
接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
* 取消当前异步操作,如果已经取消成功或者执行完毕,会返回false
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
d. FutureTask
FutureTask
是Future
和Runnable
的实现类,具体实现是接口RunnableFuture
继承了Runnable
接口和Future
接口,再由FutureTask
类实现RunnableFuture
接口。
1 | public class FutureTask<V> implements RunnableFuture<V> { |
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
2. 成员变量,静态变量,Unsafe相关和构造方法
a. 成员变量,静态变量
1 | /** |
WaitNode
是FutureTask
的内部静态类1
2
3
4
5
6
7static final class WaitNode {
// 线程引用
volatile Thread thread;
// 下一个节点
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
b. Unsafe相关
1 | // Unsafe mechanics |
c. 构造方法
FutureTask
有2个构造方法,分别传入一个Callable类型的参数和一个Runnable类型的参数和一个返回值result。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
* 传入一个Callable对象,生成一个FutureTask对象
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
// 内部引用要执行的Callable对象
this.callable = callable;
// 状态值设置为待执行状态
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* 传入一个Runnable对象和返回值,生成一个FutureTask对象
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
// Executors.callable()方法将runnable类型和返回值转为一个callable对象,底层使用了适配器模式
this.callable = Executors.callable(runnable, result);
// 状态值设置为待执行状态
this.state = NEW; // ensure visibility of callable
}
其中调用了Executors.callable()
方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 使用构造器模式,把Runnable对象转换为Callable对象
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
// 转换后的Callable对象,底层还是调用传入Runnable的run方法,返回值为传入的result对象
task.run();
return result;
}
}
这里附带说一下,AbstractExecutorService
的submit()
方法无论传入Runnable
对象还是Callable
对象也是类似的实现。底层都会将其封装成为一个FutureTask
对象。返回的类型RunnableFuture
实现了Runnable
接口和Future
接口,FutureTask
实现了此接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 底层调用的是FutureTask的构造方法
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
// 底层调用的是FutureTask的构造方法
return new FutureTask<T>(callable);
}
/**
* 传入的Runnable对象通过newTaskFor方法转为RunnableFuture对象,底层的callable引用返回值为null
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 将Runnable包装成RunnableFuture对象,此时转化后的Callable的call方法返回值为null!!
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* 传入的Runnable对象和result结果通过newTaskFor方法转为RunnableFuture对象
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 将Runnable对象和result结果包装成RunnableFuture对象
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* 传入的Callable对象通过newTaskFor方法转为RunnableFuture对象
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 将Callable包装成RunnableFuture对象
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
A -> B:适配器实现B接口,然后把A接口实现当做构造参数传入。
3. run()方法
1 | /** |
1 | protected void set(V v) { |
4. get()方法
1 | public V get() throws InterruptedException, ExecutionException { |
5. awaitDown()方法
1 | /** |
附线程状态转换图:
interrupt方法相关
首先,一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。
所以,Thread.stop, Thread.suspend, Thread.resume 都已经被废弃了。
而 Thread.interrupt 的作用其实也不是中断线程,而是「通知线程应该中断了」,具体到底中断还是继续运行,应该由被通知的线程自己处理。
具体来说,当对一个线程,调用 interrupt() 时,
① 如果线程处于被阻塞状态(例如处于sleep, wait, join 等状态),那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。
② 如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。
interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。也就是说,一个线程如果有被中断的需求,那么就可以这样做。
① 在正常运行任务时,经常检查本线程的中断标志位,如果被设置了中断标志就自行停止线程。
② 在调用阻塞方法时正确处理InterruptedException异常。(例如,catch异常后就结束线程。)
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
// do more work.
}
});
thread.start();
// 一段时间以后
thread.interrupt();
具体到你的问题,Thread.interrupted()清除标志位是为了下次继续检测标志位。如果一个线程被设置中断标志后,选择结束线程那么自然不存在下次的问题,而如果一个线程被设置中断标识后,进行了一些处理后选择继续进行任务,而且这个任务也是需要被中断的,那么当然需要清除标志位了。
removeWaiter()方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
* 移除传入的WaitNode,两次调用
* 1. 线程被中断
* 2. get方法等待超时
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
// node 的thread已经置为null
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// pred : 前一个节点
// q : 当前正处理的节点
// s : q的下一个节点
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// q.thread != null表示q不是当前要移除的节点,将pred设置为q,开始第二次自旋
if (q.thread != null)
pred = q;
// 第一个条件不满足,也就是q.thread==null,此时删掉q节点,把pred的next节点直接设置为s
else if (pred != null) {
pred.next = s;
// 如果pred.thread为null,重新retry自旋
if (pred.thread == null) // check for race
continue retry;
}
// 前两个条件均不满足,表示q.thread和pred都为null,所以把s设置为waiters头结点,重新retry自旋
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
6. cancel()方法
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
finishCompletion方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
* 这个方法有三处调用,调用的时机都是任务已经到了终态,后续不会再执行了。
* 作用是唤醒所有等待任务结束的线程,也就是清空WaitNode栈
* 1. cancel方法
* 2. set(V) 为run方法内部执行成功时调用
* 3. setEception(Throwable) 为run方法内部执行抛异常时调用
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 尝试把waiters设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 自旋
for (;;) {
Thread t = q.thread;
if (t != null) {
// help the GC
q.thread = null;
// 停止block,唤醒线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
// 如果执行到了最后,则跳出自旋
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// FutureTask中为空方法,留待扩展用
done();
callable = null; // to reduce footprint
}
参考文档
[1] Java线程状态分析/线程状态转换图
[2] Java里一个线程调用了Thread.interrupt()到底意味着什么?