Очередь быстрых сообщений (FMQ)

Если вам нужна поддержка AIDL, см. также FMQ с AIDL .

Инфраструктура удаленного вызова процедур (RPC) HIDL использует механизмы Binder, что означает, что вызовы связаны с накладными расходами, требуют операций ядра и могут запускать действия планировщика. Однако в тех случаях, когда данные должны передаваться между процессами с меньшими затратами и без участия ядра, используется система быстрой очереди сообщений (FMQ).

FMQ создает очереди сообщений с нужными свойствами. Объект MQDescriptorSync или MQDescriptorUnsync можно отправить посредством вызова HIDL RPC и использовать принимающий процесс для доступа к очереди сообщений.

Очереди быстрых сообщений поддерживаются только в C++ и на устройствах под управлением Android 8.0 и выше.

Типы очередей сообщений

Android поддерживает два типа очередей (известных как варианты ):

  • Несинхронизированным очередям разрешено переполнение, и у них может быть много читателей; каждый читатель должен вовремя прочитать данные или потерять их.
  • Синхронизированным очередям не разрешено переполнение, и они могут иметь только одного читателя.

Для обоих типов очередей не допускается опустошение (чтение из пустой очереди завершится неудачно) и они могут иметь только одну запись.

Несинхронизированный

Несинхронизированная очередь имеет только одну запись, но может иметь любое количество читателей. В очереди имеется одна позиция записи; однако каждый читатель отслеживает свою независимую позицию чтения.

Запись в очередь всегда завершается успешно (не проверяется на переполнение), если она не превышает настроенную емкость очереди (запись, превышающая емкость очереди, завершается сбоем немедленно). Поскольку у каждого считывателя может быть своя позиция чтения, вместо того, чтобы ждать, пока каждый считыватель прочитает каждый фрагмент данных, данные могут выпадать из очереди всякий раз, когда новым операциям записи требуется пространство.

Читатели несут ответственность за извлечение данных до того, как они попадут в конец очереди. Чтение, которое пытается прочитать больше данных, чем доступно, либо завершается сбоем немедленно (если неблокируется), либо ожидает, пока будет доступно достаточное количество данных (если блокируется). Чтение, пытающееся прочитать больше данных, чем емкость очереди, всегда немедленно завершается неудачей.

Если считыватель не успевает за записывающим, так что объем данных, записанных и еще не прочитанных этим считывателем, превышает емкость очереди, следующее чтение не возвращает данные; вместо этого он сбрасывает позицию чтения устройства чтения до последней позиции записи, а затем возвращает ошибку. Если данные, доступные для чтения, проверяются после переполнения, но перед следующим чтением, это показывает, что данных, доступных для чтения, больше, чем емкость очереди, что указывает на то, что произошло переполнение. (Если очередь переполняется между проверкой доступных данных и попыткой их чтения, единственным признаком переполнения является сбой чтения.)

Читатели несинхронизированной очереди, скорее всего, не захотят сбрасывать указатели чтения и записи очереди. Таким образом, при создании очереди из считывателей дескриптора следует использовать аргумент false для параметра resetPointers.

Синхронизировано

Синхронизированная очередь имеет одну запись и одно чтение с одной позицией записи и одной позицией чтения. Невозможно записать больше данных, чем имеется в очереди, или прочитать больше данных, чем в настоящее время находится в очереди. В зависимости от того, вызывается блокирующая или неблокирующая функция записи или чтения, попытки превысить доступное пространство или данные либо немедленно возвращают ошибку, либо блокируются до тех пор, пока желаемая операция не будет завершена. Попытки прочитать или записать больше данных, чем емкость очереди, всегда немедленно завершаются неудачей.

Настройка FMQ

Для очереди сообщений требуется несколько объектов MessageQueue : один для записи и один или несколько для чтения. Не существует явной настройки того, какой объект используется для записи или чтения; Пользователь должен гарантировать, что ни один объект не используется как для чтения, так и для записи, что существует не более одного записывающего устройства, а для синхронизированных очередей не более одного читателя.

Создание первого объекта MessageQueue

Очередь сообщений создается и настраивается одним вызовом:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Инициализатор MessageQueue<T, flavor>(numElements) создает и инициализирует объект, который поддерживает функциональность очереди сообщений.
  • Инициализатор MessageQueue<T, flavor>(numElements, configureEventFlagWord) создает и инициализирует объект, который поддерживает функциональность очереди сообщений с блокировкой.
  • flavor может быть либо kSynchronizedReadWrite для синхронизированной очереди, либо kUnsynchronizedWrite для несинхронизированной очереди.
  • uint16_t (в этом примере) может быть любым типом, определенным в HIDL , который не использует вложенные буферы (без string или vec -типов), дескрипторы или интерфейсы.
  • kNumElementsInQueue указывает размер очереди в количестве записей; он определяет размер буфера общей памяти, который будет выделен для очереди.

Создание второго объекта MessageQueue

Вторая сторона очереди сообщений создается с использованием объекта MQDescriptor , полученного из первой стороны. Объект MQDescriptor отправляется посредством вызова HIDL или AIDL RPC процессу, который будет удерживать второй конец очереди сообщений. MQDescriptor содержит информацию об очереди, в том числе:

  • Информация для отображения буфера и указателя записи.
  • Информация для сопоставления указателя чтения (если очередь синхронизирована).
  • Информация для сопоставления слова флага события (если очередь блокируется).
  • Тип объекта ( <T, flavor> ), который включает определенный HIDL тип элементов очереди и вариант очереди (синхронизированный или несинхронизированный).

Объект MQDescriptor можно использовать для создания объекта MessageQueue :

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Параметр resetPointers указывает, следует ли сбрасывать позиции чтения и записи в 0 при создании этого объекта MessageQueue . В несинхронизированной очереди позиция чтения (которая является локальной для каждого объекта MessageQueue в несинхронизированных очередях) во время создания всегда устанавливается на 0. Обычно MQDescriptor инициализируется во время создания первого объекта очереди сообщений. Для дополнительного контроля над общей памятью вы можете настроить MQDescriptor вручную ( MQDescriptor определен в system/libhidl/base/include/hidl/MQDescriptor.h ), а затем создать каждый объект MessageQueue , как описано в этом разделе.

Блокировка очередей и флагов событий

По умолчанию очереди не поддерживают блокировку чтения/записи. Существует два типа блокировки вызовов чтения/записи:

  • Краткая форма , с тремя параметрами (указатель данных, количество элементов, таймаут). Поддерживает блокировку отдельных операций чтения/записи в одной очереди. При использовании этой формы очередь будет обрабатывать флаг события и битовые маски внутри себя, а первый объект очереди сообщений должен быть инициализирован вторым параметром true . Например:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Длинная форма с шестью параметрами (включая флаг события и битовые маски). Поддерживает использование общего объекта EventFlag между несколькими очередями и позволяет указывать используемые битовые маски уведомлений. В этом случае флаг события и битовые маски должны передаваться для каждого вызова чтения и записи.

В полной форме EventFlag может быть указан явно при каждом вызове readBlocking() и writeBlocking() . Одну из очередей можно инициализировать с помощью внутреннего флага события, который затем необходимо извлечь из объектов MessageQueue этой очереди с помощью getEventFlagWord() и использовать для создания объектов EventFlag в каждом процессе для использования с другими FMQ. Альтернативно, объекты EventFlag могут быть инициализированы с использованием любой подходящей общей памяти.

В общем, каждая очередь должна использовать только один из вариантов: неблокирующая, короткая или полная блокировка. Их смешивание не является ошибкой, но для получения желаемого результата требуется тщательное программирование.

Маркировка памяти как доступной только для чтения

По умолчанию общая память имеет разрешения на чтение и запись. Для несинхронизированных очередей ( kUnsynchronizedWrite ) разработчику может потребоваться удалить разрешения на запись для всех читателей, прежде чем он раздаст объекты MQDescriptorUnsync . Это гарантирует, что другие процессы не смогут записывать данные в очередь, что рекомендуется для защиты от ошибок или неправильного поведения процессов чтения. Если автор записи хочет, чтобы читатели могли сбросить очередь всякий раз, когда они используют MQDescriptorUnsync для создания стороны очереди для чтения, тогда память не может быть помечена как доступная только для чтения. Это поведение конструктора MessageQueue по умолчанию. Итак, если в этой очереди уже есть пользователи, их код необходимо изменить, чтобы создать очередь с помощью resetPointer=false .

  • Writer: вызовите ashmem_set_prot_region с дескриптором файла MQDescriptor и регионом, доступным только для чтения ( PROT_READ ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Читатель: создайте очередь сообщений с помощью resetPointer=false (по умолчанию — true ):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Использование очереди сообщений

Публичный API объекта MessageQueue :

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite() и availableToRead() можно использовать для определения объема данных, которые можно передать за одну операцию. В несинхронизированной очереди:

  • availableToWrite() всегда возвращает емкость очереди.
  • Каждый читатель имеет свою собственную позицию чтения и выполняет собственные вычисления для availableToRead() .
  • С точки зрения медленного читателя, очередь может переполниться; это может привести к тому, что availableToRead() вернет значение, превышающее размер очереди. Первое чтение после переполнения завершится неудачей и приведет к тому, что позиция чтения для этого читателя будет установлена ​​равной текущему указателю записи, независимо от того, было ли о переполнении сообщено через availableToRead() .

Методы read() и write() возвращают true , если все запрошенные данные могли быть (и были) переданы в/из очереди. Эти методы не блокируются; они либо завершаются успешно (и возвращают true ), либо немедленно возвращают ошибку ( false ).

Методы readBlocking() и writeBlocking() ждут, пока запрошенная операция не будет завершена, или пока не истечет время ожидания (значение timeOutNanos , равное 0, означает, что время ожидания никогда не истекает).

Операции блокировки реализуются с использованием слова флага события. По умолчанию каждая очередь создает и использует свое собственное слово-флаг для поддержки краткой формы readBlocking() и writeBlocking() . Несколько очередей могут совместно использовать одно слово, так что процесс может ожидать записи или чтения в любой из очередей. Указатель на слово флага события очереди можно получить, вызвав getEventFlagWord() , и этот указатель (или любой указатель на подходящее место в общей памяти) можно использовать для создания объекта EventFlag для перехода в длинную форму readBlocking() и writeBlocking() для другой очереди. Параметры readNotification и writeNotification указывают, какие биты флага события следует использовать для сигнализации операций чтения и записи в этой очереди. readNotification и writeNotification — это 32-битные битовые маски.

readBlocking() ожидает бит writeNotification ; если этот параметр равен 0, вызов всегда завершается неудачно. Если значение readNotification равно 0, вызов не завершится неудачей, но успешное чтение не установит никаких битов уведомления. В синхронизированной очереди это будет означать, что соответствующий вызов writeBlocking() никогда не проснется, если бит не установлен в другом месте. В несинхронизированной очереди writeBlocking() не будет ждать (его все равно следует использовать для установки бита уведомления о записи), и при чтении уместно не устанавливать какие-либо биты уведомления. Аналогично, writeblocking() завершится неудачей, если readNotification равно 0, а успешная запись устанавливает указанные биты writeNotification .

Чтобы одновременно ожидать несколько очередей, используйте метод wait() объекта EventFlag для ожидания битовой маски уведомлений. Метод wait() возвращает слово состояния с битами, вызвавшими установку пробуждения. Эта информация затем используется для проверки того, что в соответствующей очереди достаточно места или данных для желаемой операции записи/чтения, и выполнения неблокирующей write() / read() . Чтобы получить уведомление о пост-операции, используйте еще один вызов метода wake() EventFlag . Определение абстракции EventFlag можно найти в system/libfmq/include/fmq/EventFlag.h .

Операции нулевого копирования

API-интерфейсы read / write / readBlocking / writeBlocking() принимают указатель на буфер ввода/вывода в качестве аргумента и используют внутренние вызовы memcpy() для копирования данных между ним и кольцевым буфером FMQ. Для повышения производительности Android 8.0 и более поздних версий включает набор API, которые обеспечивают прямой доступ к кольцевому буферу по указателю, устраняя необходимость использования вызовов memcpy .

Используйте следующие общедоступные API для операций FMQ с нулевым копированием:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Метод beginWrite предоставляет базовые указатели в кольцевой буфер FMQ. После записи данных зафиксируйте их с помощью commitWrite() . Методы beginRead / commitRead действуют аналогично.
  • Методы beginRead / Write принимают на вход количество сообщений, которые необходимо прочитать/записать, и возвращают логическое значение, указывающее, возможно ли чтение/запись. Если чтение или запись возможны, структура memTx заполняется базовыми указателями, которые можно использовать для прямого доступа к указателям в разделяемую память кольцевого буфера.
  • Структура MemRegion содержит подробную информацию о блоке памяти, включая базовый указатель (базовый адрес блока памяти) и длину в терминах T (длина блока памяти в терминах HIDL-определенного типа очереди сообщений).
  • Структура MemTransaction содержит две структуры MemRegion : first и second , поскольку чтение или запись в кольцевой буфер может потребовать переноса на начало очереди. Это будет означать, что для чтения/записи данных в кольцевой буфер FMQ необходимы два базовых указателя.

Чтобы получить базовый адрес и длину из структуры MemRegion :

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Чтобы получить ссылки на первый и второй MemRegion внутри объекта MemTransaction :

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Пример записи в FMQ с использованием API нулевого копирования:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Следующие вспомогательные методы также являются частью MemTransaction :

  • T* getSlot(size_t idx);
    Возвращает указатель на idx слота в MemRegions , которые являются частью этого объекта MemTransaction . Если объект MemTransaction представляет области памяти для чтения/записи N элементов типа T, то допустимый диапазон idx находится между 0 и N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    Записывайте элементы nMessages типа T в области памяти, описанные объектом, начиная с индекса startIdx . Этот метод использует memcpy() и не предназначен для операции нулевого копирования. Если объект MemTransaction представляет память для чтения/записи N элементов типа T, то допустимый диапазон idx находится между 0 и N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    Вспомогательный метод для чтения элементов nMessages типа T из областей памяти, описанных объектом, начиная с startIdx . Этот метод использует memcpy() и не предназначен для операции нулевого копирования.

Отправка очереди через HIDL

На творческой стороне:

  1. Создайте объект очереди сообщений, как описано выше.
  2. Убедитесь, что объект действителен с помощью isValid() .
  3. Если вы будете ожидать в нескольких очередях, передав EventFlag в длинную форму readBlocking() / writeBlocking() , вы можете извлечь указатель флага события (используя getEventFlagWord() ) из объекта MessageQueue , который был инициализирован для создания флага, и используйте этот флаг для создания необходимого объекта EventFlag .
  4. Используйте метод MessageQueue getDesc() для получения объекта дескриптора.
  5. В файле .hal укажите методу параметр типа fmq_sync или fmq_unsync где T — подходящий тип, определенный в HIDL. Используйте это, чтобы отправить объект, возвращаемый getDesc() , принимающему процессу.

На принимающей стороне:

  1. Используйте объект дескриптора для создания объекта MessageQueue . Обязательно используйте тот же вариант очереди и тип данных, иначе шаблон не удастся скомпилировать.
  2. Если вы извлекли флаг события, извлеките его из соответствующего объекта MessageQueue в процессе получения.
  3. Используйте объект MessageQueue для передачи данных.