תור הודעות מהיר (FMQ)

התשתית של הקריאה לשירות מרוחק (RPC) של HIDL משתמשת במנגנוני קישור (binder), כלומר הקריאות כוללות תקורה, דורשות פעולות ליבה ויכולות להפעיל פעולה של מתזמן. עם זאת, במקרים שבהם צריך להעביר נתונים בין תהליכים עם פחות תקורה וללא מעורבות של הליבה, משתמשים במערכת Fast Message Queue‏ (FMQ).

FMQ יוצר תורים של הודעות עם המאפיינים הרצויים. אפשר לשלוח אובייקט MQDescriptorSync או MQDescriptorUnsync באמצעות קריאה ל-RPC של HIDL, והתהליך המקבל משתמש באובייקט כדי לגשת לתור ההודעות.

סוגי תורים

ב-Android יש תמיכה בשני סוגי תורים (שנקראים טעמים):

  • בתורים לא מסונכרנים מותר לנתונים לחרוג מהנפח, ויכולים להיות להם הרבה קוראים. כל קורא צריך לקרוא את הנתונים בזמן, אחרת הם יאבדו.
  • אסור שתור סונכרן יעלה על המכסה, ויכול להיות לו רק קורא אחד.

בשני סוגי התורים אסור להגיע למצב של זרימה נמוכה מדי (קריאה מתור ריק נכשלת), ויכול להיות רק כותב אחד.

תורים לא מסונכרנים

לתור לא מסונכרן יש רק כותב אחד, אבל יכול להיות בו מספר בלתי מוגבל של קוראים. בתור, יש מיקום אחד לכתיבה, אבל כל קורא עוקב אחרי מיקום הקריאה העצמאי שלו.

פעולות כתיבה לתור תמיד מצליחות (לא מתבצעת בדיקה לחריגה ממלאי) כל עוד הן לא גדולות מיכולת האחסון שהוגדרה לתור (פעולות כתיבה גדולות מיכולת האחסון של התור נכשלות באופן מיידי). מכיוון שלכל קורא יכולה להיות מיקום קריאה שונה, במקום להמתין לכל קורא כדי שיקריא כל פיסת נתונים, הנתונים יוצאים מהתור בכל פעם שכתיבה חדשה זקוקה למרחב.

הקוראים אחראים לאחזר את הנתונים לפני שהם נופלים מהקצה של התור. קריאה שמנסה לקרוא יותר נתונים ממה שזמין תיכשל באופן מיידי (אם היא לא חוסמת) או תמתין עד שיהיו זמינים מספיק נתונים (אם היא חוסמת). קריאה שמנסה לקרוא יותר נתונים ממה שקיבולת התור תמיד נכשלת.

אם הקורא לא מצליח לעמוד בקצב של הסופר, כך שכמות הנתונים שנכתבו ועדיין לא נקראים על ידו גדולה מיכולת האחסון של התור, הקריאה הבאה לא מחזירה נתונים. במקום זאת, מיקום הקריאה של הקורא מתאפס כך שיהיה שווה למיקום הכתיבה האחרון, ואז מוחזר סטטוס כשל. אם הנתונים שזמינים לקריאה נבדקים אחרי חריגה ממלאי אבל לפני הקריאה הבאה, יוצגו יותר נתונים שזמינים לקריאה מאשר קיבולת התור, דבר שמציין שחריגה ממלאי התרחשה. (אם התור יתמלא בין בדיקת הנתונים הזמינים לבין הניסיון לקרוא את הנתונים האלה, הקריאה תיכשל ותהיה זו הדרך היחידה לדעת שהתור מלא).

תורים מסונכרנים

בתור מסונכרן יש כותב אחד וקורא אחד, עם מיקום כתיבה אחד ומיקום קריאה אחד. אי אפשר לכתוב יותר נתונים ממה שיש בתור, או לקרוא יותר נתונים ממה שיש בתור כרגע. בהתאם לקריאה לפונקציית הכתיבה או הקריאה החוסמת או הלא חוסמת, ניסיונות לחרוג מהמקום או מהנתונים הזמינים יחזירו כשל באופן מיידי או ייחסמו עד שאפשר יהיה להשלים את הפעולה הרצויה. ניסיונות לקרוא או לכתוב יותר נתונים מאשר קיבולת התור תמיד נכשלים באופן מיידי.

הגדרת FMQ

כדי ליצור תור הודעות נדרשים כמה אובייקטים של MessageQueue: אחד לכתיבה, ואחד או יותר לקריאה. אין הגדרה מפורשת של אובייקט שמשמש לכתיבה או לקריאה. המשתמש אחראי לוודא שאף אובייקט לא משמש גם לקריאה וגם לכתיבה, שיש רק כותב אחד, ובתורות מסונכרנות, שיש רק קורא אחד.

יצירת אובייקט MessageQueue הראשון

אפשר ליצור ולקבוע את ההגדרות של תור הודעות באמצעות קריאה אחת:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • ה-initializer של MessageQueue<T, flavor>(numElements) יוצר ומפעיל אובייקט שתומך בפונקציונליות של תור ההודעות.
  • ה-initializer של MessageQueue<T, flavor>(numElements, configureEventFlagWord) יוצר ומפעיל אובייקט שתומך בפונקציונליות של תור ההודעות עם חסימה.
  • הערך של flavor יכול להיות kSynchronizedReadWrite עבור תור מסונכרן או kUnsynchronizedWrite עבור תור לא מסונכרן.
  • uint16_t (בדוגמה הזו) יכול להיות כל סוג שהוגדר ב-HIDL שלא כולל מאגרים בתצוגת עץ (אין סוגי string או vec), אחזקים או ממשקים.
  • kNumElementsInQueue מציין את גודל התור במספר הרשאות הגישה. הוא קובע את גודל מאגר הנתונים הזמני של הזיכרון המשותף שהוקצה לתור.

יצירת האובייקט השני של MessageQueue

הצד השני של תור ההודעות נוצר באמצעות אובייקט MQDescriptor שהתקבל מהצד הראשון. האובייקט MQDescriptor נשלח בקריאה ל-HIDL או ל-AIDL RPC אל התהליך שבו נשמר הקצה השני של תור ההודעות. השדה MQDescriptor מכיל מידע על התור, כולל:

  • מידע למיפוי המאגר ולמצב של מצביע הכתיבה.
  • מידע למיפוי של מצבית הקריאה (אם התור מסונכרן).
  • מידע למיפוי של מילה של דגל אירוע (אם התור חסום).
  • סוג האובייקט (<T, flavor>), שכולל את הסוג המוגדר HIDL של רכיבים בתור ואת סוג האובייקט (מסונכרן או לא מסונכרן).

אפשר להשתמש באובייקט MQDescriptor כדי ליצור אובייקט MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

הפרמטר resetPointers מציין אם לאפס את מיקומי הקריאה והכתיבה ל-0 במהלך יצירת האובייקט MessageQueue. בתור לא מסונכרן, מיקום הקריאה (שמייצג את המיקום המקומי של כל אובייקט MessageQueue בתורים לא מסונכרנים) תמיד מוגדר ל-0 במהלך היצירה. בדרך כלל, MQDescriptor מופעל במהלך יצירת אובייקט תור ההודעות הראשון. כדי לשלוט יותר טוב בזיכרון המשותף, אפשר להגדיר את MQDescriptor באופן ידני (MQDescriptor מוגדר בקובץ system/libhidl/base/include/hidl/MQDescriptor.h), ואז ליצור כל אובייקט MessageQueue כפי שמתואר בקטע הזה.

רצפי חסימה ודגלים של אירועים

כברירת מחדל, תורים לא תומכים בחסימת קריאה וכתיבה. יש שני סוגים של שיחות חסימה של קריאה וכתיבה:

  • טופס קצר, עם שלושה פרמטרים (מצביע נתונים, מספר פריטים, זמן קצוב לתפוגה), תומך בחסימה של פעולות קריאה וכתיבה נפרדות בתור יחיד. כשמשתמשים בטופס הזה, המערכת מטפלת בדגל האירוע ובמסכות הביטים באופן פנימי, ואובייקט תור ההודעות הראשון צריך להיות מאותחלל עם פרמטר שני של true. לדוגמה:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • צורה ארוכה, עם שישה פרמטרים (כולל דגל אירוע ומסכות ביטים), תומכת בשימוש באובייקט EventFlag משותף בין תורים מרובים ומאפשרת לציין את מסכות הביט של ההתראות שבהן צריך להשתמש. במקרה כזה, צריך לספק את דגל האירוע ואת מסיכות הביטים לכל קריאה וכתיבה.

בטופס הארוך, אפשר לספק את EventFlag באופן מפורש בכל קריאה ל-readBlocking() ול-writeBlocking(). אפשר לאתחל אחת מהתורניות באמצעות דגל אירוע פנימי, ולאחר מכן לחלץ אותו מהאובייקטים מסוג MessageQueue של התורנית הזו באמצעות getEventFlagWord() ולהשתמש בו כדי ליצור אובייקטים מסוג EventFlag בכל תהליך לשימוש עם תורניות FMQ אחרות. לחלופין, אפשר לאתחל את האובייקטים של EventFlag באמצעות כל זיכרון משותף מתאים.

באופן כללי, בכל תור צריך להשתמש רק באחת מהאפשרויות הבאות: חסימה ללא ניתוק, חסימה של סרטוני Shorts או חסימה של סרטונים ארוכים. אין שגיאה בשימוש בשניהם, אבל נדרש תכנות זהיר כדי לקבל את התוצאה הרצויה.

סימון הזיכרון כזיכרון לקריאה בלבד

כברירת מחדל, לזיכרון המשותף יש הרשאות קריאה וכתיבה. במקרה של תורים לא מסונכרנים (kUnsynchronizedWrite), יכול להיות שמי שכותב את הנתונים ירצה להסיר את הרשאות הכתיבה מכל הקוראים לפני שהוא נותן את האובייקטים מסוג MQDescriptorUnsync. כך ניתן לוודא שהתהליכים האחרים לא יכולים לכתוב לתור. מומלץ לעשות זאת כדי להגן מפני באגים או התנהגות לא תקינה בתהליכי הקוראים. אם הכותבים רוצה לאפשר לקוראים לאפס את התור בכל פעם שהם משתמשים ב-MQDescriptorUnsync כדי ליצור את הצד לקריאה של התור, הזיכרון לא יסומן לקריאה בלבד. זוהי התנהגות ברירת המחדל של ה-constructor של MessageQueue. לכן, אם יש משתמשים קיימים בתור הזה, צריך לשנות את הקוד שלהם כדי ליצור את התור באמצעות resetPointer=false.

  • כותב: קוראים ל-ashmem_set_prot_region עם מתאר קובץ MQDescriptor והאזור מוגדר לקריאה בלבד (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • קוראים: יוצרים תור הודעות באמצעות resetPointer=false (ברירת המחדל היא true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

שימוש ב-MessageQueue

ממשק ה-API הציבורי של האובייקט MessageQueue הוא:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

אפשר להשתמש ב-availableToWrite() וב-availableToRead() כדי לקבוע כמה נתונים אפשר להעביר בפעולה אחת. בתור לא מסונכרן:

  • הפונקציה availableToWrite() תמיד מחזירה את הקיבולת של התור.
  • לכל קורא יש מיקום קריאה משלו והוא מבצע חישוב משלו של availableToRead().
  • מנקודת המבט של קורא איטי, התור יכול לגלוש. כתוצאה מכך, availableToRead() יכול להחזיר ערך גדול יותר מגודל התור. הקריאה הראשונה אחרי חריגה ממלאי הנתונים נכשלת, וכתוצאה מכך מיקום הקריאה של הקורא הזה מוגדר כערך של מצביע הכתיבה הנוכחי, גם אם חריגת המליאה דווחה דרך availableToRead() וגם אם לא.

השיטות read() ו-write() מחזירות את הערך true אם ניתן היה להעביר (והועברו) את כל הנתונים המבוקשים אל התור וממנו. השיטות האלה לא חוסמות את הקוד. הן מצליחות (ומחזירות את הערך true) או חוזרות עם שגיאה (false) באופן מיידי.

השיטות readBlocking() ו-writeBlocking() ממתינות עד שאפשר להשלים את הפעולה המבוקשת, או עד שהן מגיעות לזמן קצוב לתפוגה (ערך timeOutNanos של 0 מציין שהזמן הקצוב לתפוגה לא יפוג אף פעם).

פעולות חסימה מיושמות באמצעות מילה של סימון אירוע. כברירת מחדל, כל תור יוצר מילה משלו לדגל ומשתמש בה כדי לתמוך בפורמט המקוצר של readBlocking() ו-writeBlocking(). מספר תורים יכולים לשתף מילה אחת, כך שתהליך יכול להמתין לכתיבת נתונים או לקריאת נתונים מכל אחד מהתור. בקריאה ל-getEventFlagWord(), אפשר לקבל מצביע למילת הדגל של האירוע בתור, ולהשתמש בו (או בכל מצביע למיקום זיכרון מתאים מתאים) כדי ליצור אובייקט EventFlag שיועבר בצורה הארוכה של readBlocking() ו-writeBlocking() לתור אחר. הפרמטרים readNotification ו-writeNotification קובעים באילו ביטים בדגל האירוע צריך להשתמש כדי לסמן קריאות וכתיבה באותה תור. readNotification ו-writeNotification הם מסיכות ביט ב-32 ביט.

readBlocking() ממתין לביטים של writeNotification. אם הערך של הפרמטר הזה הוא 0, הקריאה תמיד נכשלת. אם הערך של readNotification הוא 0, הקריאה לא תיכשל, אבל קריאה שהושלמה לא תגדיר ביטים של התראות. בתור מסונכרן, המשמעות היא שהקריאה התואמת של writeBlocking() אף פעם לא תתעורר אלא אם הביט מוגדר במקום אחר. בתור לא מסונכרן, הפונקציה writeBlocking() לא ממתינה (עדיין צריך להשתמש בה כדי להגדיר את הביט של ההתראה על הכתיבה), ומומלץ שלא להגדיר ביטים של התראות בקריאות. באופן דומה, הפונקציה writeblocking() נכשלת אם הערך של readNotification הוא 0, וכתיבה מוצלחת מגדירה את הביטים של writeNotification שצוינו.

כדי להמתין בכמה תורים בו-זמנית, משתמשים ב-method‏ wait() של אובייקט EventFlag כדי להמתין למסכת ביט של התראות. ה-method wait() מחזירה מילת סטטוס עם הביטים שגרמו להגדרת ההתעוררות. לאחר מכן המידע הזה ישמש כדי לוודא שבתור התואם יש מספיק מקום או נתונים לפעולת הכתיבה והקריאה הרצויה, ולבצע פעולת כתיבה וקריאה ללא חסימה של write() ושל read(). כדי לקבל התראה בסיום הפעולה, צריך לבצע קריאה נוספת לשיטה wake() של האובייקט EventFlag. להגדרה של הפונקציה הזו, ראו system/libfmq/include/fmq/EventFlag.h.EventFlag

אפס פעולות העתקה

ה-methods read, write, readBlocking ו-writeBlocking() לוקחות מצביע למאגר נתונים זמני של קלט כארגומנט, ומשתמשים בהפעלות memcpy() באופן פנימי כדי להעתיק נתונים בין אותו למאגר הצלצול של FMQ. כדי לשפר את הביצועים, Android בגרסה 8.0 ואילך כולל קבוצה של ממשקי API שמספקים גישה ישירה למצביע למאגר הטבעת, ומבטלים את הצורך בשימוש בקריאות memcpy.

אפשר להשתמש בממשקי ה-API הציבוריים הבאים לפעולות FMQ ללא העתקה:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • השיטה beginWrite מספקת נקודות בסיס למאגר הטבעת של FMQ. אחרי כתיבת הנתונים, מבצעים אותם באמצעות commitWrite(). השיטות beginRead ו-commitRead פועלות באותו אופן.
  • השיטות beginRead ו-Write מקבלות כקלט את מספר ההודעות שרוצים לקרוא ולכתוב, ומחזירות ערך בוליאני שמציין אם אפשר לקרוא או לכתוב. אם אפשר לקרוא או לכתוב, המבנה memTx מאוכלס ב-base pointers שאפשר להשתמש בהם כדי לגשת ישירות למצביעים בזיכרון המשותף של מאגר הטבעות.
  • המבנה MemRegion מכיל פרטים על בלוק של זיכרון, כולל מצביע הבסיס (כתובת הבסיס של בלוק הזיכרון) והאורך במונחים של T (אורך בלוק הזיכרון במונחים של הסוג של תור ההודעות שהוגדר ב-HIDL).
  • המבנה MemTransaction מכיל שני מבני MemRegion, first ו-second, כי קריאה או כתיבה למאגר הטבעת עשויה לדרוש חזרה לתחילת התור. המשמעות היא שנדרשים שני מצביעי בסיס כדי לקרוא ולכתוב נתונים במאגר הנתונים הזמני של FMQ.

כדי לקבל את כתובת הבסיס ואת האורך ממבנה MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

כדי לקבל הפניות למבני MemRegion הראשון והשני בתוך אובייקט MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

דוגמה לכתיבת נתונים ב-FMQ באמצעות ממשקי API ללא העתקה (zero copy):

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

שיטות העזר הבאות הן גם חלק מ-MemTransaction:

  • הפונקציה T* getSlot(size_t idx); מחזירה הפניה למק"ט idx ב-MemRegions שנמצאים באובייקט MemTransaction הזה. אם האובייקט MemTransaction מייצג את אזורי הזיכרון לקריאה ולכתיבה של N פריטים מסוג T, הטווח החוקי של idx הוא בין 0 ל-N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); כותב nMessages פריטים מסוג T באזורי הזיכרון שמתוארים על ידי האובייקט, החל מהאינדקס startIdx. השיטה הזו משתמשת ב-memcpy() ואינה מיועדת לפעולות של העברה ללא העתקה (zero copy). אם האובייקט MemTransaction מייצג זיכרון לקריאה ולכתיבה של N פריטים מסוג T, טווח הערכים החוקי של idx הוא בין 0 ל-N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); היא שיטת עזר לקריאת פריטים מסוג nMessages מסוג T מאזורי הזיכרון שמתוארים על ידי האובייקט, החל מ-startIdx. השיטה הזו משתמשת ב-memcpy() ואינה מיועדת לשימוש בפעולה ללא העתקה.

שליחת התור ב-HIDL

בצד היצירה:

  1. יוצרים אובייקט של תור הודעות כפי שמתואר למעלה.
  2. מוודאים שהאובייקט תקין באמצעות isValid().
  3. אם אתם ממתינים בכמה תורים על ידי העברת EventFlag לטופס הארוך של readBlocking() או writeBlocking(), תוכלו לחלץ את הפונקציה של אירוע הדגל (באמצעות getEventFlagWord()) מאובייקט MessageQueue שהותחל כדי ליצור את הדגל, ולהשתמש בדגל הזה כדי ליצור את האובייקט הנדרש EventFlag.
  4. משתמשים בשיטה MessageQueuegetDesc() כדי לקבל אובייקט מתאר.
  5. בקובץ ה-HAL, מקצים לשיטה פרמטר מסוג fmq_sync או fmq_unsync, כאשר T הוא סוג מתאים שהוגדר ב-HIDL. משתמשים בפונקציה הזו כדי לשלוח את האובייקט שהוחזר על ידי getDesc() לתהליך המקבל.

בצד המקבל:

  1. משתמשים באובייקט המתאר כדי ליצור אובייקט MessageQueue. צריך להשתמש באותו הטעם של התור ובאותו סוג הנתונים, או שהתבנית לא תצליח לעבור הידור (compile).
  2. אם חילוצתם דגל אירוע, חילוצו את הדגל מהאובייקט MessageQueue התואם בתהליך המקבל.
  3. כדי להעביר נתונים צריך להשתמש באובייקט MessageQueue.