Fast Message Queue (FMQ)

HIDL's remote procedure call (RPC) infrastructure uses binder mechanisms, meaning calls involve overhead, require kernel operations, and can trigger scheduler action. However, for cases where data must be transferred between processes with less overhead and no kernel involvement, the Fast Message Queue (FMQ) system is used.

FMQ creates message queues with the desired properties. You can send an MQDescriptorSync or MQDescriptorUnsync object over a HIDL RPC call and the object is used by the receiving process to access the message queue.

Queue types

Android supports two queue types (known as flavors):

  • Unsynchronized queues are allowed to overflow, and can have many readers; each reader must read data in time or lose it.
  • Synchronized queues aren't allowed to overflow, and can have only one reader.

Both queue types aren't allowed to underflow (read from an empty queue fails) and can have only one writer.

Unsynchronized queues

An unsynchronized queue has only one writer, but can have any number of readers. There is one write position for the queue; however, each reader keeps track of its own independent read position.

Writes to the queue always succeed (aren't checked for overflow) as long as they are no larger than the configured queue capacity (writes larger than the queue capacity fail immediately). As each reader might have a different read position, rather than waiting for every reader to read every piece of data, data falls off the queue whenever new writes need the space.

Readers are responsible for retrieving data before it falls off the end of the queue. A read that attempts to read more data than is available either fails immediately (if nonblocking) or waits for enough data to be available (if blocking). A read that attempts to read more data than the queue capacity always fails immediately.

If a reader fails to keep up with the writer, such that the amount of data written and not yet read by that reader exceeds the queue capacity, the next read doesn't return data; instead, it resets the reader's read position to the write position plus half of the capacity, and then returns a failure. This leaves half of the buffer available to read and reserves space for new writes to avoid immediately overflowing the queue again. If the data available to read is checked after an overflow but before the next read, it shows more data available to read than the queue capacity, indicating that an overflow has occurred. (If the queue overflows between checking available data and attempting to read that data, the only indication of overflow is that the read fails.)

Synchronized queues

A synchronized queue has one writer and one reader with a single write position and a single read position. It's impossible to write more data than the queue has space for or read more data than the queue currently holds. Depending on whether the blocking or nonblocking write or read function is called, attempts to exceed available space or data either return failure immediately or block until the desired operation can be completed. Attempts to read or write more data than the queue capacity always fail immediately.

Set up an FMQ

A message queue requires multiple MessageQueue objects: one to be written to, and one or more to be read from. There is no explicit configuration of which object is used for writing or reading; the user is responsible for ensuring that no object is used for both reading and writing, that there is at most one writer, and for synchronized queues, that there is at most one reader.

Create the first MessageQueue object

A message queue is created and configured with a single call:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • The MessageQueue<T, flavor>(numElements) initializer creates and initializes an object that supports the message queue functionality.
  • The MessageQueue<T, flavor>(numElements, configureEventFlagWord) initializer creates and initializes an object that supports the message queue functionality with blocking.
  • flavor can be either kSynchronizedReadWrite for a synchronized queue or kUnsynchronizedWrite for an unsynchronized queue.
  • uint16_t (in this example) can be any HIDL-defined type that doesn't involve nested buffers (no string or vec types), handles, or interfaces.
  • kNumElementsInQueue indicates the size of queue in number of entries; it determines the size of shared memory buffer that is allocated for the queue.

Create the second MessageQueue object

The second side of the message queue is created using an MQDescriptor object obtained from the first side. The MQDescriptor object is sent over a HIDL or AIDL RPC call to the process that holds the second end of the message queue. The MQDescriptor contains information about the queue, including:

  • Information to map the buffer and write pointer.
  • Information to map the read pointer (if the queue is synchronized).
  • Information to map the event flag word (if the queue is blocking).
  • Object type (<T, flavor>), which includes the HIDL-defined type of queue elements and the queue flavor (synchronized or unsynchronized).

You can use the MQDescriptor object to construct a MessageQueue object:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

The resetPointers parameter indicates whether to reset the read and write positions to 0 while creating this MessageQueue object. In an unsynchronized queue, the read position (which is local to each MessageQueue object in unsynchronized queues) is always set to 0 during creation. Typically, the MQDescriptor is initialized during creation of the first message queue object. For extra control over the shared memory, you can set up the MQDescriptor manually (MQDescriptor is defined in system/libhidl/base/include/hidl/MQDescriptor.h), then create every MessageQueue object as described in this section.

Block queues and event flags

By default, queues don't support blocking reads and writes. There are two kinds of blocking read and write calls:

  • Short form, with three parameters (data pointer, number of items, timeout), supports blocking on individual read and write operations on a single queue. When using this form, the queue handles the event flag and bitmasks internally, and the first message queue object must be initialized with a second parameter of true. For example:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Long form, with six parameters (includes event flag and bitmasks), supports using a shared EventFlag object between multiple queues and allows specifying the notification bit masks to be used. In this case, the event flag and bitmasks must be supplied to each read and write call.

For the long form, you can supply the EventFlag explicitly in each readBlocking() and writeBlocking() call. You can initialize one of the queues with an internal event flag, which must then be extracted from that queue's MessageQueue objects using getEventFlagWord() and used to create an EventFlag objects in each process for use with other FMQs. Alternatively, you can initialize the EventFlag objects with any suitable shared memory.

In general, each queue should use only one of nonblocking, short-form blocking, or long-form blocking. It isn't an error to mix them, but careful programming is required to get the desired result.

Mark the memory as read only

By default, shared memory has read and write permissions. For unsynchronized queues (kUnsynchronizedWrite), the writer might want to remove write permissions for all of the readers before it hands out the MQDescriptorUnsync objects. This ensures the other processes can't write to the queue, which is recommended to protect against bugs or bad behavior in the reader processes. If the writer wants the readers to be able to reset the queue whenever they use MQDescriptorUnsync to create the read side of the queue, then the memory can't be marked as read-only. This is the default behavior of the MessageQueue constructor. So if there are existing users of this queue, their code needs to be changed to construct the queue with resetPointer=false.

  • Writer: Call ashmem_set_prot_region with a MQDescriptor file descriptor and region set to read-only (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Reader: Create message queue with resetPointer=false (the default is true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Use the MessageQueue

The public API of the MessageQueue object is:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

You can use availableToWrite() and availableToRead() to determine how much data can be transferred in a single operation. In an unsynchronized queue:

  • availableToWrite() always returns the capacity of the queue.
  • Each reader has its own read position and does its own calculation for availableToRead().
  • From the point of view of a slow reader, the queue is allowed to overflow; this can result in availableToRead() returning a value larger than the size of the queue. The first read after an overflow fails and results in the read position for that reader being set equal to the current write pointer, whether or not the overflow was reported through availableToRead().

The read() and write() methods return true if all requested data could be (and was) transferred to and from the queue. These methods don't block; they either succeed (and return true), or return failure (false) immediately.

The readBlocking() and writeBlocking() methods wait until the requested operation can be completed, or until they timeout (a timeOutNanos value of 0 means never timeout).

Blocking operations are implemented using an event flag word. By default, each queue creates and uses its own flag word to support the short form of readBlocking() and writeBlocking(). Multiple queues can share a single word, so that a process can wait on writes or reads to any of the queues. By calling getEventFlagWord(), you can get a pointer to a queue's event flag word, and you can use that pointer (or any pointer to a suitable shared memory location) to create an EventFlag object to pass into the long form of readBlocking() and writeBlocking()for a different queue. The readNotification and writeNotification parameters tell which bits in the event flag should be used to signal reads and writes on that queue. readNotification and writeNotification are 32-bit bitmasks.

readBlocking() waits on the writeNotification bits; if that parameter is 0, the call always fails. If the readNotification value is 0, the call doesn't fail, but a successful read won't set any notification bits. In a synchronized queue, this means that the corresponding writeBlocking() call never wakes up unless the bit is set elsewhere. In an unsynchronized queue, writeBlocking() doesn't wait (it should still be used to set the write notification bit), and it is appropriate for reads to not set any notification bits. Similarly, writeblocking() fails if readNotification is 0, and a successful write sets the specified writeNotification bits.

To wait on multiple queues at once, use an EventFlag object's wait() method to wait on a bitmask of notifications. The wait() method returns a status word with the bits that caused the wake up set. This information is then used to verify that the corresponding queue has enough space or data for the desired write and read operation and perform a nonblocking write() and read(). To get a post operation notification, use another call to the EventFlag object's wake() method. For a definition of the EventFlag abstraction, see system/libfmq/include/fmq/EventFlag.h.

Zero copy operations

The read, write, readBlocking, and writeBlocking() methods take a pointer to an input-output buffer as an argument and use memcpy() calls internally to copy data between the same and the FMQ ring buffer. To improve performance, Android 8.0 and higher include a set of APIs that provide direct pointer access into the ring buffer, eliminating the need to use memcpy calls.

Use the following public APIs for zero copy FMQ operations:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • The beginWrite method provides base pointers into the FMQ ring buffer. After the data is written, commit it using commitWrite(). The beginRead and commitRead methods act the same way.
  • The beginRead and Write methods take as input the number of messages to be read and written and return a boolean indicating if the read or write is possible. If the read or write is possible, the memTx struct is populated with base pointers that can be used for direct pointer access into the ring buffer shared memory.
  • The MemRegion struct contains details about a block of memory, including the base pointer (base address of the memory block) and the length in terms of T (length of the memory block in terms of the HIDL-defined type of the message queue).
  • The MemTransaction struct contains two MemRegion structs, first and second as a read or write into the ring buffer might require a wraparound to the beginning of the queue. This would mean that two base pointers are needed to read and write data into the FMQ ring buffer.

To get the base address and length from a MemRegion struct:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

To get references to the first and second MemRegion structs within a MemTransaction object:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Example write to the FMQ using zero copy APIs:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

The following helper methods are also part of MemTransaction:

  • T* getSlot(size_t idx); returns a pointer to slot idx within the MemRegions that are part of this MemTransaction object. If the MemTransaction object is representing the memory regions to read and write N items of type T, then the valid range of idx is between 0 and N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); writes nMessages items of type T into the memory regions described by the object, starting from index startIdx. This method uses memcpy() and isn't to meant to be used for a zero copy operation. If the MemTransaction object represents memory to read and write N items of type T, then the valid range of idx is between 0 and N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); is a helper method to read nMessages items of type T from the memory regions described by the object starting from startIdx. This method uses memcpy() and isn't meant to be used for a zero copy operation.

Send the queue over HIDL

On the creating side:

  1. Create a message queue object as described above.
  2. Verify the object is valid with isValid().
  3. If you're waiting on multiple queues by passing EventFlag into the long form of readBlocking() or writeBlocking(), you can extract the event flag pointer (using getEventFlagWord()) from a MessageQueue object that was initialized to create the flag, and use that flag to create the necessary EventFlag object.
  4. Use the MessageQueue method getDesc() to get a descriptor object.
  5. In the HAL file, give the method a parameter of type fmq_sync or fmq_unsync where T is a suitable HIDL-defined type. Use this to send the object returned by getDesc() to the receiving process.

On the receiving side:

  1. Use the descriptor object to create a MessageQueue object. Use the same queue flavor and data type, or the template fails to compile.
  2. If you extracted an event flag, extract the flag from the corresponding MessageQueue object in the receiving process.
  3. Use the MessageQueue object to transfer data.