Add PendingRequestPool to handle pending requests.

PendingRequestPool would store all pending requests that we have
not yet got responses from hardware. If a request has been pending
for too long, the timout callback would be called and the request
would be removed.

Test: atest DefaultVehicleHalTest
Bug: 203713317
Change-Id: I4d7ae2c72b960347be70ac4cc8ce3d66eb8128f9
This commit is contained in:
Yu Shan
2021-10-26 15:22:28 -07:00
parent eac016a771
commit 54cfc5a102
6 changed files with 603 additions and 3 deletions

View File

@@ -154,7 +154,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {
return toInt(result.error());
}
void onSetValues(const std::vector<SetValueResult> results) {
void onSetValues(std::vector<SetValueResult> results) {
for (auto& result : results) {
mSetValueResults.push_back(result);
}
@@ -162,7 +162,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {
const std::vector<SetValueResult>& getSetValueResults() { return mSetValueResults; }
void onGetValues(const std::vector<GetValueResult> results) {
void onGetValues(std::vector<GetValueResult> results) {
for (auto& result : results) {
mGetValueResults.push_back(result);
}
@@ -170,7 +170,7 @@ class FakeVehicleHardwareTest : public ::testing::Test {
const std::vector<GetValueResult>& getGetValueResults() { return mGetValueResults; }
void onPropertyChangeEvent(const std::vector<VehiclePropValue>& values) {
void onPropertyChangeEvent(std::vector<VehiclePropValue> values) {
for (auto& value : values) {
mChangedProperties.push_back(value);
}

View File

@@ -56,6 +56,7 @@ cc_library {
srcs: [
"src/ConnectedClient.cpp",
"src/DefaultVehicleHal.cpp",
"src/PendingRequestPool.cpp",
],
static_libs: [
"VehicleHalUtils",

View File

@@ -88,12 +88,17 @@ class DefaultVehicleHal final : public ::aidl::android::hardware::automotive::ve
GetSetValuesClient<::aidl::android::hardware::automotive::vehicle::SetValueResult,
::aidl::android::hardware::automotive::vehicle::SetValueResults>;
// The default timeout of get or set value requests is 30s.
// TODO(b/214605968): define TIMEOUT_IN_NANO in IVehicle and allow getValues/setValues/subscribe
// to specify custom timeouts.
static constexpr int64_t TIMEOUT_IN_NANO = 30'000'000'000;
const std::unique_ptr<IVehicleHardware> mVehicleHardware;
// mConfigsByPropId and mConfigFile are only modified during initialization, so no need to
// lock guard them.
std::unordered_map<int32_t, ::aidl::android::hardware::automotive::vehicle::VehiclePropConfig>
mConfigsByPropId;
// Only modified in constructor, so thread-safe.
std::unique_ptr<::ndk::ScopedFileDescriptor> mConfigFile;
std::mutex mLock;

View File

@@ -0,0 +1,100 @@
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_
#define android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_
#include <android-base/result.h>
#include <android-base/thread_annotations.h>
#include <atomic>
#include <list>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>
namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
// A thread-safe pending request pool that tracks whether each request has timed-out.
class PendingRequestPool final {
public:
using TimeoutCallbackFunc = std::function<void(const std::unordered_set<int64_t>&)>;
explicit PendingRequestPool(int64_t timeoutInSec);
~PendingRequestPool();
// Adds a list of requests to the request pool.
// The clientId is the key for all the requests. It could be a number or an address to a data
// structure that represents a client. The caller must maintain this data structure.
// All the request IDs must be unique for one client, if any of the requestIds is duplicate with
// any pending request IDs for the client, this function returns error and no requests would be
// added. Otherwise, they would be added to the request pool.
// The callback would be called if requests are not finished within {@code mTimeoutInNano}
// seconds.
android::base::Result<void> addRequests(const void* clientId,
const std::unordered_set<int64_t>& requestIds,
std::shared_ptr<TimeoutCallbackFunc> callback);
// Checks whether the request is currently pending.
bool isRequestPending(const void* clientId, int64_t requestId) const;
// Tries to mark the requests as finished and remove them from the pool if the request is
// currently pending. Returns the list of request that is pending and has been finished
// successfully. This function would try to finish any valid requestIds even though some of the
// requestIds are not valid.
std::unordered_set<int64_t> tryFinishRequests(const void* clientId,
const std::unordered_set<int64_t>& requestIds);
// Returns how many pending requests in the pool, for testing purpose.
size_t countPendingRequests(const void* clientId) const;
private:
// The maximum number of pending requests allowed per client. If exceeds this number, adding
// more requests would fail. This is to prevent spamming from client.
static constexpr size_t MAX_PENDING_REQUEST_PER_CLIENT = 10000;
struct PendingRequest {
std::unordered_set<int64_t> requestIds;
int64_t timeoutTimestamp;
std::shared_ptr<TimeoutCallbackFunc> callback;
};
int64_t mTimeoutInNano;
mutable std::mutex mLock;
std::unordered_map<const void*, std::list<PendingRequest>> mPendingRequestsByClient
GUARDED_BY(mLock);
std::thread mThread;
std::atomic<bool> mThreadStop = false;
std::condition_variable mCv;
std::mutex mCvLock;
bool isRequestPendingLocked(const void* clientId, int64_t requestId) const REQUIRES(mLock);
// Checks whether the requests in the pool has timed-out, run periodically in a separate thread.
void checkTimeout();
};
} // namespace vehicle
} // namespace automotive
} // namespace hardware
} // namespace android
#endif // android_hardware_automotive_vehicle_aidl_impl_vhal_include_PendingRequestPool_H_

View File

@@ -0,0 +1,220 @@
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PendingRequestPool.h"
#include <VehicleHalTypes.h>
#include <VehicleUtils.h>
#include <utils/Log.h>
#include <utils/SystemClock.h>
#include <vector>
namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
namespace {
using ::aidl::android::hardware::automotive::vehicle::StatusCode;
using ::android::base::Error;
using ::android::base::Result;
// At least check every 1s.
constexpr int64_t CHECK_TIME_IN_NANO = 1000000000;
} // namespace
PendingRequestPool::PendingRequestPool(int64_t timeoutInNano)
: mTimeoutInNano(timeoutInNano), mThread([this] {
// [this] must be alive within this thread because destructor would wait for this thread
// to exit.
int64_t sleepTime = std::min(mTimeoutInNano, static_cast<int64_t>(CHECK_TIME_IN_NANO));
std::unique_lock<std::mutex> lk(mCvLock);
while (!mCv.wait_for(lk, std::chrono::nanoseconds(sleepTime),
[this] { return mThreadStop.load(); })) {
checkTimeout();
}
}) {}
PendingRequestPool::~PendingRequestPool() {
mThreadStop = true;
mCv.notify_all();
if (mThread.joinable()) {
mThread.join();
}
// If this pool is being destructed, send out all pending requests as timeout.
{
std::scoped_lock<std::mutex> lockGuard(mLock);
for (auto& [_, pendingRequests] : mPendingRequestsByClient) {
for (const auto& request : pendingRequests) {
(*request.callback)(request.requestIds);
}
}
mPendingRequestsByClient.clear();
}
}
Result<void> PendingRequestPool::addRequests(const void* clientId,
const std::unordered_set<int64_t>& requestIds,
std::shared_ptr<TimeoutCallbackFunc> callback) {
std::scoped_lock<std::mutex> lockGuard(mLock);
std::list<PendingRequest>* pendingRequests;
size_t pendingRequestCount = 0;
if (mPendingRequestsByClient.find(clientId) != mPendingRequestsByClient.end()) {
pendingRequests = &mPendingRequestsByClient[clientId];
for (const auto& pendingRequest : *pendingRequests) {
const auto& pendingRequestIds = pendingRequest.requestIds;
for (int64_t requestId : requestIds) {
if (pendingRequestIds.find(requestId) != pendingRequestIds.end()) {
return Error(toInt(StatusCode::INVALID_ARG))
<< "duplicate request ID: " << requestId;
}
}
pendingRequestCount += pendingRequestIds.size();
}
} else {
// Create a new empty list for this client.
pendingRequests = &mPendingRequestsByClient[clientId];
}
if (requestIds.size() > MAX_PENDING_REQUEST_PER_CLIENT - pendingRequestCount) {
return Error(toInt(StatusCode::TRY_AGAIN)) << "too many pending requests";
}
int64_t currentTime = elapsedRealtimeNano();
int64_t timeoutTimestamp = currentTime + mTimeoutInNano;
pendingRequests->push_back({
.requestIds = std::unordered_set<int64_t>(requestIds.begin(), requestIds.end()),
.timeoutTimestamp = timeoutTimestamp,
.callback = callback,
});
return {};
}
bool PendingRequestPool::isRequestPending(const void* clientId, int64_t requestId) const {
std::scoped_lock<std::mutex> lockGuard(mLock);
return isRequestPendingLocked(clientId, requestId);
}
size_t PendingRequestPool::countPendingRequests(const void* clientId) const {
std::scoped_lock<std::mutex> lockGuard(mLock);
auto it = mPendingRequestsByClient.find(clientId);
if (it == mPendingRequestsByClient.end()) {
return 0;
}
size_t count = 0;
for (const auto& pendingRequest : it->second) {
count += pendingRequest.requestIds.size();
}
return count;
}
bool PendingRequestPool::isRequestPendingLocked(const void* clientId, int64_t requestId) const {
auto it = mPendingRequestsByClient.find(clientId);
if (it == mPendingRequestsByClient.end()) {
return false;
}
for (const auto& pendingRequest : it->second) {
const auto& requestIds = pendingRequest.requestIds;
if (requestIds.find(requestId) != requestIds.end()) {
return true;
}
}
return false;
}
void PendingRequestPool::checkTimeout() {
std::vector<PendingRequest> timeoutRequests;
{
std::scoped_lock<std::mutex> lockGuard(mLock);
int64_t currentTime = elapsedRealtimeNano();
std::vector<const void*> clientsWithEmptyRequests;
for (auto& [clientId, pendingRequests] : mPendingRequestsByClient) {
auto it = pendingRequests.begin();
while (it != pendingRequests.end()) {
if (it->timeoutTimestamp >= currentTime) {
break;
}
timeoutRequests.push_back(std::move(*it));
it = pendingRequests.erase(it);
}
if (pendingRequests.empty()) {
clientsWithEmptyRequests.push_back(clientId);
}
}
for (const void* clientId : clientsWithEmptyRequests) {
mPendingRequestsByClient.erase(clientId);
}
}
// Call the callback outside the lock.
for (const auto& request : timeoutRequests) {
(*request.callback)(request.requestIds);
}
}
std::unordered_set<int64_t> PendingRequestPool::tryFinishRequests(
const void* clientId, const std::unordered_set<int64_t>& requestIds) {
std::scoped_lock<std::mutex> lockGuard(mLock);
std::unordered_set<int64_t> foundIds;
if (mPendingRequestsByClient.find(clientId) == mPendingRequestsByClient.end()) {
return foundIds;
}
auto& pendingRequests = mPendingRequestsByClient[clientId];
auto it = pendingRequests.begin();
while (it != pendingRequests.end()) {
auto& pendingRequestIds = it->requestIds;
for (int64_t requestId : requestIds) {
auto idIt = pendingRequestIds.find(requestId);
if (idIt == pendingRequestIds.end()) {
continue;
}
pendingRequestIds.erase(idIt);
foundIds.insert(requestId);
}
if (pendingRequestIds.empty()) {
it = pendingRequests.erase(it);
continue;
}
it++;
}
return foundIds;
}
} // namespace vehicle
} // namespace automotive
} // namespace hardware
} // namespace android

View File

@@ -0,0 +1,274 @@
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PendingRequestPool.h"
#include <VehicleHalTypes.h>
#include <VehicleUtils.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <unordered_set>
#include <vector>
namespace android {
namespace hardware {
namespace automotive {
namespace vehicle {
using ::aidl::android::hardware::automotive::vehicle::StatusCode;
using ::testing::ElementsAre;
using ::testing::UnorderedElementsAre;
using ::testing::WhenSorted;
class PendingRequestPoolTest : public ::testing::Test {
public:
void SetUp() override { mPool = std::make_unique<PendingRequestPool>(TEST_TIMEOUT); }
void TearDown() override {
if (mPool != nullptr) {
ASSERT_EQ(mPool->countPendingRequests(getTestClientId()), static_cast<size_t>(0))
<< "at least one pending request still exists in the pool when finish";
}
}
PendingRequestPool* getPool() { return mPool.get(); }
void destroyPool() { mPool.reset(); }
int64_t getTimeout() { return TEST_TIMEOUT; }
void* getTestClientId() { return reinterpret_cast<void*>(0); }
private:
// Test timeout is 0.1s.
static const int64_t TEST_TIMEOUT = 100000000;
std::unique_ptr<PendingRequestPool> mPool;
};
TEST_F(PendingRequestPoolTest, testFinishAllRequests) {
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
std::unordered_set<int64_t> requestIds;
for (int64_t i = 0; i < 10; i++) {
requestIds.insert(i);
}
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), requestIds, callback));
for (int64_t i = 0; i < 10; i++) {
ASSERT_TRUE(getPool()->isRequestPending(getTestClientId(), i));
}
for (int64_t i = 0; i < 10; i++) {
ASSERT_THAT(getPool()->tryFinishRequests(getTestClientId(), {i}), UnorderedElementsAre(i));
}
for (int64_t i = 0; i < 10; i++) {
ASSERT_FALSE(getPool()->isRequestPending(getTestClientId(), i));
}
}
TEST_F(PendingRequestPoolTest, testFinishHalfOfRequest) {
int64_t timeout = getTimeout();
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
std::unordered_set<int64_t> requestIds;
for (int64_t i = 0; i < 10; i++) {
requestIds.insert(i);
}
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), requestIds, callback));
for (int64_t i = 0; i < 10; i++) {
ASSERT_TRUE(getPool()->isRequestPending(getTestClientId(), i));
}
// Finish half of the requests.
requestIds.clear();
for (int64_t i = 0; i < 5; i++) {
requestIds.insert(i);
}
ASSERT_EQ(getPool()->tryFinishRequests(getTestClientId(), requestIds), requestIds);
for (int64_t i = 0; i < 5; i++) {
ASSERT_FALSE(getPool()->isRequestPending(getTestClientId(), i));
}
for (int64_t i = 5; i < 10; i++) {
ASSERT_TRUE(getPool()->isRequestPending(getTestClientId(), i));
}
// Wait until the unfinished requests timeout. The check interval is timeout, so at max we
// would wait an additional interval, which is 2 * timeout until the callback is called.
std::this_thread::sleep_for(2 * std::chrono::nanoseconds(timeout));
ASSERT_THAT(timeoutRequestIds, WhenSorted(ElementsAre(5, 6, 7, 8, 9)));
}
TEST_F(PendingRequestPoolTest, testFinishRequestTwice) {
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), {0}, callback));
ASSERT_THAT(getPool()->tryFinishRequests(getTestClientId(), {0}), UnorderedElementsAre(0))
<< "failed to finish an added request";
ASSERT_TRUE(getPool()->tryFinishRequests(getTestClientId(), {0}).empty())
<< "finish a request second time must return empty result";
}
TEST_F(PendingRequestPoolTest, testFinishRequestNonExistingId) {
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), {0, 1, 2}, callback));
ASSERT_THAT(getPool()->tryFinishRequests(getTestClientId(), {0, 1, 2, 3}),
UnorderedElementsAre(0, 1, 2))
<< "finished request IDs must not contain non-existing request ID";
// Even though one of the request to finish does not exist, the rest of the requests should be
// finished.
ASSERT_EQ(getPool()->countPendingRequests(getTestClientId()), static_cast<size_t>(0))
<< "requests not being finished correctly";
}
TEST_F(PendingRequestPoolTest, testFinishAfterTimeout) {
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), {0}, callback));
std::this_thread::sleep_for(2 * std::chrono::nanoseconds(getTimeout()));
ASSERT_TRUE(getPool()->tryFinishRequests(getTestClientId(), {0}).empty())
<< "finish a request after timeout must do nothing";
}
TEST_F(PendingRequestPoolTest, testDestroyWithPendingRequests) {
std::mutex lock;
std::vector<int64_t> timeoutRequestIds;
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[&lock, &timeoutRequestIds](const std::unordered_set<int64_t>& requests) {
std::scoped_lock<std::mutex> lockGuard(lock);
for (int64_t request : requests) {
timeoutRequestIds.push_back(request);
}
});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), {0}, callback));
destroyPool();
// Before the pool is destroyed, the pending requests should be notified as timeout.
ASSERT_THAT(timeoutRequestIds, UnorderedElementsAre(0))
<< "timeout not triggered when the pool is destroyed";
}
TEST_F(PendingRequestPoolTest, testDuplicateRequestId) {
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[](std::unordered_set<int64_t>) {});
ASSERT_RESULT_OK(getPool()->addRequests(getTestClientId(), {0}, callback));
ASSERT_FALSE(getPool()->addRequests(getTestClientId(), {1, 2, 0}, callback).ok())
<< "adding duplicate request IDs must fail";
ASSERT_THAT(getPool()->tryFinishRequests(getTestClientId(), {0}), UnorderedElementsAre(0));
}
TEST_F(PendingRequestPoolTest, testSameRequestIdForDifferentClient) {
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[](std::unordered_set<int64_t>) {});
ASSERT_RESULT_OK(getPool()->addRequests(reinterpret_cast<void*>(0), {0}, callback));
ASSERT_RESULT_OK(getPool()->addRequests(reinterpret_cast<void*>(1), {1, 2, 0}, callback));
ASSERT_THAT(getPool()->tryFinishRequests(reinterpret_cast<void*>(0), {0}),
UnorderedElementsAre(0));
ASSERT_THAT(getPool()->tryFinishRequests(reinterpret_cast<void*>(1), {1, 2, 0}),
UnorderedElementsAre(0, 1, 2));
}
TEST_F(PendingRequestPoolTest, testPendingRequestCountLimit) {
auto callback = std::make_shared<PendingRequestPool::TimeoutCallbackFunc>(
[](std::unordered_set<int64_t>) {});
std::unordered_set<int64_t> requests;
// MAX_PENDING_REQUEST_PER_CLIENT = 10000
for (size_t i = 0; i < 10000; i++) {
requests.insert(static_cast<int64_t>(i));
}
ASSERT_RESULT_OK(getPool()->addRequests(reinterpret_cast<void*>(0), requests, callback));
auto result = getPool()->addRequests(reinterpret_cast<void*>(0), {static_cast<int64_t>(10000)},
callback);
ASSERT_FALSE(result.ok()) << "adding more pending requests than limit must fail";
ASSERT_EQ(result.error().code(), toInt(StatusCode::TRY_AGAIN));
getPool()->tryFinishRequests(reinterpret_cast<void*>(0), requests);
}
} // namespace vehicle
} // namespace automotive
} // namespace hardware
} // namespace android