Kolejka wiadomości (FMQ)

Infrastruktura wywołania procedury zdalnej (RPC) HIDL korzysta z mechanizmów bindera, co oznacza, że wywołania generują obciążenie, wymagają operacji jądra i mogą wywoływać działanie harmonogramu. W przypadku jednak, gdy dane muszą być przesyłane między procesami z mniejszym obciążeniem i bez udziału jądra, używany jest system Fast Message Queue (FMQ).

FMQ tworzy kolejki wiadomości z odpowiednimi właściwościami. Możesz wysłać obiekt MQDescriptorSync lub MQDescriptorUnsync za pomocą wywołania RPC HIDL, a obiekt zostanie wykorzystany przez proces odbierający do uzyskania dostępu do kolejki wiadomości.

Typy kolejek

Android obsługuje 2 typy kolejek (zwane też wersjami):

  • Niezsynchronizowane kolejki mogą przepełniać się i mieć wielu czytelników. Każdy czytelnik musi odczytać dane na czas lub je utracić.
  • Synchronizowane kolejki nie mogą przepełniać i mogą mieć tylko jednego czytelnika.

Oba typy kolejek nie mogą być niedopełnione (odczyt z pustej kolejki) i mogą mieć tylko 1 zapisujący.

Niezsynchronizowane kolejki

Niezsynchronizowana kolejka ma tylko jednego zapisującego, ale może mieć dowolną liczbę czytników. Kolejka ma 1 pozycję zapisu, ale każdy czytelnik śledzi własną, niezależną pozycję do odczytu.

Zapisy do kolejki zawsze się udają (nie są sprawdzane pod kątem przepełnienia), o ile nie są większe niż skonfigurowana pojemność kolejki (zapisy większe niż pojemność kolejki od razu się nie udają). Każdy czytnik może mieć inną pozycję odczytu, więc zamiast czekać, aż każdy czytnik odczyta wszystkie dane, dane są usuwane z kolejki, gdy nowe zapisy potrzebują miejsca.

Czytelnicy są odpowiedzialni za pobranie danych, zanim wypadną z końca kolejki. Odczyt, który próbuje odczytać więcej danych niż jest dostępnych, albo natychmiast się nie powiedzie (jeśli nie jest blokujący) albo czeka na dostępność wystarczającej ilości danych (jeśli jest blokujący). Odczyt, który próbuje odczytać więcej danych niż pojemność kolejki, zawsze kończy się błędem.

Jeśli czytnik nie nadąża za zapisem, tak że ilość danych zapisanych i jeszcze nie odczytanych przez tego czytnika jest większa niż pojemność kolejki, następne odczytanie nie zwróci danych. Zamiast tego zeruje pozycję odczytu czytnika na ostatnią pozycję zapisu, a następnie zwraca błąd. Jeśli dane dostępne do odczytania są zaznaczone po przekroczeniu limitu, ale przed kolejnym odczytem, pokazują więcej danych do odczytu niż pojemność kolejki, co oznacza, że doszło do przepełnienia. (Jeśli kolejka przepełni się między sprawdzeniem dostępnych danych a próbą odczytu tych danych, jedynym sygnałem przepełnienia jest niepowodzenie odczytu).

Synchronizowane kolejki

Zsynchronizowana kolejka ma 1 zapisujący i 1 czytnik z 1 pozycją zapisu i 1 pozycją odczytu. Nie można zapisać więcej danych, niż pozwala na to kolejka, ani odczytać więcej danych, niż obecnie zawiera kolejka. W zależności od tego, czy wywoływana jest funkcja zapisu lub odczytu blokującego czy nieblokującego, próby przekroczenia dostępnej przestrzeni lub danych albo zwracają błąd natychmiast, albo blokują się, dopóki nie można wykonać żądanej operacji. Próby odczytu lub zapisu większej ilości danych niż pojemność kolejki zawsze kończą się niepowodzeniem.

Konfigurowanie FMQ

Kolejka wiadomości wymaga kilku obiektów MessageQueue: jednego do zapisu i co najmniej jednego do odczytu. Nie ma wyraźnej konfiguracji, która określa, który obiekt ma być używany do zapisu lub odczytu. Użytkownik musi zadbać o to, aby żaden obiekt nie był używany do odczytu i zapisu, aby było maksymalnie 1 nagrywacz i aby w przypadku kolejek synchronizowanych było maksymalnie 1 czytnik.

Tworzenie pierwszego obiektu MessageQueue

Kolejka wiadomości jest tworzona i konfigurowana za pomocą pojedynczego wywołania:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Konstruktor MessageQueue<T, flavor>(numElements) tworzy i inicjalizuje obiekt, który obsługuje kolejkę wiadomości.
  • Konstruktor MessageQueue<T, flavor>(numElements, configureEventFlagWord) tworzy i inicjalizuje obiekt, który obsługuje kolejkę wiadomości z blokowaniem.
  • flavor może być równe kSynchronizedReadWrite w przypadku kolejki zsynchronizowanej lub kUnsynchronizedWrite w przypadku kolejki niezsynchronizowanej.
  • uint16_t (w tym przykładzie) może być dowolnym typem zdefiniowanym przez HIDL, który nie zawiera zagnieżdżonych buforów (bez typów string lub vec), uchwytów ani interfejsów.
  • kNumElementsInQueue wskazuje rozmiar kolejki pod względem liczby wpisów; określa rozmiar bufora pamięci współdzielonej przydzielonej do kolejki.

Utwórz drugi obiekt MessageQueue

Druga strona kolejki wiadomości jest tworzona za pomocą obiektu MQDescriptor uzyskanego od pierwszej strony. Obiekt MQDescriptor jest wysyłany za pomocą wywołania HIDL lub AIDL RPC do procesu, który obsługuje drugi koniec kolejki komunikatów. MQDescriptor zawiera informacje o kolejce, w tym:

  • Informacje do mapowania bufora i wskaźnika zapisu.
  • Informacje do mapowania wskaźnika odczytu (jeśli kolejka jest zsynchronizowana).
  • informacje potrzebne do zmapowania słowa flagi zdarzenia (jeśli kolejka blokuje).
  • Typ obiektu (<T, flavor>), który obejmuje zdefiniowany przez HIDL typ elementów kolejki oraz typ kolejki (zsynchronizowany lub niezsynchronizowany).

Obiekt MQDescriptor możesz użyć do utworzenia obiektu MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Parametr resetPointers wskazuje, czy podczas tworzenia obiektu MessageQueue pozycje odczytu i zapisu mają być resetowane do 0. W niezsynchronizowanej kolejce pozycja odczytu (która jest lokalna dla każdego obiektu MessageQueue w niezsynchronizowanych kolejkach) jest zawsze ustawiana na 0 podczas tworzenia. Zwykle MQDescriptor jest inicjowany podczas tworzenia obiektu kolejki pierwszej wiadomości. Aby uzyskać większą kontrolę nad współdzieloną pamięcią, możesz ręcznie skonfigurować MQDescriptor (definiowany w system/libhidl/base/include/hidl/MQDescriptor.h), a potem utworzyć każdy obiekt MessageQueue zgodnie z opisem w tej sekcji.MQDescriptor

Blokowanie kolejek i flag zdarzeń

Domyślnie kolejki nie obsługują blokowania operacji odczytu i zapisu. Istnieją 2 rodzaje blokowania wywołań odczytu i zapisu:

  • Wersja krótka z 3 parametrami (wskaźnikiem danych, liczbą elementów, limitem czasu) obsługuje blokowanie poszczególnych operacji odczytu i zapisu w jednej kolejce. Gdy używasz tego formularza, kolejka obsługuje flagę zdarzenia i maski bitowe wewnętrznie, a obiekt kolejki pierwszej wiadomości musi zostać zainicjowany za pomocą drugiego parametru true. Na przykład:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Długa wersja z 6 parametrami (w tym flaga zdarzenia i maski bitowe) obsługuje używanie udostępnionego obiektu EventFlag w wielu kolejkach oraz umożliwia określenie używanych masek bitowych powiadomień. W takim przypadku do każdego wywołania odczytu i zapisu musisz podać flagę zdarzenia i maski bitowe.

W przypadku długiej formy możesz podać wartość EventFlag w każdym wywołaniu funkcji readBlocking()writeBlocking(). Możesz zainicjować jedną z kolejek przy użyciu wewnętrznej flagi zdarzenia, która musi zostać wyodrębniona z obiektów MessageQueue tej kolejki za pomocą polecenia getEventFlagWord() i użyta do utworzenia obiektów EventFlag w każdym procesie do użycia z innymi FMQ. Możesz też zainicjować obiekty EventFlag za pomocą dowolnej odpowiedniej pamięci współdzielonej.

Ogólnie w każdej kolejce powinna być stosowana tylko jedna opcja: nieblokujące, krótkie lub długie. Mieszanie ich nie jest błędem, ale aby uzyskać pożądany efekt, konieczne jest ostrożne programowanie.

Oznaczanie pamięci jako tylko do odczytu

Domyślnie pamięć współdzielona ma uprawnienia do odczytu i zapisu. W przypadku niezsynchronizowanych kolejek (kUnsynchronizedWrite) zapisujący może chcieć odebrać wszystkim czytelnikom uprawnienia do zapisu przed przekazaniem obiektów MQDescriptorUnsync. Dzięki temu inne procesy nie będą mogły zapisywać w kolejce, co zalecamy w celu ochrony przed błędami i niewłaściwym działaniem procesów czytnika. Jeśli autor chce, aby czytelnicy mogli zresetować kolejkę, gdy użyją MQDescriptorUnsync do utworzenia strony odczytu kolejki, pamięć nie może być oznaczona jako dostępna tylko do odczytu. Jest to domyślne działanie konstruktora MessageQueue. Jeśli więc istnieją obecni użytkownicy tej kolejki, musisz zmienić ich kod, aby utworzyć kolejkę za pomocą funkcji resetPointer=false.

  • Pisanie: wywołaj funkcję ashmem_set_prot_region z deskryptorem pliku MQDescriptor i regionem ustawionym jako tylko do odczytu (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Czytelnik: utwórz kolejkę wiadomości z wartością resetPointer=false (domyślnie true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Korzystanie z MessageQueue

Publiczny interfejs API obiektu MessageQueue to:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Za pomocą availableToWrite() i availableToRead() możesz określić, ile danych można przenieść w pojedynczej operacji. W niezsynchronizowanej kolejce:

  • Funkcja availableToWrite() zawsze zwraca pojemność kolejki.
  • Każdy czytelnik ma swoją pozycję czytania i sam wykonuje obliczenia dotyczące availableToRead().
  • Z punktu widzenia wolnego czytelnika kolejka może się przepełnić, co może spowodować, że funkcja availableToRead() zwróci wartość większą niż rozmiar kolejki. Pierwszy odczyt po przepełnieniu powoduje niepowodzenie i ustawienie pozycji odczytu dla tego czytnika na równe bieżącemu wskaźnikowi zapisu, niezależnie od tego, czy przepełnienie zostało zgłoszone za pomocą funkcji availableToRead().

Metody read()write() zwracają true, jeśli wszystkie żądane dane mogły zostać przeniesione do kolejki i z niej. Te metody nie blokują; albo działają prawidłowo (i zwracają wartość true), albo od razu zwracają błąd (false).

Metody readBlocking()writeBlocking() czekają, aż żądana operacja zostanie wykonana lub do momentu przekroczenia limitu czasu (wartość 0 w parametrze timeOutNanos oznacza, że nie ma limitu czasu).

Operacje blokowania są implementowane za pomocą słowa flagi zdarzenia. Domyślnie każda kolejka tworzy własne słowo flagi i obsługuje krótkie formy readBlocking() oraz writeBlocking(). Jedno słowo może być używane w wielu kolejkach, więc proces może czekać na zapis lub odczyt w dowolnej z kolejek. Wywołanie funkcji getEventFlagWord() zwraca wskaźnik do słowa flagi zdarzenia kolejki. Możesz użyć tego wskaźnika (lub dowolnego wskaźnika do odpowiedniej lokalizacji w wspólnej pamięci) do utworzenia obiektu EventFlag, który zostanie przekazany do funkcji readBlocking() w długiej formie i do funkcji writeBlocking() w innej kolejce. Parametry readNotificationwriteNotification określają, które bity w flagach zdarzenia mają być używane do sygnalizowania odczytów i zapisów w tej kolejce. readNotificationwriteNotification to 32-bitowe maski bitowe.

readBlocking() czeka na bity writeNotification. Jeśli ten parametr ma wartość 0, wywołanie zawsze kończy się niepowodzeniem. Jeśli wartość readNotification wynosi 0, wywołanie nie zakończy się niepowodzeniem, ale pomyślne odczytanie nie spowoduje ustawienia żadnych bitów powiadomienia. W przypadku zsynchronizowanej kolejki oznacza to, że odpowiednie wywołanie funkcji writeBlocking() nie jest aktywowane, dopóki bit nie zostanie ustawiony w innym miejscu. W niesynchronizowanej kolejce writeBlocking() nie czeka (nadal powinien być używany do ustawienia bitu powiadomienia o zapisie), a w przypadku odczytu nie powinien ustawiać żadnych bitów powiadomienia. Podobnie writeblocking() zakończy się niepowodzeniem, jeśli readNotification ma wartość 0, a pomyślne zapisanie ustawia określone bity writeNotification.

Aby czekać na wiele kolejek jednocześnie, użyj metody EventFlag obiektu wait(), aby czekać na bitową maskę powiadomień. Metoda wait() zwraca słowo określające stan z bitami, które spowodowały wybudzenie. Następnie te informacje są używane do sprawdzania, czy odpowiednia kolejka ma wystarczająco dużo miejsca lub danych na potrzeby operacji zapisu i odczytu, a także do wykonywania nieblokujących operacji write()read(). Aby uzyskać powiadomienie po operacji, użyj kolejnego wywołania metody EventFlag obiektu wake(). Definicję pojęcia EventFlag abstrahowania znajdziesz w artykule system/libfmq/include/fmq/EventFlag.h.

Operacje bez kopii

Metody read, write, readBlockingwriteBlocking() przyjmują jako argument wskaźnik do bufora wejścia-wyjścia i korzystają wewnętrznie z wywołań memcpy(), aby kopiować dane między tym samym buforem a buforem pierścieniowym FMQ. Aby zwiększyć wydajność, Android 8.0 i nowsze wersje zawierają zestaw interfejsów API, które zapewniają bezpośredni dostęp do wskaźnika do bufora pierścieniowego, eliminując konieczność korzystania z wywołań memcpy.

Użyj następujących publicznych interfejsów API do operacji FMQ bez kopiowania:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Metoda beginWrite udostępnia wskaźniki bazowe w buforze pierścieni FMQ. Po zapisaniu danych zatwierdź je za pomocą polecenia commitWrite(). Metody beginRead i commitRead działają tak samo.
  • Metody beginReadWrite przyjmują jako dane wejściowe liczbę wiadomości do odczytu i zapisu oraz zwracają wartość logiczną wskazującą, czy odczyt lub zapis jest możliwy. Jeśli odczyt lub zapis jest możliwy, struktura memTx jest wypełniana wskaźnikami podstawowymi, których można używać do bezpośredniego dostępu do współdzielonej pamięci za pomocą wskaźnika w buforze pierścieniowym.
  • Struktura MemRegion zawiera szczegółowe informacje o bloku pamięci, w tym wskaźnik podstawowy (adres podstawowy bloku pamięci) i długość zgodnie z elementami T (długość bloku pamięci w kontekście zdefiniowanego przez HIDL typu kolejki komunikatów).
  • Struktura MemTransaction zawiera 2 struktury MemRegion, firstsecond, które w ramach operacji odczytu lub zapisu do pierścieniowego bufora mogą wymagać przewinięcia do początku kolejki. Oznacza to, że do odczytu i zapisu danych do pierścieniowego bufora FMQ potrzebne są 2 wskaźniki podstawowe.

Aby uzyskać adres podstawowy i jego długość z struktury MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Aby uzyskać odwołania do pierwszego i drugiego elementu MemRegion w obiekcie MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Przykład zapisu do FMQ przy użyciu interfejsów zero copy:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

W MemTransaction są też dostępne te metody pomocnicze:

  • T* getSlot(size_t idx); zwraca wskaźnik do boksu idx w MemRegions, które są częścią tego obiektu MemTransaction. Jeśli obiekt MemTransaction reprezentuje regiony pamięci, w których mają być odczytywane i zapisywane elementy typu T, to prawidłowy zakres zmiennej idx wynosi od 0 do N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); zapisuje nMessages elementy typu T w regionach pamięci opisanych przez obiekt, zaczynając od indeksu startIdx. Ta metoda korzysta z wartości memcpy() i nie powinna być używana do wykonywania operacji „zero kopiowania”. Jeśli obiekt MemTransaction reprezentuje pamięć do odczytu i zapisu N elementów typu T, to prawidłowy zakres wartości idx jest zdefiniowany jako od 0 do N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); to metoda pomocnicza do odczytywania elementów nMessages typu T z regionów pamięci opisanych przez obiekt, począwszy od startIdx. Ta metoda używa memcpy() i nie jest przeznaczona do operacji bez kopii.

Wysyłanie kolejki przez HIDL

Po stronie tworzenia:

  1. Utwórz obiekt kolejki wiadomości w sposób opisany powyżej.
  2. Sprawdź, czy obiekt jest prawidłowy w isValid().
  3. Jeśli oczekujesz w kilku kolejkach, przekazując EventFlag do długiej formy readBlocking() lub writeBlocking(), możesz wyodrębnić wskaźnik flagi zdarzenia (za pomocą getEventFlagWord()) z obiektu MessageQueue, który został zainicjowany w celu utworzenia flagi, i użyć tej flagi do utworzenia niezbędnego obiektu EventFlag.
  4. Aby uzyskać obiekt opisu, użyj metody MessageQueue getDesc().
  5. W pliku HAL nadaj metodzie parametr typu fmq_sync lub fmq_unsync, gdzie T to odpowiedni typ zdefiniowany w HIDL. Użyj tej opcji, aby wysłać obiekt zwrócony przez getDesc() do procesu odbierania.

Po stronie odbiorcy:

  1. Użyj obiektu opisu, aby utworzyć obiekt MessageQueue. Użyj tego samego typu kolejki i typu danych, w przeciwnym razie nie uda się skompilować szablonu.
  2. Jeśli wyodrębniasz flagę zdarzenia, wyodrębnij ją z odpowiedniego obiektu MessageQueue w procesie odbierania.
  3. Do przeniesienia danych użyj obiektu MessageQueue.