From 614e4b5f16b41e47e557c1c5d239c4ec1e1cb8c7 Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Thu, 30 Jun 2022 21:05:11 +0000 Subject: [PATCH] audio: Add StreamWorker to aidl/common This utility class has been copied from HIDL VTS. It will be used both for the default implementation and AIDL VTS, and might need modifications. Bug: 205884982 Test: atest libaudioaidlcommon_test Merged-In: I43b35b0c23ae45305dca66e15b60820cad19635e Change-Id: I43b35b0c23ae45305dca66e15b60820cad19635e (cherry picked from commit c17f0484bcc71b4a9103d8809739167cf1fef89a) --- audio/aidl/common/Android.bp | 61 +++++ audio/aidl/common/TEST_MAPPING | 7 + audio/aidl/common/include/StreamWorker.h | 156 +++++++++++++ .../aidl/common/tests/streamworker_tests.cpp | 210 ++++++++++++++++++ 4 files changed, 434 insertions(+) create mode 100644 audio/aidl/common/Android.bp create mode 100644 audio/aidl/common/TEST_MAPPING create mode 100644 audio/aidl/common/include/StreamWorker.h create mode 100644 audio/aidl/common/tests/streamworker_tests.cpp diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp new file mode 100644 index 0000000000..6a1c4a4677 --- /dev/null +++ b/audio/aidl/common/Android.bp @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package { + // See: http://go/android-license-faq + // A large-scale-change added 'default_applicable_licenses' to import + // all of the 'license_kinds' from "hardware_interfaces_license" + // to get the below license kinds: + // SPDX-license-identifier-Apache-2.0 + default_applicable_licenses: ["hardware_interfaces_license"], +} + +cc_library_headers { + name: "libaudioaidlcommon", + host_supported: true, + vendor_available: true, + export_include_dirs: ["include"], + header_libs: [ + "libbase_headers", + ], + export_header_lib_headers: [ + "libbase_headers", + ], +} + +cc_test { + name: "libaudioaidlcommon_test", + host_supported: true, + vendor_available: true, + header_libs: [ + "libaudioaidlcommon", + ], + shared_libs: [ + "liblog", + ], + cflags: [ + "-Wall", + "-Wextra", + "-Werror", + "-Wthread-safety", + ], + srcs: [ + "tests/streamworker_tests.cpp", + ], + test_suites: [ + "general-tests", + ], +} diff --git a/audio/aidl/common/TEST_MAPPING b/audio/aidl/common/TEST_MAPPING new file mode 100644 index 0000000000..9dcf44ed7c --- /dev/null +++ b/audio/aidl/common/TEST_MAPPING @@ -0,0 +1,7 @@ +{ + "presubmit": [ + { + "name": "libaudioaidlcommon_test" + } + ] +} diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h new file mode 100644 index 0000000000..8a273dc7f2 --- /dev/null +++ b/audio/aidl/common/include/StreamWorker.h @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +#include + +template +class StreamWorker { + enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR }; + + public: + StreamWorker() = default; + ~StreamWorker() { stop(); } + bool start() { + mWorker = std::thread(&StreamWorker::workerThread, this); + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState != WorkerState::STOPPED; + }); + return mWorkerState == WorkerState::RUNNING; + } + void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } + void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } + bool hasError() { + std::lock_guard lock(mWorkerLock); + return mWorkerState == WorkerState::ERROR; + } + void stop() { + { + std::lock_guard lock(mWorkerLock); + if (mWorkerState == WorkerState::STOPPED) return; + mWorkerState = WorkerState::STOPPED; + } + if (mWorker.joinable()) { + mWorker.join(); + } + } + bool waitForAtLeastOneCycle() { + WorkerState newState; + switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); + if (newState != WorkerState::PAUSED) return false; + switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); + return newState == WorkerState::RUNNING; + } + + // Methods that need to be provided by subclasses: + // + // Called once at the beginning of the thread loop. Must return + // 'true' to enter the thread loop, otherwise the thread loop + // exits and the worker switches into the 'error' state. + // bool workerInit(); + // + // Called for each thread loop unless the thread is in 'paused' state. + // Must return 'true' to continue running, otherwise the thread loop + // exits and the worker switches into the 'error' state. + // bool workerCycle(); + + private: + void switchWorkerStateSync(WorkerState oldState, WorkerState newState, + WorkerState* finalState = nullptr) { + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + if (mWorkerState != oldState) { + if (finalState) *finalState = mWorkerState; + return; + } + mWorkerState = newState; + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState != newState; + }); + if (finalState) *finalState = mWorkerState; + } + void workerThread() { + bool success = static_cast(this)->workerInit(); + { + std::lock_guard lock(mWorkerLock); + mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR; + } + mWorkerCv.notify_one(); + if (!success) return; + + for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { + bool needToNotify = false; + if (state != WorkerState::PAUSED ? static_cast(this)->workerCycle() + : (sched_yield(), true)) { + // + // Pause and resume are synchronous. One worker cycle must complete + // before the worker indicates a state change. This is how 'mWorkerState' and + // 'state' interact: + // + // mWorkerState == RUNNING + // client sets mWorkerState := PAUSE_REQUESTED + // last workerCycle gets executed, state := mWorkerState := PAUSED by us + // (or the workers enters the 'error' state if workerCycle fails) + // client gets notified about state change in any case + // thread is doing a busy wait while 'state == PAUSED' + // client sets mWorkerState := RESUME_REQUESTED + // state := mWorkerState (RESUME_REQUESTED) + // mWorkerState := RUNNING, but we don't notify the client yet + // first workerCycle gets executed, the code below triggers a client notification + // (or if workerCycle fails, worker enters 'error' state and also notifies) + // state := mWorkerState (RUNNING) + if (state == WorkerState::RESUME_REQUESTED) { + needToNotify = true; + } + std::lock_guard lock(mWorkerLock); + state = mWorkerState; + if (mWorkerState == WorkerState::PAUSE_REQUESTED) { + state = mWorkerState = WorkerState::PAUSED; + needToNotify = true; + } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { + mWorkerState = WorkerState::RUNNING; + } + } else { + std::lock_guard lock(mWorkerLock); + if (state == WorkerState::RESUME_REQUESTED || + mWorkerState == WorkerState::PAUSE_REQUESTED) { + needToNotify = true; + } + mWorkerState = WorkerState::ERROR; + state = WorkerState::STOPPED; + } + if (needToNotify) { + mWorkerCv.notify_one(); + } + } + } + + std::thread mWorker; + std::mutex mWorkerLock; + std::condition_variable mWorkerCv; + WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED; +}; diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp new file mode 100644 index 0000000000..bb354e58d4 --- /dev/null +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include +#define LOG_TAG "StreamWorker_Test" +#include + +struct TestStream { + std::atomic error = false; +}; + +class TestWorker : public StreamWorker { + public: + // Use nullptr to test error reporting from the worker thread. + explicit TestWorker(TestStream* stream) : mStream(stream) {} + + size_t getWorkerCycles() const { return mWorkerCycles; } + bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; } + bool hasNoWorkerCycleCalled(useconds_t usec) { + const size_t cyclesBefore = mWorkerCycles; + usleep(usec); + return mWorkerCycles == cyclesBefore; + } + + bool workerInit() { return mStream; } + bool workerCycle() { + do { + mWorkerCycles++; + } while (mWorkerCycles == 0); + return !mStream->error; + } + + private: + TestStream* const mStream; + std::atomic mWorkerCycles = 0; +}; + +// The parameter specifies whether an extra call to 'stop' is made at the end. +class StreamWorkerInvalidTest : public testing::TestWithParam { + public: + StreamWorkerInvalidTest() : StreamWorkerInvalidTest(nullptr) {} + void TearDown() override { + if (GetParam()) { + worker.stop(); + } + } + + protected: + StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam(), worker(stream) {} + TestWorker worker; +}; + +TEST_P(StreamWorkerInvalidTest, Uninitialized) { + EXPECT_FALSE(worker.hasWorkerCycleCalled()); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerInvalidTest, UninitializedPauseIgnored) { + EXPECT_FALSE(worker.hasError()); + worker.pause(); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerInvalidTest, UninitializedResumeIgnored) { + EXPECT_FALSE(worker.hasError()); + worker.resume(); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerInvalidTest, Start) { + EXPECT_FALSE(worker.start()); + EXPECT_FALSE(worker.hasWorkerCycleCalled()); + EXPECT_TRUE(worker.hasError()); +} + +TEST_P(StreamWorkerInvalidTest, PauseIgnored) { + EXPECT_FALSE(worker.start()); + EXPECT_TRUE(worker.hasError()); + worker.pause(); + EXPECT_TRUE(worker.hasError()); +} + +TEST_P(StreamWorkerInvalidTest, ResumeIgnored) { + EXPECT_FALSE(worker.start()); + EXPECT_TRUE(worker.hasError()); + worker.resume(); + EXPECT_TRUE(worker.hasError()); +} + +INSTANTIATE_TEST_SUITE_P(StreamWorkerInvalid, StreamWorkerInvalidTest, testing::Bool()); + +class StreamWorkerTest : public StreamWorkerInvalidTest { + public: + StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {} + + protected: + TestStream stream; +}; + +static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000; + +TEST_P(StreamWorkerTest, Uninitialized) { + EXPECT_FALSE(worker.hasWorkerCycleCalled()); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, Start) { + ASSERT_TRUE(worker.start()); + worker.waitForAtLeastOneCycle(); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, WorkerError) { + ASSERT_TRUE(worker.start()); + stream.error = true; + worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); +} + +TEST_P(StreamWorkerTest, PauseResume) { + ASSERT_TRUE(worker.start()); + worker.waitForAtLeastOneCycle(); + EXPECT_FALSE(worker.hasError()); + worker.pause(); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); + EXPECT_FALSE(worker.hasError()); + const size_t workerCyclesBefore = worker.getWorkerCycles(); + worker.resume(); + // 'resume' is synchronous and returns after the worker has looped at least once. + EXPECT_GT(worker.getWorkerCycles(), workerCyclesBefore); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, StopPaused) { + ASSERT_TRUE(worker.start()); + worker.waitForAtLeastOneCycle(); + EXPECT_FALSE(worker.hasError()); + worker.pause(); + worker.stop(); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) { + ASSERT_TRUE(worker.start()); + stream.error = true; + worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.hasError()); + worker.pause(); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); + EXPECT_TRUE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) { + ASSERT_TRUE(worker.start()); + stream.error = true; + worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.hasError()); + worker.resume(); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); + EXPECT_TRUE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, WorkerErrorOnResume) { + ASSERT_TRUE(worker.start()); + worker.waitForAtLeastOneCycle(); + EXPECT_FALSE(worker.hasError()); + worker.pause(); + EXPECT_FALSE(worker.hasError()); + stream.error = true; + EXPECT_FALSE(worker.hasError()); + worker.resume(); + worker.waitForAtLeastOneCycle(); + EXPECT_TRUE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); +} + +TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) { + ASSERT_TRUE(worker.start()); + const size_t workerCyclesBefore = worker.getWorkerCycles(); + EXPECT_TRUE(worker.waitForAtLeastOneCycle()); + EXPECT_GT(worker.getWorkerCycles(), workerCyclesBefore); +} + +TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) { + ASSERT_TRUE(worker.start()); + stream.error = true; + EXPECT_FALSE(worker.waitForAtLeastOneCycle()); +} + +INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());