audio: Move frame counter to StreamContext

Moving frame counter to the StreamContext class enables
switching stream drivers on the fly while keeping the frame
count monotonically increasing.

StreamWorkerCommonLogic now holds a pointer to StreamContext,
which makes redundant storing copies of the fields of the latter.

Bug: 264712385
Test: atest VtsHalAudioCoreTargetTest
Change-Id: If6716f4051c484b52927cbfe4032df7c907eb3a5
This commit is contained in:
Mikhail Naganov
2023-07-21 17:45:28 -07:00
parent 780fefb331
commit 1eedc130e8
10 changed files with 93 additions and 89 deletions

View File

@@ -91,17 +91,18 @@ void StreamContext::reset() {
} }
std::string StreamWorkerCommonLogic::init() { std::string StreamWorkerCommonLogic::init() {
if (mCommandMQ == nullptr) return "Command MQ is null"; if (mContext->getCommandMQ() == nullptr) return "Command MQ is null";
if (mReplyMQ == nullptr) return "Reply MQ is null"; if (mContext->getReplyMQ() == nullptr) return "Reply MQ is null";
if (mDataMQ == nullptr) return "Data MQ is null"; StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
if (sizeof(DataBufferElement) != mDataMQ->getQuantumSize()) { if (dataMQ == nullptr) return "Data MQ is null";
return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize()); if (sizeof(DataBufferElement) != dataMQ->getQuantumSize()) {
return "Unexpected Data MQ quantum size: " + std::to_string(dataMQ->getQuantumSize());
} }
mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize(); mDataBufferSize = dataMQ->getQuantumCount() * dataMQ->getQuantumSize();
mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]); mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]);
if (mDataBuffer == nullptr) { if (mDataBuffer == nullptr) {
return "Failed to allocate data buffer for element count " + return "Failed to allocate data buffer for element count " +
std::to_string(mDataMQ->getQuantumCount()) + std::to_string(dataMQ->getQuantumCount()) +
", size in bytes: " + std::to_string(mDataBufferSize); ", size in bytes: " + std::to_string(mDataBufferSize);
} }
if (::android::status_t status = mDriver->init(); status != STATUS_OK) { if (::android::status_t status = mDriver->init(); status != STATUS_OK) {
@@ -114,7 +115,7 @@ void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
bool isConnected) const { bool isConnected) const {
reply->status = STATUS_OK; reply->status = STATUS_OK;
if (isConnected) { if (isConnected) {
reply->observable.frames = mFrameCount; reply->observable.frames = mContext->getFrameCount();
reply->observable.timeNs = ::android::elapsedRealtimeNano(); reply->observable.timeNs = ::android::elapsedRealtimeNano();
if (auto status = mDriver->getPosition(&reply->observable); status == ::android::OK) { if (auto status = mDriver->getPosition(&reply->observable); status == ::android::OK) {
return; return;
@@ -141,7 +142,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
// TODO: Add a delay for transitions of async operations when/if they added. // TODO: Add a delay for transitions of async operations when/if they added.
StreamDescriptor::Command command{}; StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) { if (!mContext->getCommandMQ()->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed"; LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR; mState = StreamDescriptor::State::ERROR;
return Status::ABORT; return Status::ABORT;
@@ -159,7 +160,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
switch (command.getTag()) { switch (command.getTag()) {
case Tag::halReservedExit: case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>(); if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) { cookie == mContext->getInternalCommandCookie()) {
mDriver->shutdown(); mDriver->shutdown();
setClosed(); setClosed();
// This is an internal command, no need to reply. // This is an internal command, no need to reply.
@@ -277,7 +278,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
} }
reply.state = mState; reply.state = mState;
LOG(severity) << __func__ << ": writing reply " << reply.toString(); LOG(severity) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) { if (!mContext->getReplyMQ()->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR; mState = StreamDescriptor::State::ERROR;
return Status::ABORT; return Status::ABORT;
@@ -286,14 +287,16 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
} }
bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) { bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize}); StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
const size_t byteCount = std::min({clientSize, dataMQ->availableToWrite(), mDataBufferSize});
const bool isConnected = mIsConnected; const bool isConnected = mIsConnected;
const size_t frameSize = mContext->getFrameSize();
size_t actualFrameCount = 0; size_t actualFrameCount = 0;
bool fatal = false; bool fatal = false;
int32_t latency = Module::kLatencyMs; int32_t latency = Module::kLatencyMs;
if (isConnected) { if (isConnected) {
if (::android::status_t status = mDriver->transfer( if (::android::status_t status = mDriver->transfer(mDataBuffer.get(), byteCount / frameSize,
mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency); &actualFrameCount, &latency);
status != ::android::OK) { status != ::android::OK) {
fatal = true; fatal = true;
LOG(ERROR) << __func__ << ": read failed: " << status; LOG(ERROR) << __func__ << ": read failed: " << status;
@@ -301,17 +304,16 @@ bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply
} else { } else {
usleep(3000); // Simulate blocking transfer delay. usleep(3000); // Simulate blocking transfer delay.
for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0; for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0;
actualFrameCount = byteCount / mFrameSize; actualFrameCount = byteCount / frameSize;
} }
const size_t actualByteCount = actualFrameCount * mFrameSize; const size_t actualByteCount = actualFrameCount * frameSize;
if (bool success = if (bool success = actualByteCount > 0 ? dataMQ->write(&mDataBuffer[0], actualByteCount) : true;
actualByteCount > 0 ? mDataMQ->write(&mDataBuffer[0], actualByteCount) : true;
success) { success) {
LOG(VERBOSE) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ" LOG(VERBOSE) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ"
<< " succeeded; connected? " << isConnected; << " succeeded; connected? " << isConnected;
// Frames are provided and counted regardless of connection status. // Frames are provided and counted regardless of connection status.
reply->fmqByteCount += actualByteCount; reply->fmqByteCount += actualByteCount;
mFrameCount += actualFrameCount; mContext->advanceFrameCount(actualFrameCount);
populateReply(reply, isConnected); populateReply(reply, isConnected);
} else { } else {
LOG(WARNING) << __func__ << ": writing of " << actualByteCount LOG(WARNING) << __func__ << ": writing of " << actualByteCount
@@ -330,7 +332,8 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>( if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - mTransientStateStart); std::chrono::steady_clock::now() - mTransientStateStart);
stateDurationMs >= mTransientStateDelayMs) { stateDurationMs >= mTransientStateDelayMs) {
if (mAsyncCallback == nullptr) { std::shared_ptr<IStreamCallback> asyncCallback = mContext->getAsyncCallback();
if (asyncCallback == nullptr) {
// In blocking mode, mState can only be DRAINING. // In blocking mode, mState can only be DRAINING.
mState = StreamDescriptor::State::IDLE; mState = StreamDescriptor::State::IDLE;
} else { } else {
@@ -338,13 +341,13 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
// drain or transfer completion. In the stub, we switch unconditionally. // drain or transfer completion. In the stub, we switch unconditionally.
if (mState == StreamDescriptor::State::DRAINING) { if (mState == StreamDescriptor::State::DRAINING) {
mState = StreamDescriptor::State::IDLE; mState = StreamDescriptor::State::IDLE;
ndk::ScopedAStatus status = mAsyncCallback->onDrainReady(); ndk::ScopedAStatus status = asyncCallback->onDrainReady();
if (!status.isOk()) { if (!status.isOk()) {
LOG(ERROR) << __func__ << ": error from onDrainReady: " << status; LOG(ERROR) << __func__ << ": error from onDrainReady: " << status;
} }
} else { } else {
mState = StreamDescriptor::State::ACTIVE; mState = StreamDescriptor::State::ACTIVE;
ndk::ScopedAStatus status = mAsyncCallback->onTransferReady(); ndk::ScopedAStatus status = asyncCallback->onTransferReady();
if (!status.isOk()) { if (!status.isOk()) {
LOG(ERROR) << __func__ << ": error from onTransferReady: " << status; LOG(ERROR) << __func__ << ": error from onTransferReady: " << status;
} }
@@ -358,7 +361,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
} }
StreamDescriptor::Command command{}; StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) { if (!mContext->getCommandMQ()->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed"; LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR; mState = StreamDescriptor::State::ERROR;
return Status::ABORT; return Status::ABORT;
@@ -377,7 +380,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
switch (command.getTag()) { switch (command.getTag()) {
case Tag::halReservedExit: case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>(); if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) { cookie == mContext->getInternalCommandCookie()) {
mDriver->shutdown(); mDriver->shutdown();
setClosed(); setClosed();
// This is an internal command, no need to reply. // This is an internal command, no need to reply.
@@ -432,10 +435,11 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
if (!write(fmqByteCount, &reply)) { if (!write(fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR; mState = StreamDescriptor::State::ERROR;
} }
std::shared_ptr<IStreamCallback> asyncCallback = mContext->getAsyncCallback();
if (mState == StreamDescriptor::State::STANDBY || if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAIN_PAUSED || mState == StreamDescriptor::State::DRAIN_PAUSED ||
mState == StreamDescriptor::State::PAUSED) { mState == StreamDescriptor::State::PAUSED) {
if (mAsyncCallback == nullptr || if (asyncCallback == nullptr ||
mState != StreamDescriptor::State::DRAIN_PAUSED) { mState != StreamDescriptor::State::DRAIN_PAUSED) {
mState = StreamDescriptor::State::PAUSED; mState = StreamDescriptor::State::PAUSED;
} else { } else {
@@ -444,7 +448,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
} else if (mState == StreamDescriptor::State::IDLE || } else if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::DRAINING || mState == StreamDescriptor::State::DRAINING ||
mState == StreamDescriptor::State::ACTIVE) { mState == StreamDescriptor::State::ACTIVE) {
if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) { if (asyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
mState = StreamDescriptor::State::ACTIVE; mState = StreamDescriptor::State::ACTIVE;
} else { } else {
switchToTransientState(StreamDescriptor::State::TRANSFERRING); switchToTransientState(StreamDescriptor::State::TRANSFERRING);
@@ -466,7 +470,8 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
if (::android::status_t status = mDriver->drain(mode); if (::android::status_t status = mDriver->drain(mode);
status == ::android::OK) { status == ::android::OK) {
populateReply(&reply, mIsConnected); populateReply(&reply, mIsConnected);
if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) { if (mState == StreamDescriptor::State::ACTIVE &&
mContext->getForceSynchronousDrain()) {
mState = StreamDescriptor::State::IDLE; mState = StreamDescriptor::State::IDLE;
} else { } else {
switchToTransientState(StreamDescriptor::State::DRAINING); switchToTransientState(StreamDescriptor::State::DRAINING);
@@ -541,7 +546,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
} }
reply.state = mState; reply.state = mState;
LOG(severity) << __func__ << ": writing reply " << reply.toString(); LOG(severity) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) { if (!mContext->getReplyMQ()->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR; mState = StreamDescriptor::State::ERROR;
return Status::ABORT; return Status::ABORT;
@@ -550,38 +555,40 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
} }
bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) { bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
const size_t readByteCount = mDataMQ->availableToRead(); StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
const size_t readByteCount = dataMQ->availableToRead();
const size_t frameSize = mContext->getFrameSize();
bool fatal = false; bool fatal = false;
int32_t latency = Module::kLatencyMs; int32_t latency = Module::kLatencyMs;
if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) { if (bool success = readByteCount > 0 ? dataMQ->read(&mDataBuffer[0], readByteCount) : true) {
const bool isConnected = mIsConnected; const bool isConnected = mIsConnected;
LOG(VERBOSE) << __func__ << ": reading of " << readByteCount << " bytes from data MQ" LOG(VERBOSE) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
<< " succeeded; connected? " << isConnected; << " succeeded; connected? " << isConnected;
// Amount of data that the HAL module is going to actually use. // Amount of data that the HAL module is going to actually use.
size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize}); size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
if (byteCount >= mFrameSize && mForceTransientBurst) { if (byteCount >= frameSize && mContext->getForceTransientBurst()) {
// In order to prevent the state machine from going to ACTIVE state, // In order to prevent the state machine from going to ACTIVE state,
// simulate partial write. // simulate partial write.
byteCount -= mFrameSize; byteCount -= frameSize;
} }
size_t actualFrameCount = 0; size_t actualFrameCount = 0;
if (isConnected) { if (isConnected) {
if (::android::status_t status = mDriver->transfer( if (::android::status_t status = mDriver->transfer(
mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency); mDataBuffer.get(), byteCount / frameSize, &actualFrameCount, &latency);
status != ::android::OK) { status != ::android::OK) {
fatal = true; fatal = true;
LOG(ERROR) << __func__ << ": write failed: " << status; LOG(ERROR) << __func__ << ": write failed: " << status;
} }
} else { } else {
if (mAsyncCallback == nullptr) { if (mContext->getAsyncCallback() == nullptr) {
usleep(3000); // Simulate blocking transfer delay. usleep(3000); // Simulate blocking transfer delay.
} }
actualFrameCount = byteCount / mFrameSize; actualFrameCount = byteCount / frameSize;
} }
const size_t actualByteCount = actualFrameCount * mFrameSize; const size_t actualByteCount = actualFrameCount * frameSize;
// Frames are consumed and counted regardless of the connection status. // Frames are consumed and counted regardless of the connection status.
reply->fmqByteCount += actualByteCount; reply->fmqByteCount += actualByteCount;
mFrameCount += actualFrameCount; mContext->advanceFrameCount(actualFrameCount);
populateReply(reply, isConnected); populateReply(reply, isConnected);
} else { } else {
LOG(WARNING) << __func__ << ": reading of " << readByteCount LOG(WARNING) << __func__ << ": reading of " << readByteCount

View File

@@ -27,7 +27,7 @@
namespace aidl::android::hardware::audio::core { namespace aidl::android::hardware::audio::core {
StreamAlsa::StreamAlsa(const StreamContext& context, const Metadata& metadata, int readWriteRetries) StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
: StreamCommonImpl(context, metadata), : StreamCommonImpl(context, metadata),
mFrameSizeBytes(getContext().getFrameSize()), mFrameSizeBytes(getContext().getFrameSize()),
mIsInput(isInput(metadata)), mIsInput(isInput(metadata)),

View File

@@ -113,7 +113,8 @@ class StreamContext {
mDataMQ(std::move(other.mDataMQ)), mDataMQ(std::move(other.mDataMQ)),
mAsyncCallback(std::move(other.mAsyncCallback)), mAsyncCallback(std::move(other.mAsyncCallback)),
mOutEventCallback(std::move(other.mOutEventCallback)), mOutEventCallback(std::move(other.mOutEventCallback)),
mDebugParameters(std::move(other.mDebugParameters)) {} mDebugParameters(std::move(other.mDebugParameters)),
mFrameCount(other.mFrameCount) {}
StreamContext& operator=(StreamContext&& other) { StreamContext& operator=(StreamContext&& other) {
mCommandMQ = std::move(other.mCommandMQ); mCommandMQ = std::move(other.mCommandMQ);
mInternalCommandCookie = other.mInternalCommandCookie; mInternalCommandCookie = other.mInternalCommandCookie;
@@ -128,6 +129,7 @@ class StreamContext {
mAsyncCallback = std::move(other.mAsyncCallback); mAsyncCallback = std::move(other.mAsyncCallback);
mOutEventCallback = std::move(other.mOutEventCallback); mOutEventCallback = std::move(other.mOutEventCallback);
mDebugParameters = std::move(other.mDebugParameters); mDebugParameters = std::move(other.mDebugParameters);
mFrameCount = other.mFrameCount;
return *this; return *this;
} }
@@ -156,7 +158,12 @@ class StreamContext {
int getTransientStateDelayMs() const { return mDebugParameters.transientStateDelayMs; } int getTransientStateDelayMs() const { return mDebugParameters.transientStateDelayMs; }
int getSampleRate() const { return mSampleRate; } int getSampleRate() const { return mSampleRate; }
bool isValid() const; bool isValid() const;
// 'reset' is called on a Binder thread when closing the stream. Does not use
// locking because it only cleans MQ pointers which were also set on the Binder thread.
void reset(); void reset();
// 'advanceFrameCount' and 'getFrameCount' are only called on the worker thread.
long advanceFrameCount(size_t increase) { return mFrameCount += increase; }
long getFrameCount() const { return mFrameCount; }
private: private:
std::unique_ptr<CommandMQ> mCommandMQ; std::unique_ptr<CommandMQ> mCommandMQ;
@@ -172,6 +179,7 @@ class StreamContext {
std::shared_ptr<IStreamCallback> mAsyncCallback; std::shared_ptr<IStreamCallback> mAsyncCallback;
std::shared_ptr<IStreamOutEventCallback> mOutEventCallback; // Only used by output streams std::shared_ptr<IStreamOutEventCallback> mOutEventCallback; // Only used by output streams
DebugParameters mDebugParameters; DebugParameters mDebugParameters;
long mFrameCount = 0;
}; };
// This interface provides operations of the stream which are executed on the worker thread. // This interface provides operations of the stream which are executed on the worker thread.
@@ -206,17 +214,10 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
protected: protected:
using DataBufferElement = int8_t; using DataBufferElement = int8_t;
StreamWorkerCommonLogic(const StreamContext& context, DriverInterface* driver) StreamWorkerCommonLogic(StreamContext* context, DriverInterface* driver)
: mDriver(driver), : mContext(context),
mInternalCommandCookie(context.getInternalCommandCookie()), mDriver(driver),
mFrameSize(context.getFrameSize()), mTransientStateDelayMs(context->getTransientStateDelayMs()) {}
mCommandMQ(context.getCommandMQ()),
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()),
mAsyncCallback(context.getAsyncCallback()),
mTransientStateDelayMs(context.getTransientStateDelayMs()),
mForceTransientBurst(context.getForceTransientBurst()),
mForceSynchronousDrain(context.getForceSynchronousDrain()) {}
std::string init() override; std::string init() override;
void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const; void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
void populateReplyWrongState(StreamDescriptor::Reply* reply, void populateReplyWrongState(StreamDescriptor::Reply* reply,
@@ -226,34 +227,28 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
mTransientStateStart = std::chrono::steady_clock::now(); mTransientStateStart = std::chrono::steady_clock::now();
} }
// The context is only used for reading, except for updating the frame count,
// which happens on the worker thread only.
StreamContext* const mContext;
DriverInterface* const mDriver; DriverInterface* const mDriver;
// Atomic fields are used both by the main and worker threads. // Atomic fields are used both by the main and worker threads.
std::atomic<bool> mIsConnected = false; std::atomic<bool> mIsConnected = false;
static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free); static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY; std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
// All fields are used on the worker thread only. // All fields below are used on the worker thread only.
const int mInternalCommandCookie;
const size_t mFrameSize;
StreamContext::CommandMQ* const mCommandMQ;
StreamContext::ReplyMQ* const mReplyMQ;
StreamContext::DataMQ* const mDataMQ;
std::shared_ptr<IStreamCallback> mAsyncCallback;
const std::chrono::duration<int, std::milli> mTransientStateDelayMs; const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart; std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
const bool mForceTransientBurst;
const bool mForceSynchronousDrain;
// We use an array and the "size" field instead of a vector to be able to detect // We use an array and the "size" field instead of a vector to be able to detect
// memory allocation issues. // memory allocation issues.
std::unique_ptr<DataBufferElement[]> mDataBuffer; std::unique_ptr<DataBufferElement[]> mDataBuffer;
size_t mDataBufferSize; size_t mDataBufferSize;
long mFrameCount = 0;
}; };
// This interface is used to decouple stream implementations from a concrete StreamWorker // This interface is used to decouple stream implementations from a concrete StreamWorker
// implementation. // implementation.
struct StreamWorkerInterface { struct StreamWorkerInterface {
using CreateInstance = std::function<StreamWorkerInterface*(const StreamContext& context, using CreateInstance =
DriverInterface* driver)>; std::function<StreamWorkerInterface*(StreamContext* context, DriverInterface* driver)>;
virtual ~StreamWorkerInterface() = default; virtual ~StreamWorkerInterface() = default;
virtual bool isClosed() const = 0; virtual bool isClosed() const = 0;
virtual void setIsConnected(bool isConnected) = 0; virtual void setIsConnected(bool isConnected) = 0;
@@ -268,7 +263,7 @@ class StreamWorkerImpl : public StreamWorkerInterface,
using WorkerImpl = ::android::hardware::audio::common::StreamWorker<WorkerLogic>; using WorkerImpl = ::android::hardware::audio::common::StreamWorker<WorkerLogic>;
public: public:
StreamWorkerImpl(const StreamContext& context, DriverInterface* driver) StreamWorkerImpl(StreamContext* context, DriverInterface* driver)
: WorkerImpl(context, driver) {} : WorkerImpl(context, driver) {}
bool isClosed() const override { return WorkerImpl::isClosed(); } bool isClosed() const override { return WorkerImpl::isClosed(); }
void setIsConnected(bool isConnected) override { WorkerImpl::setIsConnected(isConnected); } void setIsConnected(bool isConnected) override { WorkerImpl::setIsConnected(isConnected); }
@@ -282,7 +277,7 @@ class StreamWorkerImpl : public StreamWorkerInterface,
class StreamInWorkerLogic : public StreamWorkerCommonLogic { class StreamInWorkerLogic : public StreamWorkerCommonLogic {
public: public:
static const std::string kThreadName; static const std::string kThreadName;
StreamInWorkerLogic(const StreamContext& context, DriverInterface* driver) StreamInWorkerLogic(StreamContext* context, DriverInterface* driver)
: StreamWorkerCommonLogic(context, driver) {} : StreamWorkerCommonLogic(context, driver) {}
protected: protected:
@@ -296,8 +291,9 @@ using StreamInWorker = StreamWorkerImpl<StreamInWorkerLogic>;
class StreamOutWorkerLogic : public StreamWorkerCommonLogic { class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
public: public:
static const std::string kThreadName; static const std::string kThreadName;
StreamOutWorkerLogic(const StreamContext& context, DriverInterface* driver) StreamOutWorkerLogic(StreamContext* context, DriverInterface* driver)
: StreamWorkerCommonLogic(context, driver), mEventCallback(context.getOutEventCallback()) {} : StreamWorkerCommonLogic(context, driver),
mEventCallback(context->getOutEventCallback()) {}
protected: protected:
Status cycle() override; Status cycle() override;
@@ -416,10 +412,10 @@ class StreamCommonDelegator : public BnStreamCommon {
// who must be owner of the context. // who must be owner of the context.
class StreamCommonImpl : virtual public StreamCommonInterface, virtual public DriverInterface { class StreamCommonImpl : virtual public StreamCommonInterface, virtual public DriverInterface {
public: public:
StreamCommonImpl(const StreamContext& context, const Metadata& metadata, StreamCommonImpl(StreamContext* context, const Metadata& metadata,
const StreamWorkerInterface::CreateInstance& createWorker) const StreamWorkerInterface::CreateInstance& createWorker)
: mContext(context), mMetadata(metadata), mWorker(createWorker(mContext, this)) {} : mContext(*context), mMetadata(metadata), mWorker(createWorker(context, this)) {}
StreamCommonImpl(const StreamContext& context, const Metadata& metadata) StreamCommonImpl(StreamContext* context, const Metadata& metadata)
: StreamCommonImpl( : StreamCommonImpl(
context, metadata, context, metadata,
isInput(metadata) ? getDefaultInWorkerCreator() : getDefaultOutWorkerCreator()) {} isInput(metadata) ? getDefaultInWorkerCreator() : getDefaultOutWorkerCreator()) {}
@@ -453,12 +449,12 @@ class StreamCommonImpl : virtual public StreamCommonInterface, virtual public Dr
protected: protected:
static StreamWorkerInterface::CreateInstance getDefaultInWorkerCreator() { static StreamWorkerInterface::CreateInstance getDefaultInWorkerCreator() {
return [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* { return [](StreamContext* ctx, DriverInterface* driver) -> StreamWorkerInterface* {
return new StreamInWorker(ctx, driver); return new StreamInWorker(ctx, driver);
}; };
} }
static StreamWorkerInterface::CreateInstance getDefaultOutWorkerCreator() { static StreamWorkerInterface::CreateInstance getDefaultOutWorkerCreator() {
return [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* { return [](StreamContext* ctx, DriverInterface* driver) -> StreamWorkerInterface* {
return new StreamOutWorker(ctx, driver); return new StreamOutWorker(ctx, driver);
}; };
} }

View File

@@ -31,7 +31,7 @@ namespace aidl::android::hardware::audio::core {
// provide necessary overrides for all interface methods omitted here. // provide necessary overrides for all interface methods omitted here.
class StreamAlsa : public StreamCommonImpl { class StreamAlsa : public StreamCommonImpl {
public: public:
StreamAlsa(const StreamContext& context, const Metadata& metadata, int readWriteRetries); StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries);
// Methods of 'DriverInterface'. // Methods of 'DriverInterface'.
::android::status_t init() override; ::android::status_t init() override;
::android::status_t drain(StreamDescriptor::DrainMode) override; ::android::status_t drain(StreamDescriptor::DrainMode) override;

View File

@@ -29,7 +29,7 @@ using aidl::android::hardware::audio::core::r_submix::SubmixRoute;
class StreamRemoteSubmix : public StreamCommonImpl { class StreamRemoteSubmix : public StreamCommonImpl {
public: public:
StreamRemoteSubmix(const StreamContext& context, const Metadata& metadata); StreamRemoteSubmix(StreamContext* context, const Metadata& metadata);
::android::status_t init() override; ::android::status_t init() override;
::android::status_t drain(StreamDescriptor::DrainMode) override; ::android::status_t drain(StreamDescriptor::DrainMode) override;

View File

@@ -22,7 +22,7 @@ namespace aidl::android::hardware::audio::core {
class StreamStub : public StreamCommonImpl { class StreamStub : public StreamCommonImpl {
public: public:
StreamStub(const StreamContext& context, const Metadata& metadata); StreamStub(StreamContext* context, const Metadata& metadata);
// Methods of 'DriverInterface'. // Methods of 'DriverInterface'.
::android::status_t init() override; ::android::status_t init() override;
::android::status_t drain(StreamDescriptor::DrainMode) override; ::android::status_t drain(StreamDescriptor::DrainMode) override;

View File

@@ -28,7 +28,7 @@ namespace aidl::android::hardware::audio::core {
class StreamUsb : public StreamAlsa { class StreamUsb : public StreamAlsa {
public: public:
StreamUsb(const StreamContext& context, const Metadata& metadata); StreamUsb(StreamContext* context, const Metadata& metadata);
// Methods of 'DriverInterface'. // Methods of 'DriverInterface'.
::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount, ::android::status_t transfer(void* buffer, size_t frameCount, size_t* actualFrameCount,
int32_t* latencyMs) override; int32_t* latencyMs) override;

View File

@@ -29,14 +29,14 @@ using aidl::android::media::audio::common::MicrophoneInfo;
namespace aidl::android::hardware::audio::core { namespace aidl::android::hardware::audio::core {
StreamRemoteSubmix::StreamRemoteSubmix(const StreamContext& context, const Metadata& metadata) StreamRemoteSubmix::StreamRemoteSubmix(StreamContext* context, const Metadata& metadata)
: StreamCommonImpl(context, metadata), : StreamCommonImpl(context, metadata),
mPortId(context.getPortId()), mPortId(context->getPortId()),
mIsInput(isInput(metadata)) { mIsInput(isInput(metadata)) {
mStreamConfig.frameSize = context.getFrameSize(); mStreamConfig.frameSize = context->getFrameSize();
mStreamConfig.format = context.getFormat(); mStreamConfig.format = context->getFormat();
mStreamConfig.channelLayout = context.getChannelLayout(); mStreamConfig.channelLayout = context->getChannelLayout();
mStreamConfig.sampleRate = context.getSampleRate(); mStreamConfig.sampleRate = context->getSampleRate();
} }
std::mutex StreamRemoteSubmix::sSubmixRoutesLock; std::mutex StreamRemoteSubmix::sSubmixRoutesLock;
@@ -357,7 +357,7 @@ StreamInRemoteSubmix::StreamInRemoteSubmix(StreamContext&& context,
const SinkMetadata& sinkMetadata, const SinkMetadata& sinkMetadata,
const std::vector<MicrophoneInfo>& microphones) const std::vector<MicrophoneInfo>& microphones)
: StreamIn(std::move(context), microphones), : StreamIn(std::move(context), microphones),
StreamRemoteSubmix(StreamIn::mContext, sinkMetadata) {} StreamRemoteSubmix(&(StreamIn::mContext), sinkMetadata) {}
ndk::ScopedAStatus StreamInRemoteSubmix::getActiveMicrophones( ndk::ScopedAStatus StreamInRemoteSubmix::getActiveMicrophones(
std::vector<MicrophoneDynamicInfo>* _aidl_return) { std::vector<MicrophoneDynamicInfo>* _aidl_return) {
@@ -370,6 +370,6 @@ StreamOutRemoteSubmix::StreamOutRemoteSubmix(StreamContext&& context,
const SourceMetadata& sourceMetadata, const SourceMetadata& sourceMetadata,
const std::optional<AudioOffloadInfo>& offloadInfo) const std::optional<AudioOffloadInfo>& offloadInfo)
: StreamOut(std::move(context), offloadInfo), : StreamOut(std::move(context), offloadInfo),
StreamRemoteSubmix(StreamOut::mContext, sourceMetadata) {} StreamRemoteSubmix(&(StreamOut::mContext), sourceMetadata) {}
} // namespace aidl::android::hardware::audio::core } // namespace aidl::android::hardware::audio::core

View File

@@ -31,7 +31,7 @@ using aidl::android::media::audio::common::MicrophoneInfo;
namespace aidl::android::hardware::audio::core { namespace aidl::android::hardware::audio::core {
StreamStub::StreamStub(const StreamContext& context, const Metadata& metadata) StreamStub::StreamStub(StreamContext* context, const Metadata& metadata)
: StreamCommonImpl(context, metadata), : StreamCommonImpl(context, metadata),
mFrameSizeBytes(getContext().getFrameSize()), mFrameSizeBytes(getContext().getFrameSize()),
mSampleRate(getContext().getSampleRate()), mSampleRate(getContext().getSampleRate()),
@@ -120,10 +120,11 @@ void StreamStub::shutdown() {
StreamInStub::StreamInStub(StreamContext&& context, const SinkMetadata& sinkMetadata, StreamInStub::StreamInStub(StreamContext&& context, const SinkMetadata& sinkMetadata,
const std::vector<MicrophoneInfo>& microphones) const std::vector<MicrophoneInfo>& microphones)
: StreamIn(std::move(context), microphones), StreamStub(StreamIn::mContext, sinkMetadata) {} : StreamIn(std::move(context), microphones), StreamStub(&(StreamIn::mContext), sinkMetadata) {}
StreamOutStub::StreamOutStub(StreamContext&& context, const SourceMetadata& sourceMetadata, StreamOutStub::StreamOutStub(StreamContext&& context, const SourceMetadata& sourceMetadata,
const std::optional<AudioOffloadInfo>& offloadInfo) const std::optional<AudioOffloadInfo>& offloadInfo)
: StreamOut(std::move(context), offloadInfo), StreamStub(StreamOut::mContext, sourceMetadata) {} : StreamOut(std::move(context), offloadInfo),
StreamStub(&(StreamOut::mContext), sourceMetadata) {}
} // namespace aidl::android::hardware::audio::core } // namespace aidl::android::hardware::audio::core

View File

@@ -35,7 +35,7 @@ using aidl::android::media::audio::common::MicrophoneInfo;
namespace aidl::android::hardware::audio::core { namespace aidl::android::hardware::audio::core {
StreamUsb::StreamUsb(const StreamContext& context, const Metadata& metadata) StreamUsb::StreamUsb(StreamContext* context, const Metadata& metadata)
: StreamAlsa(context, metadata, 1 /*readWriteRetries*/) {} : StreamAlsa(context, metadata, 1 /*readWriteRetries*/) {}
ndk::ScopedAStatus StreamUsb::setConnectedDevices( ndk::ScopedAStatus StreamUsb::setConnectedDevices(
@@ -85,7 +85,7 @@ std::vector<alsa::DeviceProfile> StreamUsb::getDeviceProfiles() {
StreamInUsb::StreamInUsb(StreamContext&& context, const SinkMetadata& sinkMetadata, StreamInUsb::StreamInUsb(StreamContext&& context, const SinkMetadata& sinkMetadata,
const std::vector<MicrophoneInfo>& microphones) const std::vector<MicrophoneInfo>& microphones)
: StreamIn(std::move(context), microphones), StreamUsb(StreamIn::mContext, sinkMetadata) {} : StreamIn(std::move(context), microphones), StreamUsb(&(StreamIn::mContext), sinkMetadata) {}
ndk::ScopedAStatus StreamInUsb::getActiveMicrophones( ndk::ScopedAStatus StreamInUsb::getActiveMicrophones(
std::vector<MicrophoneDynamicInfo>* _aidl_return __unused) { std::vector<MicrophoneDynamicInfo>* _aidl_return __unused) {
@@ -96,7 +96,7 @@ ndk::ScopedAStatus StreamInUsb::getActiveMicrophones(
StreamOutUsb::StreamOutUsb(StreamContext&& context, const SourceMetadata& sourceMetadata, StreamOutUsb::StreamOutUsb(StreamContext&& context, const SourceMetadata& sourceMetadata,
const std::optional<AudioOffloadInfo>& offloadInfo) const std::optional<AudioOffloadInfo>& offloadInfo)
: StreamOut(std::move(context), offloadInfo), : StreamOut(std::move(context), offloadInfo),
StreamUsb(StreamOut::mContext, sourceMetadata), StreamUsb(&(StreamOut::mContext), sourceMetadata),
mChannelCount(getChannelCount(getContext().getChannelLayout())) {} mChannelCount(getChannelCount(getContext().getChannelLayout())) {}
ndk::ScopedAStatus StreamOutUsb::getHwVolume(std::vector<float>* _aidl_return) { ndk::ScopedAStatus StreamOutUsb::getHwVolume(std::vector<float>* _aidl_return) {