快速消息队列 (FMQ)

HIDL 的远程过程调用 (RPC) 基础架构使用 binder 机制,这意味着调用涉及开销,需要内核操作,并且可能会触发调度程序操作。不过,对于必须在开销较小且无内核参与的进程之间传输数据的情况,则使用快速消息队列 (FMQ) 系统。

FMQ 会创建具有所需属性的消息队列。您可以通过 HIDL RPC 调用发送 MQDescriptorSyncMQDescriptorUnsync 对象,接收进程会使用该对象来访问消息队列。

队列类型

Android 支持两种队列类型(称为“风格”):

  • 未同步队列:可以溢出,并且可以有多个读取器;每个读取器都必须及时读取数据,否则数据将会丢失。
  • 已同步队列:不能溢出,并且只能有一个读取器。

这两种队列都不能下溢(从空队列进行读取将会失败),且都只能有 1 个写入器。

非同步队列

未同步队列只有一个写入器,但可以有任意多个读取器。此类队列有一个写入位置;不过,每个读取器都会跟踪各自的独立读取位置。

对此类队列执行写入操作一定会成功(不会检查是否出现溢出情况),但前提是写入的内容不超出配置的队列容量(如果写入的内容超出队列容量,则操作会立即失败)。由于各个读取器的读取位置可能不同,因此每当新的写入操作需要空间时,系统都允许数据离开队列,而不会等待每个读取器读取每条数据。

读取器负责在数据离开队列末尾之前对其进行检索。如果读取操作尝试读取的数据超出可用数据量,则该操作要么立即失败(如果非阻塞),要么等到有足够多的可用数据时(如果阻塞)。如果读取操作尝试读取的数据超出队列容量,则读取一定会立即失败。

如果某个读取器的读取速度无法跟上写入器的写入速度,则写入的数据量和该读取器尚未读取的数据量加在一起会超出队列容量,这会导致下一次读取不会返回数据;相反,该读取操作会将读取器的读取位置重置为等于最新的写入位置,然后返回失败。如果在发生溢出后但在下一次读取之前,系统查看可供读取的数据,则会显示可供读取的数据超出了队列容量,这表示发生了溢出。(如果队列溢出发生在查看可用数据和尝试读取这些数据之间,则溢出的唯一表征就是读取操作失败。)

同步队列

已同步队列有一个写入器和一个读取器,其中写入器有一个写入位置,读取器有一个读取位置。写入的数据量不可能超出队列可提供的空间;读取的数据量不可能超出队列当前存在的数据量。如果尝试写入的数据量超出可用空间或尝试读取的数据量超出现有数据量,该操作要么立即返回失败,要么阻塞到可以完成所需操作为止,具体取决于调用的是阻塞还是非阻塞写入或读取函数。如果尝试读取或尝试写入的数据量超出队列容量,读取或写入操作一定会立即失败。

设置 FMQ

一个消息队列需要多个 MessageQueue 对象:一个对象用作数据写入目标,一个或多个对象用作数据读取来源。没有关于哪个对象用于写入数据,哪个对象用于读取数据的显式配置;用户需负责确保没有对象既用于读取数据又用于写入数据,并且最多只有 1 个写入器,而且对于同步队列,最多只有 1 个读取器。

创建第一个 MessageQueue 对象

通过单个调用创建并配置消息队列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) 初始化程序负责创建并初始化支持消息队列功能的对象。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) 初始化程序负责创建并初始化支持消息队列功能和阻塞的对象。
  • flavor 可以是 kSynchronizedReadWrite(对于已同步队列)或 kUnsynchronizedWrite(对于未同步队列)。
  • uint16_t(在本示例中)可以是任意不涉及嵌套式缓冲区(无 stringvec 类型)、句柄或接口的 HIDL 定义的类型
  • kNumElementsInQueue 表示队列的大小(以条目数表示);它用于确定将为队列分配的共享内存缓冲区的大小。

创建第二个 MessageQueue 对象

消息队列的第二侧使用从第一侧获取的 MQDescriptor 对象来创建。MQDescriptor 对象会通过 HIDL 或 AIDL RPC 调用被发送到持有消息队列第二端的进程。MQDescriptor 包含有关队列的信息,其中包括:

  • 用于映射缓冲区和写入指针的信息。
  • 用于映射读取指针的信息(如果队列已同步)。
  • 用于映射事件标记字词的信息(如果队列是阻塞队列)。
  • 对象类型 (<T, flavor>),其中包含 HIDL 定义的类型的队列元素和队列变种(已同步或未同步)。

您可以使用 MQDescriptor 对象来构造 MessageQueue 对象:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers 参数指示在创建此 MessageQueue 对象时是否将读取和写入位置重置为 0。在未同步队列中,读取位置(这是未同步队列中每个 MessageQueue 对象的本地位置)在对象创建过程中始终设为 0。通常,系统会在创建第一个消息队列对象过程中初始化 MQDescriptor。如需对共享内存进行额外的控制,您可以手动设置 MQDescriptorMQDescriptorsystem/libhidl/base/include/hidl/MQDescriptor.h 中进行定义),然后按照本部分的说明创建每个 MessageQueue 对象。

阻塞队列和事件标记

默认情况下,队列不支持阻塞读取和写入。有两种类型的阻塞读取和写入调用:

  • 短格式:有三个参数(数据指针、项数、超时),支持阻塞单个队列中的各项读写操作。在使用这种格式时,队列将在内部处理事件标志和位掩码,并且在初始化第一个消息队列对象时,必须采用第二个参数 true。例如:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 长格式:有六个参数(包括事件标记和位掩码),支持在多个队列之间使用共享 EventFlag 对象,并允许指定要使用的通知位掩码。在这种情况下,必须为每个读取和写入调用提供事件标记和位掩码。

对于长格式,您可以在每个 readBlocking()writeBlocking() 调用中显式提供 EventFlag。您可以使用内部事件标记初始化其中一个队列,然后必须使用 getEventFlagWord() 从该队列的 MessageQueue 对象中提取该标记,并在每个进程中使用该标记创建 EventFlag 对象,以便与其他 FMQ 一起使用。或者,您也可以使用任何合适的共享内存来初始化 EventFlag 对象。

一般来说,每个队列都应只使用以下三者之一:非阻塞、短格式阻塞或长格式阻塞。混合使用也不算是错误;但要获得理想结果,则需要谨慎地进行编程。

将内存标记为只读

默认情况下,共享内存具有读取和写入权限。对于未同步队列 (kUnsynchronizedWrite),写入者可能想先移除所有读取者的写入权限,然后再分发 MQDescriptorUnsync 对象。这样可以确保其他进程无法向队列写入数据,建议采用此方法来防范读取者进程中的 bug 或不良行为。 如果写入者希望读取者在每次使用 MQDescriptorUnsync 创建队列的读取端时都能够重置队列,则不能将内存标记为只读。这是 MessageQueue 构造函数的默认行为。因此,如果此队列中已有用户,则需要更改其代码以使用 resetPointer=false 构造队列。

  • 写入器:使用 MQDescriptor 文件描述符调用 ashmem_set_prot_region,并将区域设置为只读 (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • 读取器:使用 resetPointer=false 创建消息队列(默认值为 true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

使用 MessageQueue

MessageQueue 对象的公共 API 是:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

您可以使用 availableToWrite()availableToRead() 确定在一次操作中可传输的数据量。在未同步队列中:

  • availableToWrite() 始终返回队列容量。
  • 每个读取器都有自己的读取位置,并会针对 availableToRead() 进行自己的计算。
  • 如果是读取速度缓慢的读取器,队列可以溢出,这可能会导致 availableToRead() 返回的值大于队列的大小。发生溢出后进行的第一次读取操作会失败,并且会导致相应读取器的读取位置被设为等于当前写入指针,无论是否通过 availableToRead() 报告了溢出都是如此。

如果所有请求的数据都可以(并已)传输到队列/从队列传出,read()write() 方法会返回 true。这两个方法不会阻塞;它们要么成功(并返回 true),要么立即返回失败 (false)。

readBlocking()writeBlocking() 方法会等到可以完成请求的操作,或等到超时(timeOutNanos 值为 0 表示永不超时)。

阻塞操作使用事件标记字词来实现。默认情况下,每个队列都会创建并使用自己的标记字词支持短格式的 readBlocking()writeBlocking()。多个队列可以共用一个字词,这样一来,进程就可以等待对任何队列执行写入或读取操作。通过调用 getEventFlagWord(),您可以获取指向队列的事件标记字词的指针,并可以使用该指针(或任何指向合适的共享内存位置的指针)来创建 EventFlag 对象,以传入其他队列的长格式的 readBlocking()writeBlocking()readNotificationwriteNotification 参数用于指示事件标记中的哪些位应该用于针对相应队列发出读取和写入信号。 readNotificationwriteNotification 是 32 位的位掩码。

readBlocking() 会等待 writeNotification 位;如果该参数为 0,调用一定会失败。如果 readNotification 值为 0,调用不会失败,但成功的读取操作将不会设置任何通知位。在已同步队列中,这意味着相应的 writeBlocking() 调用一定不会唤醒,除非已在其他位置对相应的位进行设置。在未同步队列中,writeBlocking() 不会等待(它应仍用于设置写入通知位),而对于读取操作来说,可以不设置任何通知位。同样,如果 readNotification 为 0,writeblocking() 会失败,并且成功的写入操作会设置指定的 writeNotification 位。

如需一次等待多个队列,请使用 EventFlag 对象的 wait() 方法等待通知的位掩码。wait() 方法会返回一个状态字词以及导致设置了唤醒的位。此信息随后用于验证相应的队列是否有足够的空间或数据来完成所需的写入和读取操作,并执行非阻塞 write()read()。如需获取后台操作通知,请再次调用 EventFlag 对象的 wake() 方法。如需了解 EventFlag 抽象化的定义,请参阅 system/libfmq/include/fmq/EventFlag.h

零复制操作

readwritereadBlockingwriteBlocking() 方法会将指向输入/输出缓冲区的指针作为参数,并在内部使用 memcpy() 调用,以便在相应缓冲区和 FMQ 环形缓冲区之间复制数据。为了提高性能,Android 8.0 及更高版本引入了一组 API,这些 API 可提供对环形缓冲区的直接指针访问,从而无需使用 memcpy 调用。

使用以下公共 API 执行零复制 FMQ 操作:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite 方法负责提供用于访问 FMQ 环形缓冲区的基址指针。写入数据之后,使用 commitWrite() 进行提交。beginReadcommitRead 方法的运作方式与之相同。
  • beginReadWrite 方法会将要读取和写入的消息条数作为输入,并返回一个布尔值来指示是否可以执行读取或写入操作。如果可以执行读取或写入操作,则会将基址指针填充到 memTx 结构体中,这些指针可用于对环形缓冲区共享内存进行直接指针访问。
  • MemRegion 结构体包含有关内存块的详细信息,其中包括基址指针(内存块的基址)和以 T 表示的长度(以 HIDL 定义的消息队列类型表示的内存块长度)。
  • MemTransaction 结构体包含两个 MemRegion 结构体(firstsecond),因为对环形缓冲区执行读取或写入操作时可能需要绕回到队列开头。这意味着,对 FMQ 环形缓冲区执行数据读取和写入操作需要两个基址指针。

如需从 MemRegion 结构体获取基址和长度,请调用:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

如需获取对 MemTransaction 对象内的第一个和第二个 MemRegion 结构体的引用,请调用:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零复制 API 写入 FMQ 的示例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

以下辅助方法也是 MemTransaction 的一部分:

  • T* getSlot(size_t idx); 会返回一个指针,该指针指向属于此 MemTransaction 对象一部分的 MemRegions 内的槽位 idx。如果 MemTransaction 对象表示要读取和写入 N 个类型为 T 的项的内存区域,则 idx 的有效范围在 0 到 N-1 之间。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);nMessages 个类型为 T 的项写入该对象描述的内存区域,从索引 startIdx 开始。此方法使用 memcpy(),不应该用于零复制操作。如果 MemTransaction 对象表示要读取和写入 N 个类型为 T 的项的内存区域,则 idx 的有效范围在 0 到 N-1 之间。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); 是一种辅助方法,用于从该对象描述的内存区域读取 nMessages 个类型为 T 的项,从 startIdx 开始。此方法使用 memcpy(),不应该用于零复制操作。

通过 HIDL 发送队列

在创建侧执行的操作:

  1. 按照上述方法创建消息队列对象。
  2. 使用 isValid() 验证对象是否有效。
  3. 如果您要通过将 EventFlag 传入长格式的 readBlocking()writeBlocking() 来等待多个队列,可以从经过初始化的 MessageQueue 对象提取事件标记指针(使用 getEventFlagWord())来创建相应的标记,再使用该标记创建必要的 EventFlag 对象。
  4. 使用 MessageQueue 方法 getDesc() 获取描述符对象。
  5. 在 HAL 文件中,为该方法提供一个类型为 fmq_syncfmq_unsync 的参数,其中 T 是 HIDL 定义的一种合适类型。使用此方法将 getDesc() 返回的对象发送到接收进程。

在接收侧执行的操作:

  1. 使用描述符对象创建 MessageQueue 对象。 请使用相同的队列变种和数据类型,否则将无法编译模板。
  2. 如果您已提取事件标记,则在接收进程中从相应的 MessageQueue 对象提取该标记。
  3. 使用 MessageQueue 对象传输数据。