Очередь быстрых сообщений (FMQ)

Инфраструктура удаленного вызова процедур (RPC) HIDL использует механизмы связывания, что означает, что вызовы связаны с накладными расходами, требуют операций ядра и могут запускать действия планировщика. Однако в тех случаях, когда данные должны передаваться между процессами с меньшими затратами и без участия ядра, используется система быстрой очереди сообщений (FMQ).

FMQ создает очереди сообщений с нужными свойствами. Вы можете отправить объект MQDescriptorSync или MQDescriptorUnsync через вызов HIDL RPC, и этот объект будет использоваться принимающим процессом для доступа к очереди сообщений.

Типы очередей

Android поддерживает два типа очередей (известных как варианты ):

  • Несинхронизированным очередям разрешено переполнение, и у них может быть много читателей; каждый читатель должен вовремя прочитать данные или потерять их.
  • Синхронизированным очередям не разрешено переполнение, и у них может быть только один читатель.

Для обоих типов очередей не допускается опустошение (ошибка чтения из пустой очереди) и они могут иметь только одну запись.

Несинхронизированные очереди

Несинхронизированная очередь имеет только одну запись, но может иметь любое количество читателей. В очереди имеется одна позиция записи; однако каждый читатель отслеживает свою независимую позицию чтения.

Запись в очередь всегда завершается успешно (не проверяется на переполнение), если она не превышает настроенную емкость очереди (запись, превышающая емкость очереди, завершается сбоем немедленно). Поскольку у каждого читателя может быть своя позиция чтения, вместо того, чтобы ждать, пока каждый читатель прочитает каждый фрагмент данных, данные выпадают из очереди всякий раз, когда новым операциям записи требуется пространство.

Читатели несут ответственность за извлечение данных до того, как они попадут в конец очереди. Чтение, которое пытается прочитать больше данных, чем доступно, либо завершается сбоем немедленно (если неблокируется), либо ожидает, пока будет доступно достаточное количество данных (если блокируется). Чтение, пытающееся прочитать больше данных, чем емкость очереди, всегда немедленно завершается неудачей.

Если считыватель не успевает за записывающим, так что объем данных, записанных и еще не прочитанных этим считывателем, превышает емкость очереди, следующее чтение не возвращает данные; вместо этого он сбрасывает позицию чтения устройства чтения на позицию записи плюс половина емкости, а затем возвращает ошибку. Это оставляет половину буфера доступной для чтения и резервирует место для новых записей, чтобы избежать немедленного повторного переполнения очереди. Если данные, доступные для чтения, проверяются после переполнения, но перед следующим чтением, это показывает, что данных, доступных для чтения, больше, чем емкость очереди, что указывает на то, что произошло переполнение. (Если очередь переполняется между проверкой доступных данных и попыткой их чтения, единственным признаком переполнения является сбой чтения.)

Синхронизированные очереди

Синхронизированная очередь имеет одну запись и одно чтение с одной позицией записи и одной позицией чтения. Невозможно записать больше данных, чем имеется в очереди, или прочитать больше данных, чем в настоящее время находится в очереди. В зависимости от того, вызывается блокирующая или неблокирующая функция записи или чтения, попытки превысить доступное пространство или данные либо немедленно возвращают ошибку, либо блокируются до тех пор, пока желаемая операция не будет завершена. Попытки прочитать или записать больше данных, чем емкость очереди, всегда немедленно завершаются неудачей.

Настройте FMQ

Для очереди сообщений требуется несколько объектов MessageQueue : один для записи и один или несколько для чтения. Не существует явной настройки того, какой объект используется для записи или чтения; пользователь несет ответственность за то, чтобы ни один объект не использовался как для чтения, так и для записи, чтобы существовал не более одного записывающего устройства, а для синхронизированных очередей - чтобы был не более одного читателя.

Создайте первый объект MessageQueue.

Очередь сообщений создается и настраивается одним вызовом:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Инициализатор MessageQueue<T, flavor>(numElements) создает и инициализирует объект, который поддерживает функциональность очереди сообщений.
  • Инициализатор MessageQueue<T, flavor>(numElements, configureEventFlagWord) создает и инициализирует объект, который поддерживает функциональность очереди сообщений с блокировкой.
  • flavor может быть либо kSynchronizedReadWrite для синхронизированной очереди, либо kUnsynchronizedWrite для несинхронизированной очереди.
  • uint16_t (в этом примере) может быть любым типом, определенным в HIDL , который не использует вложенные буферы (без string или vec -типов), дескрипторы или интерфейсы.
  • kNumElementsInQueue указывает размер очереди в количестве записей; он определяет размер буфера общей памяти, выделенного для очереди.

Создайте второй объект MessageQueue.

Вторая сторона очереди сообщений создается с использованием объекта MQDescriptor полученного из первой стороны. Объект MQDescriptor отправляется посредством вызова HIDL или AIDL RPC процессу, который удерживает второй конец очереди сообщений. MQDescriptor содержит информацию об очереди, в том числе:

  • Информация для отображения буфера и указателя записи.
  • Информация для сопоставления указателя чтения (если очередь синхронизирована).
  • Информация для сопоставления слова флага события (если очередь блокируется).
  • Тип объекта ( <T, flavor> ), который включает определенный HIDL тип элементов очереди и вариант очереди (синхронизированный или несинхронизированный).

Вы можете использовать объект MQDescriptor для создания объекта MessageQueue :

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Параметр resetPointers указывает, следует ли сбрасывать позиции чтения и записи в 0 при создании этого объекта MessageQueue . В несинхронизированной очереди позиция чтения (которая является локальной для каждого объекта MessageQueue в несинхронизированных очередях) во время создания всегда устанавливается на 0. Обычно MQDescriptor инициализируется во время создания первого объекта очереди сообщений. Для дополнительного контроля над общей памятью вы можете настроить MQDescriptor вручную ( MQDescriptor определен в system/libhidl/base/include/hidl/MQDescriptor.h ), а затем создать каждый объект MessageQueue как описано в этом разделе.

Блокировать очереди и флаги событий

По умолчанию очереди не поддерживают блокировку чтения и записи. Существует два типа блокировки вызовов чтения и записи:

  • Краткая форма с тремя параметрами (указатель данных, количество элементов, время ожидания) поддерживает блокировку отдельных операций чтения и записи в одной очереди. При использовании этой формы очередь обрабатывает флаг события и битовые маски внутри себя, а первый объект очереди сообщений должен быть инициализирован вторым параметром true . Например:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Длинная форма с шестью параметрами (включая флаг события и битовые маски) поддерживает использование общего объекта EventFlag между несколькими очередями и позволяет указать используемые битовые маски уведомлений. В этом случае флаг события и битовые маски должны передаваться при каждом вызове чтения и записи.

В длинной форме вы можете явно указать EventFlag в каждом вызове readBlocking() и writeBlocking() . Вы можете инициализировать одну из очередей с помощью внутреннего флага события, который затем необходимо извлечь из объектов MessageQueue этой очереди с помощью getEventFlagWord() и использовать для создания объектов EventFlag в каждом процессе для использования с другими FMQ. Альтернативно вы можете инициализировать объекты EventFlag с помощью любой подходящей общей памяти.

В общем, каждая очередь должна использовать только один из вариантов: неблокирующая, короткая или полная блокировка. Их смешивание не является ошибкой, но для получения желаемого результата требуется тщательное программирование.

Отметить память как доступную только для чтения

По умолчанию общая память имеет разрешения на чтение и запись. Для несинхронизированных очередей ( kUnsynchronizedWrite ) разработчику может потребоваться удалить разрешения на запись для всех читателей, прежде чем он раздаст объекты MQDescriptorUnsync . Это гарантирует, что другие процессы не смогут записывать данные в очередь, что рекомендуется для защиты от ошибок или неправильного поведения процессов чтения. Если автор записи хочет, чтобы читатели могли сбрасывать очередь всякий раз, когда они используют MQDescriptorUnsync для создания стороны очереди для чтения, тогда память не может быть помечена как доступная только для чтения. Это поведение конструктора MessageQueue по умолчанию. Поэтому, если в этой очереди уже есть пользователи, их код необходимо изменить, чтобы создать очередь с помощью resetPointer=false .

  • Writer: вызовите ashmem_set_prot_region с дескриптором файла MQDescriptor и регионом, доступным только для чтения ( PROT_READ ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Читатель: создайте очередь сообщений с помощью resetPointer=false (по умолчанию — true ):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Используйте очередь сообщений

Публичный API объекта MessageQueue :

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Вы можете использовать availableToWrite() и availableToRead() чтобы определить, какой объем данных можно передать за одну операцию. В несинхронизированной очереди:

  • availableToWrite() всегда возвращает емкость очереди.
  • Каждый читатель имеет свою собственную позицию чтения и выполняет собственные вычисления для availableToRead() .
  • С точки зрения медленного читателя, очередь может переполниться; это может привести к тому, что availableToRead() вернет значение, превышающее размер очереди. Первое чтение после переполнения терпит неудачу и приводит к тому, что позиция чтения для этого читателя устанавливается равной текущему указателю записи, независимо от того, было ли о переполнении сообщено через availableToRead() .

Методы read() и write() возвращают true если все запрошенные данные могли быть (и были) переданы в очередь и из нее. Эти методы не блокируются; они либо завершаются успешно (и возвращают true ), либо немедленно возвращают ошибку ( false ).

Методы readBlocking() и writeBlocking() ждут, пока запрошенная операция не будет завершена, или пока не истечет время ожидания (значение timeOutNanos , равное 0, означает отсутствие тайм-аута).

Операции блокировки реализуются с использованием слова флага события. По умолчанию каждая очередь создает и использует свое собственное слово-флаг для поддержки краткой формы readBlocking() и writeBlocking() . Несколько очередей могут совместно использовать одно слово, так что процесс может ожидать записи или чтения в любой из очередей. Вызвав getEventFlagWord() , вы можете получить указатель на слово флага события очереди и использовать этот указатель (или любой указатель на подходящее место в общей памяти) для создания объекта EventFlag для передачи в длинную форму readBlocking() и writeBlocking() для другой очереди. Параметры readNotification и writeNotification указывают, какие биты флага события следует использовать для сигнализации операций чтения и записи в этой очереди. readNotification и writeNotification — это 32-битные битовые маски.

readBlocking() ожидает бит writeNotification ; если этот параметр равен 0, вызов всегда завершается неудачно. Если значение readNotification равно 0, вызов не завершится неудачей, но успешное чтение не установит никаких битов уведомления. В синхронизированной очереди это означает, что соответствующий вызов writeBlocking() никогда не активируется, если бит не установлен в другом месте. В несинхронизированной очереди writeBlocking() не ожидает (его все равно следует использовать для установки бита уведомления о записи), и при чтении уместно не устанавливать какие-либо биты уведомления. Аналогично, writeblocking() завершается неудачей, если readNotification равен 0, а успешная запись устанавливает указанные биты writeNotification .

Чтобы одновременно ожидать несколько очередей, используйте метод wait() объекта EventFlag для ожидания битовой маски уведомлений. Метод wait() возвращает слово состояния с битами, вызвавшими установку пробуждения. Эта информация затем используется для проверки того, что в соответствующей очереди достаточно места или данных для желаемой операции записи и чтения, и выполнения неблокирующих операций write() и read() . Чтобы получить уведомление о пост-операции, используйте еще один вызов метода wake() объекта EventFlag . Определение абстракции EventFlag см. system/libfmq/include/fmq/EventFlag.h .

Операции нулевого копирования

Методы read , write , readBlocking и writeBlocking() принимают указатель на буфер ввода-вывода в качестве аргумента и используют внутренние вызовы memcpy() для копирования данных между ним и кольцевым буфером FMQ. Для повышения производительности Android 8.0 и более поздних версий включает набор API, которые обеспечивают прямой доступ к кольцевому буферу по указателю, устраняя необходимость использования вызовов memcpy .

Используйте следующие общедоступные API для операций FMQ с нулевым копированием:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Метод beginWrite предоставляет базовые указатели в кольцевой буфер FMQ. После записи данных зафиксируйте их с помощью commitWrite() . Методы beginRead и commitRead действуют одинаково.
  • Методы beginRead и Write принимают на вход количество сообщений, которые необходимо прочитать и записать, и возвращают логическое значение, указывающее, возможно ли чтение или запись. Если чтение или запись возможны, структура memTx заполняется базовыми указателями, которые можно использовать для прямого доступа к указателям в общей памяти кольцевого буфера.
  • Структура MemRegion содержит подробную информацию о блоке памяти, включая базовый указатель (базовый адрес блока памяти) и длину в терминах T (длина блока памяти в терминах определенного HIDL типа очереди сообщений).
  • Структура MemTransaction содержит две структуры MemRegion : first и second поскольку чтение или запись в кольцевой буфер может потребовать переноса на начало очереди. Это означало бы, что для чтения и записи данных в кольцевой буфер FMQ необходимы два базовых указателя.

Чтобы получить базовый адрес и длину из структуры MemRegion :

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Чтобы получить ссылки на первую и вторую структуры MemRegion внутри объекта MemTransaction :

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Пример записи в FMQ с использованием API нулевого копирования:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Следующие вспомогательные методы также являются частью MemTransaction :

  • T* getSlot(size_t idx); возвращает указатель на idx слота в MemRegions , которые являются частью этого объекта MemTransaction . Если объект MemTransaction представляет области памяти для чтения и записи N элементов типа T , то допустимый диапазон idx находится между 0 и N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); записывает элементы nMessages типа T в области памяти, описанные объектом, начиная с индекса startIdx . Этот метод использует memcpy() и не предназначен для операции нулевого копирования. Если объект MemTransaction представляет память для чтения и записи N элементов типа T , то допустимый диапазон idx находится между 0 и N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); — это вспомогательный метод для чтения элементов nMessages типа T из областей памяти, описанных объектом, начиная с startIdx . Этот метод использует memcpy() и не предназначен для операции нулевого копирования.

Отправить очередь через HIDL

На творческой стороне:

  1. Создайте объект очереди сообщений, как описано выше.
  2. Убедитесь, что объект действителен с помощью isValid() .
  3. Если вы ожидаете несколько очередей, передавая EventFlag в длинную форму readBlocking() или writeBlocking() , вы можете извлечь указатель флага события (используя getEventFlagWord() ) из объекта MessageQueue , который был инициализирован для создания флага, и используйте этот флаг для создания необходимого объекта EventFlag .
  4. Используйте метод MessageQueue getDesc() чтобы получить объект дескриптора.
  5. В файле HAL дайте методу параметр типа fmq_sync или fmq_unsync где T — подходящий тип, определенный в HIDL. Используйте это, чтобы отправить объект, возвращенный getDesc() , принимающему процессу.

На принимающей стороне:

  1. Используйте объект дескриптора для создания объекта MessageQueue . Используйте тот же вариант очереди и тип данных, иначе шаблон не удастся скомпилировать.
  2. Если вы извлекли флаг события, извлеките его из соответствующего объекта MessageQueue в принимающем процессе.
  3. Используйте объект MessageQueue для передачи данных.