Implement scheduleTask in TestWakeupClientService.

Handles the schedule task request from GRPC. Starts a thread
for the scheduled tasks and injects the task data through the grpc
channel.

Test: m -j TestWakeupClientServerHost && atest TestWakeupClientServerHostUnitTest
Bug: 297271235
Change-Id: Ia9f1d7fa3dadb3be68e31987622bc6df9271e929
This commit is contained in:
Yu Shan
2023-08-30 19:22:08 -07:00
parent 72d6f8944d
commit bfc29b2442
5 changed files with 712 additions and 76 deletions

View File

@@ -61,3 +61,27 @@ cc_binary_host {
"-DHOST",
],
}
cc_test_host {
name: "TestWakeupClientServerHostUnitTest",
srcs: [
"test/*.cpp",
"src/TestWakeupClientServiceImpl.cpp",
],
local_include_dirs: ["include"],
shared_libs: [
"libbase",
"libutils",
"libgrpc++",
"libprotobuf-cpp-full",
],
static_libs: [
"libgtest",
],
whole_static_libs: [
"wakeup_client_protos",
],
cflags: [
"-Wno-unused-parameter",
],
}

View File

@@ -71,30 +71,41 @@ class TaskTimeoutMessageHandler final : public android::MessageHandler {
// TaskQueue is thread-safe.
class TaskQueue final {
public:
TaskQueue();
~TaskQueue();
TaskQueue(android::sp<Looper> looper);
void add(const GetRemoteTasksResponse& response);
std::optional<GetRemoteTasksResponse> maybePopOne();
void waitForTask();
void stopWait();
void handleTaskTimeout();
bool isEmpty();
private:
std::thread mCheckTaskTimeoutThread;
friend class TaskTimeoutMessageHandler;
std::mutex mLock;
std::priority_queue<TaskInfo, std::vector<TaskInfo>, TaskInfoComparator> mTasks
GUARDED_BY(mLock);
// A variable to notify mTasks is not empty.
std::condition_variable mTasksNotEmptyCv;
bool mStopped GUARDED_BY(mLock);
std::atomic<bool> mStopped;
android::sp<Looper> mLooper;
android::sp<TaskTimeoutMessageHandler> mTaskTimeoutMessageHandler;
std::atomic<int> mTaskIdCounter = 0;
void checkForTestTimeoutLoop();
void waitForTaskWithLock(std::unique_lock<std::mutex>& lock);
void loop();
void handleTaskTimeout();
};
// forward-declaration
class TestWakeupClientServiceImpl;
class TaskScheduleMsgHandler final : public android::MessageHandler {
public:
TaskScheduleMsgHandler(TestWakeupClientServiceImpl* mImpl);
void handleMessage(const android::Message& message) override;
private:
TestWakeupClientServiceImpl* mImpl;
};
class TestWakeupClientServiceImpl : public WakeupClient::Service {
@@ -103,6 +114,9 @@ class TestWakeupClientServiceImpl : public WakeupClient::Service {
~TestWakeupClientServiceImpl();
// Stop the handling for all income requests. Prepare for shutdown.
void stopServer();
grpc::Status GetRemoteTasks(grpc::ServerContext* context, const GetRemoteTasksRequest* request,
grpc::ServerWriter<GetRemoteTasksResponse>* writer) override;
@@ -110,6 +124,24 @@ class TestWakeupClientServiceImpl : public WakeupClient::Service {
const NotifyWakeupRequiredRequest* request,
NotifyWakeupRequiredResponse* response) override;
grpc::Status ScheduleTask(grpc::ServerContext* context, const ScheduleTaskRequest* request,
ScheduleTaskResponse* response) override;
grpc::Status UnscheduleTask(grpc::ServerContext* context, const UnscheduleTaskRequest* request,
UnscheduleTaskResponse* response) override;
grpc::Status UnscheduleAllTasks(grpc::ServerContext* context,
const UnscheduleAllTasksRequest* request,
UnscheduleAllTasksResponse* response) override;
grpc::Status IsTaskScheduled(grpc::ServerContext* context,
const IsTaskScheduledRequest* request,
IsTaskScheduledResponse* response) override;
grpc::Status GetAllScheduledTasks(grpc::ServerContext* context,
const GetAllScheduledTasksRequest* request,
GetAllScheduledTasksResponse* response) override;
/**
* Starts generating fake tasks for the specific client repeatedly.
*
@@ -146,10 +178,34 @@ class TestWakeupClientServiceImpl : public WakeupClient::Service {
*/
virtual void wakeupApplicationProcessor() = 0;
/**
* Cleans up a scheduled task info.
*/
void cleanupScheduledTaskLocked(const std::string& clientId, const std::string& scheduleId)
REQUIRES(mLock);
private:
// This is a thread for communicating with remote wakeup server (via network) and receive tasks
// from it.
std::thread mThread;
friend class TaskScheduleMsgHandler;
struct ScheduleInfo {
std::unique_ptr<GrpcScheduleInfo> grpcScheduleInfo;
// This is a unique ID to represent this schedule. Each repeated tasks will have different
// task ID but will have the same scheduleMsgId so that we can use to unschedule. This has
// to be an int so we cannot use the scheduleId provided by the client.
int scheduleMsgId;
int64_t periodicInSeconds;
int32_t currentCount;
int32_t totalCount;
};
std::atomic<int> mScheduleMsgCounter = 0;
// This is a looper for scheduling tasks to be executed in the future.
android::sp<Looper> mLooper;
android::sp<TaskScheduleMsgHandler> mTaskScheduleMsgHandler;
// This is a thread for generating fake tasks.
std::thread mFakeTaskThread;
// This is a thread for the looper.
std::thread mLooperThread;
// A variable to notify server is stopping.
std::condition_variable mTaskLoopStoppedCv;
// Whether wakeup AP is required for executing tasks.
@@ -158,14 +214,21 @@ class TestWakeupClientServiceImpl : public WakeupClient::Service {
std::atomic<bool> mRemoteTaskConnectionAlive = false;
std::mutex mLock;
bool mGeneratingFakeTask GUARDED_BY(mLock);
std::atomic<bool> mServerStopped;
std::unordered_map<std::string, std::unordered_map<std::string, ScheduleInfo>>
mInfoByScheduleIdByClientId GUARDED_BY(mLock);
// Thread-safe. For test impl only.
FakeTaskGenerator mFakeTaskGenerator;
// Thread-sfae.
TaskQueue mTaskQueue;
// Thread-safe.
std::unique_ptr<TaskQueue> mTaskQueue;
void fakeTaskGenerateLoop(const std::string& clientId);
void injectTaskResponse(const GetRemoteTasksResponse& response);
bool getScheduleInfoLocked(int scheduleMsgId, ScheduleInfo** outScheduleInfoPtr)
REQUIRES(mLock);
void handleAddTask(int scheduleMsgId);
void loop();
};
} // namespace remoteaccess

View File

@@ -37,8 +37,17 @@ using ::grpc::ServerContext;
using ::grpc::ServerWriter;
using ::grpc::Status;
constexpr int kTaskIntervalInMs = 5'000;
constexpr int64_t KTaskTimeoutInMs = 20'000;
constexpr int64_t kTaskIntervalInMs = 5'000;
constexpr int64_t kTaskTimeoutInMs = 20'000;
int64_t msToNs(int64_t ms) {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(ms))
.count();
}
int64_t sToNs(int64_t s) {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(s)).count();
}
} // namespace
@@ -56,26 +65,9 @@ void TaskTimeoutMessageHandler::handleMessage(const android::Message& message) {
mTaskQueue->handleTaskTimeout();
}
TaskQueue::TaskQueue() {
TaskQueue::TaskQueue(android::sp<Looper> looper) {
mTaskTimeoutMessageHandler = android::sp<TaskTimeoutMessageHandler>::make(this);
mLooper = Looper::prepare(/*opts=*/0);
mCheckTaskTimeoutThread = std::thread([this] { checkForTestTimeoutLoop(); });
}
TaskQueue::~TaskQueue() {
{
std::lock_guard<std::mutex> lockGuard(mLock);
mStopped = true;
}
while (true) {
// Remove all pending timeout handlers from queue.
if (!maybePopOne().has_value()) {
break;
}
}
if (mCheckTaskTimeoutThread.joinable()) {
mCheckTaskTimeoutThread.join();
}
mLooper = looper;
}
std::optional<GetRemoteTasksResponse> TaskQueue::maybePopOne() {
@@ -101,16 +93,12 @@ void TaskQueue::add(const GetRemoteTasksResponse& task) {
.taskData = task,
});
android::Message message(taskId);
mLooper->sendMessageDelayed(KTaskTimeoutInMs * 1000, mTaskTimeoutMessageHandler, message);
mLooper->sendMessageDelayed(msToNs(kTaskTimeoutInMs), mTaskTimeoutMessageHandler, message);
mTasksNotEmptyCv.notify_all();
}
void TaskQueue::waitForTask() {
std::unique_lock<std::mutex> lock(mLock);
waitForTaskWithLock(lock);
}
void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
mTasksNotEmptyCv.wait(lock, [this] {
ScopedLockAssertion lockAssertion(mLock);
return mTasks.size() > 0 || mStopped;
@@ -118,9 +106,11 @@ void TaskQueue::waitForTaskWithLock(std::unique_lock<std::mutex>& lock) {
}
void TaskQueue::stopWait() {
std::lock_guard<std::mutex> lockGuard(mLock);
mStopped = true;
mTasksNotEmptyCv.notify_all();
{
std::lock_guard<std::mutex> lockGuard(mLock);
mTasksNotEmptyCv.notify_all();
}
}
bool TaskQueue::isEmpty() {
@@ -128,21 +118,6 @@ bool TaskQueue::isEmpty() {
return mTasks.size() == 0 || mStopped;
}
void TaskQueue::checkForTestTimeoutLoop() {
Looper::setForThread(mLooper);
while (true) {
{
std::unique_lock<std::mutex> lock(mLock);
if (mStopped) {
return;
}
}
mLooper->pollAll(/*timeoutMillis=*/-1);
}
}
void TaskQueue::handleTaskTimeout() {
// We know which task timed-out from the taskId in the message. However, there is no easy way
// to remove a specific task with the task ID from the priority_queue, so we just check from
@@ -151,22 +126,50 @@ void TaskQueue::handleTaskTimeout() {
int64_t now = uptimeMillis();
while (mTasks.size() > 0) {
const TaskInfo& taskInfo = mTasks.top();
if (taskInfo.timestampInMs + KTaskTimeoutInMs > now) {
if (taskInfo.timestampInMs + kTaskTimeoutInMs > now) {
break;
}
// In real implementation, this should report task failure to remote wakeup server.
printf("Task for client ID: %s timed-out, added at %" PRId64 " ms, now %" PRId64 " ms",
printf("Task for client ID: %s timed-out, added at %" PRId64 " ms, now %" PRId64 " ms\n",
taskInfo.taskData.clientid().c_str(), taskInfo.timestampInMs, now);
mTasks.pop();
}
}
TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {}
TestWakeupClientServiceImpl::TestWakeupClientServiceImpl() {
mTaskScheduleMsgHandler = android::sp<TaskScheduleMsgHandler>::make(this);
mLooper = android::sp<Looper>::make(/*opts=*/0);
mLooperThread = std::thread([this] { loop(); });
mTaskQueue = std::make_unique<TaskQueue>(mLooper);
}
TestWakeupClientServiceImpl::~TestWakeupClientServiceImpl() {
{ std::lock_guard<std::mutex> lockGuard(mLock); }
mTaskQueue.stopWait();
if (mServerStopped) {
return;
}
stopServer();
}
void TestWakeupClientServiceImpl::stopServer() {
mTaskQueue->stopWait();
stopGeneratingFakeTask();
// Set the flag so that the loop thread will exit.
mServerStopped = true;
mLooper->wake();
if (mLooperThread.joinable()) {
mLooperThread.join();
}
}
void TestWakeupClientServiceImpl::loop() {
Looper::setForThread(mLooper);
while (true) {
mLooper->pollAll(/*timeoutMillis=*/-1);
if (mServerStopped) {
return;
}
}
}
void TestWakeupClientServiceImpl::injectTask(const std::string& taskData,
@@ -178,8 +181,8 @@ void TestWakeupClientServiceImpl::injectTask(const std::string& taskData,
}
void TestWakeupClientServiceImpl::injectTaskResponse(const GetRemoteTasksResponse& response) {
printf("Received a new task\n");
mTaskQueue.add(response);
printf("Receive a new task\n");
mTaskQueue->add(response);
if (mWakeupRequired) {
wakeupApplicationProcessor();
}
@@ -192,7 +195,7 @@ void TestWakeupClientServiceImpl::startGeneratingFakeTask(const std::string& cli
return;
}
mGeneratingFakeTask = true;
mThread = std::thread([this, clientId] { fakeTaskGenerateLoop(clientId); });
mFakeTaskThread = std::thread([this, clientId] { fakeTaskGenerateLoop(clientId); });
printf("Started generating fake tasks\n");
}
@@ -206,8 +209,8 @@ void TestWakeupClientServiceImpl::stopGeneratingFakeTask() {
mTaskLoopStoppedCv.notify_all();
mGeneratingFakeTask = false;
}
if (mThread.joinable()) {
mThread.join();
if (mFakeTaskThread.joinable()) {
mFakeTaskThread.join();
}
printf("Stopped generating fake tasks\n");
}
@@ -217,7 +220,7 @@ void TestWakeupClientServiceImpl::fakeTaskGenerateLoop(const std::string& client
// from it. Here we simulate receiving one remote task every {kTaskIntervalInMs}ms.
while (true) {
injectTaskResponse(mFakeTaskGenerator.generateTask(clientId));
printf("Sleeping for %d seconds until next task\n", kTaskIntervalInMs);
printf("Sleeping for %" PRId64 " seconds until next task\n", kTaskIntervalInMs);
std::unique_lock lk(mLock);
if (mTaskLoopStoppedCv.wait_for(lk, std::chrono::milliseconds(kTaskIntervalInMs), [this] {
@@ -236,10 +239,16 @@ Status TestWakeupClientServiceImpl::GetRemoteTasks(ServerContext* context,
printf("GetRemoteTasks called\n");
mRemoteTaskConnectionAlive = true;
while (true) {
mTaskQueue.waitForTask();
mTaskQueue->waitForTask();
if (mServerStopped) {
// Server stopped, exit the loop.
printf("Server stopped exit loop\n");
break;
}
while (true) {
auto maybeTask = mTaskQueue.maybePopOne();
auto maybeTask = mTaskQueue->maybePopOne();
if (!maybeTask.has_value()) {
// No task left, loop again and wait for another task(s).
break;
@@ -252,21 +261,21 @@ Status TestWakeupClientServiceImpl::GetRemoteTasks(ServerContext* context,
printf("Failed to deliver remote task to remote access HAL\n");
// The task failed to be sent, add it back to the queue. The order might change, but
// it is okay.
mTaskQueue.add(response);
mTaskQueue->add(response);
mRemoteTaskConnectionAlive = false;
return Status::CANCELLED;
}
}
}
mRemoteTaskConnectionAlive = false;
return Status::OK;
// Server stopped, exit the loop.
return Status::CANCELLED;
}
Status TestWakeupClientServiceImpl::NotifyWakeupRequired(ServerContext* context,
const NotifyWakeupRequiredRequest* request,
NotifyWakeupRequiredResponse* response) {
printf("NotifyWakeupRequired called\n");
if (request->iswakeuprequired() && !mWakeupRequired && !mTaskQueue.isEmpty()) {
if (request->iswakeuprequired() && !mWakeupRequired && !mTaskQueue->isEmpty()) {
// If wakeup is now required and previously not required, this means we have finished
// shutting down the device. If there are still pending tasks, try waking up AP again
// to finish executing those tasks.
@@ -281,6 +290,203 @@ Status TestWakeupClientServiceImpl::NotifyWakeupRequired(ServerContext* context,
return Status::OK;
}
void TestWakeupClientServiceImpl::cleanupScheduledTaskLocked(const std::string& clientId,
const std::string& scheduleId) {
mInfoByScheduleIdByClientId[clientId].erase(scheduleId);
if (mInfoByScheduleIdByClientId[clientId].size() == 0) {
mInfoByScheduleIdByClientId.erase(clientId);
}
}
TaskScheduleMsgHandler::TaskScheduleMsgHandler(TestWakeupClientServiceImpl* impl) : mImpl(impl) {}
void TaskScheduleMsgHandler::handleMessage(const android::Message& message) {
mImpl->handleAddTask(message.what);
}
Status TestWakeupClientServiceImpl::ScheduleTask(ServerContext* context,
const ScheduleTaskRequest* request,
ScheduleTaskResponse* response) {
std::lock_guard<std::mutex> lockGuard(mLock);
const GrpcScheduleInfo& grpcScheduleInfo = request->scheduleinfo();
const std::string& scheduleId = grpcScheduleInfo.scheduleid();
const std::string& clientId = grpcScheduleInfo.clientid();
response->set_errorcode(ErrorCode::OK);
if (mInfoByScheduleIdByClientId.find(clientId) != mInfoByScheduleIdByClientId.end() &&
mInfoByScheduleIdByClientId[clientId].find(scheduleId) !=
mInfoByScheduleIdByClientId[clientId].end()) {
printf("Duplicate schedule Id: %s for client Id: %s\n", scheduleId.c_str(),
clientId.c_str());
response->set_errorcode(ErrorCode::INVALID_ARG);
return Status::OK;
}
int64_t startTimeInEpochSeconds = grpcScheduleInfo.starttimeinepochseconds();
int64_t periodicInSeconds = grpcScheduleInfo.periodicinseconds();
int32_t count = grpcScheduleInfo.count();
int scheduleMsgId = mScheduleMsgCounter++;
mInfoByScheduleIdByClientId[clientId][scheduleId] = {
.grpcScheduleInfo = std::make_unique<GrpcScheduleInfo>(grpcScheduleInfo),
.scheduleMsgId = scheduleMsgId,
.periodicInSeconds = periodicInSeconds,
.currentCount = 0,
.totalCount = count,
};
int64_t delayInSeconds =
startTimeInEpochSeconds - std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (delayInSeconds < 0) {
delayInSeconds = 0;
}
printf("ScheduleTask called with client Id: %s, schedule Id: %s, delay: %" PRId64 " s\n",
clientId.c_str(), scheduleId.c_str(), delayInSeconds);
mLooper->sendMessageDelayed(sToNs(delayInSeconds), mTaskScheduleMsgHandler,
android::Message(scheduleMsgId));
return Status::OK;
}
bool TestWakeupClientServiceImpl::getScheduleInfoLocked(int scheduleMsgId,
ScheduleInfo** outScheduleInfoPtr) {
for (auto& [_, infoByScheduleId] : mInfoByScheduleIdByClientId) {
for (auto& [_, scheduleInfo] : infoByScheduleId) {
if (scheduleInfo.scheduleMsgId == scheduleMsgId) {
*outScheduleInfoPtr = &scheduleInfo;
return true;
}
}
}
return false;
}
void TestWakeupClientServiceImpl::handleAddTask(int scheduleMsgId) {
std::lock_guard<std::mutex> lockGuard(mLock);
ScheduleInfo* scheduleInfoPtr;
bool found = getScheduleInfoLocked(scheduleMsgId, &scheduleInfoPtr);
if (!found) {
printf("The schedule msg Id: %d is not found\n", scheduleMsgId);
return;
}
const GrpcScheduleInfo& grpcScheduleInfo = *scheduleInfoPtr->grpcScheduleInfo;
const std::string scheduleId = grpcScheduleInfo.scheduleid();
const std::string clientId = grpcScheduleInfo.clientid();
GetRemoteTasksResponse injectResponse;
injectResponse.set_data(grpcScheduleInfo.data().data(), grpcScheduleInfo.data().size());
injectResponse.set_clientid(clientId);
injectTaskResponse(injectResponse);
scheduleInfoPtr->currentCount++;
printf("Sending scheduled tasks for scheduleId: %s, clientId: %s, taskCount: %d\n",
scheduleId.c_str(), clientId.c_str(), scheduleInfoPtr->currentCount);
if (scheduleInfoPtr->totalCount != 0 &&
scheduleInfoPtr->currentCount == scheduleInfoPtr->totalCount) {
// This schedule is finished.
cleanupScheduledTaskLocked(clientId, scheduleId);
return;
}
// Schedule the task for the next period.
mLooper->sendMessageDelayed(sToNs(scheduleInfoPtr->periodicInSeconds), mTaskScheduleMsgHandler,
android::Message(scheduleMsgId));
}
Status TestWakeupClientServiceImpl::UnscheduleTask(ServerContext* context,
const UnscheduleTaskRequest* request,
UnscheduleTaskResponse* response) {
std::lock_guard<std::mutex> lockGuard(mLock);
const std::string& clientId = request->clientid();
const std::string& scheduleId = request->scheduleid();
printf("UnscheduleTask called with client Id: %s, schedule Id: %s\n", clientId.c_str(),
scheduleId.c_str());
if (mInfoByScheduleIdByClientId.find(clientId) == mInfoByScheduleIdByClientId.end() ||
mInfoByScheduleIdByClientId[clientId].find(scheduleId) ==
mInfoByScheduleIdByClientId[clientId].end()) {
printf("UnscheduleTask: no task associated with clientId: %s, scheduleId: %s\n",
clientId.c_str(), scheduleId.c_str());
return Status::OK;
}
mLooper->removeMessages(mTaskScheduleMsgHandler,
mInfoByScheduleIdByClientId[clientId][scheduleId].scheduleMsgId);
cleanupScheduledTaskLocked(clientId, scheduleId);
return Status::OK;
}
Status TestWakeupClientServiceImpl::UnscheduleAllTasks(ServerContext* context,
const UnscheduleAllTasksRequest* request,
UnscheduleAllTasksResponse* response) {
std::lock_guard<std::mutex> lockGuard(mLock);
const std::string& clientId = request->clientid();
printf("UnscheduleAllTasks called with client Id: %s\n", clientId.c_str());
if (mInfoByScheduleIdByClientId.find(clientId) == mInfoByScheduleIdByClientId.end()) {
printf("UnscheduleTask: no task associated with clientId: %s\n", clientId.c_str());
return Status::OK;
}
const auto& infoByScheduleId = mInfoByScheduleIdByClientId[clientId];
std::vector<int> scheduleMsgIds;
for (const auto& [_, scheduleInfo] : infoByScheduleId) {
mLooper->removeMessages(mTaskScheduleMsgHandler, /*what=*/scheduleInfo.scheduleMsgId);
}
mInfoByScheduleIdByClientId.erase(clientId);
return Status::OK;
}
Status TestWakeupClientServiceImpl::IsTaskScheduled(ServerContext* context,
const IsTaskScheduledRequest* request,
IsTaskScheduledResponse* response) {
std::lock_guard<std::mutex> lockGuard(mLock);
const std::string& clientId = request->clientid();
const std::string& scheduleId = request->scheduleid();
printf("IsTaskScheduled called with client Id: %s, scheduleId: %s\n", clientId.c_str(),
scheduleId.c_str());
if (mInfoByScheduleIdByClientId.find(clientId) == mInfoByScheduleIdByClientId.end()) {
response->set_istaskscheduled(false);
return Status::OK;
}
if (mInfoByScheduleIdByClientId[clientId].find(scheduleId) ==
mInfoByScheduleIdByClientId[clientId].end()) {
response->set_istaskscheduled(false);
return Status::OK;
}
response->set_istaskscheduled(true);
return Status::OK;
}
Status TestWakeupClientServiceImpl::GetAllScheduledTasks(ServerContext* context,
const GetAllScheduledTasksRequest* request,
GetAllScheduledTasksResponse* response) {
const std::string& clientId = request->clientid();
printf("GetAllScheduledTasks called with client Id: %s\n", clientId.c_str());
response->clear_allscheduledtasks();
{
std::unique_lock lk(mLock);
if (mInfoByScheduleIdByClientId.find(clientId) == mInfoByScheduleIdByClientId.end()) {
return Status::OK;
}
for (const auto& [_, scheduleInfo] : mInfoByScheduleIdByClientId[clientId]) {
(*response->add_allscheduledtasks()) = *scheduleInfo.grpcScheduleInfo;
}
}
return Status::OK;
}
bool TestWakeupClientServiceImpl::isWakeupRequired() {
return mWakeupRequired;
}

View File

@@ -101,13 +101,17 @@ bool powerOn() {
#endif
}
const char* getSetPropCommand(int propId) {
int size = snprintf(nullptr, 0, COMMAND_SET_VHAL_PROP, propId, 1);
const char* getSetPropCommand(int propId, int value) {
int size = snprintf(nullptr, 0, COMMAND_SET_VHAL_PROP, propId, value);
char* command = new char[size + 1];
snprintf(command, size + 1, COMMAND_SET_VHAL_PROP, propId, 1);
snprintf(command, size + 1, COMMAND_SET_VHAL_PROP, propId, value);
return command;
}
const char* getSetPropCommand(int propId) {
return getSetPropCommand(propId, /*value=*/1);
}
void powerOffEmu() {
updateEmuStatus();
if (emuPid == 0) {
@@ -136,7 +140,7 @@ void setVehicleInUse(bool vehicleInUse) {
if (vehicleInUse) {
value = 1;
}
const char* command = getSetPropCommand(VEHICLE_IN_USE);
const char* command = getSetPropCommand(VEHICLE_IN_USE, value);
runCommand(command);
delete[] command;
#else

View File

@@ -0,0 +1,339 @@
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TestWakeupClientServiceImpl.h"
#include <grpcpp/channel.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <gtest/gtest.h>
#include <chrono>
namespace android::hardware::automotive::remoteaccess::test {
using ::android::base::ScopedLockAssertion;
using ::grpc::Channel;
using ::grpc::ClientContext;
using ::grpc::Server;
using ::grpc::ServerBuilder;
using ::grpc::Status;
const std::string kTestClientId = "test client id";
const std::string kTestScheduleId = "test schedule id";
const std::vector<uint8_t> kTestData = {0xde, 0xad, 0xbe, 0xef};
constexpr int32_t kTestCount = 1234;
constexpr int64_t kTestStartTimeInEpochSeconds = 2345;
constexpr int64_t kTestPeriodicInSeconds = 123;
const std::string kTestGrpcAddr = "localhost:50051";
class MyTestWakeupClientServiceImpl final : public TestWakeupClientServiceImpl {
public:
void wakeupApplicationProcessor() override {
// Do nothing.
}
};
class TestWakeupClientServiceImplUnitTest : public ::testing::Test {
public:
virtual void SetUp() override {
mServerThread = std::thread([this] {
{
std::unique_lock<std::mutex> lock(mLock);
mService = std::make_unique<MyTestWakeupClientServiceImpl>();
ServerBuilder builder;
builder.AddListeningPort(kTestGrpcAddr, grpc::InsecureServerCredentials());
builder.RegisterService(mService.get());
mServer = builder.BuildAndStart();
mServerStartCv.notify_one();
}
mServer->Wait();
});
{
std::unique_lock<std::mutex> lock(mLock);
mServerStartCv.wait(lock, [this] {
ScopedLockAssertion lockAssertion(mLock);
return mServer != nullptr;
});
}
mChannel = grpc::CreateChannel(kTestGrpcAddr, grpc::InsecureChannelCredentials());
mStub = WakeupClient::NewStub(mChannel);
}
virtual void TearDown() override {
printf("Start server shutdown\n");
mService->stopServer();
mServer->Shutdown();
printf("Server shutdown complete\n");
mServerThread.join();
printf("Server thread exits\n");
mServer.reset();
mService.reset();
printf("Server and service classes reset\n");
}
WakeupClient::Stub* getStub() { return mStub.get(); }
size_t waitForRemoteTasks(size_t count) {
ClientContext context = {};
GetRemoteTasksResponse response;
auto reader = mStub->GetRemoteTasks(&context, GetRemoteTasksRequest{});
size_t got = 0;
while (reader->Read(&response)) {
got++;
mRemoteTaskResponses.push_back(response);
if (got == count) {
break;
}
}
// If there is more messages to be read in the reader, cancel them all so that we can
// finish.
context.TryCancel();
reader->Finish();
return got;
}
std::vector<GetRemoteTasksResponse> getRemoteTaskResponses() { return mRemoteTaskResponses; }
Status scheduleTask(int32_t count, int64_t startTimeInEpochSeconds, int64_t periodicInSeconds) {
return scheduleTask(kTestScheduleId, count, startTimeInEpochSeconds, periodicInSeconds);
}
Status scheduleTask(const std::string& scheduleId, int32_t count,
int64_t startTimeInEpochSeconds, int64_t periodicInSeconds) {
ClientContext context;
ScheduleTaskRequest request;
ScheduleTaskResponse response;
int64_t now = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
request.mutable_scheduleinfo()->set_clientid(kTestClientId);
request.mutable_scheduleinfo()->set_scheduleid(scheduleId);
request.mutable_scheduleinfo()->set_data(kTestData.data(), kTestData.size());
request.mutable_scheduleinfo()->set_count(count);
request.mutable_scheduleinfo()->set_starttimeinepochseconds(startTimeInEpochSeconds);
request.mutable_scheduleinfo()->set_periodicinseconds(periodicInSeconds);
return getStub()->ScheduleTask(&context, request, &response);
}
int64_t getNow() {
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
private:
std::condition_variable mServerStartCv;
std::mutex mLock;
std::thread mServerThread;
std::unique_ptr<MyTestWakeupClientServiceImpl> mService;
std::unique_ptr<Server> mServer;
std::shared_ptr<Channel> mChannel;
std::unique_ptr<WakeupClient::Stub> mStub;
std::vector<GetRemoteTasksResponse> mRemoteTaskResponses;
};
TEST_F(TestWakeupClientServiceImplUnitTest, TestScheduleTask) {
ClientContext context = {};
ScheduleTaskRequest request = {};
ScheduleTaskResponse response = {};
request.mutable_scheduleinfo()->set_clientid(kTestClientId);
request.mutable_scheduleinfo()->set_scheduleid(kTestScheduleId);
request.mutable_scheduleinfo()->set_data(kTestData.data(), kTestData.size());
request.mutable_scheduleinfo()->set_count(2);
// Schedule the task to be executed 1s later.
request.mutable_scheduleinfo()->set_starttimeinepochseconds(getNow() + 1);
request.mutable_scheduleinfo()->set_periodicinseconds(1);
Status status = getStub()->ScheduleTask(&context, request, &response);
ASSERT_TRUE(status.ok());
ASSERT_EQ(response.errorcode(), ErrorCode::OK);
size_t gotTaskCount = waitForRemoteTasks(/*count=*/2);
EXPECT_EQ(gotTaskCount, 2);
auto responses = getRemoteTaskResponses();
for (const auto& response : responses) {
EXPECT_EQ(response.clientid(), kTestClientId);
EXPECT_EQ(response.data(), std::string(kTestData.begin(), kTestData.end()));
}
}
TEST_F(TestWakeupClientServiceImplUnitTest, TestScheduleTask_conflictScheduleId) {
Status status = scheduleTask(/*count=*/2, /*startTimeInEpochSeconds=*/getNow() + 1,
/*periodicInSeconds=*/1);
ASSERT_TRUE(status.ok());
// Schedule the same task again.
ClientContext context = {};
ScheduleTaskRequest request = {};
ScheduleTaskResponse response = {};
request.mutable_scheduleinfo()->set_clientid(kTestClientId);
request.mutable_scheduleinfo()->set_scheduleid(kTestScheduleId);
request.mutable_scheduleinfo()->set_data(kTestData.data(), kTestData.size());
request.mutable_scheduleinfo()->set_count(2);
request.mutable_scheduleinfo()->set_starttimeinepochseconds(getNow() + 1);
request.mutable_scheduleinfo()->set_periodicinseconds(1);
status = getStub()->ScheduleTask(&context, request, &response);
ASSERT_TRUE(status.ok());
ASSERT_EQ(response.errorcode(), ErrorCode::INVALID_ARG);
}
TEST_F(TestWakeupClientServiceImplUnitTest, TestUnscheduleTask) {
Status status = scheduleTask(/*count=*/2, /*startTimeInEpochSeconds=*/getNow() + 1,
/*periodicInSeconds=*/1);
ASSERT_TRUE(status.ok());
ClientContext context;
UnscheduleTaskRequest request;
UnscheduleTaskResponse response;
request.set_clientid(kTestClientId);
request.set_scheduleid(kTestScheduleId);
status = getStub()->UnscheduleTask(&context, request, &response);
ASSERT_TRUE(status.ok());
sleep(2);
// There should be no remote tasks received after 2s because the task was unscheduled.
EXPECT_EQ(getRemoteTaskResponses().size(), 0);
}
TEST_F(TestWakeupClientServiceImplUnitTest, TestIsTaskScheduled) {
int64_t startTimeInEpochSeconds = getNow() + 1;
int64_t periodicInSeconds = 1234;
Status status = scheduleTask(/*count=*/2, startTimeInEpochSeconds, periodicInSeconds);
ASSERT_TRUE(status.ok());
ClientContext context;
IsTaskScheduledRequest request;
IsTaskScheduledResponse response;
request.set_clientid(kTestClientId);
request.set_scheduleid(kTestScheduleId);
status = getStub()->IsTaskScheduled(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_TRUE(response.istaskscheduled());
ClientContext context2;
IsTaskScheduledRequest request2;
IsTaskScheduledResponse response2;
request.set_clientid(kTestClientId);
request.set_scheduleid("invalid id");
status = getStub()->IsTaskScheduled(&context2, request2, &response2);
ASSERT_TRUE(status.ok());
EXPECT_FALSE(response2.istaskscheduled());
}
TEST_F(TestWakeupClientServiceImplUnitTest, TestUnscheduleAllTasks) {
std::string scheduleId1 = "scheduleId1";
std::string scheduleId2 = "scheduleId2";
int64_t time1 = getNow();
int64_t time2 = getNow() + 1;
int64_t periodicInSeconds1 = 1;
int64_t periodicInSeconds2 = 1;
int32_t count1 = 2;
int64_t count2 = 5;
Status status = scheduleTask(scheduleId1, count1, time1, periodicInSeconds1);
ASSERT_TRUE(status.ok());
status = scheduleTask(scheduleId2, count2, time2, periodicInSeconds2);
ASSERT_TRUE(status.ok());
ClientContext context;
UnscheduleAllTasksRequest request;
UnscheduleAllTasksResponse response;
request.set_clientid(kTestClientId);
status = getStub()->UnscheduleAllTasks(&context, request, &response);
ASSERT_TRUE(status.ok());
sleep(2);
// There should be no remote tasks received after 2s because the tasks were unscheduled.
EXPECT_EQ(getRemoteTaskResponses().size(), 0);
}
TEST_F(TestWakeupClientServiceImplUnitTest, TestGetAllScheduledTasks) {
std::string scheduleId1 = "scheduleId1";
std::string scheduleId2 = "scheduleId2";
int64_t time1 = getNow();
int64_t time2 = getNow() + 1;
int64_t periodicInSeconds1 = 1;
int64_t periodicInSeconds2 = 1;
int32_t count1 = 2;
int64_t count2 = 5;
Status status = scheduleTask(scheduleId1, count1, time1, periodicInSeconds1);
ASSERT_TRUE(status.ok());
status = scheduleTask(scheduleId2, count2, time2, periodicInSeconds2);
ASSERT_TRUE(status.ok());
ClientContext context;
GetAllScheduledTasksRequest request;
GetAllScheduledTasksResponse response;
request.set_clientid("invalid client Id");
status = getStub()->GetAllScheduledTasks(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ(response.allscheduledtasks_size(), 0);
ClientContext context2;
GetAllScheduledTasksRequest request2;
GetAllScheduledTasksResponse response2;
request2.set_clientid(kTestClientId);
status = getStub()->GetAllScheduledTasks(&context2, request2, &response2);
ASSERT_TRUE(status.ok());
ASSERT_EQ(response2.allscheduledtasks_size(), 2);
for (int i = 0; i < 2; i++) {
EXPECT_EQ(response2.allscheduledtasks(i).clientid(), kTestClientId);
if (response2.allscheduledtasks(i).scheduleid() == scheduleId1) {
EXPECT_EQ(response2.allscheduledtasks(i).data(),
std::string(kTestData.begin(), kTestData.end()));
EXPECT_EQ(response2.allscheduledtasks(i).count(), count1);
EXPECT_EQ(response2.allscheduledtasks(i).starttimeinepochseconds(), time1);
EXPECT_EQ(response2.allscheduledtasks(i).periodicinseconds(), periodicInSeconds1);
} else {
EXPECT_EQ(response2.allscheduledtasks(i).scheduleid(), scheduleId2);
EXPECT_EQ(response2.allscheduledtasks(i).data(),
std::string(kTestData.begin(), kTestData.end()));
EXPECT_EQ(response2.allscheduledtasks(i).count(), count2);
EXPECT_EQ(response2.allscheduledtasks(i).starttimeinepochseconds(), time2);
EXPECT_EQ(response2.allscheduledtasks(i).periodicinseconds(), periodicInSeconds2);
}
}
}
} // namespace android::hardware::automotive::remoteaccess::test
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}