Если вам нужна поддержка AIDL, см. также FMQ с AIDL .
Инфраструктура удаленного вызова процедур (RPC) HIDL использует механизмы Binder, что означает, что вызовы связаны с накладными расходами, требуют операций ядра и могут запускать действия планировщика. Однако в тех случаях, когда данные должны передаваться между процессами с меньшими затратами и без участия ядра, используется система быстрой очереди сообщений (FMQ).
FMQ создает очереди сообщений с нужными свойствами. Объект MQDescriptorSync
или MQDescriptorUnsync
можно отправить посредством вызова HIDL RPC и использовать принимающий процесс для доступа к очереди сообщений.
Очереди быстрых сообщений поддерживаются только в C++ и на устройствах под управлением Android 8.0 и выше.
Типы очередей сообщений
Android поддерживает два типа очередей (известных как варианты ):
- Несинхронизированным очередям разрешено переполнение, и у них может быть много читателей; каждый читатель должен вовремя прочитать данные или потерять их.
- Синхронизированным очередям не разрешено переполнение, и у них может быть только один читатель.
Для обоих типов очередей не допускается опустошение (ошибка чтения из пустой очереди) и они могут иметь только одну запись.
Несинхронизированный
Несинхронизированная очередь имеет только одну запись, но может иметь любое количество читателей. В очереди имеется одна позиция записи; однако каждый читатель отслеживает свою независимую позицию чтения.
Запись в очередь всегда завершается успешно (не проверяется на переполнение), если она не превышает настроенную емкость очереди (запись, превышающая емкость очереди, завершается сбоем немедленно). Поскольку у каждого считывателя может быть своя позиция чтения, вместо того, чтобы ждать, пока каждый считыватель прочитает каждый фрагмент данных, данные могут выпадать из очереди всякий раз, когда новым операциям записи требуется пространство.
Читатели несут ответственность за извлечение данных до того, как они попадут в конец очереди. Чтение, которое пытается прочитать больше данных, чем доступно, либо завершается сбоем немедленно (если неблокируется), либо ожидает, пока будет доступно достаточное количество данных (если блокируется). Чтение, пытающееся прочитать больше данных, чем емкость очереди, всегда немедленно завершается неудачей.
Если считыватель не успевает за записывающим, так что объем данных, записанных и еще не прочитанных этим считывателем, превышает емкость очереди, следующее чтение не возвращает данные; вместо этого он сбрасывает позицию чтения устройства чтения до последней позиции записи, а затем возвращает ошибку. Если данные, доступные для чтения, проверяются после переполнения, но перед следующим чтением, это показывает, что данных, доступных для чтения, больше, чем емкость очереди, что указывает на то, что произошло переполнение. (Если очередь переполняется между проверкой доступных данных и попыткой их чтения, единственным признаком переполнения является сбой чтения.)
Читатели несинхронизированной очереди, скорее всего, не захотят сбрасывать указатели чтения и записи очереди. Таким образом, при создании очереди из считывателей дескриптора следует использовать аргумент false для параметра resetPointers.
Синхронизировано
Синхронизированная очередь имеет одну запись и одно чтение с одной позицией записи и одной позицией чтения. Невозможно записать больше данных, чем имеется в очереди, или прочитать больше данных, чем в настоящее время находится в очереди. В зависимости от того, вызывается блокирующая или неблокирующая функция записи или чтения, попытки превысить доступное пространство или данные либо немедленно возвращают ошибку, либо блокируются до тех пор, пока желаемая операция не будет завершена. Попытки прочитать или записать больше данных, чем емкость очереди, всегда немедленно завершаются неудачей.
Настройте FMQ
Для очереди сообщений требуется несколько объектов MessageQueue
: один для записи и один или несколько для чтения. Не существует явной настройки того, какой объект используется для записи или чтения; Пользователь должен гарантировать, что ни один объект не используется как для чтения, так и для записи, что существует не более одного записывающего устройства, а для синхронизированных очередей не более одного читателя.
Создайте первый объект MessageQueue.
Очередь сообщений создается и настраивается одним вызовом:
#include <fmq/MessageQueue.h> using android::hardware::kSynchronizedReadWrite; using android::hardware::kUnsynchronizedWrite; using android::hardware::MQDescriptorSync; using android::hardware::MQDescriptorUnsync; using android::hardware::MessageQueue; .... // For a synchronized non-blocking FMQ mFmqSynchronized = new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite> (kNumElementsInQueue); // For an unsynchronized FMQ that supports blocking mFmqUnsynchronizedBlocking = new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite> (kNumElementsInQueue, true /* enable blocking operations */);
- Инициализатор
MessageQueue<T, flavor>(numElements)
создает и инициализирует объект, который поддерживает функциональность очереди сообщений. - Инициализатор
MessageQueue<T, flavor>(numElements, configureEventFlagWord)
создает и инициализирует объект, который поддерживает функциональность очереди сообщений с блокировкой. -
flavor
может быть либоkSynchronizedReadWrite
для синхронизированной очереди, либоkUnsynchronizedWrite
для несинхронизированной очереди. -
uint16_t
(в этом примере) может быть любым типом, определенным в HIDL , который не использует вложенные буферы (безstring
илиvec
типов), дескрипторы или интерфейсы. -
kNumElementsInQueue
указывает размер очереди в количестве записей; он определяет размер буфера общей памяти, выделенного для очереди.
Создайте второй объект MessageQueue.
Вторая сторона очереди сообщений создается с использованием объекта MQDescriptor
, полученного из первой стороны. Объект MQDescriptor
отправляется посредством вызова HIDL или AIDL RPC процессу, который удерживает второй конец очереди сообщений. MQDescriptor
содержит информацию об очереди, в том числе:
- Информация для отображения буфера и указателя записи.
- Информация для сопоставления указателя чтения (если очередь синхронизирована).
- Информация для сопоставления слова флага события (если очередь блокируется).
- Тип объекта (
<T, flavor>
), который включает определенный HIDL тип элементов очереди и вариант очереди (синхронизированный или несинхронизированный).
Объект MQDescriptor
можно использовать для создания объекта MessageQueue
:
MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)
Параметр resetPointers
указывает, следует ли сбрасывать позиции чтения и записи в 0 при создании этого объекта MessageQueue
. В несинхронизированной очереди позиция чтения (которая является локальной для каждого объекта MessageQueue
в несинхронизированных очередях) во время создания всегда устанавливается на 0. Обычно MQDescriptor
инициализируется во время создания первого объекта очереди сообщений. Для дополнительного контроля над общей памятью вы можете настроить MQDescriptor
вручную ( MQDescriptor
определен в system/libhidl/base/include/hidl/MQDescriptor.h
), а затем создать каждый объект MessageQueue
, как описано в этом разделе.
Блокировать очереди и флаги событий
По умолчанию очереди не поддерживают блокировку чтения и записи. Существует два типа блокировки вызовов чтения/записи:
- Краткая форма , с тремя параметрами (указатель данных, количество элементов, таймаут). Поддерживает блокировку отдельных операций чтения/записи в одной очереди. При использовании этой формы очередь обрабатывает флаг события и битовые маски внутри себя, а первый объект очереди сообщений должен быть инициализирован вторым параметром
true
. Например:// For an unsynchronized FMQ that supports blocking mFmqUnsynchronizedBlocking = new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite> (kNumElementsInQueue, true /* enable blocking operations */);
- Длинная форма с шестью параметрами (включая флаг события и битовые маски). Поддерживает использование общего объекта
EventFlag
между несколькими очередями и позволяет указывать используемые битовые маски уведомлений. В этом случае флаг события и битовые маски должны передаваться для каждого вызова чтения и записи.
В полной форме EventFlag
может быть указан явно при каждом вызове readBlocking()
и writeBlocking()
. Одну из очередей можно инициализировать с помощью внутреннего флага события, который затем необходимо извлечь из объектов MessageQueue
этой очереди с помощью getEventFlagWord()
и использовать для создания объектов EventFlag
в каждом процессе для использования с другими FMQ. Альтернативно, объекты EventFlag
могут быть инициализированы с использованием любой подходящей общей памяти.
В общем, каждая очередь должна использовать только один из вариантов: неблокирующая, короткая или полная блокировка. Их смешивание не является ошибкой, но для получения желаемого результата требуется тщательное программирование.
Отметить память как доступную только для чтения
По умолчанию общая память имеет разрешения на чтение и запись. Для несинхронизированных очередей ( kUnsynchronizedWrite
) разработчику может потребоваться удалить разрешения на запись для всех читателей, прежде чем он раздаст объекты MQDescriptorUnsync
. Это гарантирует, что другие процессы не смогут записывать данные в очередь, что рекомендуется для защиты от ошибок или неправильного поведения процессов чтения. Если автор записи хочет, чтобы читатели могли сбросить очередь каждый раз, когда они используют MQDescriptorUnsync
для создания стороны очереди для чтения, тогда память не может быть помечена как доступная только для чтения. Это поведение конструктора MessageQueue по умолчанию. Итак, если в этой очереди уже есть пользователи, их код необходимо изменить, чтобы создать очередь с помощью resetPointer=false
.
- Writer: вызовите
ashmem_set_prot_region
с дескриптором файлаMQDescriptor
и регионом, доступным только для чтения (PROT_READ
):int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
- Читатель: создайте очередь сообщений с помощью
resetPointer=false
(по умолчанию —true
):mFmq = new (std::nothrow) MessageQueue(mqDesc, false);
Используйте очередь сообщений
Публичный API объекта MessageQueue
:
size_t availableToWrite() // Space available (number of elements). size_t availableToRead() // Number of elements available. size_t getQuantumSize() // Size of type T in bytes. size_t getQuantumCount() // Number of items of type T that fit in the FMQ. bool isValid() // Whether the FMQ is configured correctly. const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process. bool write(const T* data) // Write one T to FMQ; true if successful. bool write(const T* data, size_t count) // Write count T's; no partial writes. bool read(T* data); // read one T from FMQ; true if successful. bool read(T* data, size_t count); // Read count T's; no partial reads. bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0); bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0); // Allows multiple queues to share a single event flag word std::atomic<uint32_t>* getEventFlagWord(); bool writeBlocking(const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification, int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts. bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification, int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts; //APIs to allow zero copy read/write operations bool beginWrite(size_t nMessages, MemTransaction* memTx) const; bool commitWrite(size_t nMessages); bool beginRead(size_t nMessages, MemTransaction* memTx) const; bool commitRead(size_t nMessages);
availableToWrite()
и availableToRead()
можно использовать для определения объема данных, которые можно передать за одну операцию. В несинхронизированной очереди:
-
availableToWrite()
всегда возвращает емкость очереди. - Каждый читатель имеет свою собственную позицию чтения и выполняет собственные вычисления для
availableToRead()
. - С точки зрения медленного читателя, очередь может переполниться; это может привести к тому, что
availableToRead()
вернет значение, превышающее размер очереди. Первое чтение после переполнения терпит неудачу и приводит к тому, что позиция чтения для этого читателя устанавливается равной текущему указателю записи, независимо от того, было ли о переполнении сообщено черезavailableToRead()
.
Методы read()
и write()
возвращают true
, если все запрошенные данные могли быть (и были) переданы в/из очереди. Эти методы не блокируются; они либо завершаются успешно (и возвращают true
), либо немедленно возвращают ошибку ( false
).
Методы readBlocking()
и writeBlocking()
ждут, пока запрошенная операция не будет завершена, или пока не истечет время ожидания (значение timeOutNanos
, равное 0, означает, что время ожидания никогда не истекает).
Операции блокировки реализуются с использованием слова флага события. По умолчанию каждая очередь создает и использует свое собственное слово-флаг для поддержки краткой формы readBlocking()
и writeBlocking()
. Несколько очередей могут совместно использовать одно слово, так что процесс может ожидать записи или чтения в любой из очередей. Указатель на слово флага события очереди можно получить, вызвав getEventFlagWord()
, и этот указатель (или любой указатель на подходящее место в общей памяти) можно использовать для создания объекта EventFlag
для перехода в длинную форму readBlocking()
и writeBlocking()
для другой очереди. Параметры readNotification
и writeNotification
указывают, какие биты флага события следует использовать для сигнализации операций чтения и записи в этой очереди. readNotification
и writeNotification
— это 32-битные битовые маски.
readBlocking()
ожидает бит writeNotification
; если этот параметр равен 0, вызов всегда завершается неудачно. Если значение readNotification
равно 0, вызов не завершится неудачей, но успешное чтение не установит никаких битов уведомления. В синхронизированной очереди это будет означать, что соответствующий вызов writeBlocking()
никогда не проснется, если бит не установлен в другом месте. В несинхронизированной очереди writeBlocking()
не ожидает ожидания (его все равно следует использовать для установки бита уведомления о записи), и при чтении уместно не устанавливать какие-либо биты уведомления. Аналогично, writeblocking()
завершается неудачей, если readNotification
равен 0, а успешная запись устанавливает указанные биты writeNotification
.
Чтобы одновременно ожидать несколько очередей, используйте метод wait()
объекта EventFlag
для ожидания битовой маски уведомлений. Метод wait()
возвращает слово состояния с битами, вызвавшими установку пробуждения. Эта информация затем используется для проверки того, что в соответствующей очереди достаточно места или данных для желаемой операции записи/чтения, и выполнения неблокирующей write()
/ read()
. Чтобы получить уведомление о пост-операции, используйте еще один вызов метода wake()
EventFlag
. Определение абстракции EventFlag
можно найти в system/libfmq/include/fmq/EventFlag.h
.
Операции нулевого копирования
API-интерфейсы read
/ write
/ readBlocking
/ writeBlocking()
принимают указатель на буфер ввода/вывода в качестве аргумента и используют внутренние вызовы memcpy()
для копирования данных между ним и кольцевым буфером FMQ. Для повышения производительности Android 8.0 и более поздних версий включает набор API, которые обеспечивают прямой доступ к кольцевому буферу по указателю, устраняя необходимость использования вызовов memcpy
.
Используйте следующие общедоступные API для операций FMQ с нулевым копированием:
bool beginWrite(size_t nMessages, MemTransaction* memTx) const; bool commitWrite(size_t nMessages); bool beginRead(size_t nMessages, MemTransaction* memTx) const; bool commitRead(size_t nMessages);
- Метод
beginWrite
предоставляет базовые указатели в кольцевой буфер FMQ. После записи данных зафиксируйте их с помощьюcommitWrite()
. МетодыbeginRead
/commitRead
действуют аналогично. - Методы
beginRead
/Write
принимают на вход количество сообщений, которые необходимо прочитать/записать, и возвращают логическое значение, указывающее, возможно ли чтение/запись. Если чтение или запись возможны, структураmemTx
заполняется базовыми указателями, которые можно использовать для прямого доступа к указателям в разделяемую память кольцевого буфера. - Структура
MemRegion
содержит подробную информацию о блоке памяти, включая базовый указатель (базовый адрес блока памяти) и длину в терминахT
(длина блока памяти в терминах HIDL-определенного типа очереди сообщений). - Структура
MemTransaction
содержит две структурыMemRegion
:first
иsecond
, поскольку чтение или запись в кольцевой буфер может потребовать переноса на начало очереди. Это будет означать, что для чтения/записи данных в кольцевой буфер FMQ необходимы два базовых указателя.
Чтобы получить базовый адрес и длину из структуры MemRegion
:
T* getAddress(); // gets the base address size_t getLength(); // gets the length of the memory region in terms of T size_t getLengthInBytes(); // gets the length of the memory region in bytes
Чтобы получить ссылки на первый и второй MemRegion
внутри объекта MemTransaction
:
const MemRegion& getFirstRegion(); // get a reference to the first MemRegion const MemRegion& getSecondRegion(); // get a reference to the second MemRegion
Пример записи в FMQ с использованием API нулевого копирования:
MessageQueueSync::MemTransaction tx; if (mQueue->beginRead(dataLen, &tx)) { auto first = tx.getFirstRegion(); auto second = tx.getSecondRegion(); foo(first.getAddress(), first.getLength()); // method that performs the data write foo(second.getAddress(), second.getLength()); // method that performs the data write if(commitWrite(dataLen) == false) { // report error } } else { // report error }
Следующие вспомогательные методы также являются частью MemTransaction
:
-
T* getSlot(size_t idx);
Возвращает указатель наidx
слота вMemRegions
, которые являются частью этого объектаMemTransaction
. Если объектMemTransaction
представляет области памяти для чтения/записи N элементов типа T, то допустимый диапазонidx
находится между 0 и N-1. -
bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
Записывайте элементыnMessages
типа T в области памяти, описанные объектом, начиная с индексаstartIdx
. Этот метод используетmemcpy()
и не предназначен для операции нулевого копирования. Если объектMemTransaction
представляет память для чтения/записи N элементов типа T, то допустимый диапазонidx
находится между 0 и N-1. -
bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
Вспомогательный метод для чтения элементовnMessages
типа T из областей памяти, описанных объектом, начиная сstartIdx
. Этот метод используетmemcpy()
и не предназначен для операции нулевого копирования.
Отправить очередь через HIDL
На творческой стороне:
- Создайте объект очереди сообщений, как описано выше.
- Убедитесь, что объект действителен с помощью
isValid()
. - Если вы ожидаете в нескольких очередях, передав
EventFlag
в длинную формуreadBlocking()
/writeBlocking()
, вы можете извлечь указатель флага события (используяgetEventFlagWord()
) из объектаMessageQueue
, который был инициализирован для создания флага, и используйте этот флаг для создания необходимого объектаEventFlag
. - Используйте метод
MessageQueue
getDesc()
для получения объекта дескриптора. - В файле
.hal
укажите методу параметр типаfmq_sync
или fmq_unsync
где T
— подходящий тип, определенный в HIDL. Используйте это, чтобы отправить объект, возвращенныйgetDesc()
принимающему процессу.
На принимающей стороне:
- Используйте объект дескриптора для создания объекта
MessageQueue
. Обязательно используйте тот же вариант очереди и тип данных, иначе шаблон не удастся скомпилировать. - Если вы извлекли флаг события, извлеките его из соответствующего объекта
MessageQueue
в принимающем процессе. - Используйте объект
MessageQueue
для передачи данных.