Kolejka wiadomości (FMQ)

Infrastruktura wywołania procedury zdalnej (RPC) HIDL korzysta z mechanizmów bindera, co oznacza, że wywołania generują koszty dodatkowe, wymagają operacji jądra i mogą wywoływać działanie harmonogramu. W przypadku jednak, gdy dane muszą być przesyłane między procesami z mniejszym obciążeniem i bez udziału jądra, używany jest system kolejki szybkich wiadomości (FMQ).

FMQ tworzy kolejki wiadomości z wybranymi właściwościami. Za pomocą wywołania RPC HIDL możesz wysłać obiekt MQDescriptorSync lub MQDescriptorUnsync. Obiekt ten jest używany przez proces odbierający do uzyskiwania dostępu do kolejki wiadomości.

Typy kolejek

Android obsługuje 2 typy kolejek (zwane też wersjami):

  • Niezsynchronizowane kolejki mogą przepełniać się i mieć wielu czytelników. Każdy z nich musi odczytać dane na czas lub je utracić.
  • Synchronizowane kolejki nie mogą przepełniać i mogą mieć tylko jednego czytnika.

Oba typy kolejek nie mogą mieć wartości poniżej zera (odczyt z pustej kolejki kończy się niepowodzeniem) i mogą mieć tylko 1 nagrywarkę.

Niezsynchronizowane kolejki

Niezsynchronizowana kolejka ma tylko 1 program zapisu, ale może mieć dowolną liczbę czytelników. W kolejce jest 1 miejsce do zapisu, ale każdy czytnik śledzi swoje własne miejsce odczytu.

Zapisy do kolejki zawsze się udają (nie są sprawdzane pod kątem przepełnienia), o ile nie są większe niż skonfigurowana pojemność kolejki (zapisy większe niż pojemność kolejki od razu się nie udają). Każdy czytnik może mieć inną pozycję odczytu, więc zamiast czekać, aż każdy czytnik odczyta wszystkie dane, dane są usuwane z kolejki, gdy nowe zapisy potrzebują miejsca.

Czytelnicy są odpowiedzialni za pobranie danych, zanim wypadną z końca kolejki. Odczyt, który próbuje odczytać więcej danych niż jest dostępnych, albo natychmiast się nie powiedzie (jeśli nie jest blokujący) albo zaczeka na dostępność wystarczającej ilości danych (jeśli jest blokujący). Odczyt, który próbuje odczytać więcej danych niż pojemność kolejki, zawsze kończy się natychmiastowym niepowodzeniem.

Jeśli czytnik nie nadąża za zapisem, tak że ilość danych zapisanych i jeszcze nie odczytanych przez tego czytnika przekracza pojemność kolejki, następne odczytanie nie zwróci danych. Zamiast tego zeruje pozycję odczytu na pozycji zapisu plus połowa pojemności, a potem zwraca błąd. Pozostawia to połowę bufora do odczytu i rezerwuje miejsce na nowe zapisy, aby uniknąć natychmiastowego przepełnienia kolejki. Jeśli dane dostępne do odczytu są sprawdzane po przepełnieniu, ale przed kolejnym odczytem, pokazują więcej danych do odczytu niż pojemność kolejki, co oznacza, że nastąpiło przepełnienie. (Jeśli kolejka przepełni się między sprawdzeniem dostępnych danych a próbą odczytu tych danych, jedynym sygnałem przepełnienia jest niepowodzenie odczytu).

Synchronizowane kolejki

Synchronizowana kolejka ma 1 element zapisujący i 1 element odczytujący z 1 pozycją zapisu i 1 pozycją odczytu. Nie można zapisać więcej danych, niż pozwala na to kolejka, ani odczytać więcej danych, niż obecnie zawiera. W zależności od tego, czy wywoływana jest funkcja zapisu lub odczytu blokującego czy nieblokującego, próby przekroczenia dostępnej przestrzeni lub danych albo od razu kończą się niepowodzeniem albo są blokowane, dopóki nie można wykonać żądanej operacji. Próby odczytu lub zapisu większej ilości danych niż pojemność kolejki zawsze kończą się niepowodzeniem.

Konfigurowanie FMQ

Kolejka wiadomości wymaga kilku obiektów MessageQueue: jednego do zapisu i co najmniej jednego do odczytu. Nie ma wyraźnej konfiguracji obiektu używanego do zapisu lub odczytu. Użytkownik musi zadbać o to, aby żaden obiekt nie był używany do odczytu i zapisu, aby był co najwyżej 1 użytkownik zapisujący oraz aby w przypadku kolejek synchronizowanych był co najwyżej 1 czytelnik.

Tworzenie pierwszego obiektu MessageQueue

Kolejka wiadomości jest tworzona i konfigurowana za pomocą pojedynczego wywołania:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Konstruktor MessageQueue<T, flavor>(numElements) tworzy i inicjalizuje obiekt, który obsługuje funkcję kolejki wiadomości.
  • Konstruktor MessageQueue<T, flavor>(numElements, configureEventFlagWord) tworzy i inicjalizuje obiekt, który obsługuje kolejkę wiadomości z blokowaniem.
  • flavor może być równe kSynchronizedReadWrite w przypadku kolejki zsynchronizowanej lub kUnsynchronizedWrite w przypadku kolejki niezsynchronizowanej.
  • uint16_t (w tym przykładzie) może być dowolnym typem zdefiniowanym w HIDL, który nie obejmuje zagłębionych buforów (nie ma typów string ani vec), uchwytów ani interfejsów.
  • kNumElementsInQueue wskazuje rozmiar kolejki pod względem liczby wpisów; określa rozmiar wspólnego bufora pamięci przydzielonego do kolejki.

Utwórz drugi obiekt MessageQueue.

Druga strona kolejki wiadomości jest tworzona za pomocą obiektu MQDescriptor uzyskanego od pierwszej strony. Obiekt MQDescriptor jest wysyłany za pomocą wywołania HIDL lub AIDL RPC do procesu, który obsługuje drugi koniec kolejki wiadomości. MQDescriptor zawiera informacje o kolejce, w tym:

  • informacje potrzebne do zmapowania bufora i wskaźnika zapisu;
  • Informacje do mapowania wskaźnika odczytu (jeśli kolejka jest zsynchronizowana).
  • informacje potrzebne do zmapowania słowa flagi zdarzenia (jeśli kolejka blokuje).
  • Typ obiektu (<T, flavor>), który obejmuje zdefiniowany przez HIDL typ elementów kolejki oraz typ kolejki (zsynchronizowany lub niezsynchronizowany).

Obiekt MQDescriptor możesz użyć do utworzenia obiektu MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Parametr resetPointers wskazuje, czy podczas tworzenia obiektu MessageQueue pozycje odczytu i zapisu mają być resetowane do 0. W niezsynchronizowanej kolejce pozycja odczytu (która jest lokalna dla każdego obiektu MessageQueue w niezsynchronizowanych kolejkach) jest zawsze ustawiana na 0 podczas tworzenia. Zwykle MQDescriptor jest inicjowany podczas tworzenia obiektu kolejki pierwszej wiadomości. Aby uzyskać większą kontrolę nad współdzieloną pamięcią, możesz ręcznie skonfigurować MQDescriptor (definiowany w system/libhidl/base/include/hidl/MQDescriptor.h), a potem utworzyć każdy obiekt MessageQueue zgodnie z opisem w tej sekcji.MQDescriptor

kolejki blokowania i flagi zdarzeń.

Domyślnie kolejki nie obsługują blokowania operacji odczytu i zapisu. Istnieją 2 rodzaje blokowania wywołań odczytu i zapisu:

  • Forma krótka z 3 parametrami (wskaźnik danych, liczba elementów, limit czasu) obsługuje blokowanie poszczególnych operacji odczytu i zapisu w pojedynczej kolejce. Gdy używasz tego formularza, kolejka obsługuje flagę zdarzenia i maski bitowe wewnętrznie, a obiekt kolejki pierwszej wiadomości musi zostać zainicjowany za pomocą drugiego parametru true. Przykład:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Długa wersja z 6 parametrami (w tym flagą zdarzenia i maskami bitowymi) obsługuje używanie udostępnionego obiektu EventFlag w różnych kolejkach oraz umożliwia określenie używanych masek bitowych powiadomień. W takim przypadku do każdego wywołania odczytu i zapisu musisz podać flagę zdarzenia i maski bitowe.

W przypadku długiej formy możesz podać wartość EventFlag w każdym wywołaniu funkcji readBlocking()writeBlocking(). Możesz zainicjować jedną z kolejek za pomocą wewnętrznego flagi zdarzenia, który musi zostać wyodrębniony z obiektów MessageQueue tej kolejki za pomocą funkcji getEventFlagWord() i użyty do utworzenia obiektów EventFlag w każdym procesie na potrzeby innych kolejek FMQ. Możesz też zainicjować obiekty EventFlag za pomocą dowolnej odpowiedniej pamięci współdzielonej.

Ogólnie każda kolejka powinna używać tylko jednej z metod: nieblokującej, blokującej krótkie treści lub blokującej długie treści. Mieszanie ich nie jest błędem, ale aby uzyskać pożądany efekt, konieczne jest ostrożne programowanie.

Oznaczanie pamięci jako tylko do odczytu

Domyślnie pamięć współdzielona ma uprawnienia do odczytu i zapisu. W przypadku niesynchronizowanych kolejek (kUnsynchronizedWrite) autor może chcieć usunąć uprawnienia do zapisu dla wszystkich czytelników, zanim przekaże obiekty MQDescriptorUnsync. Dzięki temu inne procesy nie mogą zapisywać danych w kole, co jest zalecane w celu ochrony przed błędami lub nieprawidłowym działaniem procesów czytnika. Jeśli autor chce, aby czytelnicy mogli zresetować kolejkę, gdy użyją MQDescriptorUnsync do utworzenia strony odczytu kolejki, pamięć nie może być oznaczona jako dostępna tylko do odczytu. Jest to domyślne działanie konstruktora MessageQueue. Jeśli kolejka ma już użytkowników, należy zmienić ich kod, aby zawierał parametr resetPointer=false.

  • Zapisz: wywołaj funkcję ashmem_set_prot_region z deskryptorem pliku MQDescriptor i regionem ustawionym jako tylko do odczytu (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Czytnik: utwórz kolejkę wiadomości z resetPointer=false (domyślnie true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Korzystanie z MessageQueue

Publiczny interfejs API obiektu MessageQueue to:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Możesz użyć funkcji availableToWrite() i availableToRead(), aby określić, ile danych można przenieść w ramach jednej operacji. W niezsynchronizowanej kolejce:

  • availableToWrite() zawsze zwraca pojemność kolejki.
  • Każdy czytelnik ma swoją pozycję czytania i sam wykonuje obliczenia dotyczące availableToRead().
  • Z punktu widzenia wolnego czytelnika kolejka może przepełnić się; może to spowodować, że availableToRead() zwróci wartość większą niż rozmiar kolejki. Pierwszy odczyt po przepełnieniu powoduje niepowodzenie i ustawienie pozycji odczytu dla tego czytnika na równą bieżącemu wskaźnikowi zapisu, niezależnie od tego, czy przepełnienie zostało zgłoszone za pomocą availableToRead().

Metody read()write() zwracają true, jeśli wszystkie żądane dane mogły zostać przeniesione do kolejki i z niej. Te metody nie blokują; albo działają prawidłowo (i zwracają wartość true), albo od razu zwracają błąd (false).

Metody readBlocking()writeBlocking() czekają, aż żądana operacja zostanie wykonana lub do momentu przekroczenia limitu czasu (wartość 0 w parametrze timeOutNanos oznacza, że nie ma limitu czasu).

Operacje blokowania są implementowane za pomocą słowa flagi zdarzenia. Domyślnie każda kolejka tworzy i używa własnego słowa flagi, aby obsługiwać krótkie formy readBlocking() i writeBlocking(). Wiele kolejek może używać tego samego słowa, dzięki czemu proces może czekać na zapis lub odczyt w dowolnej z nich. Wywołanie funkcji getEventFlagWord() zwraca wskaźnik do słowa flagi zdarzenia kolejki. Możesz użyć tego wskaźnika (lub dowolnego wskaźnika do odpowiedniej lokalizacji w wspólnej pamięci) do utworzenia obiektu EventFlag, który zostanie przekazany do funkcji readBlocking() w długiej formie i do funkcji writeBlocking() w innej kolejce. Parametry readNotificationwriteNotification określają, które bity w flagach zdarzeń mają być używane do sygnalizowania odczytów i zapisów w tej kolejce. readNotificationwriteNotification to 32-bitowe maski bitowe.

readBlocking() czeka na bity writeNotification; jeśli ten parametr ma wartość 0, wywołanie zawsze kończy się niepowodzeniem. Jeśli wartość readNotification wynosi 0, wywołanie nie zakończy się niepowodzeniem, ale pomyślne odczytanie nie spowoduje ustawienia żadnych bitów powiadomienia. W kolejce synchronicznej oznacza to, że odpowiednia funkcja writeBlocking() nigdy się nie budzi, chyba że bit jest ustawiony gdzie indziej. W niesynchronizowanej kolejce writeBlocking() nie czeka (nadal powinien być używany do ustawienia bitu powiadomienia o zapisie), a w przypadku odczytu nie powinien ustawiać żadnych bitów powiadomienia. Podobnie writeblocking() zakończy się niepowodzeniem, jeśli readNotification ma wartość 0, a pomyślne zapisanie ustawia określone bity writeNotification.

Aby czekać na wiele kolejek jednocześnie, użyj metody EventFlag obiektu wait(), aby czekać na bitową maskę powiadomień. Metoda wait() zwraca słowo określające stan z bitami, które spowodowały wybudzenie. Następnie te informacje są używane do sprawdzania, czy odpowiednia kolejka ma wystarczająco dużo miejsca lub danych na potrzeby operacji zapisu i odczytu, a także do wykonywania nieblokujących operacji write()read(). Aby uzyskać powiadomienie po operacji, użyj kolejnego wywołania metody EventFlag obiektu wake(). Definicję pojęcia EventFlag abstrahowania znajdziesz w artykule system/libfmq/include/fmq/EventFlag.h.

Operacje bez kopii

Metody read, write, readBlockingwriteBlocking() przyjmują jako argument wskaźnik do bufora wejścia-wyjścia i korzystają wewnętrznie z wywołań memcpy(), aby kopiować dane między tym samym buforem a buforem pierścieniowym FMQ. Aby zwiększyć wydajność, Android 8.0 i nowsze wersje zawierają zestaw interfejsów API, które zapewniają bezpośredni dostęp do wskaźnika do bufora pierścieniowego, eliminując konieczność korzystania z wywołań memcpy.

Do operacji FMQ bez kopiowania używaj tych publicznych interfejsów API:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Metoda beginWrite udostępnia podstawowe wskaźniki do pętli buforowej FMQ. Po zapisaniu danych zatwierdź je za pomocą polecenia commitWrite(). Metody beginReadcommitRead działają w ten sam sposób.
  • Metody beginReadWrite przyjmują jako dane wejściowe liczbę wiadomości do odczytu i zapisu oraz zwracają wartość logiczną wskazującą, czy odczyt lub zapis jest możliwy. Jeśli odczyt lub zapis jest możliwy, struktura memTx jest wypełniana wskaźnikami podstawowymi, których można używać do bezpośredniego dostępu do współdzielonej pamięci za pomocą wskaźnika w buforze pierścieniowym.
  • Struktura MemRegion zawiera informacje o bloku pamięci, w tym wskaźnik bazowy (adres bazowy bloku pamięci) i długość w jednostce T (długość bloku pamięci w jednostce zdefiniowanej przez HIDL typu kolejki wiadomości).
  • Struktura MemTransaction zawiera 2 struktury MemRegion, firstsecond, które podczas odczytu lub zapisu do pierścieniowego bufora mogą wymagać zawijania na początek kolejki. Oznacza to, że do odczytu i zapisu danych do pierścieniowego bufora FMQ potrzebne są 2 wskaźniki podstawowe.

Aby uzyskać adres bazowy i długość z struktury MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Aby uzyskać odwołania do pierwszego i drugiego elementu MemRegion w obiekcie MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Przykład zapisu do FMQ przy użyciu interfejsów API zero copy:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Te pomocnicze metody są też dostępne w obiekcie MemTransaction:

  • T* getSlot(size_t idx); zwraca wskaźnik do slotu idx w MemRegions, który jest częścią obiektu MemTransaction. Jeśli obiekt MemTransaction reprezentuje regiony pamięci, w których mają być odczytywane i zapisywane elementy typu T, to prawidłowy zakres zmiennej idx wynosi od 0 do N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); zapisuje nMessages elementy typu T w regionach pamięci opisanych przez obiekt, zaczynając od indeksu startIdx. Ta metoda używa memcpy() i nie jest przeznaczona do operacji bez kopii. Jeśli obiekt MemTransaction reprezentuje pamięć do odczytu i zapisu N elementów typu T, to prawidłowy zakres wartości idx jest zawarty między 0 a N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); to metoda pomocnicza do odczytywania elementów nMessages typu T z regionów pamięci opisanych przez obiekt, począwszy od startIdx. Ta metoda używa memcpy() i nie jest przeznaczona do operacji zero copy.

Wysyłanie kolejki przez HIDL

W przypadku tworzenia:

  1. Utwórz obiekt kolejki wiadomości w sposób opisany powyżej.
  2. Sprawdź, czy obiekt jest prawidłowy, za pomocą funkcji isValid().
  3. Jeśli oczekujesz w kilku kolejkach, przekazując EventFlag do długiej formy readBlocking() lub writeBlocking(), możesz wyodrębnić wskaźnik flagi zdarzenia (za pomocą getEventFlagWord()) z obiektu MessageQueue, który został zainicjowany w celu utworzenia flagi, i użyć tej flagi do utworzenia niezbędnego obiektu EventFlag.
  4. Aby uzyskać obiekt opisu, użyj metody MessageQueue getDesc().
  5. W pliku HAL nadaj metodzie parametr typu fmq_sync lub fmq_unsync, gdzie T to odpowiedni typ zdefiniowany w HIDL. Użyj tego, aby wysłać obiekt zwrócony przez getDesc() do procesu odbioru.

Po stronie odbiorcy:

  1. Użyj obiektu opisu, aby utworzyć obiekt MessageQueue. Użyj tego samego typu kolejki i typu danych, w przeciwnym razie nie uda się skompilować szablonu.
  2. Jeśli wyodrębniasz flagę zdarzenia, wyodrębnij ją z odpowiedniego obiektu MessageQueue w procesie odbierania.
  3. Do przenoszenia danych użyj obiektu MessageQueue.