From f429c03d49fc2254be24a1408e054d725321ed4b Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Sat, 7 Jan 2023 00:24:50 +0000 Subject: [PATCH] audio: Generalize stream implementations This allows for more code reuse and composability when implementing streams for a particular audio "backend." The existing "stub" code has been moved to StreamStub* files. Bug: 264712385 Test: atest VtsHalAudioCoreTargetTest Change-Id: I97fd41f87eb6d01e1d57f0d70a86d3b2b3555837 --- audio/aidl/default/Android.bp | 1 + audio/aidl/default/Module.cpp | 11 +- audio/aidl/default/Stream.cpp | 259 ++++++++++-------- audio/aidl/default/StreamStub.cpp | 125 +++++++++ audio/aidl/default/include/core-impl/Stream.h | 179 ++++++++---- .../default/include/core-impl/StreamStub.h | 69 +++++ 6 files changed, 475 insertions(+), 169 deletions(-) create mode 100644 audio/aidl/default/StreamStub.cpp create mode 100644 audio/aidl/default/include/core-impl/StreamStub.h diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp index 1e6785f940..248ba84a71 100644 --- a/audio/aidl/default/Android.bp +++ b/audio/aidl/default/Android.bp @@ -69,6 +69,7 @@ cc_library_static { "Module.cpp", "SoundDose.cpp", "Stream.cpp", + "StreamStub.cpp", "Telephony.cpp", ], generated_sources: [ diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp index 55e5bff1d7..acad70f52d 100644 --- a/audio/aidl/default/Module.cpp +++ b/audio/aidl/default/Module.cpp @@ -28,6 +28,7 @@ #include "core-impl/Bluetooth.h" #include "core-impl/Module.h" #include "core-impl/SoundDose.h" +#include "core-impl/StreamStub.h" #include "core-impl/Telephony.h" #include "core-impl/utils.h" @@ -552,8 +553,9 @@ ndk::ScopedAStatus Module::openInputStream(const OpenInputStreamArguments& in_ar } context.fillDescriptor(&_aidl_return->desc); std::shared_ptr stream; - if (auto status = StreamIn::createInstance(in_args.sinkMetadata, std::move(context), - mConfig->microphones, &stream); + // TODO: Add a mapping from module instance names to a corresponding 'createInstance'. + if (auto status = StreamInStub::createInstance(in_args.sinkMetadata, std::move(context), + mConfig->microphones, &stream); !status.isOk()) { return status; } @@ -606,8 +608,9 @@ ndk::ScopedAStatus Module::openOutputStream(const OpenOutputStreamArguments& in_ } context.fillDescriptor(&_aidl_return->desc); std::shared_ptr stream; - if (auto status = StreamOut::createInstance(in_args.sourceMetadata, std::move(context), - in_args.offloadInfo, &stream); + // TODO: Add a mapping from module instance names to a corresponding 'createInstance'. + if (auto status = StreamOutStub::createInstance(in_args.sourceMetadata, std::move(context), + in_args.offloadInfo, &stream); !status.isOk()) { return status; } diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp index 0520cba581..25814e445a 100644 --- a/audio/aidl/default/Stream.cpp +++ b/audio/aidl/default/Stream.cpp @@ -85,16 +85,19 @@ std::string StreamWorkerCommonLogic::init() { if (mCommandMQ == nullptr) return "Command MQ is null"; if (mReplyMQ == nullptr) return "Reply MQ is null"; if (mDataMQ == nullptr) return "Data MQ is null"; - if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) { + if (sizeof(DataBufferElement) != mDataMQ->getQuantumSize()) { return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize()); } mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize(); - mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]); + mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]); if (mDataBuffer == nullptr) { return "Failed to allocate data buffer for element count " + std::to_string(mDataMQ->getQuantumCount()) + ", size in bytes: " + std::to_string(mDataBufferSize); } + if (::android::status_t status = mDriver->init(); status != STATUS_OK) { + return "Failed to initialize the driver: " + std::to_string(status); + } return ""; } @@ -191,46 +194,59 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { } break; case Tag::drain: - if (command.get() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) { + if (const auto mode = command.get(); + mode == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) { if (mState == StreamDescriptor::State::ACTIVE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::DRAINING; + if (::android::status_t status = mDriver->drain(mode); + status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::DRAINING; + } else { + LOG(ERROR) << __func__ << ": drain failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } } else { - LOG(WARNING) << __func__ - << ": invalid drain mode: " << toString(command.get()); + LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode); } break; case Tag::standby: if (mState == StreamDescriptor::State::IDLE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::STANDBY; + if (::android::status_t status = mDriver->standby(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(ERROR) << __func__ << ": standby failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } break; case Tag::pause: if (mState == StreamDescriptor::State::ACTIVE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::PAUSED; + if (::android::status_t status = mDriver->pause(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::PAUSED; + } else { + LOG(ERROR) << __func__ << ": pause failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } break; case Tag::flush: if (mState == StreamDescriptor::State::PAUSED) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::STANDBY; + if (::android::status_t status = mDriver->flush(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(ERROR) << __func__ << ": flush failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } @@ -247,33 +263,39 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { } bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) { - // Can switch the state to ERROR if a driver error occurs. const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize}); const bool isConnected = mIsConnected; + size_t actualFrameCount = 0; bool fatal = false; - // Simulate reading of data, or provide zeroes if the stream is not connected. - for (size_t i = 0; i < byteCount; ++i) { - using buffer_type = decltype(mDataBuffer)::element_type; - constexpr int kBufferValueRange = std::numeric_limits::max() - - std::numeric_limits::min() + 1; - mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) + - std::numeric_limits::min() - : 0; + int32_t latency = Module::kLatencyMs; + if (isConnected) { + if (::android::status_t status = mDriver->transfer( + mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency); + status != ::android::OK) { + fatal = true; + LOG(ERROR) << __func__ << ": read failed: " << status; + } + } else { + usleep(3000); // Simulate blocking transfer delay. + for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0; + actualFrameCount = byteCount / mFrameSize; } - usleep(3000); // Simulate a blocking call into the driver. - // Set 'fatal = true' if a driver error occurs. - if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) { - LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ" + const size_t actualByteCount = actualFrameCount * mFrameSize; + if (bool success = + actualByteCount > 0 ? mDataMQ->write(&mDataBuffer[0], actualByteCount) : true; + success) { + LOG(DEBUG) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ" << " succeeded; connected? " << isConnected; // Frames are provided and counted regardless of connection status. - reply->fmqByteCount += byteCount; - mFrameCount += byteCount / mFrameSize; + reply->fmqByteCount += actualByteCount; + mFrameCount += actualFrameCount; populateReply(reply, isConnected); } else { - LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed"; + LOG(WARNING) << __func__ << ": writing of " << actualByteCount + << " bytes of data to MQ failed"; reply->status = STATUS_NOT_ENOUGH_DATA; } - reply->latencyMs = Module::kLatencyMs; + reply->latencyMs = latency; return !fatal; } @@ -395,17 +417,22 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { } break; case Tag::drain: - if (command.get() == StreamDescriptor::DrainMode::DRAIN_ALL || - command.get() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) { + if (const auto mode = command.get(); + mode == StreamDescriptor::DrainMode::DRAIN_ALL || + mode == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) { if (mState == StreamDescriptor::State::ACTIVE || mState == StreamDescriptor::State::TRANSFERRING) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) { - mState = StreamDescriptor::State::IDLE; + if (::android::status_t status = mDriver->drain(mode); + status == ::android::OK) { + populateReply(&reply, mIsConnected); + if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) { + mState = StreamDescriptor::State::IDLE; + } else { + switchToTransientState(StreamDescriptor::State::DRAINING); + } } else { - switchToTransientState(StreamDescriptor::State::DRAINING); + LOG(ERROR) << __func__ << ": drain failed: " << status; + mState = StreamDescriptor::State::ERROR; } } else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) { mState = StreamDescriptor::State::DRAIN_PAUSED; @@ -414,46 +441,58 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { populateReplyWrongState(&reply, command); } } else { - LOG(WARNING) << __func__ - << ": invalid drain mode: " << toString(command.get()); + LOG(WARNING) << __func__ << ": invalid drain mode: " << toString(mode); } break; case Tag::standby: if (mState == StreamDescriptor::State::IDLE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::STANDBY; + if (::android::status_t status = mDriver->standby(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(ERROR) << __func__ << ": standby failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } break; case Tag::pause: { - bool commandAccepted = true; + std::optional nextState; switch (mState) { case StreamDescriptor::State::ACTIVE: - mState = StreamDescriptor::State::PAUSED; + nextState = StreamDescriptor::State::PAUSED; break; case StreamDescriptor::State::DRAINING: - mState = StreamDescriptor::State::DRAIN_PAUSED; + nextState = StreamDescriptor::State::DRAIN_PAUSED; break; case StreamDescriptor::State::TRANSFERRING: - mState = StreamDescriptor::State::TRANSFER_PAUSED; + nextState = StreamDescriptor::State::TRANSFER_PAUSED; break; default: populateReplyWrongState(&reply, command); - commandAccepted = false; } - if (commandAccepted) { - populateReply(&reply, mIsConnected); + if (nextState.has_value()) { + if (::android::status_t status = mDriver->pause(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = nextState.value(); + } else { + LOG(ERROR) << __func__ << ": pause failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } } break; case Tag::flush: if (mState == StreamDescriptor::State::PAUSED || mState == StreamDescriptor::State::DRAIN_PAUSED || mState == StreamDescriptor::State::TRANSFER_PAUSED) { - populateReply(&reply, mIsConnected); - mState = StreamDescriptor::State::IDLE; + if (::android::status_t status = mDriver->flush(); status == ::android::OK) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::IDLE; + } else { + LOG(ERROR) << __func__ << ": flush failed: " << status; + mState = StreamDescriptor::State::ERROR; + } } else { populateReplyWrongState(&reply, command); } @@ -472,6 +511,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) { const size_t readByteCount = mDataMQ->availableToRead(); bool fatal = false; + int32_t latency = Module::kLatencyMs; if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) { const bool isConnected = mIsConnected; LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ" @@ -483,23 +523,36 @@ bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* rep // simulate partial write. byteCount -= mFrameSize; } + size_t actualFrameCount = 0; + if (isConnected) { + if (::android::status_t status = mDriver->transfer( + mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency); + status != ::android::OK) { + fatal = true; + LOG(ERROR) << __func__ << ": write failed: " << status; + } + } else { + if (mAsyncCallback == nullptr) { + usleep(3000); // Simulate blocking transfer delay. + } + actualFrameCount = byteCount / mFrameSize; + } + const size_t actualByteCount = actualFrameCount * mFrameSize; // Frames are consumed and counted regardless of the connection status. - reply->fmqByteCount += byteCount; - mFrameCount += byteCount / mFrameSize; + reply->fmqByteCount += actualByteCount; + mFrameCount += actualFrameCount; populateReply(reply, isConnected); - usleep(3000); // Simulate a blocking call into the driver. - // Set 'fatal = true' if a driver error occurs. } else { LOG(WARNING) << __func__ << ": reading of " << readByteCount << " bytes of data from MQ failed"; reply->status = STATUS_NOT_ENOUGH_DATA; } - reply->latencyMs = Module::kLatencyMs; + reply->latencyMs = latency; return !fatal; } -template -StreamCommonImpl::~StreamCommonImpl() { +template +StreamCommonImpl::~StreamCommonImpl() { if (!isClosed()) { LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak"; stopWorker(); @@ -507,8 +560,8 @@ StreamCommonImpl::~StreamCommonImpl() { } } -template -void StreamCommonImpl::createStreamCommon( +template +void StreamCommonImpl::createStreamCommon( const std::shared_ptr& delegate) { if (mCommon != nullptr) { LOG(FATAL) << __func__ << ": attempting to create the common interface twice"; @@ -518,8 +571,8 @@ void StreamCommonImpl::createStreamCommon( AIBinder_setMinSchedulerPolicy(mCommonBinder.get(), SCHED_NORMAL, ANDROID_PRIORITY_AUDIO); } -template -ndk::ScopedAStatus StreamCommonImpl::getStreamCommon( +template +ndk::ScopedAStatus StreamCommonImpl::getStreamCommon( std::shared_ptr* _aidl_return) { if (mCommon == nullptr) { LOG(FATAL) << __func__ << ": the common interface was not created"; @@ -529,31 +582,30 @@ ndk::ScopedAStatus StreamCommonImpl::getStreamCommon( return ndk::ScopedAStatus::ok(); } -template -ndk::ScopedAStatus StreamCommonImpl::updateHwAvSyncId( - int32_t in_hwAvSyncId) { +template +ndk::ScopedAStatus StreamCommonImpl::updateHwAvSyncId(int32_t in_hwAvSyncId) { LOG(DEBUG) << __func__ << ": id " << in_hwAvSyncId; return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION); } -template -ndk::ScopedAStatus StreamCommonImpl::getVendorParameters( +template +ndk::ScopedAStatus StreamCommonImpl::getVendorParameters( const std::vector& in_ids, std::vector* _aidl_return) { LOG(DEBUG) << __func__ << ": id count: " << in_ids.size(); (void)_aidl_return; return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION); } -template -ndk::ScopedAStatus StreamCommonImpl::setVendorParameters( +template +ndk::ScopedAStatus StreamCommonImpl::setVendorParameters( const std::vector& in_parameters, bool in_async) { LOG(DEBUG) << __func__ << ": parameters count " << in_parameters.size() << ", async: " << in_async; return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION); } -template -ndk::ScopedAStatus StreamCommonImpl::addEffect( +template +ndk::ScopedAStatus StreamCommonImpl::addEffect( const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) { if (in_effect == nullptr) { LOG(DEBUG) << __func__ << ": null effect"; @@ -563,8 +615,8 @@ ndk::ScopedAStatus StreamCommonImpl::addEffect( return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION); } -template -ndk::ScopedAStatus StreamCommonImpl::removeEffect( +template +ndk::ScopedAStatus StreamCommonImpl::removeEffect( const std::shared_ptr<::aidl::android::hardware::audio::effect::IEffect>& in_effect) { if (in_effect == nullptr) { LOG(DEBUG) << __func__ << ": null effect"; @@ -574,16 +626,16 @@ ndk::ScopedAStatus StreamCommonImpl::removeEffect( return ndk::ScopedAStatus::fromExceptionCode(EX_UNSUPPORTED_OPERATION); } -template -ndk::ScopedAStatus StreamCommonImpl::close() { +template +ndk::ScopedAStatus StreamCommonImpl::close() { LOG(DEBUG) << __func__; if (!isClosed()) { stopWorker(); LOG(DEBUG) << __func__ << ": joining the worker thread..."; - mWorker.stop(); + mWorker->stop(); LOG(DEBUG) << __func__ << ": worker thread joined"; mContext.reset(); - mWorker.setClosed(); + mWorker->setClosed(); return ndk::ScopedAStatus::ok(); } else { LOG(ERROR) << __func__ << ": stream was already closed"; @@ -591,8 +643,8 @@ ndk::ScopedAStatus StreamCommonImpl::close() { } } -template -void StreamCommonImpl::stopWorker() { +template +void StreamCommonImpl::stopWorker() { if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) { LOG(DEBUG) << __func__ << ": asking the worker to exit..."; auto cmd = StreamDescriptor::Command::make( @@ -608,9 +660,8 @@ void StreamCommonImpl::stopWorker() { } } -template -ndk::ScopedAStatus StreamCommonImpl::updateMetadata( - const Metadata& metadata) { +template +ndk::ScopedAStatus StreamCommonImpl::updateMetadata(const Metadata& metadata) { LOG(DEBUG) << __func__; if (!isClosed()) { mMetadata = metadata; @@ -621,16 +672,11 @@ ndk::ScopedAStatus StreamCommonImpl::updateMetadata( } // static -ndk::ScopedAStatus StreamIn::createInstance(const common::SinkMetadata& sinkMetadata, - StreamContext context, - const std::vector& microphones, - std::shared_ptr* result) { - auto stream = ndk::SharedRefBase::make(sinkMetadata, std::move(context), microphones); +ndk::ScopedAStatus StreamIn::initInstance(const std::shared_ptr& stream) { if (auto status = stream->init(); !status.isOk()) { return status; } stream->createStreamCommon(stream); - *result = std::move(stream); return ndk::ScopedAStatus::ok(); } @@ -645,8 +691,10 @@ static std::map transformMicrophones( } // namespace StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext&& context, + const DriverInterface::CreateInstance& createDriver, + const StreamWorkerInterface::CreateInstance& createWorker, const std::vector& microphones) - : StreamCommonImpl(sinkMetadata, std::move(context)), + : StreamCommonImpl(sinkMetadata, std::move(context), createDriver, createWorker), mMicrophones(transformMicrophones(microphones)) { LOG(DEBUG) << __func__; } @@ -704,23 +752,20 @@ ndk::ScopedAStatus StreamIn::setHwGain(const std::vector& in_channelGains } // static -ndk::ScopedAStatus StreamOut::createInstance(const SourceMetadata& sourceMetadata, - StreamContext context, - const std::optional& offloadInfo, - std::shared_ptr* result) { - auto stream = - ndk::SharedRefBase::make(sourceMetadata, std::move(context), offloadInfo); +ndk::ScopedAStatus StreamOut::initInstance(const std::shared_ptr& stream) { if (auto status = stream->init(); !status.isOk()) { return status; } stream->createStreamCommon(stream); - *result = std::move(stream); return ndk::ScopedAStatus::ok(); } StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext&& context, + const DriverInterface::CreateInstance& createDriver, + const StreamWorkerInterface::CreateInstance& createWorker, const std::optional& offloadInfo) - : StreamCommonImpl(sourceMetadata, std::move(context)), + : StreamCommonImpl(sourceMetadata, std::move(context), createDriver, + createWorker), mOffloadInfo(offloadInfo) { LOG(DEBUG) << __func__; } diff --git a/audio/aidl/default/StreamStub.cpp b/audio/aidl/default/StreamStub.cpp new file mode 100644 index 0000000000..544217992f --- /dev/null +++ b/audio/aidl/default/StreamStub.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "AHAL_Stream" +#include + +#include "core-impl/Module.h" +#include "core-impl/StreamStub.h" + +using aidl::android::hardware::audio::common::SinkMetadata; +using aidl::android::hardware::audio::common::SourceMetadata; +using aidl::android::media::audio::common::AudioOffloadInfo; + +namespace aidl::android::hardware::audio::core { + +DriverStub::DriverStub(const StreamContext& context, bool isInput) + : mFrameSizeBytes(context.getFrameSize()), mIsInput(isInput) {} + +::android::status_t DriverStub::init() { + usleep(1000); + return ::android::OK; +} + +::android::status_t DriverStub::drain(StreamDescriptor::DrainMode) { + usleep(1000); + return ::android::OK; +} + +::android::status_t DriverStub::flush() { + usleep(1000); + return ::android::OK; +} + +::android::status_t DriverStub::pause() { + usleep(1000); + return ::android::OK; +} + +::android::status_t DriverStub::transfer(void* buffer, size_t frameCount, size_t* actualFrameCount, + int32_t* latencyMs) { + usleep(3000); + if (mIsInput) { + uint8_t* byteBuffer = static_cast(buffer); + for (size_t i = 0; i < frameCount * mFrameSizeBytes; ++i) { + byteBuffer[i] = std::rand() % 255; + } + } + *actualFrameCount = frameCount; + *latencyMs = Module::kLatencyMs; + return ::android::OK; +} + +::android::status_t DriverStub::standby() { + usleep(1000); + return ::android::OK; +} + +// static +ndk::ScopedAStatus StreamInStub::createInstance(const SinkMetadata& sinkMetadata, + StreamContext&& context, + const std::vector& microphones, + std::shared_ptr* result) { + std::shared_ptr stream = + ndk::SharedRefBase::make(sinkMetadata, std::move(context), microphones); + if (auto status = initInstance(stream); !status.isOk()) { + return status; + } + *result = std::move(stream); + return ndk::ScopedAStatus::ok(); +} + +StreamInStub::StreamInStub(const SinkMetadata& sinkMetadata, StreamContext&& context, + const std::vector& microphones) + : StreamIn( + sinkMetadata, std::move(context), + [](const StreamContext& ctx) -> DriverInterface* { + return new DriverStub(ctx, true /*isInput*/); + }, + [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* { + // The default worker implementation is used. + return new StreamInWorker(ctx, driver); + }, + microphones) {} + +// static +ndk::ScopedAStatus StreamOutStub::createInstance(const SourceMetadata& sourceMetadata, + StreamContext&& context, + const std::optional& offloadInfo, + std::shared_ptr* result) { + std::shared_ptr stream = ndk::SharedRefBase::make( + sourceMetadata, std::move(context), offloadInfo); + if (auto status = initInstance(stream); !status.isOk()) { + return status; + } + *result = std::move(stream); + return ndk::ScopedAStatus::ok(); +} + +StreamOutStub::StreamOutStub(const SourceMetadata& sourceMetadata, StreamContext&& context, + const std::optional& offloadInfo) + : StreamOut( + sourceMetadata, std::move(context), + [](const StreamContext& ctx) -> DriverInterface* { + return new DriverStub(ctx, false /*isInput*/); + }, + [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* { + // The default worker implementation is used. + return new StreamOutWorker(ctx, driver); + }, + offloadInfo) {} + +} // namespace aidl::android::hardware::audio::core diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h index 8c60efa628..7cd4259948 100644 --- a/audio/aidl/default/include/core-impl/Stream.h +++ b/audio/aidl/default/include/core-impl/Stream.h @@ -38,6 +38,7 @@ #include #include #include +#include #include "core-impl/utils.h" @@ -145,6 +146,20 @@ class StreamContext { DebugParameters mDebugParameters; }; +struct DriverInterface { + using CreateInstance = std::function; + virtual ~DriverInterface() = default; + // This function is called once, on the main thread, before starting the worker thread. + virtual ::android::status_t init() = 0; + // All the functions below are called on the worker thread. + virtual ::android::status_t drain(StreamDescriptor::DrainMode mode) = 0; + virtual ::android::status_t flush() = 0; + virtual ::android::status_t pause() = 0; + virtual ::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount, + int32_t* latencyMs) = 0; + virtual ::android::status_t standby() = 0; +}; + class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic { public: bool isClosed() const { @@ -154,8 +169,11 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea void setIsConnected(bool connected) { mIsConnected = connected; } protected: - explicit StreamWorkerCommonLogic(const StreamContext& context) - : mInternalCommandCookie(context.getInternalCommandCookie()), + using DataBufferElement = int8_t; + + StreamWorkerCommonLogic(const StreamContext& context, DriverInterface* driver) + : mDriver(driver), + mInternalCommandCookie(context.getInternalCommandCookie()), mFrameSize(context.getFrameSize()), mCommandMQ(context.getCommandMQ()), mReplyMQ(context.getReplyMQ()), @@ -173,6 +191,7 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea mTransientStateStart = std::chrono::steady_clock::now(); } + DriverInterface* const mDriver; // Atomic fields are used both by the main and worker threads. std::atomic mIsConnected = false; static_assert(std::atomic::is_always_lock_free); @@ -180,9 +199,9 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea // All fields are used on the worker thread only. const int mInternalCommandCookie; const size_t mFrameSize; - StreamContext::CommandMQ* mCommandMQ; - StreamContext::ReplyMQ* mReplyMQ; - StreamContext::DataMQ* mDataMQ; + StreamContext::CommandMQ* const mCommandMQ; + StreamContext::ReplyMQ* const mReplyMQ; + StreamContext::DataMQ* const mDataMQ; std::shared_ptr mAsyncCallback; const std::chrono::duration mTransientStateDelayMs; std::chrono::time_point mTransientStateStart; @@ -190,15 +209,46 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea const bool mForceSynchronousDrain; // We use an array and the "size" field instead of a vector to be able to detect // memory allocation issues. - std::unique_ptr mDataBuffer; + std::unique_ptr mDataBuffer; size_t mDataBufferSize; long mFrameCount = 0; }; +// This interface is used to decouple stream implementations from a concrete StreamWorker +// implementation. +struct StreamWorkerInterface { + using CreateInstance = std::function; + virtual ~StreamWorkerInterface() = default; + virtual bool isClosed() const = 0; + virtual void setIsConnected(bool isConnected) = 0; + virtual void setClosed() = 0; + virtual bool start() = 0; + virtual void stop() = 0; +}; + +template +class StreamWorkerImpl : public StreamWorkerInterface, + public ::android::hardware::audio::common::StreamWorker { + using WorkerImpl = ::android::hardware::audio::common::StreamWorker; + + public: + StreamWorkerImpl(const StreamContext& context, DriverInterface* driver) + : WorkerImpl(context, driver) {} + bool isClosed() const override { return WorkerImpl::isClosed(); } + void setIsConnected(bool isConnected) override { WorkerImpl::setIsConnected(isConnected); } + void setClosed() override { WorkerImpl::setClosed(); } + bool start() override { + return WorkerImpl::start(WorkerImpl::kThreadName, ANDROID_PRIORITY_AUDIO); + } + void stop() override { return WorkerImpl::stop(); } +}; + class StreamInWorkerLogic : public StreamWorkerCommonLogic { public: static const std::string kThreadName; - explicit StreamInWorkerLogic(const StreamContext& context) : StreamWorkerCommonLogic(context) {} + StreamInWorkerLogic(const StreamContext& context, DriverInterface* driver) + : StreamWorkerCommonLogic(context, driver) {} protected: Status cycle() override; @@ -206,13 +256,13 @@ class StreamInWorkerLogic : public StreamWorkerCommonLogic { private: bool read(size_t clientSize, StreamDescriptor::Reply* reply); }; -using StreamInWorker = ::android::hardware::audio::common::StreamWorker; +using StreamInWorker = StreamWorkerImpl; class StreamOutWorkerLogic : public StreamWorkerCommonLogic { public: static const std::string kThreadName; - explicit StreamOutWorkerLogic(const StreamContext& context) - : StreamWorkerCommonLogic(context), mEventCallback(context.getOutEventCallback()) {} + StreamOutWorkerLogic(const StreamContext& context, DriverInterface* driver) + : StreamWorkerCommonLogic(context, driver), mEventCallback(context.getOutEventCallback()) {} protected: Status cycle() override; @@ -222,7 +272,7 @@ class StreamOutWorkerLogic : public StreamWorkerCommonLogic { std::shared_ptr mEventCallback; }; -using StreamOutWorker = ::android::hardware::audio::common::StreamWorker; +using StreamOutWorker = StreamWorkerImpl; // This provides a C++ interface with methods of the IStreamCommon Binder interface, // but intentionally does not inherit from it. This is needed to avoid inheriting @@ -294,7 +344,7 @@ class StreamCommon : public BnStreamCommon { std::weak_ptr mDelegate; }; -template +template class StreamCommonImpl : public StreamCommonInterface { public: ndk::ScopedAStatus close() override; @@ -312,21 +362,25 @@ class StreamCommonImpl : public StreamCommonInterface { ndk::ScopedAStatus getStreamCommon(std::shared_ptr* _aidl_return); ndk::ScopedAStatus init() { - return mWorker.start(StreamWorker::kThreadName, ANDROID_PRIORITY_AUDIO) - ? ndk::ScopedAStatus::ok() - : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE); + return mWorker->start() ? ndk::ScopedAStatus::ok() + : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE); } - bool isClosed() const { return mWorker.isClosed(); } + bool isClosed() const { return mWorker->isClosed(); } void setIsConnected( const std::vector<::aidl::android::media::audio::common::AudioDevice>& devices) { - mWorker.setIsConnected(!devices.empty()); + mWorker->setIsConnected(!devices.empty()); mConnectedDevices = devices; } ndk::ScopedAStatus updateMetadata(const Metadata& metadata); protected: - StreamCommonImpl(const Metadata& metadata, StreamContext&& context) - : mMetadata(metadata), mContext(std::move(context)), mWorker(mContext) {} + StreamCommonImpl(const Metadata& metadata, StreamContext&& context, + const DriverInterface::CreateInstance& createDriver, + const StreamWorkerInterface::CreateInstance& createWorker) + : mMetadata(metadata), + mContext(std::move(context)), + mDriver(createDriver(mContext)), + mWorker(createWorker(mContext, mDriver.get())) {} ~StreamCommonImpl(); void stopWorker(); void createStreamCommon(const std::shared_ptr& delegate); @@ -335,16 +389,16 @@ class StreamCommonImpl : public StreamCommonInterface { ndk::SpAIBinder mCommonBinder; Metadata mMetadata; StreamContext mContext; - StreamWorker mWorker; + std::unique_ptr mDriver; + std::unique_ptr mWorker; std::vector<::aidl::android::media::audio::common::AudioDevice> mConnectedDevices; }; -class StreamIn : public StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata, - StreamInWorker>, +class StreamIn : public StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>, public BnStreamIn { ndk::ScopedAStatus getStreamCommon(std::shared_ptr* _aidl_return) override { - return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata, - StreamInWorker>::getStreamCommon(_aidl_return); + return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>:: + getStreamCommon(_aidl_return); } ndk::ScopedAStatus getActiveMicrophones( std::vector* _aidl_return) override; @@ -354,42 +408,46 @@ class StreamIn : public StreamCommonImpl<::aidl::android::hardware::audio::commo ndk::ScopedAStatus setMicrophoneFieldDimension(float in_zoom) override; ndk::ScopedAStatus updateMetadata(const ::aidl::android::hardware::audio::common::SinkMetadata& in_sinkMetadata) override { - return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata, - StreamInWorker>::updateMetadata(in_sinkMetadata); + return StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata>:: + updateMetadata(in_sinkMetadata); } ndk::ScopedAStatus getHwGain(std::vector* _aidl_return) override; ndk::ScopedAStatus setHwGain(const std::vector& in_channelGains) override; - public: - static ndk::ScopedAStatus createInstance( - const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata, - StreamContext context, const std::vector& microphones, - std::shared_ptr* result); - - private: + protected: friend class ndk::SharedRefBase; + + static ndk::ScopedAStatus initInstance(const std::shared_ptr& stream); + StreamIn(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata, - StreamContext&& context, const std::vector& microphones); + StreamContext&& context, const DriverInterface::CreateInstance& createDriver, + const StreamWorkerInterface::CreateInstance& createWorker, + const std::vector& microphones); void createStreamCommon(const std::shared_ptr& myPtr) { - StreamCommonImpl<::aidl::android::hardware::audio::common::SinkMetadata, - StreamInWorker>::createStreamCommon(myPtr); + StreamCommonImpl< + ::aidl::android::hardware::audio::common::SinkMetadata>::createStreamCommon(myPtr); } const std::map<::aidl::android::media::audio::common::AudioDevice, std::string> mMicrophones; + + public: + using CreateInstance = std::function& microphones, + std::shared_ptr* result)>; }; -class StreamOut : public StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata, - StreamOutWorker>, +class StreamOut : public StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>, public BnStreamOut { ndk::ScopedAStatus getStreamCommon(std::shared_ptr* _aidl_return) override { - return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata, - StreamOutWorker>::getStreamCommon(_aidl_return); + return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>:: + getStreamCommon(_aidl_return); } ndk::ScopedAStatus updateMetadata( const ::aidl::android::hardware::audio::common::SourceMetadata& in_sourceMetadata) override { - return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata, - StreamOutWorker>::updateMetadata(in_sourceMetadata); + return StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>:: + updateMetadata(in_sourceMetadata); } ndk::ScopedAStatus getHwVolume(std::vector* _aidl_return) override; ndk::ScopedAStatus setHwVolume(const std::vector& in_channelVolumes) override; @@ -411,26 +469,31 @@ class StreamOut : public StreamCommonImpl<::aidl::android::hardware::audio::comm override; ndk::ScopedAStatus selectPresentation(int32_t in_presentationId, int32_t in_programId) override; - public: - static ndk::ScopedAStatus createInstance( - const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata, - StreamContext context, - const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>& - offloadInfo, - std::shared_ptr* result); - - private: - friend class ndk::SharedRefBase; - StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata, - StreamContext&& context, - const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>& - offloadInfo); void createStreamCommon(const std::shared_ptr& myPtr) { - StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata, - StreamOutWorker>::createStreamCommon(myPtr); + StreamCommonImpl<::aidl::android::hardware::audio::common::SourceMetadata>:: + createStreamCommon(myPtr); } + protected: + friend class ndk::SharedRefBase; + + static ndk::ScopedAStatus initInstance(const std::shared_ptr& stream); + + StreamOut(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata, + StreamContext&& context, const DriverInterface::CreateInstance& createDriver, + const StreamWorkerInterface::CreateInstance& createWorker, + const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>& + offloadInfo); + std::optional<::aidl::android::media::audio::common::AudioOffloadInfo> mOffloadInfo; + + public: + using CreateInstance = std::function& + offloadInfo, + std::shared_ptr* result)>; }; class StreamWrapper { diff --git a/audio/aidl/default/include/core-impl/StreamStub.h b/audio/aidl/default/include/core-impl/StreamStub.h new file mode 100644 index 0000000000..98a062a02e --- /dev/null +++ b/audio/aidl/default/include/core-impl/StreamStub.h @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core-impl/Stream.h" + +namespace aidl::android::hardware::audio::core { + +class DriverStub : public DriverInterface { + public: + DriverStub(const StreamContext& context, bool isInput); + ::android::status_t init() override; + ::android::status_t drain(StreamDescriptor::DrainMode) override; + ::android::status_t flush() override; + ::android::status_t pause() override; + ::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount, + int32_t* latencyMs) override; + ::android::status_t standby() override; + + private: + const size_t mFrameSizeBytes; + const bool mIsInput; +}; + +class StreamInStub final : public StreamIn { + public: + static ndk::ScopedAStatus createInstance( + const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata, + StreamContext&& context, const std::vector& microphones, + std::shared_ptr* result); + + private: + friend class ndk::SharedRefBase; + StreamInStub(const ::aidl::android::hardware::audio::common::SinkMetadata& sinkMetadata, + StreamContext&& context, const std::vector& microphones); +}; + +class StreamOutStub final : public StreamOut { + public: + static ndk::ScopedAStatus createInstance( + const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata, + StreamContext&& context, + const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>& + offloadInfo, + std::shared_ptr* result); + + private: + friend class ndk::SharedRefBase; + StreamOutStub(const ::aidl::android::hardware::audio::common::SourceMetadata& sourceMetadata, + StreamContext&& context, + const std::optional<::aidl::android::media::audio::common::AudioOffloadInfo>& + offloadInfo); +}; + +} // namespace aidl::android::hardware::audio::core