Wait for all results to come before return.

Wait for all results from hardware to return before returning
the results to the client. It is not guaranteed that one
callback contains all the results.

Flag: EXEMPT host side component
Test: atest CtsCarTestCases
Bug: 352182279
Change-Id: I01f65a4da391727dc94aff36b52c14f7459b8221
This commit is contained in:
Yu Shan
2024-07-11 15:11:29 -07:00
parent 249a9d065c
commit b561e444db

View File

@@ -69,10 +69,13 @@ GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAd
const proto::VehiclePropValueRequests* requests,
proto::SetValueResults* results) {
std::vector<aidlvhal::SetValueRequest> aidlRequests;
std::unordered_set<int64_t> requestIds;
for (const auto& protoRequest : requests->requests()) {
auto& aidlRequest = aidlRequests.emplace_back();
aidlRequest.requestId = protoRequest.request_id();
int64_t requestId = protoRequest.request_id();
aidlRequest.requestId = requestId;
proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.value);
requestIds.insert(requestId);
}
auto waitMtx = std::make_shared<std::mutex>();
auto waitCV = std::make_shared<std::condition_variable>();
@@ -80,19 +83,27 @@ GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAd
auto tmpResults = std::make_shared<proto::SetValueResults>();
auto aidlStatus = mHardware->setValues(
std::make_shared<const IVehicleHardware::SetValuesCallback>(
[waitMtx, waitCV, complete,
tmpResults](std::vector<aidlvhal::SetValueResult> setValueResults) {
for (const auto& aidlResult : setValueResults) {
auto& protoResult = *tmpResults->add_results();
protoResult.set_request_id(aidlResult.requestId);
protoResult.set_status(
static_cast<proto::StatusCode>(aidlResult.status));
}
[waitMtx, waitCV, complete, tmpResults,
&requestIds](std::vector<aidlvhal::SetValueResult> setValueResults) {
bool receivedAllResults = false;
{
std::lock_guard lck(*waitMtx);
*complete = true;
for (const auto& aidlResult : setValueResults) {
auto& protoResult = *tmpResults->add_results();
int64_t requestIdForResult = aidlResult.requestId;
protoResult.set_request_id(requestIdForResult);
protoResult.set_status(
static_cast<proto::StatusCode>(aidlResult.status));
requestIds.erase(requestIdForResult);
}
if (requestIds.empty()) {
receivedAllResults = true;
*complete = true;
}
}
if (receivedAllResults) {
waitCV->notify_all();
}
waitCV->notify_all();
}),
aidlRequests);
if (aidlStatus != aidlvhal::StatusCode::OK) {
@@ -114,10 +125,13 @@ GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAd
const proto::VehiclePropValueRequests* requests,
proto::GetValueResults* results) {
std::vector<aidlvhal::GetValueRequest> aidlRequests;
std::unordered_set<int64_t> requestIds;
for (const auto& protoRequest : requests->requests()) {
auto& aidlRequest = aidlRequests.emplace_back();
aidlRequest.requestId = protoRequest.request_id();
int64_t requestId = protoRequest.request_id();
aidlRequest.requestId = requestId;
proto_msg_converter::protoToAidl(protoRequest.value(), &aidlRequest.prop);
requestIds.insert(requestId);
}
auto waitMtx = std::make_shared<std::mutex>();
auto waitCV = std::make_shared<std::condition_variable>();
@@ -125,23 +139,31 @@ GrpcVehicleProxyServer::GrpcVehicleProxyServer(std::vector<std::string> serverAd
auto tmpResults = std::make_shared<proto::GetValueResults>();
auto aidlStatus = mHardware->getValues(
std::make_shared<const IVehicleHardware::GetValuesCallback>(
[waitMtx, waitCV, complete,
tmpResults](std::vector<aidlvhal::GetValueResult> getValueResults) {
for (const auto& aidlResult : getValueResults) {
auto& protoResult = *tmpResults->add_results();
protoResult.set_request_id(aidlResult.requestId);
protoResult.set_status(
static_cast<proto::StatusCode>(aidlResult.status));
if (aidlResult.prop) {
auto* valuePtr = protoResult.mutable_value();
proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
}
}
[waitMtx, waitCV, complete, tmpResults,
&requestIds](std::vector<aidlvhal::GetValueResult> getValueResults) {
bool receivedAllResults = false;
{
std::lock_guard lck(*waitMtx);
*complete = true;
for (const auto& aidlResult : getValueResults) {
auto& protoResult = *tmpResults->add_results();
int64_t requestIdForResult = aidlResult.requestId;
protoResult.set_request_id(requestIdForResult);
protoResult.set_status(
static_cast<proto::StatusCode>(aidlResult.status));
if (aidlResult.prop) {
auto* valuePtr = protoResult.mutable_value();
proto_msg_converter::aidlToProto(*aidlResult.prop, valuePtr);
}
requestIds.erase(requestIdForResult);
}
if (requestIds.empty()) {
receivedAllResults = true;
*complete = true;
}
}
if (receivedAllResults) {
waitCV->notify_all();
}
waitCV->notify_all();
}),
aidlRequests);
if (aidlStatus != aidlvhal::StatusCode::OK) {