AsyncTask源码详解

基础知识

Callable

Callable的接口定义如下:

1
2
3
public interface Callable {   
V call() throws Exception;
}

Callable接口声明了一个名称为call()的方法,该方法可以有返回值V,也可以抛出异常。Callable也是一个线程接口,它与Runnable的主要区别就是Callable在线程执行完成后可以有返回值而Runnable没有返回值,Runnable接口声明如下:

1
2
3
public interface Runnable {
public abstract void run();
}

那么Callable接口如何使用呢,Callable需要和ExcutorService结合使用,其中ExecutorService也是一个线程池对象继承自Executor接口,接着看看ExecutorService提供了那些方法供我们使用:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
  • submit(Callable task),传递一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
  • submit(Runnable task, T result),传递一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
  • submit(Runnable task),传递一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。

因此我们只要创建好我们的线程对象(实现Callable接口或者Runnable接口),然后通过上面3个方法提交给线程池去执行即可。Callable接口介绍就先到这,再来看看Future时什么鬼。

Future

Future接口是用来获取异步计算结果的,说白了就是对具体的Runnable或者Callable对象任务执行的结果进行获取(get()),取消(cancel()),判断是否完成等操作。其方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);

//如果任务完成前被取消,则返回true。
boolean isCancelled();

//如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。
boolean isDone();

//获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
V get() throws InterruptedException, ExecutionException;

// 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,
//如果阻塞时间超过设定的timeout时间,该方法将返回null。
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

}

总得来说Future有以下3点作用:

  • 能够中断执行中的任务
  • 判断任务是否执行完成
  • 获取任务执行完成后额结果。

但是Future只是接口,我们根本无法将其创建为对象,于官方又给我们提供了其实现类FutureTask,这里我们要知道前面两个接口的介绍都只为此类做铺垫,毕竟AsncyTask中使用到的对象是FutureTask。

FutureTask

先来看看FutureTask的实现:

1
2
public class FutureTask implements RunnableFuture { 
}

RunnableFuture接口的实现:

1
2
3
public interface RunnableFuture extends Runnable, Future {
void run();
}

从接口实现可以看出,FutureTask除了实现了Future接口外还实现了Runnable接口,因此FutureTask既可以当做Future对象也可是Runnable对象,当然FutureTask也就可以直接提交给线程池来执行。接着我们最关心的是如何创建FutureTask对象,实际上可以通过如下两个构造方法来构建FutureTask:

1
2
3
4
public FutureTask(Callable<V> callable) {  
}
public FutureTask(Runnable runnable, V result) {
}

从构造方法看出,我们可以把一个实现了Callable或者Runnable的接口的对象封装成一个FutureTask对象,然后通过线程池去执行。

从入口开始

通常我们使用AsyncTask时,会自定义一个IAsyncTask继承自AsyncTask,然后调用其execute()方法来执行任务,如下所示:

1
new IAsyncTask("IAsyncTask-1").execute("");

入口是execute()方法,查看它的源码:

1
2
3
4
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params){
return executeOnExecutor(sDefaultExecutor, params);
}

execute()方法没有做任何操作,而是直接调用了executeOnExecutor()方法,并传入了sDefaultExecutor和参数params,这个sDefaultExecutor是什么呢?看名字我们应该猜到它是一个线程池,看代码:

1
2
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

SerialExecutor是AsyncTask四个内部类中的一个,它实现了Executor接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;

public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {// 插入一个Runnable任务
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
// 如果当前没有任务进行
if (mActive == null) {
scheduleNext();
}
}
//
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
//从任务队列mTasks中取出任务并放到THREAD_POOL_EXECUTOR线程池中执行.
//由此也可见任务默认是串行进行的。
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}

从源码可以看出,SerialExecutor维护了一个存放任务队列的容器ArrayDeque和一个正在执行的任务Runnable,其工作就是当用户调用execute方法时将任务存放到任务队列中,并交给THREAD_POOL_EXECUTOR串行的处理。

由此可见SerialExecutor并不是真正的线程执行者,它只是是保证传递进来的任务Runnable(实例是一个FutureTask)串行执行,而真正执行任务的是THREAD_POOL_EXECUTOR线程池。接下来我们看看THREAD_POOL_EXECUTOR:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// 核心线程数
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
// 线程池最大容量
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
// 保活时间
private static final int KEEP_ALIVE_SECONDS = 30;
// 阻塞队列,默认容量128
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<Runnable>(128);
// 线程工厂
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);

public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};

public static final Executor THREAD_POOL_EXECUTOR;

static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

关于线程池先看到这,我们之前讲到用户调用execute()方法实际是调用了executeOnExecutor()方法,看看它做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
// 判断当前状态
if (mStatus != Status.PENDING) {
switch (mStatus) {
// 同一任务只被执行一次
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}

mStatus = Status.RUNNING;
// 这里调用了我们自己实现的onPreExecute方法
onPreExecute();
//参数传递给了mWorker.mParams
mWorker.mParams = params;
// 执行了线程池THREAD_POOL_EXECUTOR的execute方法
exec.execute(mFuture);

return this;
}

从executeOnExecutor方法的源码分析得知,执行任务前先会去判断当前AsyncTask的状态,如果处于RUNNING和FINISHED状态就不可再执行,直接抛出异常,只有处于Status.PENDING时,AsyncTask才会去执行。然后onPreExecute()被执行的,该方法可以用于线程开始前做一些准备工作。接着会把我们传递进来的参数赋值给 mWorker.mParams ,并执行开始执行mFuture任务,那么mWorker和mFuture到底是什么?先看看mWorker即WorkerRunnable的声明源码:

1
2
3
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}

发现WorkerRunnable是个抽象类,存在一个参数数组,并且实现了Callable方法。既然WorkerRunnable是个抽象类,那么肯定会有具体的实现类来进行实例化,果然,在AsyncTask的构造方法中有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
// 创建WorkerRunnable mWorker,本质上就是一个实现了Callable接口对象
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
// 设置标志
mTaskInvoked.set(true);
Result result = null;
try {

Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
// 执行DoInBackGround方法,取得结果
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
// 调用postResult方法更新结果
postResult(result);
}
return result;
}
};

// 把mWorker(即Callable实现类)封装成FutureTask实例
// 最终执行结果也就封装在FutureTask中
mFuture = new FutureTask<Result>(mWorker) {
// 任务执行完成后被调用
@Override
protected void done() {
try {
// 如果还没更新结果通知就执行postResultIfNotInvoked
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}

可以看到在初始化AsyncTask时,不仅创建了mWorker(本质实现了Callable接口的实例类)而且也创建了FutureTask对象,并把mWorker对象封装在FutureTask对象中,最后FutureTask对象将在executeOnExecutor方法中通过线程池去执行。

AsynTask在初始化时会创建mWorker实例对象和FutureTask实例对象,mWorker是一个实现了Callable线程接口并封装了传递参数的实例对象,然后mWorker实例会被封装成FutureTask实例中。在AsynTask创建后,我们调用execute方法去执行异步线程,其内部又直接调用了executeOnExecutor方法,并传递了线程池exec对象和执行参数,该方法内部通过线程池exec对象去执行mFuture实例,这时mWorker内部的call方法将被执行并调用doInBackground方法,最终通过postResult去通知更新结果。关于postResult方法,其源码如下:

1
2
3
4
5
6
7
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}

显然是通过Handler去执行结果更新的,在执行结果成返回后,会把result封装到一个AsyncTaskResult对象中,最后把MESSAGE_POST_RESULT标示和AsyncTaskResult存放到Message中并发送给Handler去处理,这里我们先看看AsyncTaskResult的源码:

1
2
3
4
5
6
7
8
9
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;

AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}

显然AsyncTaskResult封装了执行结果的数组以及AsyncTask本身,这个没什么好说的,接着看看AsyncTaskResult被发送到handler后如何处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}

@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}

通过handler发送过来的不同标志去决定执行那种结果,如果标示为MESSAGE_POST_RESULT则执行AsyncTask的finish方法并传递执行结果给该方法,finish方法源码如下:

1
2
3
4
5
6
7
8
9
private void finish(Result result) {
if (isCancelled()) {// 判断任务是否被取消
onCancelled(result);
} else {
// 执行onPostExecute(result)并传递result结果
onPostExecute(result);
}
mStatus = Status.FINISHED;
}

该方法先判断任务是否被取消,如果没有被取消则去执行onPostExecute(result)方法,外部通过onPostExecute方法去更新相关信息,如UI,消息通知等。最后更改AsyncTask的状态为已完成。到此AsyncTask的全部流程执行完。

这里还有另一种标志MESSAGE_POST_PROGRESS,该标志是我们在doInBackground方法中调用publishProgress方法时发出的,该方法原型如下:

1
2
3
4
5
6
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}

总结

方法执行流程:

时序图

参考

AsyncTask

android多线程-AsyncTask之工作原理深入解析(下)


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!