diff --git a/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h b/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h index 94404a1c7e..21592b3610 100644 --- a/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h +++ b/audio/aidl/default/include/core-impl/StreamRemoteSubmix.h @@ -46,7 +46,7 @@ class StreamRemoteSubmix : public StreamCommonImpl { ndk::ScopedAStatus prepareToClose() override; private: - size_t getPipeSizeInFrames(); + long getDelayInUsForFrameCount(size_t frameCount); size_t getStreamPipeSizeInFrames(); ::android::status_t outWrite(void* buffer, size_t frameCount, size_t* actualFrameCount); ::android::status_t inRead(void* buffer, size_t frameCount, size_t* actualFrameCount); diff --git a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp index 9c9c08b048..38281b9d5f 100644 --- a/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp +++ b/audio/aidl/default/r_submix/StreamRemoteSubmix.cpp @@ -17,8 +17,6 @@ #define LOG_TAG "AHAL_StreamRemoteSubmix" #include -#include - #include "core-impl/StreamRemoteSubmix.h" using aidl::android::hardware::audio::common::SinkMetadata; @@ -158,27 +156,8 @@ void StreamRemoteSubmix::shutdown() { ::android::status_t StreamRemoteSubmix::transfer(void* buffer, size_t frameCount, size_t* actualFrameCount, int32_t* latencyMs) { - *latencyMs = (getStreamPipeSizeInFrames() * MILLIS_PER_SECOND) / mStreamConfig.sampleRate; + *latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000; LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms"; - - sp sink = mCurrentRoute->getSink(); - if (sink != nullptr) { - if (sink->isShutdown()) { - sink.clear(); - LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the transfer."; - // the pipe has already been shutdown, this buffer will be lost but we must simulate - // timing so we don't drain the output faster than realtime - const size_t delayUs = static_cast( - std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate)); - usleep(delayUs); - - *actualFrameCount = frameCount; - return ::android::OK; - } - } else { - LOG(ERROR) << __func__ << ": transfer without a pipe!"; - return ::android::UNEXPECTED_NULL; - } mCurrentRoute->exitStandby(mIsInput); return (mIsInput ? inRead(buffer, frameCount, actualFrameCount) : outWrite(buffer, frameCount, actualFrameCount)); @@ -202,6 +181,10 @@ void StreamRemoteSubmix::shutdown() { return ::android::OK; } +long StreamRemoteSubmix::getDelayInUsForFrameCount(size_t frameCount) { + return frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate; +} + // Calculate the maximum size of the pipe buffer in frames for the specified stream. size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { auto pipeConfig = mCurrentRoute->mPipeConfig; @@ -215,11 +198,11 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { if (sink != nullptr) { if (sink->isShutdown()) { sink.clear(); - LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the write."; + const auto delayUs = getDelayInUsForFrameCount(frameCount); + LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for " + << delayUs << " us"; // the pipe has already been shutdown, this buffer will be lost but we must // simulate timing so we don't drain the output faster than realtime - const size_t delayUs = static_cast( - std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate)); usleep(delayUs); *actualFrameCount = frameCount; return ::android::OK; @@ -229,17 +212,18 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { return ::android::UNKNOWN_ERROR; } - const size_t availableToWrite = sink->availableToWrite(); + const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite(); + size_t availableToWrite = sink->availableToWrite(); // NOTE: sink has been checked above and sink and source life cycles are synchronized sp source = mCurrentRoute->getSource(); // If the write to the sink should be blocked, flush enough frames from the pipe to make space // to write the most recent data. - if (!mCurrentRoute->shouldBlockWrite() && availableToWrite < frameCount) { + if (!shouldBlockWrite && availableToWrite < frameCount) { static uint8_t flushBuffer[64]; const size_t flushBufferSizeFrames = sizeof(flushBuffer) / mStreamConfig.frameSize; size_t framesToFlushFromSource = frameCount - availableToWrite; - LOG(VERBOSE) << __func__ << ": flushing " << framesToFlushFromSource - << " frames from the pipe to avoid blocking"; + LOG(DEBUG) << __func__ << ": flushing " << framesToFlushFromSource + << " frames from the pipe to avoid blocking"; while (framesToFlushFromSource) { const size_t flushSize = std::min(framesToFlushFromSource, flushBufferSizeFrames); framesToFlushFromSource -= flushSize; @@ -247,7 +231,12 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { source->read(flushBuffer, flushSize); } } + availableToWrite = sink->availableToWrite(); + if (!shouldBlockWrite && frameCount > availableToWrite) { + // Truncate the request to avoid blocking. + frameCount = availableToWrite; + } ssize_t writtenFrames = sink->write(buffer, frameCount); if (writtenFrames < 0) { if (writtenFrames == (ssize_t)::android::NEGOTIATE) { @@ -261,7 +250,6 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { writtenFrames = sink->write(buffer, frameCount); } } - sink.clear(); if (writtenFrames < 0) { LOG(ERROR) << __func__ << ": failed writing to pipe with " << writtenFrames; @@ -286,8 +274,9 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { } else { LOG(ERROR) << __func__ << ": Read errors " << readErrorCount; } - const size_t delayUs = static_cast( - std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate)); + const auto delayUs = getDelayInUsForFrameCount(frameCount); + LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs + << " us"; usleep(delayUs); memset(buffer, 0, mStreamConfig.frameSize * frameCount); *actualFrameCount = frameCount; @@ -296,7 +285,7 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { // read the data from the pipe int attempts = 0; - const size_t delayUs = static_cast(std::roundf(kReadAttemptSleepUs)); + const long delayUs = kReadAttemptSleepUs; char* buff = (char*)buffer; size_t remainingFrames = frameCount; int availableToRead = source->availableToRead(); @@ -313,11 +302,12 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { buff += framesRead * mStreamConfig.frameSize; availableToRead -= framesRead; LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead - << " frames, remaining=" << remainingFrames; + << " frames, remaining =" << remainingFrames; } else { attempts++; LOG(WARNING) << __func__ << ": read returned " << framesRead - << " , read failure attempts = " << attempts; + << " , read failure attempts = " << attempts << ", sleeping for " + << delayUs << " us"; usleep(delayUs); } } @@ -337,18 +327,18 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() { // compute how much we need to sleep after reading the data by comparing the wall clock with // the projected time at which we should return. // wall clock after reading from the pipe - auto recordDurationUs = std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime(); + auto recordDurationUs = std::chrono::duration_cast( + std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime()); // readCounterFrames contains the number of frames that have been read since the beginning of // recording (including this call): it's converted to usec and compared to how long we've been // recording for, which gives us how long we must wait to sync the projected recording time, and // the observed recording time. - const int projectedVsObservedOffsetUs = - std::roundf((readCounterFrames * MICROS_PER_SECOND / mStreamConfig.sampleRate) - - recordDurationUs.count()); + const long projectedVsObservedOffsetUs = + getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count(); LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count() - << " microseconds, will wait: " << projectedVsObservedOffsetUs << " microseconds"; + << " us, will wait: " << projectedVsObservedOffsetUs << " us"; if (projectedVsObservedOffsetUs > 0) { usleep(projectedVsObservedOffsetUs); } diff --git a/audio/aidl/default/r_submix/SubmixRoute.h b/audio/aidl/default/r_submix/SubmixRoute.h index 1a98df2340..1fe9ea2181 100644 --- a/audio/aidl/default/r_submix/SubmixRoute.h +++ b/audio/aidl/default/r_submix/SubmixRoute.h @@ -14,16 +14,19 @@ * limitations under the License. */ +#pragma once + +#include #include +#include #include #include #include #include - -#include "core-impl/Stream.h" +#include using aidl::android::media::audio::common::AudioChannelLayout; using aidl::android::media::audio::common::AudioFormatDescription; diff --git a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp index 53e51f427c..d0fc4a498a 100644 --- a/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp +++ b/audio/aidl/vts/VtsHalAudioCoreModuleTargetTest.cpp @@ -1112,6 +1112,7 @@ class DefaultStreamCallback : public ::aidl::android::hardware::audio::core::BnS template struct IOTraits { static constexpr bool is_input = std::is_same_v; + static constexpr const char* directionStr = is_input ? "input" : "output"; using Worker = std::conditional_t; }; @@ -4289,22 +4290,41 @@ class WithRemoteSubmix { ASSERT_NO_FATAL_FAILURE(SetUpPortConnection(module, moduleConfig)); SetUp(module, moduleConfig, mConnectedPort->get()); } - void sendBurstCommands() { - const StreamContext* context = mStream->getContext(); - StreamLogicDefaultDriver driver(makeBurstCommands(true), context->getFrameSizeBytes()); - typename IOTraits::Worker worker(*context, &driver, mStream->getEventReceiver()); - LOG(DEBUG) << __func__ << ": starting worker..."; - ASSERT_TRUE(worker.start()); - LOG(DEBUG) << __func__ << ": joining worker..."; - worker.join(); - EXPECT_FALSE(worker.hasError()) << worker.getError(); - EXPECT_EQ("", driver.getUnexpectedStateTransition()); - if (IOTraits::is_input) { - EXPECT_TRUE(driver.hasObservablePositionIncrease()); - } - EXPECT_FALSE(driver.hasRetrogradeObservablePosition()); + void sendBurstCommandsStartWorker() { + const StreamContext* context = mStream->getContext(); + mWorkerDriver = std::make_unique(makeBurstCommands(true), + context->getFrameSizeBytes()); + mWorker = std::make_unique::Worker>(*context, mWorkerDriver.get(), + mStream->getEventReceiver()); + + LOG(DEBUG) << __func__ << ": starting " << IOTraits::directionStr << " worker..."; + ASSERT_TRUE(mWorker->start()); } + + void sendBurstCommandsJoinWorker() { + // Must call 'prepareToClose' before attempting to join because the stream may be + // stuck due to absence of activity from the other side of the remote submix pipe. + std::shared_ptr common; + ASSERT_IS_OK(mStream->get()->getStreamCommon(&common)); + ASSERT_IS_OK(common->prepareToClose()); + LOG(DEBUG) << __func__ << ": joining " << IOTraits::directionStr << " worker..."; + mWorker->join(); + EXPECT_FALSE(mWorker->hasError()) << mWorker->getError(); + EXPECT_EQ("", mWorkerDriver->getUnexpectedStateTransition()); + if (IOTraits::is_input) { + EXPECT_TRUE(mWorkerDriver->hasObservablePositionIncrease()); + } + EXPECT_FALSE(mWorkerDriver->hasRetrogradeObservablePosition()); + mWorker.reset(); + mWorkerDriver.reset(); + } + + void sendBurstCommands() { + ASSERT_NO_FATAL_FAILURE(sendBurstCommandsStartWorker()); + ASSERT_NO_FATAL_FAILURE(sendBurstCommandsJoinWorker()); + } + bool skipTest() const { return mSkipTest; } private: @@ -4337,6 +4357,8 @@ class WithRemoteSubmix { std::unique_ptr mConnectedPort; std::unique_ptr mPatch; std::unique_ptr> mStream; + std::unique_ptr mWorkerDriver; + std::unique_ptr::Worker> mWorker; }; class AudioModuleRemoteSubmix : public AudioCoreModule { @@ -4350,18 +4372,15 @@ class AudioModuleRemoteSubmix : public AudioCoreModule { }; TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenNoInput) { - // open output stream WithRemoteSubmix streamOut; ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get())); if (streamOut.skipTest()) { GTEST_SKIP() << "No mix port for attached devices"; } - // write something to stream ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands()); } TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) { - // open output stream WithRemoteSubmix streamOut; ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get())); if (streamOut.skipTest()) { @@ -4370,19 +4389,16 @@ TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) { auto address = streamOut.getAudioDeviceAddress(); ASSERT_TRUE(address.has_value()); - // open input stream WithRemoteSubmix streamIn(address.value()); ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get())); if (streamIn.skipTest()) { GTEST_SKIP() << "No mix port for attached devices"; } - // write something to stream ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands()); } TEST_P(AudioModuleRemoteSubmix, OutputAndInput) { - // open output stream WithRemoteSubmix streamOut; ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get())); if (streamOut.skipTest()) { @@ -4391,21 +4407,20 @@ TEST_P(AudioModuleRemoteSubmix, OutputAndInput) { auto address = streamOut.getAudioDeviceAddress(); ASSERT_TRUE(address.has_value()); - // open input stream WithRemoteSubmix streamIn(address.value()); ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get())); if (streamIn.skipTest()) { GTEST_SKIP() << "No mix port for attached devices"; } - // write something to stream - ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands()); - // read from input stream + // Start writing into the output stream. + ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsStartWorker()); + // Simultaneously, read from the input stream. ASSERT_NO_FATAL_FAILURE(streamIn.sendBurstCommands()); + ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsJoinWorker()); } TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) { - // open output stream WithRemoteSubmix streamOut; ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get())); if (streamOut.skipTest()) { @@ -4414,14 +4429,13 @@ TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) { auto address = streamOut.getAudioDeviceAddress(); ASSERT_TRUE(address.has_value()); - // connect remote submix input device port + // Connect remote submix input device port. auto port = WithRemoteSubmix::getRemoteSubmixAudioPort(moduleConfig.get(), address.value()); ASSERT_TRUE(port.has_value()) << "Device AudioPort for remote submix not found"; WithDevicePortConnectedState connectedInputPort(port.value()); ASSERT_NO_FATAL_FAILURE(connectedInputPort.SetUp(module.get(), moduleConfig.get())); - // open input streams const int streamInCount = 3; std::vector>> streamIns(streamInCount); for (int i = 0; i < streamInCount; i++) { @@ -4432,13 +4446,16 @@ TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) { GTEST_SKIP() << "No mix port for attached devices"; } } - // write something to output stream - ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands()); - - // read from input streams + // Start writing into the output stream. + ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsStartWorker()); + // Simultaneously, read from input streams. for (int i = 0; i < streamInCount; i++) { - ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommands()); + ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommandsStartWorker()); } + for (int i = 0; i < streamInCount; i++) { + ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommandsJoinWorker()); + } + ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsJoinWorker()); } INSTANTIATE_TEST_SUITE_P(AudioModuleRemoteSubmixTest, AudioModuleRemoteSubmix,