diff --git a/automotive/vehicle/TEST_MAPPING b/automotive/vehicle/TEST_MAPPING index 77629a9d07..d848774a9a 100644 --- a/automotive/vehicle/TEST_MAPPING +++ b/automotive/vehicle/TEST_MAPPING @@ -44,6 +44,9 @@ { "name": "FakeVehicleHardwareTest" }, + { + "name": "GRPCVehicleHardwareUnitTest" + }, { "name": "CarServiceUnitTest", "options" : [ diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp index f44573ac8e..73bb521ddb 100644 --- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp +++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -28,11 +29,16 @@ namespace android::hardware::automotive::vehicle::virtualization { -static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { - // TODO(chenhaosjtuacm): get secured credentials here +namespace { + +constexpr size_t MAX_RETRY_COUNT = 5; + +std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { return ::grpc::InsecureChannelCredentials(); } +} // namespace + GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) : mServiceAddr(std::move(service_addr)), mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), @@ -40,11 +46,13 @@ GRPCVehicleHardware::GRPCVehicleHardware(std::string service_addr) mValuePollingThread([this] { ValuePollingLoop(); }) {} // Only used for unit testing. -GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr stub) - : mServiceAddr(""), - mGrpcChannel(nullptr), - mGrpcStub(std::move(stub)), - mValuePollingThread([] {}) {} +GRPCVehicleHardware::GRPCVehicleHardware(std::unique_ptr stub, + bool startValuePollingLoop) + : mServiceAddr(""), mGrpcChannel(nullptr), mGrpcStub(std::move(stub)) { + if (startValuePollingLoop) { + mValuePollingThread = std::thread([this] { ValuePollingLoop(); }); + } +} GRPCVehicleHardware::~GRPCVehicleHardware() { { @@ -52,7 +60,9 @@ GRPCVehicleHardware::~GRPCVehicleHardware() { mShuttingDownFlag.store(true); } mShutdownCV.notify_all(); - mValuePollingThread.join(); + if (mValuePollingThread.joinable()) { + mValuePollingThread.join(); + } } std::vector GRPCVehicleHardware::getAllPropertyConfigs() const { @@ -109,36 +119,117 @@ aidlvhal::StatusCode GRPCVehicleHardware::setValues( aidlvhal::StatusCode GRPCVehicleHardware::getValues( std::shared_ptr callback, const std::vector& requests) const { - ::grpc::ClientContext context; + std::vector results; + auto status = getValuesWithRetry(requests, &results, /*retryCount=*/0); + if (status != aidlvhal::StatusCode::OK) { + return status; + } + if (!results.empty()) { + (*callback)(std::move(results)); + } + return status; +} + +aidlvhal::StatusCode GRPCVehicleHardware::getValuesWithRetry( + const std::vector& requests, + std::vector* results, size_t retryCount) const { + if (retryCount == MAX_RETRY_COUNT) { + LOG(ERROR) << __func__ << ": GRPC GetValues Failed, failed to get the latest value after " + << retryCount << " retries"; + return aidlvhal::StatusCode::TRY_AGAIN; + } + proto::VehiclePropValueRequests protoRequests; - proto::GetValueResults protoResults; + std::unordered_map requestById; for (const auto& request : requests) { auto& protoRequest = *protoRequests.add_requests(); protoRequest.set_request_id(request.requestId); proto_msg_converter::aidlToProto(request.prop, protoRequest.mutable_value()); + requestById[request.requestId] = &request; } + // TODO(chenhaosjtuacm): Make it Async. + ::grpc::ClientContext context; + proto::GetValueResults protoResults; auto grpc_status = mGrpcStub->GetValues(&context, protoRequests, &protoResults); if (!grpc_status.ok()) { LOG(ERROR) << __func__ << ": GRPC GetValues Failed: " << grpc_status.error_message(); return aidlvhal::StatusCode::INTERNAL_ERROR; } - std::vector results; + + std::vector retryRequests; for (const auto& protoResult : protoResults.results()) { - auto& result = results.emplace_back(); - result.requestId = protoResult.request_id(); - result.status = static_cast(protoResult.status()); - if (protoResult.has_value()) { - aidlvhal::VehiclePropValue value; - proto_msg_converter::protoToAidl(protoResult.value(), &value); - result.prop = std::move(value); + int64_t requestId = protoResult.request_id(); + auto it = requestById.find(requestId); + if (it == requestById.end()) { + LOG(ERROR) << __func__ + << "Invalid getValue request with unknown request ID: " << requestId + << ", ignore"; + continue; } + + if (!protoResult.has_value()) { + auto& result = results->emplace_back(); + result.requestId = requestId; + result.status = static_cast(protoResult.status()); + continue; + } + + aidlvhal::VehiclePropValue value; + proto_msg_converter::protoToAidl(protoResult.value(), &value); + + // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to reset + // the timestamp. + // TODO(b/350822044): Remove this once we use timestamp from proxy server. + if (!setAndroidTimestamp(&value)) { + // This is a rare case when we receive a property update event reflecting a new value + // for the property before we receive the get value result. This means that the result + // is already outdated, hence we should retry getting the latest value again. + LOG(WARNING) << __func__ << "getValue result for propId: " << value.prop + << " areaId: " << value.areaId << " is oudated, retry"; + retryRequests.push_back(*(it->second)); + continue; + } + + auto& result = results->emplace_back(); + result.requestId = requestId; + result.status = static_cast(protoResult.status()); + result.prop = std::move(value); + } + + if (retryRequests.size() != 0) { + return getValuesWithRetry(retryRequests, results, retryCount++); } - (*callback)(std::move(results)); return aidlvhal::StatusCode::OK; } +bool GRPCVehicleHardware::setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const { + PropIdAreaId propIdAreaId = { + .propId = propValue->prop, + .areaId = propValue->areaId, + }; + int64_t now = elapsedRealtimeNano(); + int64_t externalTimestamp = propValue->timestamp; + + { + std::lock_guard lck(mLatestUpdateTimestampsMutex); + auto it = mLatestUpdateTimestamps.find(propIdAreaId); + if (it == mLatestUpdateTimestamps.end() || externalTimestamp > (it->second).first) { + mLatestUpdateTimestamps[propIdAreaId].first = externalTimestamp; + mLatestUpdateTimestamps[propIdAreaId].second = now; + propValue->timestamp = now; + return true; + } + if (externalTimestamp == (it->second).first) { + propValue->timestamp = (it->second).second; + return true; + } + } + // externalTimestamp < (it->second).first, the value is outdated. + return false; +} + void GRPCVehicleHardware::registerOnPropertyChangeEvent( std::unique_ptr callback) { std::lock_guard lck(mCallbackMutex); @@ -248,46 +339,61 @@ bool GRPCVehicleHardware::waitForConnected(std::chrono::milliseconds waitTime) { void GRPCVehicleHardware::ValuePollingLoop() { while (!mShuttingDownFlag.load()) { - ::grpc::ClientContext context; - - bool rpc_stopped{false}; - std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { - std::unique_lock lck(mShutdownMutex); - mShutdownCV.wait(lck, [this, &rpc_stopped]() { - return rpc_stopped || mShuttingDownFlag.load(); - }); - context.TryCancel(); - }); - - auto value_stream = - mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); - LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; - proto::VehiclePropValues protoValues; - while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { - std::vector values; - for (const auto protoValue : protoValues.values()) { - values.push_back(aidlvhal::VehiclePropValue()); - proto_msg_converter::protoToAidl(protoValue, &values.back()); - } - std::shared_lock lck(mCallbackMutex); - if (mOnPropChange) { - (*mOnPropChange)(values); - } - } - - { - std::lock_guard lck(mShutdownMutex); - rpc_stopped = true; - } - mShutdownCV.notify_all(); - shuttingdown_watcher.join(); - - auto grpc_status = value_stream->Finish(); - // never reach here until connection lost - LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); - + pollValue(); // try to reconnect } } +void GRPCVehicleHardware::pollValue() { + ::grpc::ClientContext context; + + bool rpc_stopped{false}; + std::thread shuttingdown_watcher([this, &rpc_stopped, &context]() { + std::unique_lock lck(mShutdownMutex); + mShutdownCV.wait( + lck, [this, &rpc_stopped]() { return rpc_stopped || mShuttingDownFlag.load(); }); + context.TryCancel(); + }); + + auto value_stream = mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); + LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; + proto::VehiclePropValues protoValues; + while (!mShuttingDownFlag.load() && value_stream->Read(&protoValues)) { + std::vector values; + for (const auto protoValue : protoValues.values()) { + aidlvhal::VehiclePropValue aidlValue = {}; + proto_msg_converter::protoToAidl(protoValue, &aidlValue); + + // VHAL proxy server uses a different timestamp then AAOS timestamp, so we have to + // reset the timestamp. + // TODO(b/350822044): Remove this once we use timestamp from proxy server. + if (!setAndroidTimestamp(&aidlValue)) { + LOG(WARNING) << __func__ << ": property event for propId: " << aidlValue.prop + << " areaId: " << aidlValue.areaId << " is outdated, ignore"; + continue; + } + + values.push_back(std::move(aidlValue)); + } + if (values.empty()) { + continue; + } + std::shared_lock lck(mCallbackMutex); + if (mOnPropChange) { + (*mOnPropChange)(values); + } + } + + { + std::lock_guard lck(mShutdownMutex); + rpc_stopped = true; + } + mShutdownCV.notify_all(); + shuttingdown_watcher.join(); + + auto grpc_status = value_stream->Finish(); + // never reach here until connection lost + LOG(ERROR) << __func__ << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); +} + } // namespace android::hardware::automotive::vehicle::virtualization diff --git a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h index 9750f621e9..1edf6580aa 100644 --- a/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h +++ b/automotive/vehicle/aidl/impl/grpc/GRPCVehicleHardware.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "VehicleServer.grpc.pb.h" #include "VehicleServer.pb.h" @@ -33,6 +34,7 @@ #include #include #include +#include #include namespace android::hardware::automotive::vehicle::virtualization { @@ -43,9 +45,6 @@ class GRPCVehicleHardware : public IVehicleHardware { public: explicit GRPCVehicleHardware(std::string service_addr); - // Only used for unit testing. - explicit GRPCVehicleHardware(std::unique_ptr stub); - ~GRPCVehicleHardware(); // Get all the property configs. @@ -94,7 +93,7 @@ class GRPCVehicleHardware : public IVehicleHardware { std::unique_ptr mOnPropChange; private: - void ValuePollingLoop(); + friend class GRPCVehicleHardwareUnitTest; std::string mServiceAddr; std::shared_ptr<::grpc::Channel> mGrpcChannel; @@ -106,6 +105,31 @@ class GRPCVehicleHardware : public IVehicleHardware { std::mutex mShutdownMutex; std::condition_variable mShutdownCV; std::atomic mShuttingDownFlag{false}; + + mutable std::mutex mLatestUpdateTimestampsMutex; + + // A map from [propId, areaId] to the latest timestamp this property is updated. + // The key is a tuple, the first element is the external timestamp (timestamp set by VHAL + // server), the second element is the Android timestamp (elapsedRealtimeNano). + mutable std::unordered_map, + PropIdAreaIdHash> mLatestUpdateTimestamps + GUARDED_BY(mLatestUpdateTimestampsMutex); + + // Only used for unit testing. + GRPCVehicleHardware(std::unique_ptr stub, + bool startValuePollingLoop); + + void ValuePollingLoop(); + void pollValue(); + + aidlvhal::StatusCode getValuesWithRetry(const std::vector& requests, + std::vector* results, + size_t retryCount) const; + + // Check the external timestamp of propValue against the latest updated external timestamp, if + // this is an outdated value, return false. Otherwise, update the external timestamp to the + // Android timestamp and return true. + bool setAndroidTimestamp(aidlvhal::VehiclePropValue* propValue) const; }; } // namespace android::hardware::automotive::vehicle::virtualization diff --git a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp index 3bd7e0e56a..20af2311bc 100644 --- a/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp +++ b/automotive/vehicle/aidl/impl/grpc/test/GRPCVehicleHardwareUnitTest.cpp @@ -19,8 +19,10 @@ #include #include +#include #include +#include #include #include #include @@ -31,98 +33,48 @@ namespace aidlvhal = ::aidl::android::hardware::automotive::vehicle; using ::testing::_; using ::testing::DoAll; +using ::testing::ElementsAre; using ::testing::NiceMock; using ::testing::Return; using ::testing::SaveArg; using ::testing::SetArgPointee; +using ::testing::SizeIs; + +using ::grpc::testing::MockClientReader; using proto::MockVehicleServerStub; -const std::string kFakeServerAddr = "0.0.0.0:54321"; - -class FakeVehicleServer : public proto::VehicleServer::Service { - public: - ::grpc::Status StartPropertyValuesStream( - ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, - ::grpc::ServerWriter* stream) override { - stream->Write(proto::VehiclePropValues()); - // A fake disconnection. - return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); - } - - // Functions that we do not care. - ::grpc::Status GetAllPropertyConfig( - ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, - ::grpc::ServerWriter* stream) override { - return ::grpc::Status::OK; - } - - ::grpc::Status SetValues(::grpc::ServerContext* context, - const proto::VehiclePropValueRequests* requests, - proto::SetValueResults* results) override { - return ::grpc::Status::OK; - } - - ::grpc::Status GetValues(::grpc::ServerContext* context, - const proto::VehiclePropValueRequests* requests, - proto::GetValueResults* results) override { - return ::grpc::Status::OK; - } -}; - -TEST(GRPCVehicleHardwareUnitTest, Reconnect) { - auto receivedUpdate = std::make_shared>(0); - auto vehicleHardware = std::make_unique(kFakeServerAddr); - vehicleHardware->registerOnPropertyChangeEvent( - std::make_unique( - [receivedUpdate](const auto&) { receivedUpdate->fetch_add(1); })); - - constexpr size_t kServerRestartTimes = 5; - for (size_t serverStart = 0; serverStart < kServerRestartTimes; ++serverStart) { - EXPECT_EQ(receivedUpdate->load(), 0); - auto fakeServer = std::make_unique(); - ::grpc::ServerBuilder builder; - builder.RegisterService(fakeServer.get()); - builder.AddListeningPort(kFakeServerAddr, ::grpc::InsecureServerCredentials()); - auto grpcServer = builder.BuildAndStart(); - - // Wait until the vehicle hardware received the second update (after one fake - // disconnection). - constexpr auto kMaxWaitTime = std::chrono::seconds(5); - auto startTime = std::chrono::steady_clock::now(); - while (receivedUpdate->load() <= 1 && - std::chrono::steady_clock::now() - startTime < kMaxWaitTime) - ; - - grpcServer->Shutdown(); - grpcServer->Wait(); - EXPECT_GT(receivedUpdate->load(), 1); - - // Reset for the next round. - receivedUpdate->store(0); - } -} - -class GRPCVehicleHardwareMockServerUnitTest : public ::testing::Test { +class GRPCVehicleHardwareUnitTest : public ::testing::Test { protected: NiceMock* mGrpcStub; std::unique_ptr mHardware; void SetUp() override { auto stub = std::make_unique>(); - ; mGrpcStub = stub.get(); - mHardware = std::make_unique(std::move(stub)); + // Cannot use make_unique here since the constructor is a private method. + mHardware = std::unique_ptr( + new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/false)); } void TearDown() override { mHardware.reset(); } + + // Access GRPCVehicleHardware private method. + void pollValue() { mHardware->pollValue(); } + + void startValuePollingLoop(std::unique_ptr stub) { + mHardware = std::unique_ptr( + new GRPCVehicleHardware(std::move(stub), /*startValuePollingLoop=*/true)); + } + + void generatePropertyUpdateEvent(int32_t propId, int64_t timestamp); }; MATCHER_P(RepeatedInt32Eq, expected_values, "") { return std::vector(arg.begin(), arg.end()) == expected_values; } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, Subscribe) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribe) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::OK); proto::SubscribeRequest actualRequest; @@ -147,7 +99,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, Subscribe) { EXPECT_EQ(protoOptions.enable_variable_update_rate(), true); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeLegacyServer) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeLegacyServer) { EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""))); @@ -157,7 +109,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeLegacyServer) { EXPECT_EQ(status, aidlvhal::StatusCode::OK); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeGrpcFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeGrpcFailure) { EXPECT_CALL(*mGrpcStub, Subscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error"))); @@ -167,7 +119,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeGrpcFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeProtoFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestSubscribeProtoFailure) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW); @@ -181,7 +133,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, SubscribeProtoFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, Unsubscribe) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribe) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::OK); proto::UnsubscribeRequest actualRequest; @@ -199,7 +151,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, Unsubscribe) { EXPECT_EQ(actualRequest.area_id(), areaId); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeLegacyServer) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeLegacyServer) { EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""))); @@ -208,7 +160,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeLegacyServer) { EXPECT_EQ(status, aidlvhal::StatusCode::OK); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeGrpcFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeGrpcFailure) { EXPECT_CALL(*mGrpcStub, Unsubscribe(_, _, _)) .WillOnce(Return(::grpc::Status(::grpc::StatusCode::INTERNAL, "GRPC Error"))); @@ -217,7 +169,7 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeGrpcFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::INTERNAL_ERROR); } -TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeProtoFailure) { +TEST_F(GRPCVehicleHardwareUnitTest, TestUnsubscribeProtoFailure) { proto::VehicleHalCallStatus protoStatus; protoStatus.set_status_code(proto::StatusCode::NOT_AVAILABLE_SPEED_LOW); @@ -230,4 +182,264 @@ TEST_F(GRPCVehicleHardwareMockServerUnitTest, UnsubscribeProtoFailure) { EXPECT_EQ(status, aidlvhal::StatusCode::NOT_AVAILABLE_SPEED_LOW); } +TEST_F(GRPCVehicleHardwareUnitTest, TestPollValue) { + int64_t testTimestamp = 12345; + int32_t testPropId = 54321; + int64_t startTimestamp = elapsedRealtimeNano(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([testTimestamp, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp); + value->set_prop(testPropId); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + std::vector propertyEvents; + + mHardware->registerOnPropertyChangeEvent( + std::make_unique( + [&propertyEvents](const std::vector& events) { + for (const auto& event : events) { + propertyEvents.push_back(event); + } + })); + + pollValue(); + + ASSERT_THAT(propertyEvents, SizeIs(1)); + EXPECT_EQ(propertyEvents[0].prop, testPropId); + EXPECT_GT(propertyEvents[0].timestamp, startTimestamp) + << "Timestamp must be updated to Android timestamp"; + EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano()) + << "Timestamp must be updated to Android timestamp"; +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestPollValueIgnoreOutdatedValue) { + int64_t testTimestamp1 = 12345; + int32_t value1 = 1324; + int64_t testTimestamp2 = 12340; + int32_t value2 = 1423; + int32_t testPropId = 54321; + int64_t startTimestamp = elapsedRealtimeNano(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([testTimestamp1, value1, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp1); + value->set_prop(testPropId); + value->add_int32_values(value1); + return true; + }) + .WillOnce([testTimestamp2, value2, testPropId](proto::VehiclePropValues* values) { + values->Clear(); + // This event is outdated, must be ignored. + auto value = values->add_values(); + value->set_timestamp(testTimestamp2); + value->set_prop(testPropId); + value->add_int32_values(value2); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + std::vector propertyEvents; + + mHardware->registerOnPropertyChangeEvent( + std::make_unique( + [&propertyEvents](const std::vector& events) { + for (const auto& event : events) { + propertyEvents.push_back(event); + } + })); + + pollValue(); + + ASSERT_THAT(propertyEvents, SizeIs(1)) << "Outdated event must be ignored"; + EXPECT_EQ(propertyEvents[0].prop, testPropId); + EXPECT_GT(propertyEvents[0].timestamp, startTimestamp); + EXPECT_LT(propertyEvents[0].timestamp, elapsedRealtimeNano()); + EXPECT_THAT(propertyEvents[0].value.int32Values, ElementsAre(value1)); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestValuePollingLoop) { + int64_t testTimestamp = 12345; + int32_t testPropId = 54321; + auto stub = std::make_unique>(); + + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*stub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillRepeatedly([testTimestamp, testPropId](proto::VehiclePropValues* values) { + // Sleep for 10ms and always return the same property event. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(testTimestamp); + value->set_prop(testPropId); + return true; + }); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + startValuePollingLoop(std::move(stub)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // This must stop the loop and wait for the thread to finish. + mHardware.reset(); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestGetValues) { + int64_t testRequestId = 1234; + int32_t testPropId = 4321; + int32_t testValue = 123456; + proto::VehiclePropValueRequests gotRequests; + EXPECT_CALL(*mGrpcStub, GetValues(_, _, _)) + .WillOnce([&gotRequests, testRequestId, testPropId, testValue]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + gotRequests = request; + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->add_int32_values(testValue); + return ::grpc::Status::OK; + }); + + std::vector requests; + requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId, + .prop = { + .prop = testPropId, + }}); + + std::vector gotResults; + + auto status = mHardware->getValues( + std::make_shared( + [&gotResults](std::vector results) { + for (const auto& result : results) { + gotResults.push_back(result); + } + }), + requests); + + ASSERT_EQ(status, aidlvhal::StatusCode::OK); + ASSERT_THAT(gotRequests.requests(), SizeIs(1)); + EXPECT_THAT(gotRequests.requests(0).request_id(), testRequestId); + EXPECT_THAT(gotRequests.requests(0).value().prop(), testPropId); + + ASSERT_THAT(gotResults, SizeIs(1)); + EXPECT_EQ(gotResults[0].requestId, testRequestId); + EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK); + EXPECT_EQ(gotResults[0].prop->prop, testPropId); + EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue)); +} + +void GRPCVehicleHardwareUnitTest::generatePropertyUpdateEvent(int32_t propId, int64_t timestamp) { + // This will be converted to a unique_ptr in StartPropertyValuesStream. The ownership is passed + // there. + auto clientReader = new MockClientReader(); + EXPECT_CALL(*mGrpcStub, StartPropertyValuesStreamRaw(_, _)).WillOnce(Return(clientReader)); + EXPECT_CALL(*clientReader, Read(_)) + .WillOnce([timestamp, propId](proto::VehiclePropValues* values) { + values->Clear(); + auto value = values->add_values(); + value->set_timestamp(timestamp); + value->set_prop(propId); + return true; + }) + .WillOnce(Return(false)); + EXPECT_CALL(*clientReader, Finish()).WillOnce(Return(::grpc::Status::OK)); + + pollValue(); +} + +TEST_F(GRPCVehicleHardwareUnitTest, TestGetValuesOutdatedRetry) { + int64_t startTimestamp = elapsedRealtimeNano(); + int64_t testRequestId = 1234; + int32_t testPropId = 4321; + int32_t testValue1 = 123456; + int32_t testValue2 = 654321; + int32_t testTimestamp1 = 1000; + int32_t testTimestamp2 = 2000; + + // A property update event for testTimestamp2 happens before getValues returns. + generatePropertyUpdateEvent(testPropId, testTimestamp2); + + // GetValues first returns an outdated result, then an up-to-date result. + EXPECT_CALL(*mGrpcStub, GetValues(_, _, _)) + .WillOnce([testRequestId, testPropId, testValue1, testTimestamp1]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->set_timestamp(testTimestamp1); + valuePtr->add_int32_values(testValue1); + return ::grpc::Status::OK; + }) + .WillOnce([testRequestId, testPropId, testValue2, testTimestamp2]( + ::grpc::ClientContext* context, + const proto::VehiclePropValueRequests& request, + proto::GetValueResults* response) { + response->Clear(); + auto* resultPtr = response->add_results(); + resultPtr->set_request_id(testRequestId); + resultPtr->set_status(proto::StatusCode::OK); + auto* valuePtr = resultPtr->mutable_value(); + valuePtr->set_prop(testPropId); + valuePtr->set_timestamp(testTimestamp2); + valuePtr->add_int32_values(testValue2); + return ::grpc::Status::OK; + }); + + std::vector requests; + requests.push_back(aidlvhal::GetValueRequest{.requestId = testRequestId, + .prop = { + .prop = testPropId, + }}); + + std::vector gotResults; + + auto status = mHardware->getValues( + std::make_shared( + [&gotResults](std::vector results) { + for (const auto& result : results) { + gotResults.push_back(result); + } + }), + requests); + + ASSERT_EQ(status, aidlvhal::StatusCode::OK); + ASSERT_THAT(gotResults, SizeIs(1)); + EXPECT_EQ(gotResults[0].requestId, testRequestId); + EXPECT_EQ(gotResults[0].status, aidlvhal::StatusCode::OK); + EXPECT_EQ(gotResults[0].prop->prop, testPropId); + EXPECT_THAT(gotResults[0].prop->value.int32Values, ElementsAre(testValue2)); + EXPECT_GT(gotResults[0].prop->timestamp, startTimestamp); + EXPECT_LT(gotResults[0].prop->timestamp, elapsedRealtimeNano()); +} + } // namespace android::hardware::automotive::vehicle::virtualization