本文将带您了解关于c#–MicrosoftMessageQueue–优先级标志还是单独的队列?的新内容,另外,我们还将为您提供关于activemq控制面板里的NumberOfPendingMessag
本文将带您了解关于c# – Microsoft Message Queue – 优先级标志还是单独的队列?的新内容,另外,我们还将为您提供关于activemq 控制面板里的 Number Of Pending Messages、 Messages Enqueued、Messages Dequeued含、Android Handler,Message,MessageQueue,Loper源码解析详解、Android message handling mechanism (Handler, Looper, MessageQueue and Message)、Android 中 Message,MessageQueue,Looper,Handler 详解 + 实例的实用信息。
本文目录一览:- c# – Microsoft Message Queue – 优先级标志还是单独的队列?
- activemq 控制面板里的 Number Of Pending Messages、 Messages Enqueued、Messages Dequeued含
- Android Handler,Message,MessageQueue,Loper源码解析详解
- Android message handling mechanism (Handler, Looper, MessageQueue and Message)
- Android 中 Message,MessageQueue,Looper,Handler 详解 + 实例
c# – Microsoft Message Queue – 优先级标志还是单独的队列?
我刚刚被告知有些消息需要优先于其他消息.
消息可能会出现波动,并且有可能在一次命中(例如一千次左右)中将大量消息放入队列中,因此在最终消息处理之前可能会有延迟.
我最初的想法是拥有第二个优先级消息队列,每个接收者进程也会在另一个线程中监视该队列.此队列中的消息数量要少得多,因此延迟时间会减少.
然后我偶然发现了Message.Priority属性.
所以:
我应该使用此优先级标志而不是实现另一个队列吗?它会成功有效地跳过这些消息吗?如果是这样,那么可能存在哪些条件和副作用?
或者我应该坚持我的原始计划并实施另一个优先级消息队列?
解决方法
它会成功有效地跳过这些消息吗?是.
如果是这样,那么可能存在哪些条件和副作用?没有,MSMQ旨在以这种方式工作.
activemq 控制面板里的 Number Of Pending Messages、 Messages Enqueued、Messages Dequeued含
activemq 控制面板里的 Number Of Pending Messages、 Messages Enqueued、Messages Dequeued含 博客分类: java
Number Of Consumers 消费者 这个是消费者端的消费者数量
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量
这个要分两种情况理解
在queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
在 topics里 它因为多消费者从而导致数量会比入队列数高。
简单的理解上面的意思就是
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1.
在来一条消息时,等待消费的消息是1,进入队列的消息就是2.
没有消费者时 Pending Messages 和 入队列数量一样
有消费者消费的时候 Pedding会减少 出队列会增加
到最后 就是 入队列和出队列的数量一样多
以此类推,进入队列的消息和出队列的消息是池子,等待消费的消息是水流。
Android Handler,Message,MessageQueue,Loper源码解析详解
本文主要是对Handler和消息循环的实现原理进行源码分析,如果不熟悉Handler可以参见博文《 Android中Handler的使用》,里面对Android为何以引入Handler机制以及如何使用Handler做了讲解。
概括来说,Handler是Android中引入的一种让开发者参与处理线程中消息循环的机制。我们在使用Handler的时候与Message打交道最多,Message是Hanlder机制向开发人员暴露出来的相关类,可以通过Message类完成大部分操作Handler的功能。但作为程序员,我不能只知道怎么用Handler,还要知道其内部如何实现的。Handler的内部实现主要涉及到如下几个类: Thread、MessageQueue和Looper。这几类之间的关系可以用如下的图来简单说明:
Thread是最基础的,Looper和MessageQueue都构建在Thread之上,Handler又构建在Looper和MessageQueue之上,我们通过Handler间接地与下面这几个相对底层一点的类打交道。
MessageQueue
MessageQueue源码链接
最基础最底层的是Thread,每个线程内部都维护了一个消息队列——MessageQueue。消息队列MessageQueue,顾名思义,就是存放消息的队列(好像是废话…)。那队列中存储的消息是什么呢?假设我们在UI界面上单击了某个按钮,而此时程序又恰好收到了某个广播事件,那我们如何处理这两件事呢? 因为一个线程在某一时刻只能处理一件事情,不能同时处理多件事情,所以我们不能同时处理按钮的单击事件和广播事件,我们只能挨个对其进行处理,只要挨个处理就要有处理的先后顺序。 为此Android把UI界面上单击按钮的事件封装成了一个Message,将其放入到MessageQueue里面去,即将单击按钮事件的Message入栈到消息队列中,然后再将广播事件的封装成以Message,也将其入栈到消息队列中。也就是说一个Message对象表示的是线程需要处理的一件事情,消息队列就是一堆需要处理的Message的池。线程Thread会依次取出消息队列中的消息,依次对其进行处理。MessageQueue中有两个比较重要的方法,一个是enqueueMessage方法,一个是next方法。enqueueMessage方法用于将一个Message放入到消息队列MessageQueue中,next方法是从消息队列MessageQueue中阻塞式地取出一个Message。在Android中,消息队列负责管理着顶级程序对象(Activity、BroadcastReceiver等)以及由其创建的所有窗口。需要注意的是,消息队列不是Android平台特有的,其他的平台框架也会用到消息队列,比如微软的MFC框架等。
Looper
Looper源码链接
消息队列MessageQueue只是存储Message的地方,真正让消息队列循环起来的是Looper,这就好比消息队列MessageQueue是个水车,那么Looper就是让水车转动起来的河水,如果没有河水,那么水车就是个静止的摆设,没有任何用处,Looper让MessageQueue动了起来,有了活力。
Looper是用来使线程中的消息循环起来的。默认情况下当我们创建一个新的线程的时候,这个线程里面是没有消息队列MessageQueue的。为了能够让线程能够绑定一个消息队列,我们需要借助于Looper:首先我们要调用Looper的prepare方法,然后调用Looper的Loop方法。典型的代码如下所示:
class LooperThread extends Thread { public Handler mHandler; public void run() { Looper.prepare(); mHandler = new Handler() { public void handleMessage(Message msg) { // process incoming messages here } }; Looper.loop(); } }
需要注意的是Looper.prepare()和Looper.loop()都是在新线程的run方法内调用的,这两个方法都是静态方法。我们通过查看Looper的源码可以发现,Looper的构造函数是private的,也就是在该类的外部不能用new Looper()的形式得到一个Looper对象。根据我们上面的描述,我们知道线程Thread和Looper是一对一绑定的,也就是一个线程中最多只有一个Looper对象,这也就能解释Looper的构造函数为什么是private的了,我们只能通过工厂方法Looper.myLooper()这个静态方法获取当前线程所绑定的Looper。
Looper通过如下代码保存了对当前线程的引用:
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
所以在Looper对象中通过sThreadLocal就可以找到其绑定的线程。ThreadLocal中有个set方法和get方法,可以通过set方法向ThreadLocal中存入一个对象,然后可以通过get方法取出存入的对象。ThreadLocal在new的时候使用了泛型,从上面的代码中我们可以看到此处的泛型类型是Looper,也就是我们通过ThreadLocal的set和get方法只能写入和读取Looper对象类型,如果我们调用其ThreadLocal的set方法传入一个Looper,将该Looper绑定给了该线程,相应的get就能获得该线程所绑定的Looper对象。
我们再来看一下Looper.prepare(),该方法是让Looper做好准备,只有Looper准备好了之后才能调用Looper.loop()方法,Looper.prepare()的代码如下:
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
上面的代码首先通过sThreadLocal.get()拿到线程sThreadLocal所绑定的Looper对象,由于初始情况下sThreadLocal并没有绑定Looper,所以第一次调用prepare方法时,sThreadLocal.get()返回null,不会抛出异常。重点是下面的代码sThreadLocal.set(new Looper(quitAllowed)),首先通过私有的构造函数创建了一个Looper对象的实例,然后通过sThreadLocal的set方法将该Looper绑定到sThreadLocal中。
这样就完成了线程sThreadLocal与Looper的双向绑定:
a. 在Looper内通过sThreadLocal可以获取Looper所绑定的线程;
b.线程sThreadLocal通过sThreadLocal.get()方法可以获取该线程所绑定的Looper对象。
上面的代码执行了Looper的构造函数,我们看一下其代码:
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
我们可以看到在其构造函数中实例化一个消息队列MessageQueue,并将其赋值给其成员字段mQueue,这样Looper也就与MessageQueue通过成员字段mQueue进行了关联。
在执行完了Looper.prepare()之后,我们就可以在外部通过调用Looper.myLooper()获取当前线程绑定的Looper对象。
myLooper的代码如下所示:
public static Looper myLooper() { return sThreadLocal.get(); }
需要注意的是,在一个线程中,只能调用一次Looper.prepare(),因为在第一次调用了Looper.prepare()之后,当前线程就已经绑定了Looper,在该线程内第二次调用Looper.prepare()方法的时候,sThreadLocal.get()会返回第一次调用prepare的时候绑定的Looper,不是null,这样就会走的下面的代码throw new RuntimeException(“Only one Looper may be created per thread”),从而抛出异常,告诉开发者一个线程只能绑定一个Looper对象。
在调用了Looper.prepare()方法之后,当前线程和Looper就进行了双向的绑定,这时候我们就可以调用Looper.loop()方法让消息队列循环起来了。
需要注意的是Looper.loop()应该在该Looper所绑定的线程中执行。
Looper.loop()的代码如下:
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn''t called on this thread."); } //注意下面这行 final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); //注意下面这行 for (;;) { //注意下面这行 Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } //注意下面这行 msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn''t corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
上面有几行代码是关键代码:
1. final MessageQueue queue = me.mQueue;
变量me是通过静态方法myLooper()获得的当前线程所绑定的Looper,me.mQueue是当前线程所关联的消息队列。
2. for (;;)
我们发现for循环没有设置循环终止的条件,所以这个for循环是个死循环。
3. Message msg = queue.next(); // might block
我们通过消息队列MessageQueue的next方法从消息队列中取出一条消息,如果此时消息队列中有Message,那么next方法会立即返回该Message,如果此时消息队列中没有Message,那么next方法就会阻塞式地等待获取Message。
4. msg.target.dispatchMessage(msg);
msg的target属性是Handler,该代码的意思是让Message所关联的Handler通过dispatchMessage方法让Handler处理该Message,关于Handler的dispatchMessage方法将会在下面详细介绍。
Handler
Handler源码链接
Handler是暴露给开发者最顶层的一个类,其构建在Thread、Looper与MessageQueue之上。
Handler具有多个构造函数,签名分别如下所示:
1. publicHandler()
2. publicHandler(Callbackcallback)
3. publicHandler(Looperlooper)
4. publicHandler(Looperlooper, Callbackcallback)
第1个和第2个构造函数都没有传递Looper,这两个构造函数都将通过调用Looper.myLooper()获取当前线程绑定的Looper对象,然后将该Looper对象保存到名为mLooper的成员字段中。
第3个和第4个构造函数传递了Looper对象,这两个构造函数会将该Looper保存到名为mLooper的成员字段中。
第2个和第4个构造函数还传递了Callback对象,Callback是Handler中的内部接口,需要实现其内部的handleMessage方法,Callback代码如下:
public interface Callback { public boolean handleMessage(Message msg); }
Handler.Callback是用来处理Message的一种手段,如果没有传递该参数,那么就应该重写Handler的handleMessage方法,也就是说为了使得Handler能够处理Message,我们有两种办法:
1. 向Hanlder的构造函数传入一个Handler.Callback对象,并实现Handler.Callback的handleMessage方法
2. 无需向Hanlder的构造函数传入Handler.Callback对象,但是需要重写Handler本身的handleMessage方法
也就是说无论哪种方式,我们都得通过某种方式实现handleMessage方法,这点与Java中对Thread的设计有异曲同工之处。
在Java中,如果我们想使用多线程,有两种办法:
1. 向Thread的构造函数传入一个Runnable对象,并实现Runnable的run方法
2. 无需向Thread的构造函数传入Runnable对象,但是要重写Thread本身的run方法
所以只要用过多线程Thread,应该就对Hanlder这种需要实现handleMessage的两种方式了然于心了。
我们知道通过sendMessageXXX系列方法可以向消息队列中添加消息,我们通过源码可以看出这些方法的调用顺序,
sendMessage调用了sendMessageDelayed,sendMessageDelayed又调用了sendMessageAtTime。
Handler中还有一系列的sendEmptyMessageXXX方法,而这些sendEmptyMessageXXX方法在其内部又分别调用了其对应的sendMessageXXX方法。
通过以下调用关系图我们可以看的更清楚些:
由此可见所有的sendMessageXXX方法和sendEmptyMessageXXX最终都调用了sendMessageAtTime方法。
我们再来看看postXXX方法,会发现postXXX方法在其内部又调用了对应的sendMessageXXX方法,我们可以查看下sendMessage的源码:
public final boolean post(Runnable r) { return sendMessageDelayed(getPostMessage(r), 0); }
可以看到内部调用了getPostMessage方法,该方法传入一个Runnable对象,得到一个Message对象,getPostMessage的源码如下:
private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
通过上面的代码我们可以看到在getPostMessage方法中,我们创建了一个Message对象,并将传入的Runnable对象赋值给Message的callback成员字段,然后返回该Message,然后在post方法中该携带有Runnable信息的Message传入到sendMessageDelayed方法中。由此我们可以看到所有的postXXX方法内部都需要借助sendMessageXXX方法来实现,所以postXXX与sendMessageXXX并不是对立关系,而是postXXX依赖sendMessageXXX,所以postXXX方法可以通过sendMessageXXX方法向消息队列中传入消息,只不过通过postXXX方法向消息队列中传入的消息都携带有Runnable对象(Message.callback)。
我们可以通过如下关系图看清楚postXXX系列方法与sendMessageXXX方法之间的调用关系:
通过分别分析sendEmptyMessageXXX、postXXX方法与sendMessageXXX方法之间的关系,我们可以看到在Handler中所有可以直接或间接向消息队列发送Message的方法最终都调用了sendMessageAtTime方法,该方法的源码如下:
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } //注意下面这行代码 return enqueueMessage(queue, msg, uptimeMillis); }
该方法内部调用了enqueueMessage方法,该方法的源码如下:
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { //注意下面这行代码 msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } //注意下面这行代码 return queue.enqueueMessage(msg, uptimeMillis); }
在该方法中有两件事需要注意:
1. msg.target = this
该代码将Message的target绑定为当前的Handler
2. queue.enqueueMessage
变量queue表示的是Handler所绑定的消息队列MessageQueue,通过调用queue.enqueueMessage(msg, uptimeMillis)我们将Message放入到消息队列中。
所以我们通过下图可以看到完整的方法调用顺序:
我们在分析Looper.loop()的源码时发现,Looper一直在不断的从消息队列中通过MessageQueue的next方法获取Message,然后通过代码msg.target.dispatchMessage(msg)让该msg所绑定的Handler(Message.target)执行dispatchMessage方法以实现对Message的处理。
Handler的dispatchMessage的源码如下:
public void dispatchMessage(Message msg) { //注意下面这行代码 if (msg.callback != null) { handleCallback(msg); } else { //注意下面这行代码 if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } //注意下面这行代码 handleMessage(msg); } }
我们来分析下这段代码:
1.首先会判断msg.callback存不存在,msg.callback是Runnable类型,如果msg.callback存在,那么说明该Message是通过执行Handler的postXXX系列方法将Message放入到消息队列中的,这种情况下会执行handleCallback(msg), handleCallback源码如下:
private static void handleCallback(Message message) { message.callback.run(); }
这样我们我们就清楚地看到我们执行了msg.callback的run方法,也就是执行了postXXX所传递的Runnable对象的run方法。
2.如果我们不是通过postXXX系列方法将Message放入到消息队列中的,那么msg.callback就是null,代码继续往下执行,接着我们会判断Handler的成员字段mCallback存不存在。mCallback是Hanlder.Callback类型的,我们在上面提到过,在Handler的构造函数中我们可以传递Hanlder.Callback类型的对象,该对象需要实现handleMessage方法,如果我们在构造函数中传递了该Callback对象,那么我们就会让Callback的handleMessage方法来处理Message。
3.如果我们在构造函数中没有传入Callback类型的对象,那么mCallback就为null,那么我们会调用Handler自身的hanldeMessage方法,该方法默认是个空方法,我们需要自己是重写实现该方法。
综上,我们可以看到Handler提供了三种途径处理Message,而且处理有前后优先级之分:首先尝试让postXXX中传递的Runnable执行,其次尝试让Handler构造函数中传入的Callback的handleMessage方法处理,最后才是让Handler自身的handleMessage方法处理Message。
一图胜千言
我们在本文讨论了Thread、MessageQueue、Looper以及Hanlder的之间的关系,我们可以通过如下一张传送带的图来更形象的理解他们之间的关系。
在现实生活的生产生活中,存在着各种各样的传送带,传送带上面洒满了各种货物,传送带在发动机滚轮的带动下一直在向前滚动,不断有新的货物放置在传送带的一端,货物在传送带的带动下送到另一端进行收集处理。
我们可以把传送带上的货物看做是一个个的Message,而承载这些货物的传送带就是装载Message的消息队列MessageQueue。传送带是靠发送机滚轮带动起来转动的,我们可以把发送机滚轮看做是Looper,而发动机的转动是需要电源的,我们可以把电源看做是线程Thread,所有的消息循环的一切操作都是基于某个线程的。一切准备就绪,我们只需要按下电源开关发动机就会转动起来,这个开关就是Looper的loop方法,当我们按下开关的时候,我们就相当于执行了Looper的loop方法,此时Looper就会驱动着消息队列循环起来。
那Hanlder在传送带模型中相当于什么呢?我们可以将Handler看做是放入货物以及取走货物的管道:货物从一端顺着管道划入传送带,货物又从另一端顺着管道划出传送带。我们在传送带的一端放入货物的操作就相当于我们调用了Handler的sendMessageXXX、sendEmptyMessageXXX或postXXX方法,这就把Message对象放入到了消息队列MessageQueue中了。当货物从传送带的另一端顺着管道划出时,我们就相当于调用了Hanlder的dispatchMessage方法,在该方法中我们完成对Message的处理。
啰啰嗦嗦说了很多,希望本文对于大家理解Android中的Handler和消息循环机制有所帮助。
到此这篇关于Android Handler,Message,MessageQueue,Loper源码解析详解的文章就介绍到这了,更多相关Android Handler,Message,MessageQueue,Loper内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
- Android 通过Messager与Service实现进程间双向通信案例详解
- Android Handle原理(Looper,Handler和Message)三者关系案例详解
- Android handle-message的发送与处理案例详解
- 深入Android Handler,MessageQueue与Looper关系
- Android中Handler与Message的简单实例
- Android开发使用Message对象分发必备知识点详解
Android message handling mechanism (Handler, Looper, MessageQueue and Message)
http://www.programering.com/a/MjM2QDMwATc.html
Android is a message driven, message driven several elements:
-
- The message says: Message
- Message queue: MessageQueue
- The news cycle, remove the message processing for circulation: Looper
- Message processing, message loop out messages from the message queue should be carried out after the processing of messages: Handler
Usually we use most often is Message and Handler, if you use HandlerThread or similar HandlerThread things may come into contact with Looper, while MessageQueue is used inside Looper, for the standard SDK, we are unable to instantiate and use (constructor is package visibility).
We usually come into contact with Looper, Message, Handler are realized by JAVA, Android for Linux based systems, the bottom with a C, C++, and NDK exist, how could the message driven model only exists in the JAVA layer, in fact, there are corresponding to the Java layer such as Looper, MessageQueue in the Native layer.
Initialization message queue
First to see if a thread to achieve the message loop should do, taking HandlerThread as an example:
public void run() {
mTid = Process.myTid();
Looper.prepare(); synchronized (this) {
mLooper = Looper.myLooper();
notifyAll();
}
Process.setThreadPriority(mPriority);
onLooperPrepared();
Looper.loop();
mTid = -1;
}
Is the red mark of the two, first call prepare to initialize the MessageQueue and Looper, and then call the loop enter the message loop. First look at the Looper.prepare.
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
Overloaded functions, quitAllowed default to true, from the name can be seen is whether the message loop can exit, the default is to exit, the Main thread (UI thread) initialization message loop is called prepareMainLooper, pass is false. The use of ThreadLocal, each thread can initialize a Looper.
Look at the Looper in the initialization did:
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mRun = true;
mThread = Thread.currentThread();
}
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
nativeInit();
}
In the Looper initialization, a MessageQueue object created saved in the mQueue member. The MessageQueue constructor is package visibility, so we cannot be used directly, while the MessageQueue initialization is called the nativeInit, which is a Native method:
static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
nativeMessageQueue->incStrong(env);
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
NativeMessageQueue* nativeMessageQueue) {
env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
reinterpret_cast<jint>(nativeMessageQueue));
}
In nativeInit, the new object of a Native layer MessageQueue, and the address stored in the Java layer of the MessageQueue member of the mPtr, there are a lot of implementation of Android, a class has been implemented in Java layer and Native layer, the GetFieldID and SetIntField JNI saved Native layer class an address to the Java layer for an instance of the class mPtr members, such as Parcel.
Then the realization of NativeMessageQueue:
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
Gets a Native layer of the Looper object in the constructor of the NativeMessageQueue, Native layer Looper also use thread local storage, note that new Looper introduced parameter false.
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd <0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
Native layer in Looper using epoll. Initializes a pipeline, mWakeWritePipeFd and mWakeReadPipeFd were preserved to write and read end of pipe end, and monitor the read end of the EPOLLIN event. Note the initializer list of values, the value of mAllowNonCallbacks for false.
What is mAllowNonCallback? The use of epoll to monitor mWakeReadPipeFd events? In fact, Native Looper can not only monitor the descriptors, Looper also provides the addFd method:
int addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data);
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
FD said to monitor descriptor. Ident said to monitor event identification, Value must be > or ALOOPER_POLL_BACK; =0(-2), Event said to monitor events, Callback is a callback function when the event occurs, This is the role of mAllowNonCallbacks, When the mAllowNonCallbacks is true allows callback to NULL, Ident in pollOnce as a result of return, Otherwise, do not allow the callback is empty, When the callback is not NULL, The value of ident will be ignored. Or just look at the code easy to understand:
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident <0) {
ALOGE("Invalid attempt to set NULL callback with ident <0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK;
}
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex <0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult <0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult <0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
If the callback is empty will check to see whether to allow mAllowNonCallbacks callback is empty, if the callback is empty will determine whether the ident > =0. If the callback is not empty the ident value of ALOOPER_POLL_CALLBACK, no matter in what value is.
Then the parameters of incoming values encapsulated into a Request structure, and the descriptor for the key to save to a KeyedVector mRequests, and then through the epoll_ctl to add or replace (if the descriptor before calling the addFD add monitoring) on this descriptor wiretapping.
The class diagram:
Send a message
Through the Looper.prepare initialized the message queue can be called after the Looper.loop enter the message loop, then we can send message to the message queue, message loop will remove messages are processed, before seeing the message processing, look at the news is how to be added to the message queue.
In the Java layer, the Message class represents a message object, to send a message must first obtain a message object, the constructor of the Message class is public, but does not recommend direct new Message, Message stored in a cache of the message pool, we can get a message from the buffer pool to use obtain, Message after using the system calls the recycle recovery, if they new a lot of Message, after each use system into the buffer pool, will occupy a lot of memory, as shown below:
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
sPoolSize--;
return m;
}
}
return new Message();
}
public void recycle() {
clearForRecycle();
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
The internal Message achieve a list by next members, such as the sPool to cache a list of Messages.
The message object get how to send it, as we all know is by Handler post, sendMessage and other methods, but these methods are ultimately sendMessageAtTime with a method call:
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
SendMessageAtTime access to the message queue and then call the enqueueMessage method, the mQueue is obtained from the message queue associated with the Handler Looper.
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
EnqueueMessage message target is set to the current handler, and then call the MessageQueue enqueueMessage, before calling queue.enqueueMessage judge mAsynchronous, from the name is asynchronous message means, to understand the role of Asynchronous, need to understand a concept Barrier.
Barrier and Asynchronous Message
Barrier is what meaning, from the name is an interceptor, the interceptor behind the news is temporarily unable to perform, until this interceptor is removed, the MessageQueue has a function called enqueueSyncBarier can add a Barrier.
int enqueueSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don''t need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
In enqueueSyncBarrier, Obtain a Message, And set msg.arg1=token, Token is only one of each call to enqueueSyncBarrier since the increase of int value, Objective is to return a token only each call to enqueueSyncBarrier, The Message also need to set the execution time, And then inserted into the message queue, Special is that the Message is not set to target, Msg.target null.
Enter the message loop will keep the news from the MessageQueue in the implementation, call the next function MessageQueue, in which there was a:
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
If the head of the queue for the message target null says it is a Barrier, because there are only two ways to add mMessages news, one is enqueueMessage, another is enqueueBarrier, and enqueueMessage if mst.target null is a direct throw an exception, will see the back.
Asynchronous message call is actually the case, we can through the enqueueBarrier to the message queue insert a Barrier, then the implementation of synchronization message queue in the Barrier time will be the Barrier intercept cannot execute until we call removeBarrier, remove the Barrier, and asynchronous message has no effect, the default is the synchronous message the message, unless we call the Message setAsynchronous, this method is hidden. Only in the Handler initialization parameter to the Handler specified by the message sent is asynchronous, so that it will be in the Handler enqueueMessage called Message setAsynchronous setup messages are asynchronous, from the above Handler.enqueueMessage code can be seen in the.
The so-called asynchronous message, there is only one, is in setting the Barrier still can be not affected by Barrier is normal, if not set Barrier, asynchronous message is no different from the synchronous message, can pass the removeSyncBarrier to remove the Barrier:
void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
final boolean needWake;
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycle();
}
if (needWake) {
nativeWake(mPtr);
}
}
The return value of parameter token is enqueueSyncBarrier, if not non-existent exception will be thrown calls the specified token.
enqueueMessage
Then look at how MessageQueue enqueueMessage.
final boolean enqueueMessage(Message msg, long when) {
if (msg.isInUse()) {
throw new AndroidRuntimeException(msg + " This message is already in use.");
}
if (msg.target == null) {
throw new AndroidRuntimeException("Message must have a target.");
}
boolean needWake;
synchronized (this) {
if (mQuiting) {
RuntimeException e = new RuntimeException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
return false;
}
msg.when = when;
Message p = mMessages;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don''t have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
}
if (needWake) {
nativeWake(mPtr);
}
return true;
}
Note the code red, when the msg.target null is directly thrown exception.
The first judgment in enqueueMessage, if the current message queue is empty, the execution time of when or add a new message is 0, or add a new message than the execution time of the message queue head message execution time is early, the message is added to the message queue head (message queue according to the time sequence), or to find a suitable location to the current message is added to the message queue.
Native send a message
Message model not only the Java layer, Native layer can also be used, the front also saw the message queue initialization also initializes the Looper and the NativeMessageQueue Native layer, so the Native layer should also can send a message. Unlike the Java layer, Native layer is the message Looper, also sending method all end is called sendMessageAtTime:
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i <messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
Native Message only one int type what field is used to distinguish between different message, SendMessageAtTime Message is specified, Message to the execution time of when, And processing the message Handler: MessageHandler, Then use the MessageEnvelope package of time, MessageHandler and Message, Native messages are saved to the mMessageEnvelopes, MMessageEnvelopes is a Vector<MessageEnvelope>. The Native message is also according to the time sequence, and the Java layer information is stored separately in the two cohort.
The message loop
Message queue initialization is good, also know how to send a message, the following is how to process the message, see the Handler.loop function:
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn''t called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
msg.target.dispatchMessage(msg);
if (logging != null) {
logging.println("<<<<<Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn''t corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycle();
}
}
When loop took a Message from the MessageQueue, call msg.target.dispatchMessage (MSG), handler and message associated target is to send message, so that the familiar dispatchMessage calls, Message was treated by recycle. When queue.next returns null will exit the news cycle, then look at MessageQueue.next is how to remove the message, and returns null when.
final Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(mPtr, nextPollTimeoutMillis); synchronized (this) {
if (mQuiting) {
return null;
}
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount <0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i <pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
MessageQueue.next will be the first to call nativePollOnce, then if mQuiting true returns null, Looper will exit the message loop.
The next head of the news from the message queue, If the head message is Barrier (target==null) on the next traversal to find the first asynchronous message, The next test access to the message (message queue message or the first asynchronous message), If NULL said no message to perform, Set nextPollTimeoutMillis = -1; otherwise detect this news to execution time., If the execution time of the markInUse and the message from the message queue to remove, And then returns from next to loop; otherwise, set nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE), That is the nearest to perform message also need how long, Both the current message queue no news can perform (set Barrier and no asynchronous message or the message queue is empty) or the head of the queue of the message not to execution time, Will execute the code behind the, To see if there is no set IdleHandler, If there is to run IdleHandler, When the IdleHandler is executing will set nextPollTimeoutMillis = 0.
First look at the nativePollOnce, native method, called JNI, and finally transferred to the Native Looper:: pollOnce, and from Java layer transfer in nextPollTimeMillis, namely the execution time of Java layer in the message queue to recent news how long execution time.
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
Don''t see a bunch of code first, look at the pollInner:
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis <0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount <0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i <eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i <mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}
Java messages are stored in the Java layer of the MessageQueue member of the mMessages, Native messages are stored in the Native Looper mMessageEnvelopes, it can be said that there are two message queue, and is arranged by time. TimeOutMillis said Java layer next to perform the news how long execution, mNextMessageUpdate said Native layer next to perform the news how long, if timeOutMillis is 0, epoll_wait does not set the TimeOut returned directly; if -1 Java layer without message directly with the Native time out; minimum or pollInner take the two one as the timeOut call epoll_wait. When epoll_wait returns may have the following conditions:
-
-
Error return.
-
Time Out
-
The normal return, there are events generated descriptors.
-
If the former is directly goto DONE two.
Otherwise, FD has happened, If the mWakeReadPipeFd EPOLLIN event awoken is called, If not mWakeReadPipeFd, It is through the addFD add FD, In addFD, to FD and events., callback,The data package into the Request object, Taking FD as the key stored in the KeyedVector mRequests, So here in FD to obtain association in addFD Request, And along with the events by adding pushResonse mResonse queue(Vector), Resonse is only for events and Request package. If an error of epoll_wait or timeout, there is no event descriptors, not the implementation of this code, so the direct goto DONE.
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
Enter DONE next, remove Native message headers from mMessageEnvelopes, if arrived at execution time handleMessage is called to handle it internally stored MessageeHandler and removed from the Native message queue, set result to ALOOPER_POLL_CALLBACK, otherwise the calculation of mNextMessageUptime said Native message queue a message to execution time. If not to the head message execution time may be Java layer message queuing message execution time is less than Native layer message queue message execution time, arrived at the execution time of epoll_wait TimeOut Java message, or add by addFd descriptor has occurred in epoll_wait returns, or epoll_wait error return. The Native message is not Barrier and Asynchronous.
Last, Through the mResponses (just by pushResponse put into it), If response.request.ident = = ALOOPER_POLL_CALLBACK, We call the registration of callback handleEvent (FD, events, data) processing, And then removed from the mResonses queue, After the traversal., MResponses retention and are ident> =0 and callback NULL. In the NativeMessageQueue initialization Looper into mAllowNonCallbacks false, so the treatment after mResponses must be empty.
Then return to pollOnce. PollOnce is a for cycle, The pollInner handles all response.request.ident==ALOOPER_POLL_CALLBACK Response, In second into the for cycle if the mResponses is not empty to find ident> 0 Response, The ident as the value returned by the function call pollOnce''s own processing, Here we are called in NativeMessageQueue Loope pollOnce, No return processing, But mAllowNonCallbacks false is not likely to enter the cycle. The return value of pollInner may not be 0, or can only be negative, so pollOnce for cycle will only be executed two times, then returned in second times.
Native Looper can be used alone, also has a prepare function, then the mAllowNonCallbakcs value may be true, pollOnce in mResponses makes sense.
Wake and awoken
In the Native Looper constructor, The pipe opened a pipeline, Combined use of mWakeReadPipeFd and mWakeWritePipeFd were preserved the read and write end of pipe end, Then use the epoll_ctl (mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem) monitored the read end of EPOLLIN events, In the pollInner by epoll_wait (mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis) read event, When writing to mWakeWritePipeFd, And at what time to read mWakeReadPipeFd.?
In Looper.cpp we can be found below two functions:
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
The wake function to write a &ldquo to mWakeWritePipeFd; W” character, awoken read from mWakeReadPipeFd, to mWakeWritePipeFd to write data to pollInner in the epoll_wait you can listen to the event returns. The pollInner can also see if the mWakeReadPipeFd EPOLLIN event is called the awoken consumed character written back processing.
When calling wake? Just find a place of call on the line, Look at the Looper.cpp, In the sendMessageAtTime when sending Native Message, The calculation should be inserted according to the execution time for mMessageEnvelopes to send Message, If it is in the head is inserted into the, We call the wake wake up epoll_wait, Because in the pollInner according to the execution time and the Native layer message queue head message execution time Java layer message queue header calculated a timeout, If the new message is inserted in the head, The execution time of at least the two messages in a before, So you should wake up epoll_wait, Epoll_wait returns, Check the Native message queue, To see whether the head message is just inserted message to the execution time., To execute, Otherwise you may need to set the new timeout. Also in the Java layer in the MessageQueue, there is a function nativeWake also can be called by JNI wake, call the nativeWake timing and call in Native wake time similar, insert a message in the message queue, there is also a case, the message queue head is a Barrier, and insert the message was the first message.
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don''t have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();//If the head is Barrier and the new messages are asynchronous message “ &rdquo may need to wake up;
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) { // The message queue is asynchronous message and the execution time in the news before, so there is no need to wake up.
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
In the head is inserted into the news does not have to call nativeWake, Because before may be performing IdleHandler, If the implementation of the IdleHandler, In the IdleHandler implementation of the nextPollTimeoutMillis is set to 0, Next time you start for cycle with the 0 call to nativePollOnce, Don''t need wake, Only when no message can perform (the message queue is empty or not to the execution time) and not set IdleHandler mBlocked to true.
If the Java layer of the message queue is Barrier Block and the insertion is an asynchronous message may need to wake up Looper, because the asynchronous message can be implemented in Barrier, but the asynchronous message must be the first time the implementation of asynchronous message.
Exit the Looper also need wake, removeSyncBarrier may also require.Android 中 Message,MessageQueue,Looper,Handler 详解 + 实例
这篇文章很是经典,所以拿过来分享给大家,望大家 提提人气.
一、几个关键概念
1、主线程给自己发送 Message
package test.message; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.widget.Button; import android.widget.TextView; public class MainActivity extends Activity { private Button btnTest; private TextView textView; private Handler handler; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); btnTest = (Button)this.findViewById(R.id.btn_01); textView = (TextView)this.findViewById(R.id.view_01); btnTest.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View arg0) { Looper looper = Looper.getMainLooper(); //主线程的Looper对象 //这里以主线程的Looper对象创建了handler, //所以,这个handler发送的Message会被传递给主线程的MessageQueue。 handler = new MyHandler(looper); handler.removeMessages(0); //构建Message对象 //第一个参数:是自己指定的message代号,方便在handler选择性地接收 //第二三个参数没有什么意义 //第四个参数需要封装的对象 Message msg = handler.obtainMessage(1,1,1,"主线程发消息了"); handler.sendMessage(msg); //发送消息 } }); } class MyHandler extends Handler{ public MyHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ super.handleMessage(msg); textView.setText("我是主线程的Handler,收到了消息:"+(String)msg.obj); } } }
2、其他线程给主线程发送 Message
package test.message; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.widget.Button; import android.widget.TextView; public class MainActivity extends Activity { private Button btnTest; private TextView textView; private Handler handler; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); btnTest = (Button)this.findViewById(R.id.btn_01); textView = (TextView)this.findViewById(R.id.view_01); btnTest.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View arg0) { //可以看出这里启动了一个线程来操作消息的封装和发送的工作 //这样原来主线程的发送就变成了其他线程的发送,简单吧?呵呵 new MyThread().start(); } }); } class MyHandler extends Handler{ public MyHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ super.handleMessage(msg); textView.setText("我是主线程的Handler,收到了消息:"+(String)msg.obj); } } //加了一个线程类 class MyThread extends Thread{ public void run(){ Looper looper = Looper.getMainLooper(); //主线程的Looper对象 //这里以主线程的Looper对象创建了handler, //所以,这个handler发送的Message会被传递给主线程的MessageQueue。 handler = new MyHandler(looper); //构建Message对象 //第一个参数:是自己指定的message代号,方便在handler选择性地接收 //第二三个参数没有什么意义 //第四个参数需要封装的对象 Message msg = handler.obtainMessage(1,1,1,"其他线程发消息了"); handler.sendMessage(msg); //发送消息 } } }
3、主线程给其他线程发送 Message
package test.message; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.widget.Button; import android.widget.TextView; public class MainActivity extends Activity { private Button btnTest; private TextView textView; private Handler handler; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); btnTest = (Button)this.findViewById(R.id.btn_01); textView = (TextView)this.findViewById(R.id.view_01); //启动线程 new MyThread().start(); btnTest.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View arg0) { //这里handler的实例化在线程中 //线程启动的时候就已经实例化了 Message msg = handler.obtainMessage(1,1,1,"主线程发送的消息"); handler.sendMessage(msg); } }); } class MyHandler extends Handler{ public MyHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ super.handleMessage(msg); textView.setText("我是主线程的Handler,收到了消息:"+(String)msg.obj); } } class MyThread extends Thread{ public void run(){ Looper.prepare(); //创建该线程的Looper对象,用于接收消息 //注意了:这里的handler是定义在主线程中的哦,呵呵, //前面看到直接使用了handler对象,是不是在找,在什么地方实例化的呢? //现在看到了吧???呵呵,开始的时候实例化不了,因为该线程的Looper对象 //还不存在呢。现在可以实例化了 //这里Looper.myLooper()获得的就是该线程的Looper对象了 handler = new ThreadHandler(Looper.myLooper()); //这个方法,有疑惑吗? //其实就是一个循环,循环从MessageQueue中取消息。 //不经常去看看,你怎么知道你有新消息呢??? Looper.loop(); } //定义线程类中的消息处理类 class ThreadHandler extends Handler{ public ThreadHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ //这里对该线程中的MessageQueue中的Message进行处理 //这里我们再返回给主线程一个消息 handler = new MyHandler(Looper.getMainLooper()); Message msg2 = handler.obtainMessage(1,1,1,"子线程收到:"+(String)msg.obj); handler.sendMessage(msg2); } } } }
4、其他线程给自己发送 Message
package test.message; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.widget.Button; import android.widget.TextView; public class MainActivity extends Activity { private Button btnTest; private TextView textView; private Handler handler; @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.main); btnTest = (Button)this.findViewById(R.id.btn_01); textView = (TextView)this.findViewById(R.id.view_01); btnTest.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View arg0) { //启动线程 new MyThread().start(); } }); } class MyHandler extends Handler{ public MyHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ super.handleMessage(msg); textView.setText((String)msg.obj); } } class MyThread extends Thread{ public void run(){ Looper.prepare(); //创建该线程的Looper对象 //这里Looper.myLooper()获得的就是该线程的Looper对象了 handler = new ThreadHandler(Looper.myLooper()); Message msg = handler.obtainMessage(1,1,1,"我自己"); handler.sendMessage(msg); Looper.loop(); } //定义线程类中的消息处理类 class ThreadHandler extends Handler{ public ThreadHandler(Looper looper){ super(looper); } public void handleMessage(Message msg){ //这里对该线程中的MessageQueue中的Message进行处理 //这里我们再返回给主线程一个消息 //加入判断看看是不是该线程自己发的信息 if(msg.what == 1 && msg.obj.equals("我自己")){ handler = new MyHandler(Looper.getMainLooper()); Message msg2 = handler.obtainMessage(1,1,1,"禀告主线程:我收到了自己发给自己的Message"); handler.sendMessage(msg2); } } } } }
原文转自 :http://tanghaibo001.blog.163.com/blog/static/9068612020111287218197/
关于c# – Microsoft Message Queue – 优先级标志还是单独的队列?的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于activemq 控制面板里的 Number Of Pending Messages、 Messages Enqueued、Messages Dequeued含、Android Handler,Message,MessageQueue,Loper源码解析详解、Android message handling mechanism (Handler, Looper, MessageQueue and Message)、Android 中 Message,MessageQueue,Looper,Handler 详解 + 实例等相关知识的信息别忘了在本站进行查找喔。
本文标签: