mirror of
https://github.com/Evolution-X/hardware_interfaces
synced 2026-02-02 20:24:19 +00:00
Merge "MH2 | Implement pending writes thread"
This commit is contained in:
@@ -49,8 +49,15 @@ HalProxy::HalProxy(std::vector<ISensorsSubHal*>& subHalList) : mSubHalList(subHa
|
||||
}
|
||||
|
||||
HalProxy::~HalProxy() {
|
||||
// TODO: Join any running threads and clean up FMQs and any other allocated
|
||||
// state.
|
||||
{
|
||||
std::lock_guard<std::mutex> lockGuard(mEventQueueWriteMutex);
|
||||
mPendingWritesRun = false;
|
||||
mEventQueueWriteCV.notify_one();
|
||||
}
|
||||
if (mPendingWritesThread.joinable()) {
|
||||
mPendingWritesThread.join();
|
||||
}
|
||||
// TODO: Cleanup wakeup thread once it is implemented
|
||||
}
|
||||
|
||||
Return<void> HalProxy::getSensorsList(getSensorsList_cb _hidl_cb) {
|
||||
@@ -120,7 +127,8 @@ Return<Result> HalProxy::initialize(
|
||||
result = Result::BAD_VALUE;
|
||||
}
|
||||
|
||||
// TODO: start threads to read wake locks and process events from sub HALs.
|
||||
mPendingWritesThread = std::thread(startPendingWritesThread, this);
|
||||
// TODO: start threads to read wake locks.
|
||||
|
||||
for (size_t i = 0; i < mSubHalList.size(); i++) {
|
||||
auto subHal = mSubHalList[i];
|
||||
@@ -277,21 +285,66 @@ void HalProxy::initializeSubHalCallbacksAndSensorList() {
|
||||
initializeSensorList();
|
||||
}
|
||||
|
||||
void HalProxy::startPendingWritesThread(HalProxy* halProxy) {
|
||||
halProxy->handlePendingWrites();
|
||||
}
|
||||
|
||||
void HalProxy::handlePendingWrites() {
|
||||
// TODO: Find a way to optimize locking strategy maybe using two mutexes instead of one.
|
||||
std::unique_lock<std::mutex> lock(mEventQueueWriteMutex);
|
||||
while (mPendingWritesRun) {
|
||||
mEventQueueWriteCV.wait(
|
||||
lock, [&] { return !mPendingWriteEventsQueue.empty() || !mPendingWritesRun; });
|
||||
if (!mPendingWriteEventsQueue.empty() && mPendingWritesRun) {
|
||||
std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front();
|
||||
size_t eventQueueSize = mEventQueue->getQuantumCount();
|
||||
size_t numToWrite = std::min(pendingWriteEvents.size(), eventQueueSize);
|
||||
lock.unlock();
|
||||
// TODO: Find a way to interrup writeBlocking if the thread should exit
|
||||
// so we don't have to wait for timeout on framework restarts.
|
||||
if (!mEventQueue->writeBlocking(
|
||||
pendingWriteEvents.data(), numToWrite,
|
||||
static_cast<uint32_t>(EventQueueFlagBits::EVENTS_READ),
|
||||
static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS),
|
||||
kWakelockTimeoutNs, mEventQueueFlag)) {
|
||||
ALOGE("Dropping %zu events after blockingWrite failed.", numToWrite);
|
||||
} else {
|
||||
mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
|
||||
}
|
||||
lock.lock();
|
||||
if (pendingWriteEvents.size() > eventQueueSize) {
|
||||
// TODO: Check if this erase operation is too inefficient. It will copy all the
|
||||
// events ahead of it down to fill gap off array at front after the erase.
|
||||
pendingWriteEvents.erase(pendingWriteEvents.begin(),
|
||||
pendingWriteEvents.begin() + eventQueueSize);
|
||||
} else {
|
||||
mPendingWriteEventsQueue.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events) {
|
||||
std::lock_guard<std::mutex> lock(mEventQueueMutex);
|
||||
size_t numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
|
||||
if (numToWrite > 0) {
|
||||
if (mEventQueue->write(events.data(), numToWrite)) {
|
||||
// TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more writes
|
||||
// immediately
|
||||
mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
|
||||
} else {
|
||||
numToWrite = 0;
|
||||
size_t numToWrite = 0;
|
||||
std::lock_guard<std::mutex> lock(mEventQueueWriteMutex);
|
||||
if (mPendingWriteEventsQueue.empty()) {
|
||||
numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
|
||||
if (numToWrite > 0) {
|
||||
if (mEventQueue->write(events.data(), numToWrite)) {
|
||||
// TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more
|
||||
// writes immediately
|
||||
mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
|
||||
} else {
|
||||
numToWrite = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (numToWrite < events.size()) {
|
||||
// TODO: Post from events[numToWrite -> end] to background events queue
|
||||
// Signal background thread
|
||||
// TODO: Bound the mPendingWriteEventsQueue so that we do not trigger OOMs if framework
|
||||
// stalls
|
||||
mPendingWriteEventsQueue.push(
|
||||
std::vector<Event>(events.begin() + numToWrite, events.end()));
|
||||
mEventQueueWriteCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,12 @@
|
||||
#include <hidl/MQDescriptor.h>
|
||||
#include <hidl/Status.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
#include <thread>
|
||||
|
||||
namespace android {
|
||||
namespace hardware {
|
||||
@@ -159,6 +164,7 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
|
||||
*/
|
||||
std::vector<ISensorsSubHal*> mSubHalList;
|
||||
|
||||
//! The list of subhal callbacks for each subhal where the indices correlate with mSubHalList
|
||||
std::vector<const sp<IHalProxyCallback>> mSubHalCallbacks;
|
||||
|
||||
/**
|
||||
@@ -179,6 +185,9 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
|
||||
//! The mutex for the event queue.
|
||||
std::mutex mEventQueueMutex;
|
||||
|
||||
//! The timeout for each pending write on background thread for events.
|
||||
static const int64_t kWakelockTimeoutNs = 5 * INT64_C(1000000000) /* 5 seconds */;
|
||||
|
||||
//! The scoped wakelock ref count.
|
||||
size_t mWakelockRefCount = 0;
|
||||
|
||||
@@ -188,6 +197,21 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
|
||||
//! The bit mask used to get the subhal index from a sensor handle.
|
||||
static constexpr uint32_t kSensorHandleSubHalIndexMask = 0xFF000000;
|
||||
|
||||
//! The events that were not able to be written to fmq right away
|
||||
std::queue<std::vector<Event>> mPendingWriteEventsQueue;
|
||||
|
||||
//! The mutex protecting writing to the fmq and the pending events queue
|
||||
std::mutex mEventQueueWriteMutex;
|
||||
|
||||
//! The condition variable waiting on pending write events to stack up
|
||||
std::condition_variable mEventQueueWriteCV;
|
||||
|
||||
//! The thread object ptr that handles pending writes
|
||||
std::thread mPendingWritesThread;
|
||||
|
||||
//! The bool indicating whether to end the pending writes background thread or not
|
||||
bool mPendingWritesRun = true;
|
||||
|
||||
/**
|
||||
* Initialize the list of SubHal objects in mSubHalList by reading from dynamic libraries
|
||||
* listed in a config file.
|
||||
@@ -210,6 +234,16 @@ class HalProxy : public ISensors, public IScopedWakelockRefCounter {
|
||||
*/
|
||||
void initializeSubHalCallbacksAndSensorList();
|
||||
|
||||
/**
|
||||
* Starts the thread that handles pending writes to event fmq.
|
||||
*
|
||||
* @param halProxy The HalProxy object pointer.
|
||||
*/
|
||||
static void startPendingWritesThread(HalProxy* halProxy);
|
||||
|
||||
//! Handles the pending writes on events to eventqueue.
|
||||
void handlePendingWrites();
|
||||
|
||||
/**
|
||||
* Clear direct channel flags if the HalProxy has already chosen a subhal as its direct channel
|
||||
* subhal. Set the directChannelSubHal pointer to the subHal passed in if this is the first
|
||||
|
||||
@@ -22,11 +22,10 @@
|
||||
#include "ScopedWakelock.h"
|
||||
#include "SensorsSubHal.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#undef LOG_TAG
|
||||
#define LOG_TAG "HalProxy_test"
|
||||
|
||||
namespace {
|
||||
|
||||
using ::android::hardware::hidl_vec;
|
||||
@@ -98,7 +97,7 @@ void testSensorsListForOneDirectChannelEnabledSubHal(const std::vector<SensorInf
|
||||
* Construct and return a HIDL Event type thats sensorHandle refers to a proximity sensor
|
||||
* which is a wakeup type sensor.
|
||||
*
|
||||
* @ return A proximity event.
|
||||
* @return A proximity event.
|
||||
*/
|
||||
Event makeProximityEvent();
|
||||
|
||||
@@ -106,10 +105,30 @@ Event makeProximityEvent();
|
||||
* Construct and return a HIDL Event type thats sensorHandle refers to a proximity sensor
|
||||
* which is a wakeup type sensor.
|
||||
*
|
||||
* @ return A proximity event.
|
||||
* @return A proximity event.
|
||||
*/
|
||||
Event makeAccelerometerEvent();
|
||||
|
||||
/**
|
||||
* Make a certain number of proximity type events with the sensorHandle field set to
|
||||
* the proper number for AllSensorsSubHal subhal type.
|
||||
*
|
||||
* @param numEvents The number of events to make.
|
||||
*
|
||||
* @return The created list of events.
|
||||
*/
|
||||
std::vector<Event> makeMultipleProximityEvents(size_t numEvents);
|
||||
|
||||
/**
|
||||
* Make a certain number of accelerometer type events with the sensorHandle field set to
|
||||
* the proper number for AllSensorsSubHal subhal type.
|
||||
*
|
||||
* @param numEvents The number of events to make.
|
||||
*
|
||||
* @return The created list of events.
|
||||
*/
|
||||
std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents);
|
||||
|
||||
// Tests follow
|
||||
TEST(HalProxyTest, GetSensorsListOneSubHalTest) {
|
||||
AllSensorsSubHal subHal;
|
||||
@@ -232,10 +251,7 @@ TEST(HalProxyTest, PostMultipleNonWakeupEvent) {
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events;
|
||||
for (size_t i = 0; i < kNumEvents; i++) {
|
||||
events.push_back(makeAccelerometerEvent());
|
||||
}
|
||||
std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
|
||||
subHal.postEvents(events, false /* wakeup */);
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
|
||||
@@ -272,15 +288,114 @@ TEST(HalProxyTest, PostMultipleWakeupEvents) {
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events;
|
||||
for (size_t i = 0; i < kNumEvents; i++) {
|
||||
events.push_back(makeProximityEvent());
|
||||
}
|
||||
std::vector<Event> events = makeMultipleProximityEvents(kNumEvents);
|
||||
subHal.postEvents(events, true /* wakeup */);
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
|
||||
}
|
||||
|
||||
TEST(HalProxyTest, PostEventsMultipleSubhals) {
|
||||
constexpr size_t kQueueSize = 5;
|
||||
constexpr size_t kNumEvents = 2;
|
||||
AllSensorsSubHal subHal1, subHal2;
|
||||
std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
|
||||
HalProxy proxy(subHals);
|
||||
std::unique_ptr<EventMessageQueue> eventQueue =
|
||||
std::make_unique<EventMessageQueue>(kQueueSize, true);
|
||||
std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
|
||||
std::make_unique<WakeupMessageQueue>(kQueueSize, true);
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
|
||||
subHal1.postEvents(events, false /* wakeup */);
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
|
||||
|
||||
subHal2.postEvents(events, false /* wakeup */);
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
|
||||
}
|
||||
|
||||
TEST(HalProxyTest, PostEventsDelayedWrite) {
|
||||
constexpr size_t kQueueSize = 5;
|
||||
constexpr size_t kNumEvents = 6;
|
||||
AllSensorsSubHal subHal1, subHal2;
|
||||
std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
|
||||
HalProxy proxy(subHals);
|
||||
std::unique_ptr<EventMessageQueue> eventQueue =
|
||||
std::make_unique<EventMessageQueue>(kQueueSize, true);
|
||||
std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
|
||||
std::make_unique<WakeupMessageQueue>(kQueueSize, true);
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
|
||||
subHal1.postEvents(events, false /* wakeup */);
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);
|
||||
|
||||
Event eventOut;
|
||||
// writeblock 1 event out of queue, timeout for half a second
|
||||
EXPECT_TRUE(eventQueue->readBlocking(&eventOut, 1, 500000000));
|
||||
|
||||
// Sleep for a half second so that blocking write has time complete in background thread
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
|
||||
// proxy background thread should have wrote last event when it saw space
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);
|
||||
}
|
||||
|
||||
TEST(HalProxyTest, PostEventsMultipleSubhalsThreaded) {
|
||||
constexpr size_t kQueueSize = 5;
|
||||
constexpr size_t kNumEvents = 2;
|
||||
AllSensorsSubHal subHal1, subHal2;
|
||||
std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
|
||||
HalProxy proxy(subHals);
|
||||
std::unique_ptr<EventMessageQueue> eventQueue =
|
||||
std::make_unique<EventMessageQueue>(kQueueSize, true);
|
||||
std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
|
||||
std::make_unique<WakeupMessageQueue>(kQueueSize, true);
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
|
||||
|
||||
std::thread t1(&AllSensorsSubHal::postEvents, &subHal1, events, false);
|
||||
std::thread t2(&AllSensorsSubHal::postEvents, &subHal2, events, false);
|
||||
|
||||
t1.join();
|
||||
t2.join();
|
||||
|
||||
EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
|
||||
}
|
||||
|
||||
TEST(HalProxyTest, DestructingWithEventsPendingOnBackgroundThreadTest) {
|
||||
constexpr size_t kQueueSize = 5;
|
||||
constexpr size_t kNumEvents = 6;
|
||||
AllSensorsSubHal subHal;
|
||||
std::vector<ISensorsSubHal*> subHals{&subHal};
|
||||
|
||||
std::unique_ptr<EventMessageQueue> eventQueue =
|
||||
std::make_unique<EventMessageQueue>(kQueueSize, true);
|
||||
std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
|
||||
std::make_unique<WakeupMessageQueue>(kQueueSize, true);
|
||||
::android::sp<ISensorsCallback> callback = new SensorsCallback();
|
||||
HalProxy proxy(subHals);
|
||||
proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
|
||||
|
||||
std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
|
||||
subHal.postEvents(events, false /* wakeup */);
|
||||
|
||||
// Sleep for a half second so that background thread has time to attempt it's blocking write
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
|
||||
// Should see a 5 second wait for blocking write timeout here
|
||||
|
||||
// Should be one events left on pending writes queue here and proxy will destruct
|
||||
// If this TEST completes then it was a success, if it hangs we will see a crash
|
||||
}
|
||||
|
||||
// Helper implementations follow
|
||||
void testSensorsListFromProxyAndSubHal(const std::vector<SensorInfo>& proxySensorsList,
|
||||
const std::vector<SensorInfo>& subHalSensorsList) {
|
||||
@@ -332,4 +447,20 @@ Event makeAccelerometerEvent() {
|
||||
return event;
|
||||
}
|
||||
|
||||
std::vector<Event> makeMultipleProximityEvents(size_t numEvents) {
|
||||
std::vector<Event> events;
|
||||
for (size_t i = 0; i < numEvents; i++) {
|
||||
events.push_back(makeProximityEvent());
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents) {
|
||||
std::vector<Event> events;
|
||||
for (size_t i = 0; i < numEvents; i++) {
|
||||
events.push_back(makeAccelerometerEvent());
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Reference in New Issue
Block a user