diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp index 37da9d66ec..f2d8fc2a53 100644 --- a/audio/aidl/common/Android.bp +++ b/audio/aidl/common/Android.bp @@ -23,7 +23,7 @@ package { default_applicable_licenses: ["hardware_interfaces_license"], } -cc_library_headers { +cc_library { name: "libaudioaidlcommon", host_supported: true, vendor_available: true, @@ -36,13 +36,16 @@ cc_library_headers { "libbase_headers", "libsystem_headers", ], + srcs: [ + "StreamWorker.cpp", + ], } cc_test { name: "libaudioaidlcommon_test", host_supported: true, vendor_available: true, - header_libs: [ + static_libs: [ "libaudioaidlcommon", ], shared_libs: [ diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp new file mode 100644 index 0000000000..9bca7609fd --- /dev/null +++ b/audio/aidl/common/StreamWorker.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "include/StreamWorker.h" + +namespace android::hardware::audio::common::internal { + +bool ThreadController::start(const std::string& name, int priority) { + mThreadName = name; + mThreadPriority = priority; + mWorker = std::thread(&ThreadController::workerThread, this); + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState == WorkerState::RUNNING || !mError.empty(); + }); + mWorkerStateChangeRequest = false; + return mWorkerState == WorkerState::RUNNING; +} + +void ThreadController::stop() { + { + std::lock_guard lock(mWorkerLock); + if (mWorkerState != WorkerState::STOPPED) { + mWorkerState = WorkerState::STOPPED; + mWorkerStateChangeRequest = true; + } + } + if (mWorker.joinable()) { + mWorker.join(); + } +} + +bool ThreadController::waitForAtLeastOneCycle() { + WorkerState newState; + switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); + if (newState != WorkerState::PAUSED) return false; + switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); + return newState == WorkerState::RUNNING; +} + +void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState, + WorkerState* finalState) { + std::unique_lock lock(mWorkerLock); + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + if (mWorkerState != oldState) { + if (finalState) *finalState = mWorkerState; + return; + } + mWorkerState = newState; + mWorkerStateChangeRequest = true; + mWorkerCv.wait(lock, [&]() { + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + return mWorkerState != newState; + }); + if (finalState) *finalState = mWorkerState; +} + +void ThreadController::workerThread() { + using Status = StreamLogic::Status; + + std::string error = mLogic->init(); + if (error.empty() && !mThreadName.empty()) { + std::string compliantName(mThreadName.substr(0, 15)); + if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) { + error.append("Failed to set thread name: ").append(strerror(errCode)); + } + } + if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) { + if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) { + int errCode = errno; + error.append("Failed to set thread priority: ").append(strerror(errCode)); + } + } + { + std::lock_guard lock(mWorkerLock); + mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; + mError = error; + } + mWorkerCv.notify_one(); + if (!error.empty()) return; + + for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { + bool needToNotify = false; + if (Status status = state != WorkerState::PAUSED ? mLogic->cycle() + : (sched_yield(), Status::CONTINUE); + status == Status::CONTINUE) { + { + // See https://developer.android.com/training/articles/smp#nonracing + android::base::ScopedLockAssertion lock_assertion(mWorkerLock); + if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue; + } + // + // Pause and resume are synchronous. One worker cycle must complete + // before the worker indicates a state change. This is how 'mWorkerState' and + // 'state' interact: + // + // mWorkerState == RUNNING + // client sets mWorkerState := PAUSE_REQUESTED + // last workerCycle gets executed, state := mWorkerState := PAUSED by us + // (or the workers enters the 'error' state if workerCycle fails) + // client gets notified about state change in any case + // thread is doing a busy wait while 'state == PAUSED' + // client sets mWorkerState := RESUME_REQUESTED + // state := mWorkerState (RESUME_REQUESTED) + // mWorkerState := RUNNING, but we don't notify the client yet + // first workerCycle gets executed, the code below triggers a client notification + // (or if workerCycle fails, worker enters 'error' state and also notifies) + // state := mWorkerState (RUNNING) + std::lock_guard lock(mWorkerLock); + if (state == WorkerState::RESUME_REQUESTED) { + needToNotify = true; + } + state = mWorkerState; + if (mWorkerState == WorkerState::PAUSE_REQUESTED) { + state = mWorkerState = WorkerState::PAUSED; + needToNotify = true; + } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { + mWorkerState = WorkerState::RUNNING; + } + } else { + std::lock_guard lock(mWorkerLock); + if (state == WorkerState::RESUME_REQUESTED || + mWorkerState == WorkerState::PAUSE_REQUESTED) { + needToNotify = true; + } + state = mWorkerState = WorkerState::STOPPED; + if (status == Status::ABORT) { + mError = "Received ABORT from the logic cycle"; + } + } + if (needToNotify) { + { + std::lock_guard lock(mWorkerLock); + mWorkerStateChangeRequest = false; + } + mWorkerCv.notify_one(); + } + } +} + +} // namespace android::hardware::audio::common::internal diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 03685fcf29..6260eca49a 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -16,10 +16,6 @@ #pragma once -#include -#include -#include - #include #include #include @@ -31,32 +27,18 @@ namespace android::hardware::audio::common { -template -class StreamWorker { +class StreamLogic; + +namespace internal { + +class ThreadController { enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; public: - enum class WorkerStatus { ABORT, CONTINUE, EXIT }; + explicit ThreadController(StreamLogic* logic) : mLogic(logic) {} + ~ThreadController() { stop(); } - StreamWorker() = default; - ~StreamWorker() { stop(); } - // Note that 'priority' here is what is known as the 'nice number' in *nix systems. - // The nice number is used with the default scheduler. For threads that - // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it, - // it is recommended to implement an appropriate configuration sequence within `workerInit`. - bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) { - mThreadName = name; - mThreadPriority = priority; - mWorker = std::thread(&StreamWorker::workerThread, this); - std::unique_lock lock(mWorkerLock); - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - mWorkerCv.wait(lock, [&]() { - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState == WorkerState::RUNNING || !mError.empty(); - }); - mWorkerStateChangeRequest = false; - return mWorkerState == WorkerState::RUNNING; - } + bool start(const std::string& name, int priority); void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } bool hasError() { @@ -67,150 +49,21 @@ class StreamWorker { std::lock_guard lock(mWorkerLock); return mError; } - void stop() { - { - std::lock_guard lock(mWorkerLock); - if (mWorkerState != WorkerState::STOPPED) { - mWorkerState = WorkerState::STOPPED; - mWorkerStateChangeRequest = true; - } - } - if (mWorker.joinable()) { - mWorker.join(); - } - } - bool waitForAtLeastOneCycle() { - WorkerState newState; - switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState); - if (newState != WorkerState::PAUSED) return false; - switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState); - return newState == WorkerState::RUNNING; - } + void stop(); + bool waitForAtLeastOneCycle(); + // Only used by unit tests. - void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS { + void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS { lock ? mWorkerLock.lock() : mWorkerLock.unlock(); } - std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); } - - // Methods that need to be provided by subclasses: - // - // /* Called once at the beginning of the thread loop. Must return - // * an empty string to enter the thread loop, otherwise the thread loop - // * exits and the worker switches into the 'error' state, setting - // * the error to the returned value. - // */ - // std::string workerInit(); - // - // /* Called for each thread loop unless the thread is in 'paused' state. - // * Must return 'CONTINUE' to continue running, otherwise the thread loop - // * exits. If the result from worker cycle is 'ABORT' then the worker switches - // * into the 'error' state with a generic error message. It is recommended that - // * the subclass reports any problems via logging facilities. Returning the 'EXIT' - // * status is equivalent to calling 'stop()' method. This is just a way of - // * of stopping the worker by its own initiative. - // */ - // WorkerStatus workerCycle(); + std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); } private: void switchWorkerStateSync(WorkerState oldState, WorkerState newState, - WorkerState* finalState = nullptr) { - std::unique_lock lock(mWorkerLock); - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - if (mWorkerState != oldState) { - if (finalState) *finalState = mWorkerState; - return; - } - mWorkerState = newState; - mWorkerStateChangeRequest = true; - mWorkerCv.wait(lock, [&]() { - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState != newState; - }); - if (finalState) *finalState = mWorkerState; - } - void workerThread() { - std::string error = static_cast(this)->workerInit(); - if (error.empty() && !mThreadName.empty()) { - std::string compliantName(mThreadName.substr(0, 15)); - if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); - errCode != 0) { - error.append("Failed to set thread name: ").append(strerror(errCode)); - } - } - if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) { - if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) { - int errCode = errno; - error.append("Failed to set thread priority: ").append(strerror(errCode)); - } - } - { - std::lock_guard lock(mWorkerLock); - mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; - mError = error; - } - mWorkerCv.notify_one(); - if (!error.empty()) return; - - for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) { - bool needToNotify = false; - if (WorkerStatus status = state != WorkerState::PAUSED - ? static_cast(this)->workerCycle() - : (sched_yield(), WorkerStatus::CONTINUE); - status == WorkerStatus::CONTINUE) { - { - // See https://developer.android.com/training/articles/smp#nonracing - android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue; - } - // - // Pause and resume are synchronous. One worker cycle must complete - // before the worker indicates a state change. This is how 'mWorkerState' and - // 'state' interact: - // - // mWorkerState == RUNNING - // client sets mWorkerState := PAUSE_REQUESTED - // last workerCycle gets executed, state := mWorkerState := PAUSED by us - // (or the workers enters the 'error' state if workerCycle fails) - // client gets notified about state change in any case - // thread is doing a busy wait while 'state == PAUSED' - // client sets mWorkerState := RESUME_REQUESTED - // state := mWorkerState (RESUME_REQUESTED) - // mWorkerState := RUNNING, but we don't notify the client yet - // first workerCycle gets executed, the code below triggers a client notification - // (or if workerCycle fails, worker enters 'error' state and also notifies) - // state := mWorkerState (RUNNING) - std::lock_guard lock(mWorkerLock); - if (state == WorkerState::RESUME_REQUESTED) { - needToNotify = true; - } - state = mWorkerState; - if (mWorkerState == WorkerState::PAUSE_REQUESTED) { - state = mWorkerState = WorkerState::PAUSED; - needToNotify = true; - } else if (mWorkerState == WorkerState::RESUME_REQUESTED) { - mWorkerState = WorkerState::RUNNING; - } - } else { - std::lock_guard lock(mWorkerLock); - if (state == WorkerState::RESUME_REQUESTED || - mWorkerState == WorkerState::PAUSE_REQUESTED) { - needToNotify = true; - } - state = mWorkerState = WorkerState::STOPPED; - if (status == WorkerStatus::ABORT) { - mError = "workerCycle aborted"; - } - } - if (needToNotify) { - { - std::lock_guard lock(mWorkerLock); - mWorkerStateChangeRequest = false; - } - mWorkerCv.notify_one(); - } - } - } + WorkerState* finalState = nullptr); + void workerThread(); + StreamLogic* const mLogic; std::string mThreadName; int mThreadPriority = ANDROID_PRIORITY_DEFAULT; std::thread mWorker; @@ -230,4 +83,71 @@ class StreamWorker { std::atomic mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false; }; +} // namespace internal + +class StreamLogic { + public: + friend class internal::ThreadController; + + virtual ~StreamLogic() = default; + + protected: + enum class Status { ABORT, CONTINUE, EXIT }; + + /* Called once at the beginning of the thread loop. Must return + * an empty string to enter the thread loop, otherwise the thread loop + * exits and the worker switches into the 'error' state, setting + * the error to the returned value. + */ + virtual std::string init() = 0; + + /* Called for each thread loop unless the thread is in 'paused' state. + * Must return 'CONTINUE' to continue running, otherwise the thread loop + * exits. If the result from worker cycle is 'ABORT' then the worker switches + * into the 'error' state with a generic error message. It is recommended that + * the subclass reports any problems via logging facilities. Returning the 'EXIT' + * status is equivalent to calling 'stop()' method. This is just a way of + * of stopping the worker by its own initiative. + */ + virtual Status cycle() = 0; +}; + +template +class StreamWorker : public LogicImpl { + public: + template + explicit StreamWorker(Args&&... args) : LogicImpl(std::forward(args)...), mThread(this) {} + + // Methods of LogicImpl are available via inheritance. + // Forwarded methods of ThreadController follow. + + // Note that 'priority' here is what is known as the 'nice number' in *nix systems. + // The nice number is used with the default scheduler. For threads that + // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it, + // it is recommended to implement an appropriate configuration sequence within + // 'LogicImpl' or 'StreamLogic::init'. + bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) { + return mThread.start(name, priority); + } + void pause() { mThread.pause(); } + void resume() { mThread.resume(); } + bool hasError() { return mThread.hasError(); } + std::string getError() { return mThread.getError(); } + void stop() { return mThread.stop(); } + bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); } + + // Only used by unit tests. + void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); } + std::thread::native_handle_type testGetThreadNativeHandle() { + return mThread.getThreadNativeHandle(); + } + + private: + // The ThreadController gets destroyed before LogicImpl. + // After the controller has been destroyed, it is guaranteed that + // the thread was joined, thus the 'cycle' method of LogicImpl + // will not be called anymore, and it is safe to destroy LogicImpl. + internal::ThreadController mThread; +}; + } // namespace android::hardware::audio::common diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index df81c69716..e3e484d7e3 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -26,18 +27,19 @@ #define LOG_TAG "StreamWorker_Test" #include +using android::hardware::audio::common::StreamLogic; using android::hardware::audio::common::StreamWorker; -class TestWorker : public StreamWorker { +class TestWorkerLogic : public StreamLogic { public: struct Stream { - void setErrorStatus() { status = WorkerStatus::ABORT; } - void setStopStatus() { status = WorkerStatus::EXIT; } - std::atomic status = WorkerStatus::CONTINUE; + void setErrorStatus() { status = Status::ABORT; } + void setStopStatus() { status = Status::EXIT; } + std::atomic status = Status::CONTINUE; }; // Use nullptr to test error reporting from the worker thread. - explicit TestWorker(Stream* stream) : mStream(stream) {} + explicit TestWorkerLogic(Stream* stream) : mStream(stream) {} size_t getWorkerCycles() const { return mWorkerCycles; } int getPriority() const { return mPriority; } @@ -48,8 +50,10 @@ class TestWorker : public StreamWorker { return mWorkerCycles == cyclesBefore; } - std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; } - WorkerStatus workerCycle() { + protected: + // StreamLogic implementation + std::string init() override { return mStream != nullptr ? "" : "Expected error"; } + Status cycle() override { mPriority = getpriority(PRIO_PROCESS, 0); do { mWorkerCycles++; @@ -62,6 +66,7 @@ class TestWorker : public StreamWorker { std::atomic mWorkerCycles = 0; std::atomic mPriority = ANDROID_PRIORITY_DEFAULT; }; +using TestWorker = StreamWorker; // The parameter specifies whether an extra call to 'stop' is made at the end. class StreamWorkerInvalidTest : public testing::TestWithParam { diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp index 027d92873f..07b10976f7 100644 --- a/audio/aidl/default/Android.bp +++ b/audio/aidl/default/Android.bp @@ -11,6 +11,7 @@ cc_library_static { name: "libaudioserviceexampleimpl", vendor: true, shared_libs: [ + "libaudioaidlcommon", "libbase", "libbinder_ndk", "libstagefright_foundation", diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp index 75ff37f088..1d0ec7c3f6 100644 --- a/audio/aidl/vts/Android.bp +++ b/audio/aidl/vts/Android.bp @@ -26,6 +26,7 @@ cc_test { "android.hardware.common-V2-ndk", "android.hardware.common.fmq-V1-ndk", "android.media.audio.common.types-V1-ndk", + "libaudioaidlcommon", ], test_suites: [ "general-tests",