Инфраструктура удаленного вызова процедур (RPC) HIDL использует механизмы связывания, что означает, что вызовы связаны с накладными расходами, требуют операций ядра и могут запускать действия планировщика. Однако в тех случаях, когда данные должны передаваться между процессами с меньшими затратами и без участия ядра, используется система быстрой очереди сообщений (FMQ).
FMQ создает очереди сообщений с нужными свойствами. Вы можете отправить объект MQDescriptorSync
или MQDescriptorUnsync
через вызов HIDL RPC, и этот объект будет использоваться принимающим процессом для доступа к очереди сообщений.
Типы очередей
Android поддерживает два типа очередей (известных как варианты ):
- Несинхронизированным очередям разрешено переполнение, и у них может быть много читателей; каждый читатель должен вовремя прочитать данные или потерять их.
- Синхронизированным очередям не разрешено переполнение, и у них может быть только один читатель.
Для обоих типов очередей не допускается опустошение (ошибка чтения из пустой очереди) и они могут иметь только одну запись.
Несинхронизированные очереди
Несинхронизированная очередь имеет только одну запись, но может иметь любое количество читателей. В очереди имеется одна позиция записи; однако каждый читатель отслеживает свою независимую позицию чтения.
Запись в очередь всегда завершается успешно (не проверяется на переполнение), если она не превышает настроенную емкость очереди (запись, превышающая емкость очереди, завершается сбоем немедленно). Поскольку у каждого читателя может быть своя позиция чтения, вместо того, чтобы ждать, пока каждый читатель прочитает каждый фрагмент данных, данные выпадают из очереди всякий раз, когда новым операциям записи требуется пространство.
Читатели несут ответственность за извлечение данных до того, как они попадут в конец очереди. Чтение, которое пытается прочитать больше данных, чем доступно, либо завершается сбоем немедленно (если неблокируется), либо ожидает, пока будет доступно достаточное количество данных (если блокируется). Чтение, пытающееся прочитать больше данных, чем емкость очереди, всегда немедленно завершается неудачей.
Если считыватель не успевает за записывающим, так что объем данных, записанных и еще не прочитанных этим считывателем, превышает емкость очереди, следующее чтение не возвращает данные; вместо этого он сбрасывает позицию чтения устройства чтения на позицию записи плюс половина емкости, а затем возвращает ошибку. Это оставляет половину буфера доступной для чтения и резервирует место для новых записей, чтобы избежать немедленного повторного переполнения очереди. Если данные, доступные для чтения, проверяются после переполнения, но перед следующим чтением, это показывает, что данных, доступных для чтения, больше, чем емкость очереди, что указывает на то, что произошло переполнение. (Если очередь переполняется между проверкой доступных данных и попыткой их чтения, единственным признаком переполнения является сбой чтения.)
Синхронизированные очереди
Синхронизированная очередь имеет одну запись и одно чтение с одной позицией записи и одной позицией чтения. Невозможно записать больше данных, чем имеется в очереди, или прочитать больше данных, чем в настоящее время находится в очереди. В зависимости от того, вызывается блокирующая или неблокирующая функция записи или чтения, попытки превысить доступное пространство или данные либо немедленно возвращают ошибку, либо блокируются до тех пор, пока желаемая операция не будет завершена. Попытки прочитать или записать больше данных, чем емкость очереди, всегда немедленно завершаются неудачей.
Настройте FMQ
Для очереди сообщений требуется несколько объектов MessageQueue
: один для записи и один или несколько для чтения. Не существует явной настройки того, какой объект используется для записи или чтения; пользователь несет ответственность за то, чтобы ни один объект не использовался как для чтения, так и для записи, чтобы существовал не более одного записывающего устройства, а для синхронизированных очередей - чтобы был не более одного читателя.
Создайте первый объект MessageQueue.
Очередь сообщений создается и настраивается одним вызовом:
#include <fmq/MessageQueue.h> using android::hardware::kSynchronizedReadWrite; using android::hardware::kUnsynchronizedWrite; using android::hardware::MQDescriptorSync; using android::hardware::MQDescriptorUnsync; using android::hardware::MessageQueue; .... // For a synchronized nonblocking FMQ mFmqSynchronized = new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite> (kNumElementsInQueue); // For an unsynchronized FMQ that supports blocking mFmqUnsynchronizedBlocking = new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite> (kNumElementsInQueue, true /* enable blocking operations */);
- Инициализатор
MessageQueue<T, flavor>(numElements)
создает и инициализирует объект, который поддерживает функциональность очереди сообщений. - Инициализатор
MessageQueue<T, flavor>(numElements, configureEventFlagWord)
создает и инициализирует объект, который поддерживает функциональность очереди сообщений с блокировкой. -
flavor
может быть либоkSynchronizedReadWrite
для синхронизированной очереди, либоkUnsynchronizedWrite
для несинхронизированной очереди. -
uint16_t
(в этом примере) может быть любым типом, определенным в HIDL , который не использует вложенные буферы (безstring
илиvec
-типов), дескрипторы или интерфейсы. -
kNumElementsInQueue
указывает размер очереди в количестве записей; он определяет размер буфера общей памяти, выделенного для очереди.
Создайте второй объект MessageQueue.
Вторая сторона очереди сообщений создается с использованием объекта MQDescriptor
полученного из первой стороны. Объект MQDescriptor
отправляется посредством вызова HIDL или AIDL RPC процессу, который удерживает второй конец очереди сообщений. MQDescriptor
содержит информацию об очереди, в том числе:
- Информация для отображения буфера и указателя записи.
- Информация для сопоставления указателя чтения (если очередь синхронизирована).
- Информация для сопоставления слова флага события (если очередь блокируется).
- Тип объекта (
<T, flavor>
), который включает определенный HIDL тип элементов очереди и вариант очереди (синхронизированный или несинхронизированный).
Вы можете использовать объект MQDescriptor
для создания объекта MessageQueue
:
MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)
Параметр resetPointers
указывает, следует ли сбрасывать позиции чтения и записи в 0 при создании этого объекта MessageQueue
. В несинхронизированной очереди позиция чтения (которая является локальной для каждого объекта MessageQueue
в несинхронизированных очередях) во время создания всегда устанавливается на 0. Обычно MQDescriptor
инициализируется во время создания первого объекта очереди сообщений. Для дополнительного контроля над общей памятью вы можете настроить MQDescriptor
вручную ( MQDescriptor
определен в system/libhidl/base/include/hidl/MQDescriptor.h
), а затем создать каждый объект MessageQueue
как описано в этом разделе.
Блокировать очереди и флаги событий
По умолчанию очереди не поддерживают блокировку чтения и записи. Существует два типа блокировки вызовов чтения и записи:
- Краткая форма с тремя параметрами (указатель данных, количество элементов, время ожидания) поддерживает блокировку отдельных операций чтения и записи в одной очереди. При использовании этой формы очередь обрабатывает флаг события и битовые маски внутри себя, а первый объект очереди сообщений должен быть инициализирован вторым параметром
true
. Например:// For an unsynchronized FMQ that supports blocking mFmqUnsynchronizedBlocking = new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite> (kNumElementsInQueue, true /* enable blocking operations */);
- Длинная форма с шестью параметрами (включая флаг события и битовые маски) поддерживает использование общего объекта
EventFlag
между несколькими очередями и позволяет указать используемые битовые маски уведомлений. В этом случае флаг события и битовые маски должны передаваться при каждом вызове чтения и записи.
В длинной форме вы можете явно указать EventFlag
в каждом вызове readBlocking()
и writeBlocking()
. Вы можете инициализировать одну из очередей с помощью внутреннего флага события, который затем необходимо извлечь из объектов MessageQueue
этой очереди с помощью getEventFlagWord()
и использовать для создания объектов EventFlag
в каждом процессе для использования с другими FMQ. Альтернативно вы можете инициализировать объекты EventFlag
с помощью любой подходящей общей памяти.
В общем, каждая очередь должна использовать только один из вариантов: неблокирующая, короткая или полная блокировка. Их смешивание не является ошибкой, но для получения желаемого результата требуется тщательное программирование.
Отметить память как доступную только для чтения
По умолчанию общая память имеет разрешения на чтение и запись. Для несинхронизированных очередей ( kUnsynchronizedWrite
) разработчику может потребоваться удалить разрешения на запись для всех читателей, прежде чем он раздаст объекты MQDescriptorUnsync
. Это гарантирует, что другие процессы не смогут записывать данные в очередь, что рекомендуется для защиты от ошибок или неправильного поведения процессов чтения. Если автор записи хочет, чтобы читатели могли сбрасывать очередь всякий раз, когда они используют MQDescriptorUnsync
для создания стороны очереди для чтения, тогда память не может быть помечена как доступная только для чтения. Это поведение конструктора MessageQueue
по умолчанию. Поэтому, если в этой очереди уже есть пользователи, их код необходимо изменить, чтобы создать очередь с помощью resetPointer=false
.
- Writer: вызовите
ashmem_set_prot_region
с дескриптором файлаMQDescriptor
и регионом, доступным только для чтения (PROT_READ
):int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
- Читатель: создайте очередь сообщений с помощью
resetPointer=false
(по умолчанию —true
):mFmq = new (std::nothrow) MessageQueue(mqDesc, false);
Используйте очередь сообщений
Публичный API объекта MessageQueue
:
size_t availableToWrite() // Space available (number of elements). size_t availableToRead() // Number of elements available. size_t getQuantumSize() // Size of type T in bytes. size_t getQuantumCount() // Number of items of type T that fit in the FMQ. bool isValid() // Whether the FMQ is configured correctly. const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process. bool write(const T* data) // Write one T to FMQ; true if successful. bool write(const T* data, size_t count) // Write count T's; no partial writes. bool read(T* data); // read one T from FMQ; true if successful. bool read(T* data, size_t count); // Read count T's; no partial reads. bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0); bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0); // Allows multiple queues to share a single event flag word std::atomic<uint32_t>* getEventFlagWord(); bool writeBlocking(const T* data, size_t count, uint32_t readNotification, uint32_t writeNotification, int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts. bool readBlocking(T* data, size_t count, uint32_t readNotification, uint32_t writeNotification, int64_t timeOutNanos = 0, android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts; // APIs to allow zero copy read/write operations bool beginWrite(size_t nMessages, MemTransaction* memTx) const; bool commitWrite(size_t nMessages); bool beginRead(size_t nMessages, MemTransaction* memTx) const; bool commitRead(size_t nMessages);
Вы можете использовать availableToWrite()
и availableToRead()
чтобы определить, какой объем данных можно передать за одну операцию. В несинхронизированной очереди:
-
availableToWrite()
всегда возвращает емкость очереди. - Каждый читатель имеет свою собственную позицию чтения и выполняет собственные вычисления для
availableToRead()
. - С точки зрения медленного читателя, очередь может переполниться; это может привести к тому, что
availableToRead()
вернет значение, превышающее размер очереди. Первое чтение после переполнения терпит неудачу и приводит к тому, что позиция чтения для этого читателя устанавливается равной текущему указателю записи, независимо от того, было ли о переполнении сообщено черезavailableToRead()
.
Методы read()
и write()
возвращают true
если все запрошенные данные могли быть (и были) переданы в очередь и из нее. Эти методы не блокируются; они либо завершаются успешно (и возвращают true
), либо немедленно возвращают ошибку ( false
).
Методы readBlocking()
и writeBlocking()
ждут, пока запрошенная операция не будет завершена, или пока не истечет время ожидания (значение timeOutNanos
, равное 0, означает отсутствие тайм-аута).
Операции блокировки реализуются с использованием слова флага события. По умолчанию каждая очередь создает и использует свое собственное слово-флаг для поддержки краткой формы readBlocking()
и writeBlocking()
. Несколько очередей могут совместно использовать одно слово, так что процесс может ожидать записи или чтения в любой из очередей. Вызвав getEventFlagWord()
, вы можете получить указатель на слово флага события очереди и использовать этот указатель (или любой указатель на подходящее место в общей памяти) для создания объекта EventFlag
для передачи в длинную форму readBlocking()
и writeBlocking()
для другой очереди. Параметры readNotification
и writeNotification
указывают, какие биты флага события следует использовать для сигнализации операций чтения и записи в этой очереди. readNotification
и writeNotification
— это 32-битные битовые маски.
readBlocking()
ожидает бит writeNotification
; если этот параметр равен 0, вызов всегда завершается неудачно. Если значение readNotification
равно 0, вызов не завершится неудачей, но успешное чтение не установит никаких битов уведомления. В синхронизированной очереди это означает, что соответствующий вызов writeBlocking()
никогда не активируется, если бит не установлен в другом месте. В несинхронизированной очереди writeBlocking()
не ожидает (его все равно следует использовать для установки бита уведомления о записи), и при чтении уместно не устанавливать какие-либо биты уведомления. Аналогично, writeblocking()
завершается неудачей, если readNotification
равен 0, а успешная запись устанавливает указанные биты writeNotification
.
Чтобы одновременно ожидать несколько очередей, используйте метод wait()
объекта EventFlag
для ожидания битовой маски уведомлений. Метод wait()
возвращает слово состояния с битами, вызвавшими установку пробуждения. Эта информация затем используется для проверки того, что в соответствующей очереди достаточно места или данных для желаемой операции записи и чтения, и выполнения неблокирующих операций write()
и read()
. Чтобы получить уведомление о пост-операции, используйте еще один вызов метода wake()
объекта EventFlag
. Определение абстракции EventFlag
см. system/libfmq/include/fmq/EventFlag.h
.
Операции нулевого копирования
Методы read
, write
, readBlocking
и writeBlocking()
принимают указатель на буфер ввода-вывода в качестве аргумента и используют внутренние вызовы memcpy()
для копирования данных между ним и кольцевым буфером FMQ. Для повышения производительности Android 8.0 и более поздних версий включает набор API, которые обеспечивают прямой доступ к кольцевому буферу по указателю, устраняя необходимость использования вызовов memcpy
.
Используйте следующие общедоступные API для операций FMQ с нулевым копированием:
bool beginWrite(size_t nMessages, MemTransaction* memTx) const; bool commitWrite(size_t nMessages); bool beginRead(size_t nMessages, MemTransaction* memTx) const; bool commitRead(size_t nMessages);
- Метод
beginWrite
предоставляет базовые указатели в кольцевой буфер FMQ. После записи данных зафиксируйте их с помощьюcommitWrite()
. МетодыbeginRead
иcommitRead
действуют одинаково. - Методы
beginRead
иWrite
принимают на вход количество сообщений, которые необходимо прочитать и записать, и возвращают логическое значение, указывающее, возможно ли чтение или запись. Если чтение или запись возможны, структураmemTx
заполняется базовыми указателями, которые можно использовать для прямого доступа к указателям в общей памяти кольцевого буфера. - Структура
MemRegion
содержит подробную информацию о блоке памяти, включая базовый указатель (базовый адрес блока памяти) и длину в терминахT
(длина блока памяти в терминах определенного HIDL типа очереди сообщений). - Структура
MemTransaction
содержит две структурыMemRegion
:first
иsecond
поскольку чтение или запись в кольцевой буфер может потребовать переноса на начало очереди. Это означало бы, что для чтения и записи данных в кольцевой буфер FMQ необходимы два базовых указателя.
Чтобы получить базовый адрес и длину из структуры MemRegion
:
T* getAddress(); // gets the base address size_t getLength(); // gets the length of the memory region in terms of T size_t getLengthInBytes(); // gets the length of the memory region in bytes
Чтобы получить ссылки на первую и вторую структуры MemRegion
внутри объекта MemTransaction
:
const MemRegion& getFirstRegion(); // get a reference to the first MemRegion const MemRegion& getSecondRegion(); // get a reference to the second MemRegion
Пример записи в FMQ с использованием API нулевого копирования:
MessageQueueSync::MemTransaction tx; if (mQueue->beginRead(dataLen, &tx)) { auto first = tx.getFirstRegion(); auto second = tx.getSecondRegion(); foo(first.getAddress(), first.getLength()); // method that performs the data write foo(second.getAddress(), second.getLength()); // method that performs the data write if(commitWrite(dataLen) == false) { // report error } } else { // report error }
Следующие вспомогательные методы также являются частью MemTransaction
:
-
T* getSlot(size_t idx);
возвращает указатель наidx
слота вMemRegions
, которые являются частью этого объектаMemTransaction
. Если объектMemTransaction
представляет области памяти для чтения и записи N элементов типаT
, то допустимый диапазонidx
находится между 0 и N-1. -
bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
записывает элементыnMessages
типаT
в области памяти, описанные объектом, начиная с индексаstartIdx
. Этот метод используетmemcpy()
и не предназначен для операции нулевого копирования. Если объектMemTransaction
представляет память для чтения и записи N элементов типаT
, то допустимый диапазонidx
находится между 0 и N-1. -
bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
— это вспомогательный метод для чтения элементовnMessages
типаT
из областей памяти, описанных объектом, начиная сstartIdx
. Этот метод используетmemcpy()
и не предназначен для операции нулевого копирования.
Отправить очередь через HIDL
На творческой стороне:
- Создайте объект очереди сообщений, как описано выше.
- Убедитесь, что объект действителен с помощью
isValid()
. - Если вы ожидаете несколько очередей, передавая
EventFlag
в длинную формуreadBlocking()
илиwriteBlocking()
, вы можете извлечь указатель флага события (используяgetEventFlagWord()
) из объектаMessageQueue
, который был инициализирован для создания флага, и используйте этот флаг для создания необходимого объектаEventFlag
. - Используйте метод
MessageQueue
getDesc()
чтобы получить объект дескриптора. - В файле HAL дайте методу параметр типа
fmq_sync
или fmq_unsync
где T
— подходящий тип, определенный в HIDL. Используйте это, чтобы отправить объект, возвращенныйgetDesc()
, принимающему процессу.
На принимающей стороне:
- Используйте объект дескриптора для создания объекта
MessageQueue
. Используйте тот же вариант очереди и тип данных, иначе шаблон не удастся скомпилировать. - Если вы извлекли флаг события, извлеките его из соответствующего объекта
MessageQueue
в принимающем процессе. - Используйте объект
MessageQueue
для передачи данных.