Antrean Pesan Cepat (FMQ)

Infrastruktur remote procedure call (RPC) HIDL menggunakan mekanisme binder, yang berarti panggilan melibatkan overhead, memerlukan operasi kernel, dan dapat memicu tindakan penjadwal. Namun, untuk kasus saat data harus ditransfer antar proses dengan overhead yang lebih sedikit dan tanpa keterlibatan kernel, sistem Fast Message Queue (FMQ) akan digunakan.

FMQ membuat antrean pesan dengan properti yang diinginkan. Anda dapat mengirim objek MQDescriptorSync atau MQDescriptorUnsync melalui panggilan HIDL RPC dan objek ini digunakan oleh proses penerimaan untuk mengakses antrean pesan.

Jenis antrean

Android mendukung dua jenis antrean (dikenal sebagai ragam):

  • Antrean yang tidak disinkronkan diizinkan untuk meluap, dan dapat memiliki banyak pembaca; setiap pembaca harus membaca data tepat waktu atau kehilangannya.
  • Antrean Sinkron tidak boleh meluap, dan hanya dapat memiliki satu pembaca.

Kedua jenis antrean tidak diizinkan untuk underflow (membaca dari antrean kosong gagal) dan hanya dapat memiliki satu penulis.

Antrean yang tidak disinkronkan

Antrean yang tidak disinkronkan hanya memiliki satu penulis, tetapi dapat memiliki sejumlah pembaca. Ada satu posisi tulis untuk antrean; namun, setiap pembaca terus melacak posisi baca independennya sendiri.

Operasi tulis ke antrean selalu berhasil (tidak diperiksa untuk mengetahui apakah terjadi overflow) selama tidak lebih besar dari kapasitas antrean yang dikonfigurasi (operasi tulis yang lebih besar dari kapasitas antrean akan langsung gagal). Karena setiap pembaca mungkin memiliki posisi baca yang berbeda, data akan keluar dari antrean setiap kali penulisan baru memerlukan ruang, bukan menunggu setiap pembaca membaca setiap bagian data.

Pembaca bertanggung jawab untuk mengambil data sebelum data tersebut keluar dari akhir antrean. Operasi baca yang mencoba membaca lebih banyak data daripada yang tersedia akan langsung gagal (jika tidak memblokir) atau menunggu data yang cukup tersedia (jika memblokir). Operasi baca yang mencoba membaca lebih banyak data daripada kapasitas antrean selalu gagal dengan segera.

Jika pembaca gagal mengimbangi penulis, sehingga jumlah data yang ditulis dan belum dibaca oleh pembaca tersebut lebih besar dari kapasitas antrean, pembacaan berikutnya tidak akan menampilkan data; sebagai gantinya, pembacaan akan mereset posisi baca pembaca agar sama dengan posisi tulis terbaru, lalu menampilkan kegagalan. Jika data yang tersedia untuk dibaca diperiksa setelah overflow, tetapi sebelum pembacaan berikutnya, data tersebut akan menampilkan lebih banyak data yang tersedia untuk dibaca daripada kapasitas antrean, yang menunjukkan overflow telah terjadi. (Jika antrean meluber di antara pemeriksaan data yang tersedia dan mencoba membaca data tersebut, satu-satunya indikasi tambahan adalah bahwa pembacaan gagal.)

Antrean yang disinkronkan

Antrean tersinkron memiliki satu penulis dan satu pembaca dengan satu posisi menulis dan satu posisi baca. Anda tidak dapat menulis lebih banyak data daripada ruang yang tersedia di antrean atau membaca lebih banyak data daripada yang saat ini disimpan di antrean. Bergantung pada apakah fungsi tulis atau baca pemblokiran atau tidak pemblokiran dipanggil, upaya untuk melebihi ruang atau data yang tersedia akan segera menampilkan kegagalan atau memblokir hingga operasi yang diinginkan dapat diselesaikan. Upaya untuk membaca atau menulis lebih banyak data daripada kapasitas antrean selalu gagal.

Menyiapkan FMQ

Antrean pesan memerlukan beberapa objek MessageQueue: satu objek untuk ditulis, dan satu atau beberapa objek untuk dibaca. Tidak ada konfigurasi eksplisit objek mana yang digunakan untuk menulis atau membaca; pengguna bertanggung jawab untuk memastikan bahwa tidak ada objek yang digunakan untuk membaca dan menulis, bahwa ada maksimal satu penulis, dan untuk antrean yang disinkronkan, bahwa ada maksimal satu pembaca.

Membuat objek MessageQueue pertama

Antrean pesan dibuat dan dikonfigurasi dengan satu panggilan:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Penginisialisasi MessageQueue<T, flavor>(numElements) membuat dan menginisialisasi objek yang mendukung fungsi antrean pesan.
  • Penginisialisasi MessageQueue<T, flavor>(numElements, configureEventFlagWord) membuat dan melakukan inisialisasi objek yang mendukung fungsi antrean pesan dengan pemblokiran.
  • flavor dapat berupa kSynchronizedReadWrite untuk antrean yang disinkronkan, atau kUnsynchronizedWrite untuk antrean yang tidak disinkronkan.
  • uint16_t (dalam contoh ini) dapat berupa jenis yang ditentukan HIDL yang tidak melibatkan buffering bertingkat (tidak ada jenis string atau vec), handle, atau antarmuka.
  • kNumElementsInQueue menunjukkan ukuran antrean dalam jumlah entri; ukuran ini menentukan ukuran buffering memori bersama yang dialokasikan untuk antrean.

Membuat objek MessageQueue kedua

Sisi kedua antrean pesan dibuat menggunakan objek MQDescriptor yang diperoleh dari sisi pertama. Objek MQDescriptor dikirim melalui panggilan RPC HIDL atau AIDL ke proses yang menyimpan ujung kedua antrean pesan. MQDescriptor berisi informasi tentang antrean, termasuk:

  • Informasi untuk memetakan buffer dan pointer tulis.
  • Informasi untuk memetakan pointer baca (jika antrean disinkronkan).
  • Informasi untuk memetakan kata flag peristiwa (jika antrean memblokir).
  • Jenis objek (<T, flavor>), yang mencakup jenis yang ditentukan HIDL elemen antrean dan ragam antrean (sinkron atau tidak sinkron).

Anda dapat menggunakan objek MQDescriptor untuk membuat objek MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Parameter resetPointers menunjukkan apakah akan mereset posisi baca dan tulis ke 0 saat membuat objek MessageQueue ini. Dalam antrean yang tidak disinkronkan, posisi baca (yang bersifat lokal untuk setiap objek MessageQueue dalam antrean yang tidak disinkronkan) selalu ditetapkan ke 0 selama pembuatan. Biasanya, MQDescriptor diinisialisasi selama pembuatan objek antrean pesan pertama. Untuk kontrol tambahan atas memori bersama, Anda dapat menyiapkan MQDescriptor secara manual (MQDescriptor ditentukan dalam system/libhidl/base/include/hidl/MQDescriptor.h), lalu membuat setiap objek MessageQueue seperti yang dijelaskan di bagian ini.

Antrean blok dan flag peristiwa

Secara default, antrean tidak mendukung pemblokiran operasi baca dan tulis. Ada dua jenis pemblokiran panggilan baca dan tulis:

  • Bentuk singkat, dengan tiga parameter (pointer data, jumlah item, waktu tunggu), mendukung pemblokiran pada setiap operasi baca dan tulis pada satu antrean. Saat menggunakan formulir ini, antrean menangani flag peristiwa dan bitmask secara internal, dan objek antrean pesan pertama harus diinisialisasi dengan parameter kedua true. Contoh:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Bentuk panjang, dengan enam parameter (termasuk flag peristiwa dan bitmask), mendukung penggunaan objek EventFlag bersama di antara beberapa antrean dan memungkinkan penentuan bitmask notifikasi yang akan digunakan. Dalam hal ini, flag peristiwa dan bitmask harus disediakan ke setiap panggilan baca dan tulis.

Untuk format panjang, Anda dapat menyediakan EventFlag secara eksplisit dalam setiap panggilan readBlocking() dan writeBlocking(). Anda dapat menginisialisasi salah satu antrean dengan flag peristiwa internal, yang kemudian harus diekstrak dari objek MessageQueue antrean tersebut menggunakan getEventFlagWord() dan digunakan untuk membuat objek EventFlag di setiap proses untuk digunakan dengan FMQ lainnya. Atau, Anda dapat melakukan inisialisasi objek EventFlag dengan memori bersama yang sesuai.

Secara umum, setiap antrean sebaiknya hanya menggunakan satu pemblokiran non-pemblokiran, pemblokiran video pendek, atau pemblokiran video panjang. Menggabungkannya bukanlah sebuah error, tetapi pemrograman yang cermat diperlukan untuk mendapatkan hasil yang diinginkan.

Menandai memori sebagai hanya baca

Secara default, memori bersama memiliki izin baca dan tulis. Untuk antrean yang tidak disinkronkan (kUnsynchronizedWrite), penulis mungkin ingin menghapus izin tulis untuk semua pembaca sebelum menyerahkan objek MQDescriptorUnsync. Hal ini memastikan proses lain tidak dapat menulis ke antrean, yang direkomendasikan untuk melindungi dari bug atau perilaku buruk dalam proses pembaca. Jika penulis ingin pembaca dapat mereset antrean setiap kali mereka menggunakan MQDescriptorUnsync untuk membuat sisi baca antrean, memori tidak dapat ditandai sebagai hanya baca. Ini adalah perilaku default konstruktor MessageQueue. Jadi, jika ada pengguna antrean ini, kode mereka perlu diubah untuk membuat antrean dengan resetPointer=false.

  • Penulis: Panggil ashmem_set_prot_region dengan deskriptor file MQDescriptor dan region yang ditetapkan ke hanya baca (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Pembaca: Buat antrean pesan dengan resetPointer=false (defaultnya adalah true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Menggunakan MessageQueue

API publik objek MessageQueue adalah:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Anda dapat menggunakan availableToWrite() dan availableToRead() untuk menentukan jumlah data yang dapat ditransfer dalam satu operasi. Dalam antrean yang tidak disinkronkan:

  • availableToWrite() selalu menampilkan kapasitas antrean.
  • Setiap pembaca memiliki posisi bacanya sendiri dan melakukan penghitungannya sendiri untuk availableToRead().
  • Dari sudut pandang pembaca lambat, antrean diperbolehkan untuk kelebihan beban; hal ini dapat menyebabkan availableToRead() menampilkan nilai yang lebih besar dari ukuran antrean. Pembacaan pertama setelah overflow gagal dan mengakibatkan posisi baca untuk pembaca tersebut ditetapkan sama dengan pointer tulis saat ini, baik overflow dilaporkan melalui availableToRead() maupun tidak.

Metode read() dan write() menampilkan true jika semua data yang diminta dapat (dan telah) ditransfer ke dan dari antrean. Metode ini tidak memblokir; metode ini berhasil (dan menampilkan true), atau segera menampilkan kegagalan (false).

Metode readBlocking() dan writeBlocking() menunggu hingga operasi yang diminta dapat diselesaikan, atau hingga waktu tunggunya habis (nilai timeOutNanos 0 berarti tidak pernah habis waktu tunggunya).

Operasi pemblokiran diterapkan menggunakan kata penanda peristiwa. Secara default, setiap antrean membuat dan menggunakan kata flag-nya sendiri untuk mendukung bentuk singkat readBlocking() dan writeBlocking(). Beberapa antrean dapat berbagi satu kata, sehingga proses dapat menunggu operasi tulis atau baca ke antrean mana pun. Dengan memanggil getEventFlagWord(), Anda bisa mendapatkan pointer ke kata flag peristiwa antrean, dan Anda dapat menggunakan pointer tersebut (atau pointer ke lokasi memori bersama yang sesuai) untuk membuat objek EventFlag yang akan diteruskan ke bentuk panjang readBlocking() dan writeBlocking()untuk antrean yang berbeda. Parameter readNotification dan writeNotification menunjukkan bit mana dalam flag peristiwa yang harus digunakan untuk menandai operasi baca dan menulis pada antrean tersebut. readNotification dan writeNotification adalah bitmask 32-bit.

readBlocking() menunggu pada bit writeNotification; jika parameter tersebut bernilai 0, panggilan akan selalu gagal. Jika nilai readNotification adalah 0, panggilan tidak akan gagal, tetapi pembacaan yang berhasil tidak akan menetapkan bit notifikasi apa pun. Dalam antrean yang disinkronkan, hal ini berarti panggilan writeBlocking() yang sesuai tidak pernah aktif kecuali jika bit disetel di tempat lain. Dalam antrean yang tidak disinkronkan, writeBlocking() tidak menunggu (masih harus digunakan untuk menyetel bit notifikasi penulisan), dan pembacaan tanpa menetapkan bit notifikasi apa pun sesuai untuk pembacaan. Demikian pula, writeblocking() akan gagal jika readNotification adalah 0, dan operasi tulis yang berhasil akan menetapkan writeNotification bit yang ditentukan.

Untuk menunggu beberapa antrean sekaligus, gunakan metode wait() objek EventFlag untuk menunggu bitmask notifikasi. Metode wait() menampilkan kata status dengan bit yang menyebabkan set pengaktifan. Informasi ini kemudian digunakan untuk memverifikasi bahwa antrean yang sesuai memiliki ruang atau data yang cukup untuk operasi baca dan tulis yang diinginkan serta melakukan write() dan read() yang tidak memblokir. Untuk mendapatkan notifikasi setelah operasi, gunakan panggilan lain ke metode wake() objek EventFlag. Untuk mengetahui definisi abstraksi EventFlag, lihat system/libfmq/include/fmq/EventFlag.h.

Operasi zero copy

Metode read, write, readBlocking, dan writeBlocking() mengambil pointer ke buffering input-output sebagai argumen dan menggunakan panggilan memcpy() secara internal untuk menyalin data antara buffer ring yang sama dan FMQ. Untuk meningkatkan performa, Android 8.0 dan yang lebih tinggi menyertakan kumpulan API yang memberikan akses pointer langsung ke buffer ring, sehingga menghilangkan kebutuhan untuk menggunakan panggilan memcpy.

Gunakan API publik berikut untuk operasi FMQ tanpa salinan:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Metode beginWrite menyediakan pointer dasar ke buffering ring FMQ. Setelah data ditulis, commit menggunakan commitWrite(). Metode beginRead dan commitRead berfungsi dengan cara yang sama.
  • Metode beginRead dan Write menggunakan sebagai input jumlah pesan yang akan dibaca dan ditulis, serta menampilkan boolean yang menunjukkan apakah baca atau tulis dapat dilakukan. Jika operasi baca atau tulis memungkinkan, struct memTx akan diisi dengan pointer dasar yang dapat digunakan untuk akses pointer langsung ke dalam memori bersama buffering ring.
  • Struktur MemRegion berisi detail tentang blok memori, termasuk pointer dasar (alamat dasar blok memori) dan panjangnya dalam T (panjang blok memori dalam hal jenis antrean pesan yang ditentukan HIDL).
  • Struktur MemTransaction berisi dua struktur MemRegion, first, dan second karena operasi baca atau tulis ke ring buffer mungkin memerlukan penggabungan ke awal antrean. Hal ini berarti bahwa dua pointer dasar diperlukan untuk membaca dan menulis data ke dalam buffer ring FMQ.

Untuk mendapatkan alamat dasar dan panjang dari struct MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Untuk mendapatkan referensi ke struct MemRegion pertama dan kedua dalam objek MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Contoh penulisan ke FMQ menggunakan API zero copy:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Metode bantuan berikut juga merupakan bagian dari MemTransaction:

  • T* getSlot(size_t idx); menampilkan pointer ke slot idx dalam MemRegions yang merupakan bagian dari objek MemTransaction ini. Jika objek MemTransaction mewakili region memori untuk membaca dan menulis N item jenis T, rentang idx yang valid adalah antara 0 dan N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); menulis nMessages item jenis T ke dalam region memori yang dijelaskan oleh objek, mulai dari indeks startIdx. Metode ini menggunakan memcpy() dan tidak dimaksudkan untuk digunakan untuk operasi tanpa salinan. Jika objek MemTransaction mewakili memori untuk membaca dan menulis N item jenis T, rentang idx yang valid adalah antara 0 dan N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); adalah metode bantuan untuk membaca item nMessages jenis T dari region memori yang dijelaskan oleh objek mulai dari startIdx. Metode ini menggunakan memcpy() dan tidak dimaksudkan untuk digunakan untuk operasi zero copy.

Mengirim antrean melalui HIDL

Di sisi pembuatan:

  1. Buat objek antrean pesan seperti yang dijelaskan di atas.
  2. Pastikan bahwa objek valid dengan isValid().
  3. Jika Anda menunggu beberapa antrean dengan meneruskan EventFlag ke dalam bentuk panjang readBlocking() atau writeBlocking(), Anda dapat mengekstrak pointer flag peristiwa (menggunakan getEventFlagWord()) dari objek MessageQueue yang diinisialisasi untuk membuat flag, dan menggunakan flag tersebut untuk membuat objek EventFlag yang diperlukan.
  4. Gunakan metode MessageQueue getDesc() untuk mendapatkan objek deskripsi.
  5. Dalam file HAL, berikan parameter jenis fmq_sync atau fmq_unsync pada metode dengan T sebagai jenis yang ditentukan HIDL yang sesuai. Gunakan ini untuk mengirim objek yang ditampilkan oleh getDesc() ke proses penerimaan.

Di sisi penerima:

  1. Gunakan objek deskripsi untuk membuat objek MessageQueue. Gunakan ragam antrean dan jenis data yang sama, atau template akan gagal dikompilasi.
  2. Jika Anda mengekstrak flag peristiwa, ekstrak flag dari objek MessageQueue yang sesuai dalam proses penerimaan.
  3. Gunakan objek MessageQueue untuk mentransfer data.