From abd92c1fcf0ff9712145fcd1ef988754be1c152e Mon Sep 17 00:00:00 2001 From: Yu Shan Date: Wed, 3 Jul 2024 16:13:18 -0700 Subject: [PATCH] Use AAOS side timestamp for VehiclePropValue. Update the timestamp set by the host-side VHAL proxy server with the Android-side timestamp at AAOS VHAL side. This makes sure that we always expose VehiclePropValue that is perfectly synced with Android elapsedRealtimeNano. This CL also adds logic to deal with cases when a property update event or a get value result is outdated. This CL updates the unit test to cover more cases and remove the flaky test case that requires starting a local GRPC server. Flag: EXEMPT HAL change Test: atest GRPCVehicleHardwareUnitTest Bug: 349678711 Change-Id: I5e2c07e77869f7286a438cb2a04d1b6c130c3c36 --- automotive/vehicle/TEST_MAPPING | 3 + .../aidl/impl/grpc/GRPCVehicleHardware.cpp | 220 ++++++++--- .../aidl/impl/grpc/GRPCVehicleHardware.h | 32 +- .../grpc/test/GRPCVehicleHardwareUnitTest.cpp | 364 ++++++++++++++---- 4 files changed, 482 insertions(+), 137 deletions(-) diff --git a/automotive/vehicle/TEST_MAPPING b/automotive/vehicle/TEST_MAPPING index 77629a9d07..d848774a9a 100644 --- a/automotive/vehicle/TEST_MAPPING +++ b/automotive/vehicle/TEST_MAPPING @@ -44,6 +44,9 @@ { "name": "FakeVehicleHardwareTest" }, + { + "name": "GRPCVehicleHardwareUnitTest" + }, { "name": "CarServiceUnitTest", "options" : [ diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp index f44573ac8e..73bb521ddb 100644 --- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp +++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -28,11 +29,16 @@ namespace android::hardware::automotive::vehicle::virtualization { -static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { - // TODO(chenhaosjtuacm): get secured credentials here +namespace { + +constexpr size_t MAX_RETRY_COUNT = 5; + +std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { return ::grpc::InsecureChannelCredentials(); } +} // namespace + GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) : mServiceAddr(std::move(service_addr)), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), @@ -40,11 +46,13 @@ GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) mValuePollingThread([this] { ValuePollingLoop(); }) {} // Only used for unit testing. -GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr stub) - : mServiceAddr(""), - mGrpcChannel(nullptr), - mGrpcStub(std::move(stub)), - mValuePollingThread([] {}) {} +GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr stub, + bool startValuePollingLoop) + : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) { + if (startValuePollingLoop) { + mValuePollingThread = std::thread([this] { ValuePollingLoop(); }); + } +} GRPCVehicleHardware::~GRPCVehicleHardware() { { @@ -52,7 +60,9 @@ GRPCVehicleHardware::~GRPCVehicleHardware() { mShuttingDownFlag.store(true); } mShutdownCV.notify_all(); - mValuePollingThread.join(); + if (mValuePollingThread.joinable()) { + mValuePollingThread.join(); + } } std::vector GRPCVehicleHardware::getAllPropertyConfigs() const { @@ -109,36 +119,117 @@ aidlvhal::StatusCode GRPCVehicleHardware::setValues( aidlvhal::StatusCode GRPCVehicleHardware::getValues( std::shared_ptr callback, const std::vector& requests) const { - ::grpc::ClientContext context; + std::vector results; + auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0); + if (status != aidlvhal::StatusCode::OK) { + return status; + } + if (!results.empty()) { + (*callback)(std::move(results)); + } + return status; +} + +aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry( + const std::vector& requests, + std::vector* results, size_t retryCount) const { + if (retryCount == MAX_RETRY_COUNT) { + LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after " + << retryCount << " retries"; + return aidlvhal::StatusCode::TRY_AGAIN; + } + proto::VehiclePropValueRequests protoRequests; - proto::GetValueResults protoResults; + std::unordered_map requestById; for (const auto& request : requests) { auto& protoRequest = *protoRequests.add_requests(); protoRequest.set_request_id(request.requestId); proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value()); + requestById[request.requestId] = &request; } + // TODO(chenhaosjtuacm): Make it Async. + ::grpc::ClientContext context; + proto::GetValueResults protoResults; auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } - std::vector results; + + std::vector retryRequests; for (const auto& protoResult : protoResults.results()) { - auto& result = results.emplace_back(); - result.requestId = protoResult.request_id(); - result.status = static_cast(protoResult.status()); - if (protoResult.has_value()) { - aidlvhal::VehiclePropValue value; - proto_msg_converter::protoToAidl(protoResult.value(), &value); - result.prop = std::move(value); + int64_t requestId = protoResult.request_id(); + auto it = requestById.find(requestId); + if (it == requestById.end()) { + LOG(ERROR) << __func__ + << "Invalid getValue request with unknown request ID: " << requestId + << ", ignore"; + continue; } + + if (!protoResult.has_value()) { + auto& result = results->emplace_back(); + result.requestId = requestId; + result.status = static_cast(protoResult.status()); + continue; + } + + aidlvhal::VehiclePropValue value; + proto_msg_converter::protoToAidl(protoResult.value(), &value); + + // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset + // the timestamp. + // TODO(b/350822044): Remove this once we use timestamp from proxy server. + if (!setAndroidTimestamp(&value)) { + // This is a rare case when we receive a property update event reflecting a new value + // for the property before we receive the get value result. This means that the result + // is already outdated, hence we should retry getting the latest value again. + LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop + << " areaId: " << value.areaId << " is oudated, retry"; + retryRequests.push_back(*(it->second)); + continue; + } + + auto& result = results->emplace_back(); + result.requestId = requestId; + result.status = static_cast(protoResult.status()); + result.prop = std::move(value); + } + + if (retryRequests.size() != 0) { + return getValuesWithRetry(retryRequests, results, retryCount++); } - (*callback)(std::move(results)); return aidlvhal::StatusCode::OK; } +bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const { + PropIdAreaId propIdAreaId = { + .propId = propValue->prop, + .areaId = propValue->areaId, + }; + int64_t now = elapsedRealtimeNano(); + int64_t externalTimestamp = propValue->timestamp; + + { + std::lock_guard lck(mLatestUpdateTimestampsMutex); + auto it = mLatestUpdateTimestamps.find(propIdAreaId); + if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) { + mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp; + mLatestUpdateTimestamps[propIdAreaId].second = now; + propValue->timestamp = now; + return true; + } + if (externalTimestamp == (it->second).first) { + propValue->timestamp = (it->second).second; + return true; + } + } + // externalTimestamp < (it->second).first, the value is outdated. + return false; +} + void GRPCVehicleHardware::registerOnPropertyChangeEvent( std::unique_ptr callback) { std::lock_guard lck(mCallbackMutex); @@ -248,46 +339,61 @@ bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { void GRPCVehicleHardware::ValuePollingLoop() { while (!mShuttingDownFlag.load()) { - ::grpc::ClientContext context; - - bool rpc_stopped{false}; - std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { - std::unique_lock lck(mShutdownMutex); - mShutdownCV.wait(lck, [this, &rpc_stopped]() { - return rpc_stopped || mShuttingDownFlag.load(); - }); - context.TryCancel(); - }); - - auto value_stream = - mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); - LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; - proto::VehiclePropValues protoValues; - while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { - std::vector values; - for (const auto protoValue : protoValues.values()) { - values.push_back(aidlvhal::VehiclePropValue()); - proto_msg_converter::protoToAidl(protoValue, &values.back()); - } - std::shared_lock lck(mCallbackMutex); - if (mOnPropChange) { - (*mOnPropChange)(values); - } - } - - { - std::lock_guard lck(mShutdownMutex); - rpc_stopped = true; - } - mShutdownCV.notify_all(); - shuttingdown_watcher.join(); - - auto grpc_status = value_stream->Finish(); - // never reach here until connection lost - LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); - + pollValue(); // try to reconnect } } +void GRPCVehicleHardware::pollValue() { + ::grpc::ClientContext context; + + bool rpc_stopped{false}; + std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { + std::unique_lock lck(mShutdownMutex); + mShutdownCV.wait( + lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); }); + context.TryCancel(); + }); + + auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); + LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; + proto::VehiclePropValues protoValues; + while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { + std::vector values; + for (const auto protoValue : protoValues.values()) { + aidlvhal::VehiclePropValue aidlValue = {}; + proto_msg_converter::protoToAidl(protoValue, &aidlValue); + + // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to + // reset the timestamp. + // TODO(b/350822044): Remove this once we use timestamp from proxy server. + if (!setAndroidTimestamp(&aidlValue)) { + LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop + << " areaId: " << aidlValue.areaId << " is outdated, ignore"; + continue; + } + + values.push_back(std::move(aidlValue)); + } + if (values.empty()) { + continue; + } + std::shared_lock lck(mCallbackMutex); + if (mOnPropChange) { + (*mOnPropChange)(values); + } + } + + { + std::lock_guard lck(mShutdownMutex); + rpc_stopped = true; + } + mShutdownCV.notify_all(); + shuttingdown_watcher.join(); + + auto grpc_status = value_stream->Finish(); + // never reach here until connection lost + LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); +} + } // namespace android::hardware::automotive::vehicle::virtualization diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h index 9750f621e9..1edf6580aa 100644 --- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h +++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" @@ -33,6 +34,7 @@ #include #include #include +#include #include namespace android::hardware::automotive::vehicle::virtualization { @@ -43,9 +45,6 @@ class GRPCVehicleHardware : public IVehicleHardware { public: explicit GRPCVehicleHardware(std::string service_addr); - // Only used for unit testing. - explicit GRPCVehicleHardware(std::unique_ptr stub); - ~GRPCVehicleHardware(); // Get all the property configs. @@ -94,7 +93,7 @@ class GRPCVehicleHardware : public IVehicleHardware { std::unique_ptr mOnPropChange; private: - void ValuePollingLoop(); + friend class GRPCVehicleHardwareUnitTest; std::string mServiceAddr; std::shared_ptr<::grpc::Channel> mGrpcChannel; @@ -106,6 +105,31 @@ class GRPCVehicleHardware : public IVehicleHardware { std::mutex mShutdownMutex; std::condition_variable mShutdownCV; std::atomic mShuttingDownFlag{false}; + + mutable std::mutex mLatestUpdateTimestampsMutex; + + // A map from [propId, areaId] to the latest timestamp this property is updated. + // The key is a tuple, the first element is the external timestamp (timestamp set by VHAL + // server), the second element is the Android timestamp (elapsedRealtimeNano). + mutable std::unordered_map, + PropIdAreaIdHash> mLatestUpdateTimestamps + GUARDED_BY(mLatestUpdateTimestampsMutex); + + // Only used for unit testing. + GRPCVehicleHardware(std::unique_ptr stub, + bool startValuePollingLoop); + + void ValuePollingLoop(); + void pollValue(); + + aidlvhal::StatusCode getValuesWithRetry(const std::vector& requests, + std::vector* results, + size_t retryCount) const; + + // Check the external timestamp of propValue against the latest updated external timestamp, if + // this is an outdated value, return false. Otherwise, update the external timestamp to the + // Android timestamp and return true. + bool setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const; }; } // namespace android::hardware::automotive::vehicle::virtualization diff --git a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp index 3bd7e0e56a..20af2311bc 100644 --- a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp +++ b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp @@ -19,8 +19,10 @@ #include #include +#include #include +#include #include #include #include @@ -31,98 +33,48 @@ namespace aidlvhal = ::aidl::android::hardware::automotive::vehicle; using ::testing::_; using ::testing::DoAll; +using ::testing::ElementsAre; using ::testing::NiceMock; using ::testing::Return; using ::testing::SaveArg; using ::testing::SetArgPointee; +using ::testing::SizeIs; + +using ::grpc::testing::MockClientReader; using proto::MockVehicleServerStub; -const std::string kFakeServerAddr = "0.0.0.0:54321"; - -class FakeVehicleServer : public proto::VehicleServer::Service { - public: - ::grpc::Status StartPropertyValuesStream( - ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, - ::grpc::ServerWriter* stream) override { - stream->Write(proto::VehiclePropValues()); - // A fake disconnection. - return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); - } - - // Functions that we do not care. - ::grpc::Status GetAllPropertyConfig( - ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, - ::grpc::ServerWriter* stream) override { - return ::grpc::Status::OK; - } - - ::grpc::Status SetValues(::grpc::ServerContext* context, - const proto::VehiclePropValueRequests* requests, - proto::SetValueResults* results) override { - return ::grpc::Status::OK; - } - - ::grpc::Status GetValues(::grpc::ServerContext* context, - const proto::VehiclePropValueRequests* requests, - proto::GetValueResults* results) override { - return ::grpc::Status::OK; - } -}; - -TEST(GRPCVehicleHardwareUnitTest, Reconnect) { - auto receivedUpdate = std::make_shared>(0); - auto vehicleHardware = std::make_unique(kFakeServerAddr); - vehicleHardware->registerOnPropertyChangeEvent( - std::make_unique( - [receivedUpdate](const auto&) { receivedUpdate->fetch_add(1); })); - - constexpr size_t kServerRestartTimes = 5; - for (size_t serverStart = 0; serverStart < kServerRestartTimes; ++serverStart) { - EXPECT_EQ(receivedUpdate->load(), 0); - auto fakeServer = std::make_unique(); - ::grpc::ServerBuilder builder; - builder.RegisterService(fakeServer.get()); - builder.AddListeningPort(kFakeServerAddr, ::grpc::InsecureServerCredentials()); - auto grpcServer = builder.BuildAndStart(); - - // Wait until the vehicle hardware received the second update (after one fake - // disconnection). - constexpr auto kMaxWaitTime = std::chrono::seconds(5); - auto startTime = std::chrono::steady_clock::now(); - while (receivedUpdate->load() <= 1 && - std::chrono::steady_clock::now() - startTime < kMaxWaitTime) - ; - - grpcServer->Shutdown(); - grpcServer->Wait(); - EXPECT_GT(receivedUpdate->load(), 1); - - // Reset for the next round. - receivedUpdate->store(0); - } -} - -class GRPCVehicleHardwareMockServerUnitTest : public ::testing::Test { +class GRPCVehicleHardwareUnitTest : public ::testing::Test { protected: NiceMock* mGrpcStub; std::unique_ptr mHardware; void SetUp() override { auto stub = std::make_unique>(); - ; mGrpcStub = stub.get(); - mHardware = std::make_unique(std::move(stub)); + // Cannot use make_unique here since the constructor is a private method. + mHardware = std::unique_ptr( + new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/false)); } void TearDown() override { mHardware.reset(); } + + // Access GRPCVehicleHardware private method. + void pollValue() { mHardware->pollValue(); } + + void startValuePollingLoop(std::unique_ptr stub) { + mHardware = std::unique_ptr( + new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/true)); + } + + void generatePropertyUpdateEvent(int32_t propId, int64_t timestamp); }; MATCHER_P(RepeatedInt32Eq, expected_values, "") { return std::vector(arg.begin(), arg.end()) == expected_values; } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, Subscribe) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribe) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::OK); proto::SubscribeRequest actualRequest; @@ -147,7 +99,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, Subscribe) { EXPECT_EQ(protoOptions.enable_variable_update_rate(), true); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeLegacyServer) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeLegacyServer) { EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""))); @@ -157,7 +109,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeLegacyServer) { EXPECT_EQ(status, aidlvhal::StatusCode::OK); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeGrpcFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeGrpcFailure) { EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error"))); @@ -167,7 +119,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeGrpcFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeProtoFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeProtoFailure) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW); @@ -181,7 +133,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeProtoFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, Unsubscribe) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribe) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::OK); proto::UnsubscribeRequest actualRequest; @@ -199,7 +151,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, Unsubscribe) { EXPECT_EQ(actualRequest.area_id(), areaId); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeLegacyServer) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeLegacyServer) { EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""))); @@ -208,7 +160,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeLegacyServer) { EXPECT_EQ(status, aidlvhal::StatusCode::OK); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeGrpcFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeGrpcFailure) { EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error"))); @@ -217,7 +169,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeGrpcFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeProtoFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeProtoFailure) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW); @@ -230,4 +182,264 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeProtoFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW); } +TEST_F(GRPCVehicleHardwareUnitTest, TestPollValue) { + int64_t testTimestamp = 12345; + int32_t testPropId = 54321; + int64_t startTimestamp = elapsedRealtimeNano(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([testTimestamp, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp); + value->set_prop(testPropId); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + std::vector propertyEvents; + + mHardware->registerOnPropertyChangeEvent( + std::make_unique( + [&propertyEvents](const std::vector& events) { + for (const auto& event : events) { + propertyEvents.push_back(event); + } + })); + + pollValue(); + + ASSERT_THAT(propertyEvents, SizeIs(1)); + EXPECT_EQ(propertyEvents[0].prop, testPropId); + EXPECT_GT(propertyEvents[0].timestamp, startTimestamp) + << "Timestamp must be updated to Android timestamp"; + EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano()) + << "Timestamp must be updated to Android timestamp"; +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestPollValueIgnoreOutdatedValue) { + int64_t testTimestamp1 = 12345; + int32_t value1 = 1324; + int64_t testTimestamp2 = 12340; + int32_t value2 = 1423; + int32_t testPropId = 54321; + int64_t startTimestamp = elapsedRealtimeNano(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([testTimestamp1, value1, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp1); + value->set_prop(testPropId); + value->add_int32_values(value1); + return true; + }) + .WillOnce([testTimestamp2, value2, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + // This event is outdated, must be ignored. + auto value = values->add_values(); + value->set_timestamp(testTimestamp2); + value->set_prop(testPropId); + value->add_int32_values(value2); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + std::vector propertyEvents; + + mHardware->registerOnPropertyChangeEvent( + std::make_unique( + [&propertyEvents](const std::vector& events) { + for (const auto& event : events) { + propertyEvents.push_back(event); + } + })); + + pollValue(); + + ASSERT_THAT(propertyEvents, SizeIs(1)) << "Outdated event must be ignored"; + EXPECT_EQ(propertyEvents[0].prop, testPropId); + EXPECT_GT(propertyEvents[0].timestamp, startTimestamp); + EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano()); + EXPECT_THAT(propertyEvents[0].value.int32Values, ElementsAre(value1)); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestValuePollingLoop) { + int64_t testTimestamp = 12345; + int32_t testPropId = 54321; + auto stub = std::make_unique>(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*stub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillRepeatedly([testTimestamp, testPropId](proto::VehiclePropValues* values) { + // Sleep for 10ms and always return the same property event. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp); + value->set_prop(testPropId); + return true; + }); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + startValuePollingLoop(std::move(stub)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // This must stop the loop and wait for the thread to finish. + mHardware.reset(); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestGetValues) { + int64_t testRequestId = 1234; + int32_t testPropId = 4321; + int32_t testValue = 123456; + proto::VehiclePropValueRequests gotRequests; + EXPECT_CALL(*mGrpcStub, GetValues(_, _, _)) + .WillOnce([&gotRequests, testRequestId, testPropId, testValue]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + gotRequests = request; + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->add_int32_values(testValue); + return ::grpc::Status::OK; + }); + + std::vector requests; + requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId, + .prop = { + .prop = testPropId, + }}); + + std::vector gotResults; + + auto status = mHardware->getValues( + std::make_shared( + [&gotResults](std::vector results) { + for (const auto& result : results) { + gotResults.push_back(result); + } + }), + requests); + + ASSERT_EQ(status, aidlvhal::StatusCode::OK); + ASSERT_THAT(gotRequests.requests(), SizeIs(1)); + EXPECT_THAT(gotRequests.requests(0).request_id(), testRequestId); + EXPECT_THAT(gotRequests.requests(0).value().prop(), testPropId); + + ASSERT_THAT(gotResults, SizeIs(1)); + EXPECT_EQ(gotResults[0].requestId, testRequestId); + EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK); + EXPECT_EQ(gotResults[0].prop->prop, testPropId); + EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue)); +} + +void GRPCVehicleHardwareUnitTest::generatePropertyUpdateEvent(int32_t propId, int64_t timestamp) { + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([timestamp, propId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(timestamp); + value->set_prop(propId); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + pollValue(); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestGetValuesOutdatedRetry) { + int64_t startTimestamp = elapsedRealtimeNano(); + int64_t testRequestId = 1234; + int32_t testPropId = 4321; + int32_t testValue1 = 123456; + int32_t testValue2 = 654321; + int32_t testTimestamp1 = 1000; + int32_t testTimestamp2 = 2000; + + // A property update event for testTimestamp2 happens before getValues returns. + generatePropertyUpdateEvent(testPropId, testTimestamp2); + + // GetValues first returns an outdated result, then an up-to-date result. + EXPECT_CALL(*mGrpcStub, GetValues(_, _, _)) + .WillOnce([testRequestId, testPropId, testValue1, testTimestamp1]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->set_timestamp(testTimestamp1); + valuePtr->add_int32_values(testValue1); + return ::grpc::Status::OK; + }) + .WillOnce([testRequestId, testPropId, testValue2, testTimestamp2]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->set_timestamp(testTimestamp2); + valuePtr->add_int32_values(testValue2); + return ::grpc::Status::OK; + }); + + std::vector requests; + requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId, + .prop = { + .prop = testPropId, + }}); + + std::vector gotResults; + + auto status = mHardware->getValues( + std::make_shared( + [&gotResults](std::vector results) { + for (const auto& result : results) { + gotResults.push_back(result); + } + }), + requests); + + ASSERT_EQ(status, aidlvhal::StatusCode::OK); + ASSERT_THAT(gotResults, SizeIs(1)); + EXPECT_EQ(gotResults[0].requestId, testRequestId); + EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK); + EXPECT_EQ(gotResults[0].prop->prop, testPropId); + EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue2)); + EXPECT_GT(gotResults[0].prop->timestamp, startTimestamp); + EXPECT_LT(gotResults[0].prop->timestamp, elapsedRealtimeNano()); +} + } // namespace android::hardware::automotive::vehicle::virtualization