From a90f9c4a43d23a635a6e79d1a452df5b613f5dc0 Mon Sep 17 00:00:00 2001 From: Yu Shan Date: Wed, 21 Sep 2022 18:15:50 -0700 Subject: [PATCH] Add timeout logic to TestWakeupClientServiceImpl. Add timeout logic for fake tasks. They will timeout after 20s and print an error message if not received by the remote access HAL. Test: Manually run TestWakeupClientServiceImpl and verify the log: Task for client ID: [ID] timed-out is printed. Bug: 246841306 Change-Id: I2173c931da9e0ea40c7b16f9e25a75592fa255c0 --- .../include/TestWakeupClientServiceImpl.h | 43 +++++++- .../impl/src/TestWakeupClientServiceImpl.cpp | 99 +++++++++++++++++-- 2 files changed, 133 insertions(+), 9 deletions(-) diff --git a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h index 4c440b8c00..9d6ef0aced 100644 --- a/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h +++ b/automotive/remoteaccess/test_grpc_server/impl/include/TestWakeupClientServiceImpl.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -41,20 +42,60 @@ class FakeTaskGenerator final { constexpr static uint8_t DATA[] = {0xde, 0xad, 0xbe, 0xef}; }; +struct TaskInfo { + // This is unique per-task. Note that a task might be popped and put back into the task queue, + // it will have a new task ID but the same clientId in the task data. + int taskId; + long timestampInMs; + GetRemoteTasksResponse taskData; +}; + +struct TaskInfoComparator { + // We want the smallest timestamp and smallest task ID on top. + bool operator()(const TaskInfo& l, const TaskInfo& r) { + return l.timestampInMs > r.timestampInMs || + (l.timestampInMs == r.timestampInMs && l.taskId > r.taskId); + } +}; + +// forward-declaration. +class TaskQueue; + +class TaskTimeoutMessageHandler final : public android::MessageHandler { + public: + TaskTimeoutMessageHandler(TaskQueue* taskQueue); + void handleMessage(const android::Message& message) override; + + private: + TaskQueue* mTaskQueue; +}; + // TaskQueue is thread-safe. class TaskQueue final { public: + TaskQueue(); + ~TaskQueue(); + void add(const GetRemoteTasksResponse& response); std::optional maybePopOne(); void waitForTask(); void stopWait(); + void handleTaskTimeout(); private: + std::thread mCheckTaskTimeoutThread; std::mutex mLock; - std::queue mTasks GUARDED_BY(mLock); + std::priority_queue, TaskInfoComparator> mTasks + GUARDED_BY(mLock); // A variable to notify mTasks is not empty. std::condition_variable mTasksNotEmptyCv; bool mStopped GUARDED_BY(mLock); + android::sp mLooper; + android::sp mTaskTimeoutMessageHandler; + std::atomic mTaskIdCounter = 0; + + void checkForTestTimeoutLoop(); + void waitForTaskWithLock(std::unique_lock& lock); }; class TestWakeupClientServiceImpl final : public WakeupClient::Service { diff --git a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp index 1eb87e285e..8e6669f7cc 100644 --- a/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp +++ b/automotive/remoteaccess/test_grpc_server/impl/src/TestWakeupClientServiceImpl.cpp @@ -18,6 +18,8 @@ #include #include +#include +#include #include #include @@ -28,13 +30,15 @@ namespace remoteaccess { namespace { +using ::android::uptimeMillis; using ::android::base::ScopedLockAssertion; using ::android::base::StringPrintf; using ::grpc::ServerContext; using ::grpc::ServerWriter; using ::grpc::Status; -constexpr int kTaskIntervalInSec = 5; +constexpr int kTaskIntervalInMs = 5'000; +constexpr int KTaskTimeoutInMs = 20'000; } // namespace @@ -47,24 +51,68 @@ GetRemoteTasksResponse FakeTaskGenerator::generateTask() { return response; } +TaskTimeoutMessageHandler::TaskTimeoutMessageHandler(TaskQueue* taskQueue) + : mTaskQueue(taskQueue) {} + +void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) { + mTaskQueue->handleTaskTimeout(); +} + +TaskQueue::TaskQueue() { + mTaskTimeoutMessageHandler = android::sp::make(this); + mLooper = Looper::prepare(/*opts=*/0); + mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); }); +} + +TaskQueue::~TaskQueue() { + { + std::lock_guard lockGuard(mLock); + mStopped = true; + } + while (true) { + // Remove all pending timeout handlers from queue. + if (!maybePopOne().has_value()) { + break; + } + } + if (mCheckTaskTimeoutThread.joinable()) { + mCheckTaskTimeoutThread.join(); + } +} + std::optional TaskQueue::maybePopOne() { std::lock_guard lockGuard(mLock); if (mTasks.size() == 0) { return std::nullopt; } - GetRemoteTasksResponse response = mTasks.front(); + TaskInfo response = std::move(mTasks.top()); mTasks.pop(); - return std::move(response); + mLooper->removeMessages(mTaskTimeoutMessageHandler, response.taskId); + return std::move(response.taskData); } + void TaskQueue::add(const GetRemoteTasksResponse& task) { - // TODO (b/246841306): add timeout to tasks. std::lock_guard lockGuard(mLock); - mTasks.push(task); + if (mStopped) { + return; + } + int taskId = mTaskIdCounter++; + mTasks.push(TaskInfo{ + .taskId = taskId, + .timestampInMs = uptimeMillis(), + .taskData = task, + }); + android::Message message(taskId); + mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message); mTasksNotEmptyCv.notify_all(); } void TaskQueue::waitForTask() { std::unique_lock lock(mLock); + waitForTaskWithLock(lock); +} + +void TaskQueue::waitForTaskWithLock(std::unique_lock& lock) { mTasksNotEmptyCv.wait(lock, [this] { ScopedLockAssertion lockAssertion(mLock); return mTasks.size() > 0 || mStopped; @@ -77,6 +125,41 @@ void TaskQueue::stopWait() { mTasksNotEmptyCv.notify_all(); } +void TaskQueue::checkForTestTimeoutLoop() { + Looper::setForThread(mLooper); + + while (true) { + { + std::unique_lock lock(mLock); + if (mStopped) { + ALOGW("The TestWakeupClientServiceImpl is stopping, " + "exiting checkForTestTimeoutLoop"); + return; + } + } + + mLooper->pollAll(/*timeoutMillis=*/-1); + } +} + +void TaskQueue::handleTaskTimeout() { + // We know which task timed-out from the taskId in the message. However, there is no easy way + // to remove a specific task with the task ID from the priority_queue, so we just check from + // the top of the queue (which have the oldest tasks). + std::lock_guard lockGuard(mLock); + long now = uptimeMillis(); + while (mTasks.size() > 0) { + const TaskInfo& taskInfo = mTasks.top(); + if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) { + break; + } + // In real implementation, this should report task failure to remote wakeup server. + ALOGW("Task for client ID: %s timed-out, added at %ld ms, now %ld ms", + taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now); + mTasks.pop(); + } +} + TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() { mThread = std::thread([this] { fakeTaskGenerateLoop(); }); } @@ -95,13 +178,13 @@ TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() { void TestWakeupClientServiceImpl::fakeTaskGenerateLoop() { // In actual implementation, this should communicate with the remote server and receives tasks - // from it. Here we simulate receiving one remote task every {kTaskIntervalInSec}s. + // from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms. while (true) { mTaskQueue.add(mFakeTaskGenerator.generateTask()); - ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInSec); + ALOGI("Sleeping for %d seconds until next task", kTaskIntervalInMs); std::unique_lock lk(mLock); - if (mServerStoppedCv.wait_for(lk, std::chrono::seconds(kTaskIntervalInSec), [this] { + if (mServerStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] { ScopedLockAssertion lockAssertion(mLock); return mServerStopped; })) {