高速メッセージ キュー(FMQ)

HIDL のリモート プロシージャ コール(RPC)インフラストラクチャは、バインダ メカニズムを使用しています。したがって、呼び出しによりオーバーヘッドが発生し、カーネル操作を必要とし、スケジューラのアクションをトリガーする場合があります。ただし、オーバーヘッドが少なく、カーネルの関与がないプロセス間でデータを転送する必要がある場合は、高速メッセージ キュー(FMQ)システムが使用されます。

FMQ は、必要なプロパティを備えたメッセージ キューを作成します。MQDescriptorSync オブジェクトまたは MQDescriptorUnsync オブジェクトを HIDL RPC 呼び出しで送信することができ、受信するプロセスでメッセージ キューにアクセスすると、そのオブジェクトを使用できます。

キューの種類

Android では、2 つのキュータイプ(別名: フレーバー)がサポートされています

  • 非同期キューはオーバーフローが可能で、多数のリーダーを持つことができます。各リーダーは、データが失われないうちに読み取る必要があります
  • 同期キューはオーバーフローできず、持つことができるリーダーは 1 つのみです

どちらのキュータイプもアンダーフローできず(空のキューからの読み取りは失敗します)、持つことができるライターは 1 つのみです。

非同期キュー

非同期キューはライターは 1 つしか持てませんが、任意の数のリーダーを持つことができます。キューの書き込み位置は 1 つですが、各リーダーはそれぞれの読み取り位置を個別にトラッキングします。

キューへの書き込みは、構成されたキューの容量を超えない限り(キューの容量より大きな書き込みは直ちに失敗します)、常に成功します(オーバーフローはチェックされません)。各リーダーの読み取り位置は異なることがあるため、新しい書き込みにスペースが必要となる場合は、各リーダーのデータ読み取り完了を待つことなく、キューの末尾からデータが削除されます。

リーダーは、データがキューの末尾に達して削除される前にデータを取得する必要があります。読み取り可能なデータ量より多くのデータを読み込もうとした場合、読み取りは直ちに失敗するか(非ブロック読み取りの場合)、十分な量のデータが読み取り可能になるのを待ちます(ブロック読み取りの場合)。キューの容量よりも多くのデータを読み取ろうとした場合、読み取りは直ちに失敗します。

リーダーがライターに追い付けず、書き込まれてまだリーダーが読み取っていないデータの量がキューの容量より多くなると、次の読み取りはデータを返しません。代わりに、リーダーの読み取り位置をリセットして最新の書き込み位置に合わせた後で、失敗を返します。オーバーフローしてから次の読み取りまでの間に読み取り可能なデータ量がチェックされた場合、キューの容量よりも多い読み取り可能データが返され、オーバーフローが発生したことが示されます(読み取り可能なデータ量のチェックからデータ読み取りが試行されるまでの間にキューがオーバーフローした場合、オーバーフローを示す兆候は、読み取りが失敗することだけです)。

同期キュー

同期キューは 1 つのライターと 1 つのリーダーを持ちます。ライターは 1 つの書き込み位置を持ち、リーダーは 1 つの読み取り位置を持ちます。キューが書き込み用に用意している容量以上に書き込むことはできません。また、キューが現在保持している以上のデータを読み取ることはできません。使用可能な空き領域またはデータ量を超過して処理しようとした場合の動作は、呼び出された write / read 関数がブロックか非ブロックかに応じて、直ちに失敗が返されるか、目的のオペレーションが完了するまでブロックされるかのいずれかとなります。キューの容量よりも多くのデータを読み書きしようとすると、直ちに失敗します。

FMQ のセットアップ

メッセージ キューには複数の MessageQueue オブジェクトが必要です。つまり、書き込み用に 1 つ、読み取り用に 1 つ以上です。どのオブジェクトが書き込みまたは読み取りに使用されるかは明示的に構成されません。ユーザーは、どのオブジェクトも読み込みと書き込みの両方に使用されないようにするとともに、ライターの数を 1 つ、同期キューの場合はリーダーの数を 1 つに制限する必要があります。

1 つ目の MessageQueue オブジェクトの作成

メッセージ キューは、次のように単一の呼び出しで作成および構成されます。

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) イニシャライザは、メッセージ キュー機能をサポートするオブジェクトを作成して初期化します。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) イニシャライザは、ブロックをサポートするメッセージ キュー機能に対応したオブジェクトを作成して初期化します。
  • flavor は、同期キューに対応する kSynchronizedReadWrite か、非同期キューに対応する kUnsynchronizedWrite のいずれかです。
  • この例の uint16_t には、ネストされたバッファ(string または vec 型以外)、ハンドル、インターフェースのいずれも含まない任意の HIDL 定義型を指定できます。
  • kNumElementsInQueue は、キューのサイズをエントリ数で示し、キューに割り当てられる共有メモリバッファのサイズを決定します。

2 つ目の MessageQueue オブジェクトの作成

メッセージ キューのもう一端は、最初に作成した端から取得された MQDescriptor オブジェクトを使用して作成されます。MQDescriptor オブジェクトは、HIDL または AIDL RPC 呼び出しを介して、メッセージ キューの 2 番目の端を保持するプロセスに送信されます。MQDescriptor には、次のようなキューに関する情報が含まれています。

  • バッファと書き込みポインタをマッピングするための情報。
  • 読み取りポインタをマッピングするための情報(キューが同期の場合)。
  • イベントフラグ ワードをマッピングするための情報(キューがブロックをする場合)。
  • HIDL 定義型のキュー要素とキュー フレーバー(同期または非同期)を含むオブジェクト型(<T, flavor>)。

次のように、MQDescriptor オブジェクトを使用して MessageQueue オブジェクトを構築できます。

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers パラメータは、この MessageQueue オブジェクトの作成中に、読み取り位置と書き込みの位置を 0 にリセットするかどうかを示します。非同期キューでは、読み取り位置(非同期キューでは各 MessageQueue オブジェクトに対してローカル)は作成時に常に 0 に設定されます。通常、MQDescriptor は、1 つ目のメッセージ キュー オブジェクトの作成時に初期化されます。共有メモリをさらにきめ細かく制御するには、MQDescriptor を手動でセットアップし(MQDescriptorsystem/libhidl/base/include/hidl/MQDescriptor.h で定義されています)、このセクションの説明に従ってすべての MessageQueue オブジェクトを作成します。

ブロックキューとイベントフラグ

デフォルトでは、キューは読み取りと書き込みのブロックに対応していません。読み取りと書き込みの呼び出しのブロックには、次の 2 種類があります。

  • 短形式。3 つのパラメータ(データポインタ、アイテム数、タイムアウト)を使用します。1 つのキューの個々の読み取りと書き込みの操作でのブロックをサポートします。この形式を使用する場合、キューは内部的にイベントフラグとビットマスクを処理します。1 つ目のメッセージ キュー オブジェクトは、2 番目のパラメータに true を指定して初期化する必要があります。次に例を示します。
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 長形式。6 つのパラメータ(イベントフラグとビットマスクを含む)を使用します。複数のキュー間での共有 EventFlag オブジェクトの使用をサポートするほか、使用する通知ビットマスクを指定できます。この場合、イベントフラグとビットマスクを、各読み取りと書き込みの呼び出しに渡す必要があります。

長形式の場合、readBlocking()writeBlocking() の各呼び出しで明示的に EventFlag を指定できます。いずれか 1 つのキューを内部イベントフラグで初期化できます。フラグは、キューの MessageQueue オブジェクトから getEventFlagWord() を使って抽出し、他の FMQ で利用できるように各プロセスで抽出したフラグを使って EventFlag オブジェクトを作成する必要があります。また、適当な共有メモリを使って EventFlag オブジェクトを初期化するという方法もあります。

一般に、各キューでは、非ブロック、短形式ブロック、長形式ブロックのうち 1 つのみを使用します。これらを混用しても間違いではありませんが、目的の結果を得るには慎重なプログラミングが必要です。

メモリを読み取り専用としてマークする

デフォルトでは、共有メモリには読み取り権限と書き込み権限が設定されています。非同期キュー(kUnsynchronizedWrite)の場合、ライターは MQDescriptorUnsync オブジェクトを割り当てる前に、すべてのリーダーの書き込み権限を削除しようとすることがあります。これにより、別のプロセスでキューを書き込めなくなります(リーダーのプロセスでバグや不正な動作を防ぐためにおすすめの方法です)。MQDescriptorUnsync を使ってキューの読み込みサイドを作成するときにライターがキューのリセットをリーダーに常に許可する場合は、メモリを読み取り専用としてマークすることはできません。これは MessageQueue コンストラクタのデフォルトの動作です。したがって、このキューに既存のユーザーがいる場合、resetPointer=false を使ってキューを構築できるよう、そのユーザーのコードを変更する必要があります。

  • ライター: MQDescriptor ファイル記述子を使って ashmem_set_prot_region を呼び出し、領域を読み込み専用(PROT_READ)に設定します:
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • リーダー: resetPointer=false(デフォルトは true)を使ってメッセージ キューを作成します:
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

MessageQueue の使用

MessageQueue オブジェクトの公開 API は次のとおりです。

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite()availableToRead() を使用すると、1 回のオペレーションで転送できるデータの量がわかります。非同期キューの場合、以下のようになります。

  • availableToWrite() は常にキューの容量を返します。
  • 各リーダーは固有の読み取り位置を持ち、個別に availableToRead() の計算を行います。
  • 遅いリーダーからすると、キューのオーバーフローが許容されているため、availableToRead() がキューのサイズより大きい値を返す結果になることがあります。オーバーフロー後の最初の読み取りは失敗し、availableToRead() でオーバーフローが報告されたかどうかにかかわらず、そのリーダーの読み取り位置は、現在の書き込みポインタと等しい位置に設定されます。

要求されたすべてのデータがキューとの間で転送される可能性がある場合(および転送された場合)read() メソッドと write() メソッドは true を返します。これらのメソッドはブロックを行いません。成功して true を返すか、または直ちに失敗(false)を返します。

readBlocking() メソッドと writeBlocking() メソッドは、要求されたオペレーションが完了するか、タイムアウトするまで待ちます(timeOutNanos 値が 0 の場合はタイムアウトしません)。

ブロック オペレーションは、イベントフラグ ワードを使用して実装されます。デフォルトでは、各キューは独自のフラグワードを作成して使用し、readBlocking()writeBlocking() の短形式をサポートします。プロセスがどのキューでも書き込みまたは読み取りを待機できるように、複数のキューが 1 つのワードを共有することもできます。getEventFlagWord() を呼び出すことでキューのイベントフラグ ワードへのポインタが得られ、このポインタ(または適当な共有メモリ位置へのポインタ)を使用して EventFlag オブジェクトを作成すると、別のキューの長形式の readBlocking() および writeBlocking() に渡すことができます。readNotification および writeNotification パラメータは、イベントフラグのどのビットをキューの読み取りと書き込みの通知に使用すべきかを示します。readNotificationwriteNotification は 32 ビットのビットマスクです。

readBlocking()writeNotification ビットを待機します。このパラメータが 0 の場合、呼び出しは常に失敗します。readNotification 値が 0 の場合、呼び出しは失敗しませんが、読み取りが成功しても通知ビットは設定されません。同期キューでは、ビットが別の場所に設定されていない限り、対応する writeBlocking() 呼び出しはウェイクアップされません。非同期キューでは、writeBlocking() は待機しません(ただし、書き込み通知のビットの設定にこれを使用する必要はあります)。また、読み取りが通知ビットを設定しないようにするのが適切です。同様に、readNotification が 0 で、書き込みが成功して指定した writeNotification ビットが設定されると、writeblocking() は失敗します。

複数のキューを同時に待機するには、EventFlag オブジェクトの wait() メソッドを使用して、通知のビットマスクを待機します。wait() メソッドは、ウェイクアップが設定される原因となったビットとともにステータス ワードを返します。次に、この情報を使用して、対応するキューに、目的の書き込みおよび読み取りオペレーションのための十分な領域またはデータがあることを確認し、非ブロックの write() および read() を実行します。オペレーション後の通知を取得するには、もう一度 EventFlag オブジェクトの wake() メソッドの呼び出しを使用します。EventFlag 抽象化の定義については、system/libfmq/include/fmq/EventFlag.h を参照してください。

ゼロコピー オペレーション

readwritereadBlockingwriteBlocking() の各メソッドは、入出力バッファへのポインタを引数として受け取り、memcpy() 呼び出しを内部的に使用して、入出力バッファと FMQ リングバッファの間でデータをコピーします。Android 8.0 以上では、パフォーマンスを向上させるため、リングバッファへの直接ポインタ アクセスを提供する API のセットが含まれているので、memcpy 呼び出しを使用する必要はなくなりました。

ゼロコピー FMQ オペレーションには次の公開 API を使用します。

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite メソッドは、FMQ リングバッファへのベースポインタを提供します。データが書き込まれた後、commitWrite() を使用してコミットします。beginRead メソッドと commitRead メソッドも同じように動作します。
  • beginRead メソッドと Write メソッドは、読み書きされるメッセージの数を入力として受け取り、読み書きが可能かどうかを示すブール値を返します。読み取りまたは書き込みが可能な場合、memTx 構造体には、リングバッファ共有メモリへの直接ポインタ アクセスに使用できるベースポインタが格納されます。
  • MemRegion 構造体には、ベースポインタ(メモリブロックのベースアドレス)と T 単位の長さ(HIDL 定義型のメッセージ キューを単位としたメモリブロックの長さ)を含むメモリブロックの詳細が含まれます。
  • リングバッファの読み取りまたは書き込みで、キューの先頭へのラップアラウンドが必要になる場合があるため、MemTransaction 構造体には firstsecond の 2 つの MemRegion 構造体が含まれます。これは、FMQ リングバッファへのデータの読み書きに 2 つのベースポインタが必要であることを意味します。

ベースアドレスと長さを MemRegion 構造体から取得するには、次のようにします。

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

MemTransaction オブジェクト内の 1 番目と 2 番目の MemRegion 構造体への参照を取得するには、次のようにします。

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

ゼロコピー API を使用した FMQ への書き込みの例を以下に示します。

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

次のヘルパー メソッドも MemTransaction の一部です。

  • T* getSlot(size_t idx); は、MemTransaction オブジェクトの一部である MemRegions 内のスロット idx へのポインタを返します。MemTransaction オブジェクトが T 型の N 個のアイテムを読み書きするメモリ領域を表す場合、idx の有効な範囲は 0~N-1 です。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); は、T 型の nMessages 個のアイテムを、オブジェクトによって記述されたメモリ領域のインデックス startIdx 以降に書き込みます。このメソッドは memcpy() を使用します。ゼロコピー オペレーションに使用することは想定されていません。MemTransaction オブジェクトが T 型の N 個のアイテムを読み書きするメモリを表す場合、idx の有効範囲は 0~N-1 です。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); は、T 型の nMessages 個のアイテムを、オブジェクトによって記述されたメモリ領域の startIdx 以降から読み取るヘルパー メソッドです。このメソッドは memcpy() を使用します。ゼロコピー オペレーションに使用することは想定されていません。

HIDL を介したキューの送信

作成側:

  1. 上記のようにメッセージ キュー オブジェクトを作成します。
  2. isValid() を使用してオブジェクトが有効であることを確認します。
  3. EventFlag を長形式の readBlocking() または writeBlocking() に渡して複数のキューを待機する場合は、初期化された MessageQueue オブジェクトから(getEventFlagWord() で)イベントフラグ ポインタを抽出してフラグを作成し、そのフラグを使用して必要な EventFlag オブジェクトを作成できます。
  4. MessageQueue のメソッドである getDesc() を使用して記述子オブジェクトを取得します。
  5. HAL ファイルで、T が適切な HIDL 定義型であるような fmq_sync 型または fmq_unsync 型のパラメータをこのメソッドに渡します。これを使用して、getDesc() から返されたオブジェクトを受信プロセスに送信します。

受信側:

  1. 記述子オブジェクトを使用して、MessageQueue オブジェクトを作成します。同じキュー フレーバーとデータ型を使用してください。そうしないと、テンプレートはコンパイルに失敗します。
  2. イベントフラグを抽出した場合は、受信プロセスで対応する MessageQueue オブジェクトからそのフラグを抽出します。
  3. MessageQueue オブジェクトを使用してデータを転送します。