线程池中Callable异常处理分析

前言

  分析前几天遇到的一个老代码留下的坑。线程池中运行Callable线程时抛出的异常捕获不到,简化的逻辑如图,环境是jdk8:

运行结果:

解决方案

  1. 线程池返回Future<>,调用其get()
  2. 在Callable中 try-catch可能抛错的异常

    运行结果:

    源码分析

      不难发现线程池提交时创建的类为FutureTask
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
      public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
    }

FutureTask.run()之前,先简单结束一下其关的属性。

  • state:线程的状态。主要有如下几种:
    • NEW: 新建
    • COMPLETING: 运行在
    • NORMAL: 正常完成
    • EXCEPTIONAL: 异常
    • CANCELLED: 取消
    • INTERRUPTING: 被中断的中间状态
    • INTERRUPTED: 被中断的最终状态
  • outcome: get()返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

注意这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
             try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}

protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

这里线程在运行时抛出异常时,FutureTask把异常信息赋值给outcome,并将state设为EXCEPTIONAL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

在调用get()时,如果运行时抛出异常,此时会抛出异常。

总结

  这种坑还是代码规范的问题。Callable返回结果并没有被使用可以用Runnable代替;try-catch代码的习惯。