看见的只是表象,当你深入其中的时候,你会发现一个不一样的世界~
**Android的消息机制其实是分为Java层的Message派发和Native层的
派发以及处理来自所监控的文件句柄的事件**
推荐两个在线源码阅读工具:
1 研究背景
android的应用层使用java语言写的.
既然Java的程序入口是’public static void main(String[] args)’
那么android的呢?
我们可以先从ActivityThread.java这个类看起.
在类中,你会发现有Main函数.
大家都知道java的程序如果想一直运行.就需要一个死循环.
在这个’ActivityThread.java’中 会有一个’Looper.loop();’
点开后,你会发现.在Looper.loop()这个方法中有一个死循环
for (;;) { Message msg = queue.next(); // might block ......... msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); }
对的,你想要的消息机制探寻之路,就在这里了~
2 源码分析
2.1 ThreadLocal
每一个线程,都会有一个线程的本地变量区ThreadLocal ,将类变量以map的数据结构,放到ThreadLocal类型的对象中,使变量在每个线程中都有独立拷贝,不会出现一个线程读取变量时而被另一个线程修改的现象。
具体请研究Threadlocal.java代码.
void set(Object value) public void remove()
在消息机制中,将looper放到ThreadLocal的value中保存,避免其他线程访问.
2.2 Looper
1.当一个线程要绑定一个looper时,要申请一个ThreadLocal,将looper和当前线程绑定.
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
2.作为水车上的驱动,在实例化的构造方法中,会实例化一个消息队列和得到当前线程.quitAllowed表示MessageQueue是否能被解除
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
3.looper执行loop开始对消息队列循环,其中的’for (;;)’,而不是while(true)是为了禁止外部使用反射对他进行修改.
public static void loop() { final Looper me = myLooper(); . . . . . . final MessageQueue queue = me.mQueue; Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // might block . . . . . . msg.target.dispatchMessage(msg); // 派发消息 . . . . . . final long newIdent = Binder.clearCallingIdentity(); . . . . . . msg.recycle();//摘除消息,并放到消息池中 } }
2.3 MessageQueue(Native展开分析)
在MessageQueue中有很多代码,Message mMessages记录的就是一条消息链表。另外还有几个native函数,这就说明MessageQueue会通过JNI技术调用到底层代码。mMessages域记录着消息队列中所有Java层的实质消息。MessageQueue的示意图和代码如下:
java层代码分析
public final class MessageQueue { // True if the message queue can be quit. private final boolean mQuitAllowed; @SuppressWarnings("unused") private int mPtr; // used by native code Message mMessages; // 消息队列! private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>(); private IdleHandler[] mPendingIdleHandlers; private boolean mQuitting; // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout. private boolean mBlocked; // The next barrier token. // Barriers are indicated by messages with a null target whose arg1 field carries the token. private int mNextBarrierToken; private native static int nativeInit(); private native static void nativeDestroy(int ptr); private native static void nativePollOnce(int ptr, int timeoutMillis); private native static void nativeWake(int ptr); private native static boolean nativeIsIdling(int ptr); . . . . . .
1.MessageQueue构造函数会初始化本地消息队列
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
2.enqueueMessage()就是在向MessageQueue的消息链表里插入Message。消息链表是按时间进行排序的,所以主要是在比对Message携带的when信息。消息链表的首个节点对应着最先将被处理的消息,如果Message被插到链表的头部了,就意味着队列的最近唤醒时间也应该被调整了.而且会根据情况调用nativeWake函数,以触发nativePollOnce函数,结束等待。代码如下:
boolean enqueueMessage(Message msg, long when) { .......... msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; //当链的头mMessages不存在,或者携带的执行时间不存在,或者插入的message的when小于头时,将msg作为头, if (p == null || when == 0 || when < p.when) { // 如果p为空,表明消息队列中没有消息,那么msg将是第一个消息,needWake 需要根据mBlocked的情况考虑是否触发 msg.next = p; mMessages = msg; needWake = mBlocked; } else { // 此时,新消息会插入到链表的内部,一般情况下,这不需要调整唤醒时间。 // 但还必须考虑到当表头为“同步分割栏”的情况 needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { //因为消息队列之前还有剩余消息,所以这里不用调用nativeWakeup needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { //调用nativeWake,以触发nativePollOnce函数结束等待 nativeWake(mPtr); } } return true; }
3.next()函数
Message next() { int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { . . . . . . //mPtr保存了NativeMessageQueue的指针,调用nativePollOnce进行等待 nativePollOnce(mPtr, nextPollTimeoutMillis); // 阻塞于此 . . . . . . // 获取next消息,如能得到就返回之。 final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; // 先尝试拿消息队列里当前第一个消息 if (msg != null && msg.target == null) { // 如果从队列里拿到的msg是个“同步分割栏”,那么就寻找其后第一个“异步消息” do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; // 重新设置一下消息队列的头部 } msg.next = null; if (false) Log.v("MessageQueue", "Returning message: " + msg); msg.markInUse(); return msg; // 返回得到的消息对象 } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } . . . . . . // 处理idle handlers部分 for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf("MessageQueue", "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } pendingIdleHandlerCount = 0; nextPollTimeoutMillis = 0; } }
1)如果消息队列里目前没有合适的消息可以摘取,那么不能让它所属的线程“傻转”,而应该使之阻塞;
2)队列里的消息应该按其“到时”的顺序进行排列,最先到时的消息会放在队头,也就是mMessages域所指向的消息,其后的消息依次排开;
3)阻塞的时间最好能精确一点儿,所以如果暂时没有合适的消息节点可摘时,要考虑链表首个消息节点将在什么时候到时,所以这个消息节点距离当前时刻的时间差,就是我们要阻塞的时长。
4)有时候外界希望队列能在即将进入阻塞状态之前做一些动作,这些动作可以称为idle动作,我们需要兼顾处理这些idle动作。一个典型的例子是外界希望队列在进入阻塞之前做一次垃圾收集。
JNI层C++代码分析(摘录自 <<深入理解Android:卷II>>)
1.在java层中的MessageQueued的构造中,初始化Native层的代码’mPtr = nativeInit();’.其中mPtr是一个NativeMessageQueue类型的指针.
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); }
2.nativePollOnce返回后,next函数将从mMessages中提取一个消息。也就是说,要让nativePollOnce返回,至少要添加一个消息到消息队列如果nativePollOnce在Native层等待,就表明Native层也可以投递Message.
nativePollOnce除了等待Java层来的Message,还在Native层做了大量的工作。
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, timeoutMillis); }
3.’ nativeWake(mPtr)’;所调用的’android_os_MessageQueue_nativeWake’函数则更为简单,仅仅向管道的写端写入一个字符“W”,这样管道的读端就会因为有数据可读而从等待状态中醒来
//[-->android_os_MessageQueue.cpp::android_os_MessageQueue_nativeWake] static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) { NativeMessageQueue* nativeMessageQueue = //取出NativeMessageQueue对象 reinterpret_cast<NativeMessageQueue*>(ptr); return nativeMessageQueue->wake(); //调用它的wake函数 } void NativeMessageQueue::wake() { mLooper->wake();//层层调用,现在转到mLooper的wake函数 }
// Native层Looper的wake函数代码如下[-->Looper.cpp::wake] void Looper::wake() { ssize_t nWrite; do { //向管道的写端写入一个字符 nWrite = write(mWakeWritePipeFd, "W", 1); } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) { if (errno != EAGAIN) { LOGW("Could not write wake signal, errno=%d", errno); } } }
4.Looper.cpp(相应的处理监控,等待都在此源码中):
pollInner调用epoll_wait函数等待 大家有兴趣可以研究源码.如果是管道读端发生事件,则认为是控制命令,可以直接读取管道中的数据。
int Looper::pollInner(int timeoutMillis) { ...... #ifdef LOOPER_USES_EPOLL //我们只讨论使用epoll进行I/O复用的方式 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; //调用epoll_wait,等待感兴趣的事件或超时发生 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); #else ......//使用别的方式进行I/O复用 #endif //从epoll_wait返回,这时候一定发生了什么事情 mLock.lock(); if (eventCount < 0) { //返回值小于零,表示发生错误 if (errno == EINTR) { goto Done; } //设置result为ALOOPER_POLL_ERROR,并跳转到Done result = ALOOPER_POLL_ERROR; goto Done; } //eventCount为零,表示发生超时,因此直接跳转到Done if (eventCount == 0) { result = ALOOPER_POLL_TIMEOUT; goto Done; } #ifdef LOOPER_USES_EPOLL //根据epoll的用法,此时的eventCount表示发生事件的个数 for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; /* 之前通过pipe函数创建过两个fd,这里根据fd知道是管道读端有可读事件。 读者还记得对nativeWake函数的分析吗?在那里我们向管道写端写了一个“W”字符,这样 就能触发管道读端从epoll_wait函数返回了 */ if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { //awoken函数直接读取并清空管道数据,读者可自行研究该函数 awoken(); } ...... } else { /* mRequests和前面的mResponse相对应,它也是一个KeyedVector,其中存储了 fd和对应的Request结构体,该结构体封装了和监控文件句柄相关的一些上下文信息, 例如回调函数等。我们在后面的小节会再次介绍该结构体 */ ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; //将epoll返回的事件转换成上层LOOPER使用的事件 if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; //每处理一个Request,就相应构造一个Response pushResponse(events, mRequests.valueAt(requestIndex)); } ...... } } Done: ; #else ...... #endif //除了处理Request外,还处理Native的Message,注意,Native的Message和Jave层的Message无任何关系 mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { { sp<MessageHandler>handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); //调用Native的handler处理Native的Message //从这里也可看出Native Message和Java层的Message没有什么关系 handler->handleMessage(message); } mLock.lock(); mSendingMessage = false; result = ALOOPER_POLL_CALLBACK; } else { mNextMessageUptime = messageEnvelope.uptime; break; } } mLock.unlock(); //处理那些带回调函数的Response for (size_t i = 0; i < mResponses.size(); i++) { const Response& response = mResponses.itemAt(i); ALooper_callbackFunc callback = response.request.callback; if (callback) {//有了回调函数,就能知道如何处理所发生的事情了 int fd = response.request.fd; int events = response.events; void* data = response.request.data; //调用回调函数处理所发生的事件 int callbackResult = callback(fd, events, data); if (callbackResult == 0) { //callback函数的返回值很重要,如果为0,表明不需要再次监视该文件句柄 removeFd(fd); } result = ALOOPER_POLL_CALLBACK; } } return result; }
MessageQueue总结
MessageQueue核心逻辑下移到Native层后,极大地拓展了消息处理的范围,总结一下有以下几点:
MessageQueue继续支持来自Java层的Message消息,也就是早期的Message加Handler的处理方式。
MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
2.4 Message
message是消息的载体,但是其实他是一个单向链表的节点 ,内部有一个消息池.
1.首先是主要的属性
/*package*/ int flags; /*package*/ long when; /*package*/ Bundle data; /*package*/ Handler target; /*package*/ Runnable callback; // sometimes we store linked lists of these things /*package*/ Message next;
2.通过’obtain()’获取消息实例.关于message 消息池:当message pool中没有可用的message实例,就创建.当然,也可以通过Handler对象的obtainMessage()获取 一个Message实例。
/** * Return a new Message instance from the global pool. Allows us to * avoid allocating new objects in many cases. */ public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; return m; } } return new Message(); }
3.当调用removeMessages时(没有单纯的清空一个message的方法),就讲message从Message Queue中断开链接,清空所有队列中的message.
void removeMessages(Handler h, int what, Object object) { if (h == null) { return; } synchronized (this) { Message p = mMessages; // Remove all messages at front. while (p != null && p.target == h && p.what == what && (object == null || p.obj == object)) { Message n = p.next; mMessages = n; p.recycleUnchecked(); p = n; } // Remove all messages after front. while (p != null) { Message n = p.next; if (n != null) { if (n.target == h && n.what == what && (object == null || n.obj == object)) { Message nn = n.next; n.recycleUnchecked(); p.next = nn; continue; } } p = n; } } }
2.5 Handler
1.将looper和looper中的MessageQueue 的引用放到对象中,此后,handler就会对二者进行使用,而callback是要异步的回调接口
public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
2.然后是发送消息sendMessageAtTime
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { // 将自己的handler作为消息的目标target!日后msg.target.dispatchMessage()时会使用。 msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
3 总结
Java层
打一个水车的比喻
1.Threadlocal就像是水车下的地基基座,决定这looper在哪个线程绑定运行.
2.looper就像一辆水轮车的架构,绑定所在线程,负责创建和管理MessageQueue,循环消息队列.
3.MessageQueue就像水车上的一连串的水瓢,负责运送管理每一个水瓢Massage,合理的检测massage的情况,是不是被阻塞mBlocked,有没有“异步消息”barrier?,有没有被用完?然后进行排序,梳理链表.
4.Handler就像一个人申请对水瓢中的消息进行增删改查,他会绑定一个looper中的水车,向水车中水瓢的管理者MessageQueue发起请求,我想插入新消息enqueueMessage,我想拿XX消息obtainMessage,您是不是有XX的消息handleMessage?当然,这其中,具体的增删改查的姿势~有很多种方式.
5.Massage就是一个水瓢对象,信息的载体,他有很多属性,比如我有什么what?我的执行的时间是什么when?谁对我进行操作Handler?那个消息在我下面next?当我有话要说的时候告诉谁callback?等等.而且,他自己有供别人增删改查的方法.
java层的流程图
Native层
1.MessageQueue内部通过mPtr变量保存一个Native层的NativeMessageQueue对象,mMessages保存来自Java层的Message消息。
2.NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。
3.Native层对应也有Message类和Message-Handler抽象类。在编码时,一般使用的是MessageHandler的派生类WeakMessage-Handler类。
4.MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,是通过Native的Message和MessageHandler来处理的。
整体UML类图
从处理逻辑上看,先是处理Native的Message,然后是处理Native的Request,最后才是处理Java的Message。
4 参考
1.悠然红茶的博客
2.<<深入理解Android:卷II>> 邓凡平
作者:NullPoints