opoojkk

Android 17 MessageQueue 无锁化实现解析

lxx
目次

最近 Android 有一个比较重磅的变化 —— MessageQueue 的实现从原来的有锁结构改成了无锁结构。

MessageQueue 是 Android 中非常核心的一套机制。 应用中的各种事件都会通过 MessageQueue 来进行排队和分发。

旧的有锁实现,本质上是一个 按时间排序的单链表优先队列。

这里的“优先”并不是传统意义上的 priority,而是:

1
2
when 越小
优先级越高

也就是说 谁应该更早执行,谁就排在前面。

消息按照 when 字段排序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
mMessages
   
   
Message (when=10)
   
Message (when=20)
   
Message (when=50)
   
Message (when=100)

因此队列始终保证:

1
when 最小的消息在队头

MessageQueue.next() 每次只需要取:

1
mMessages

即可得到下一条要执行的消息。

这里有一个非常容易被误解的地方:

MessageQueue 并不是 FIFO 队列。

很多人会认为消息是按照发送顺序执行的,但实际上 MessageQueue 是按照 when 排序的。

例如:

1
2
handler.sendMessageDelayed(msgA, 5000);
handler.sendMessage(msgB);

发送顺序是:

1
2
msgA
msgB

when 的值是:

1
2
msgA = now + 5000
msgB = now

因此队列顺序变成:

1
2
msgB
msgA

也就是说:执行顺序由时间决定,而不是发送顺序。

所以 MessageQueue 更准确的描述其实是:按执行时间排序的优先队列。

旧实现中:

无论是

都需要先获取锁。

基本流程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
producer线程
    
    
获取锁
    
    
 when 排序插入链表
    
    
释放锁

消费消息时同样需要获取锁。

当消息较多或者多个线程同时插入消息时,就容易出现锁竞争。

同时 UI 渲染相关任务依赖 同步屏障(Sync Barrier)机制 来保证优先级。

当 Choreographer 插入同步屏障后:

1
2
同步消息被阻塞
异步消息继续执行

新的版本中,将消息处理拆成了两个阶段:

1
2
1 插入消息
2 消费消息

整体结构也发生了变化。

旧实现中生产者线程直接操作有序链表。

而新的实现结构变成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Producer threads
        
        
   Lock-free Stack
        
        
     drainStack()
        
        
   Priority Queue

也就是说:

  1. 生产者线程
  2. 只负责把消息压入无锁栈

真正的排序操作只在 Looper 线程消费消息时 才发生。

分发 #

在真正开始分发消息时,Looper 线程会先调用:

1
drainStack()

把无锁栈中的消息转移到优先队列中。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private void drainStack(StackNode oldTop) {
    while (oldTop.isMessageNode()) {
        MessageNode oldTopMessageNode = (MessageNode) oldTop;
        if (oldTopMessageNode.removeFromStack()) {
            insertIntoPriorityQueue(oldTopMessageNode);
        }
        MessageNode inserted = oldTopMessageNode;
        oldTop = oldTopMessageNode.mNext;
        inserted.mNext = null;
    }
}

这里做的事情其实很简单:

1
2
3
4
stack
priority queue

完整执行流程如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Handler.sendMessage
   构造 MessageNode
      CAS 压栈
┌──────────────────────┐
│   Lock-free Stack     │
└──────────┬───────────┘
      drainStack()
┌─────────────────────────┐
│     Priority Queue       │
│  Sync Queue   Async Queue│
└──────────┬──────────────┘
   选择最早可执行消息
dispatchMessage()
handleMessage()

之所以要设计这一层 stack,主要是为了 降低多线程插入消息时的竞争。

如果生产者线程直接操作优先队列,那么每次插入都会涉及:

1
2
heap 调整
O(log n)

同时还需要锁保护。

而 stack 的插入是:

1
O(1)

并且只需要一次 CAS。

三种状态 #

在新的 MessageQueue 中,涉及到三种状态节点:

1
2
3
STACK_NODE_ACTIVE
STACK_NODE_PARKED
STACK_NODE_TIMEDPARK

分别表示:

1
2
3
4
5
6
7
8
ACTIVE
Looper线程正在运行

PARKED
Looper线程阻塞等待消息

TIMEDPARK
Looper线程正在等待某个时间点

例如:

如果下一条消息要在 5 秒后执行,Looper 会进行类似:

1
pollOnce(timeout)

的定时等待。

这三种状态会在 下一次插入消息时参与决策。

新的 MessageQueue 内部维护了两个优先队列:

1
2
同步消息队列
异步消息队列

当存在同步屏障时:

1
2
同步消息被阻塞
异步消息继续执行

如果没有同步屏障,则会从两个队列中选择when 最早的消息执行。

插入消息 #

说完消费流程之后,再来看消息是如何加入队列的。

在新的实现中,消息首先会加入一个 无锁栈。

这个栈实际上是一个用链表实现的 Treiber Stack。

插入逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
while (true) {
    StackNode old = (StackNode) sState.getVolatile(this);

    node.mNext = old;

    if (sState.compareAndSet(this, old, node)) {
        ...
        return true;
    }
}

CAS 的含义是:

1
compareAndSet(expected, newValue)

只有当当前值等于 expected 时才会更新成功,否则更新失败。

三种状态在插入消息时的作用,就是 决定是否需要唤醒 Looper 线程。

ACTIVE #

1
Looper线程正在运行

此时只需要压栈即可,不需要唤醒。

PARKED #

1
Looper线程阻塞等待消息

此时插入新消息必须唤醒:

1
nativeWake(mPtr)

TIMEDPARK #

1
Looper线程正在等待某个时间点

此时是否唤醒取决于:

1
2
3
新消息执行时间
是否早于
当前唤醒时间

如果更早,就需要提前唤醒。

如果当前栈顶不是状态节点,而是 MessageNode,说明栈中已经堆积了消息。

这时需要通过:

1
mBottomOfStack

找到栈底的状态节点。

结构类似:

1
2
3
4
5
6
7
MessageNode
   
MessageNode
   
MessageNode
   
StateNode

这样 sState 既表示:

1
2
栈顶节点
同时也包含栈状态

如果状态和结构分开维护,就需要:

1
2
CAS stack
CAS state

两次原子操作。

现在只需要:

1
CAS(sState)

一次即可保证一致性。

两个队列有什么不同 #

两个优先队列分别是:

插入逻辑如下:

1
2
3
4
5
if (msgNode.isAsync()) {
    mAsyncPriorityQueue.add(msgNode);
} else {
    mPriorityQueue.add(msgNode);
}

同步消息进入同步队列,异步消息进入异步队列。

这样在存在同步屏障时,可以直接从异步队列取消息,而不需要扫描整个队列。

状态什么时候恢复 Active #

状态恢复为 ACTIVE 的情况主要有:

  1. Looper线程被唤醒
  2. Looper进入 next() 开始处理消息

线程重新开始执行消息循环时,会恢复为 ACTIVE 状态。

总结 #

Android 17 对 MessageQueue 做了一次重要的重构。

旧实现结构:

1
2
3
4
5
单链表
+
全局锁
+
多线程竞争

新实现结构:

1
2
3
Lock-free Stack
+
Priority Queue

生产者线程只需要:

1
2
CAS
压栈

消费者线程再统一整理顺序。

同时通过三种状态:

1
2
3
ACTIVE
PARKED
TIMEDPARK

精细控制线程唤醒。

整体思路可以总结为:

1
2
3
4
5
无锁收集消息
+
单线程整理顺序
+
按需唤醒 Looper

这样既降低了锁竞争,也减少了不必要的线程唤醒。

标签:
Categories:

评论

评论由 giscus 提供;如果未加载,可能是网络环境阻止了 giscus。