Files
hardware_interfaces/audio/aidl/default/Stream.cpp
Mikhail Naganov bd483c03b2 audio: Implement transient state testing
Add ModuleDebug.streamTransientStateDelayMs parameter to ensure
that streams stay in transient states for the specified amount of
time. This enabled sending commands from VTS while the stream is
still in a transient state.

Add 'getStatus' stream command to retrieve current positions,
counters, and stream state. Previously we were planning to use a
zero-sized burst command for that, however, after the
introduction of stream state machines, the 'burst' command is
not handled in every stream state, and may even affect the
current state, thus it's no more usable for this purpose.

Bug: 205884982
Test: atest VtsHalAudioCoreTargetTest
Change-Id: I8717acace8d95d76bef2ec9fd6561796d7544992
2022-11-23 01:58:19 +00:00

483 lines
21 KiB
C++

/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "AHAL_Stream"
#include <android-base/logging.h>
#include <utils/SystemClock.h>
#include "core-impl/Module.h"
#include "core-impl/Stream.h"
using aidl::android::hardware::audio::common::SinkMetadata;
using aidl::android::hardware::audio::common::SourceMetadata;
using aidl::android::media::audio::common::AudioOffloadInfo;
namespace aidl::android::hardware::audio::core {
void StreamContext::fillDescriptor(StreamDescriptor* desc) {
if (mCommandMQ) {
desc->command = mCommandMQ->dupeDesc();
}
if (mReplyMQ) {
desc->reply = mReplyMQ->dupeDesc();
}
if (mDataMQ) {
desc->frameSizeBytes = mFrameSize;
desc->bufferSizeFrames =
mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize() / mFrameSize;
desc->audio.set<StreamDescriptor::AudioBuffer::Tag::fmq>(mDataMQ->dupeDesc());
}
}
bool StreamContext::isValid() const {
if (mCommandMQ && !mCommandMQ->isValid()) {
LOG(ERROR) << "command FMQ is invalid";
return false;
}
if (mReplyMQ && !mReplyMQ->isValid()) {
LOG(ERROR) << "reply FMQ is invalid";
return false;
}
if (mFrameSize == 0) {
LOG(ERROR) << "frame size is not set";
return false;
}
if (mDataMQ && !mDataMQ->isValid()) {
LOG(ERROR) << "data FMQ is invalid";
return false;
}
return true;
}
void StreamContext::reset() {
mCommandMQ.reset();
mReplyMQ.reset();
mDataMQ.reset();
}
std::string StreamWorkerCommonLogic::init() {
if (mCommandMQ == nullptr) return "Command MQ is null";
if (mReplyMQ == nullptr) return "Reply MQ is null";
if (mDataMQ == nullptr) return "Data MQ is null";
if (sizeof(decltype(mDataBuffer)::element_type) != mDataMQ->getQuantumSize()) {
return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
}
mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
mDataBuffer.reset(new (std::nothrow) int8_t[mDataBufferSize]);
if (mDataBuffer == nullptr) {
return "Failed to allocate data buffer for element count " +
std::to_string(mDataMQ->getQuantumCount()) +
", size in bytes: " + std::to_string(mDataBufferSize);
}
return "";
}
void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
bool isConnected) const {
if (isConnected) {
reply->status = STATUS_OK;
reply->observable.frames = mFrameCount;
reply->observable.timeNs = ::android::elapsedRealtimeNano();
} else {
reply->status = STATUS_NO_INIT;
}
}
void StreamWorkerCommonLogic::populateReplyWrongState(
StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
LOG(WARNING) << "command '" << toString(command.getTag())
<< "' can not be handled in the state " << toString(mState);
reply->status = STATUS_INVALID_OPERATION;
}
const std::string StreamInWorkerLogic::kThreadName = "reader";
StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
// Note: for input streams, draining is driven by the client, thus
// "empty buffer" condition can only happen while handling the 'burst'
// command. Thus, unlike for output streams, it does not make sense to
// delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
// TODO: Add a delay for transitions of async operations when/if they added.
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
StreamDescriptor::Reply reply{};
reply.status = STATUS_BAD_VALUE;
using Tag = StreamDescriptor::Command::Tag;
switch (command.getTag()) {
case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) {
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
} else {
LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
}
break;
case Tag::getStatus:
populateReply(&reply, mIsConnected);
break;
case Tag::start:
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAINING) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::STANDBY
? StreamDescriptor::State::IDLE
: StreamDescriptor::State::ACTIVE;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::burst:
if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
<< fmqByteCount << " bytes";
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAINING) {
if (!read(fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::PAUSED) {
mState = StreamDescriptor::State::ACTIVE;
} else if (mState == StreamDescriptor::State::DRAINING) {
// To simplify the reference code, we assume that the read operation
// has consumed all the data remaining in the hardware buffer.
// In a real implementation, here we would either remain in
// the 'DRAINING' state, or transfer to 'STANDBY' depending on the
// buffer state.
mState = StreamDescriptor::State::STANDBY;
}
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
}
break;
case Tag::drain:
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::DRAINING;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::standby:
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::PAUSED;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::flush:
if (mState == StreamDescriptor::State::PAUSED) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
populateReplyWrongState(&reply, command);
}
break;
}
reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
// Can switch the state to ERROR if a driver error occurs.
const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
const bool isConnected = mIsConnected;
bool fatal = false;
// Simulate reading of data, or provide zeroes if the stream is not connected.
for (size_t i = 0; i < byteCount; ++i) {
using buffer_type = decltype(mDataBuffer)::element_type;
constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
std::numeric_limits<buffer_type>::min() + 1;
mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
std::numeric_limits<buffer_type>::min()
: 0;
}
usleep(3000); // Simulate a blocking call into the driver.
// Set 'fatal = true' if a driver error occurs.
if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are provided and counted regardless of connection status.
reply->fmqByteCount += byteCount;
mFrameCount += byteCount / mFrameSize;
populateReply(reply, isConnected);
} else {
LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
reply->latencyMs = Module::kLatencyMs;
return !fatal;
}
const std::string StreamOutWorkerLogic::kThreadName = "writer";
StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
if (mState == StreamDescriptor::State::DRAINING) {
if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - mTransientStateStart);
stateDurationMs >= mTransientStateDelayMs) {
mState = StreamDescriptor::State::IDLE;
if (mTransientStateDelayMs.count() != 0) {
LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
<< " after a timeout";
}
}
}
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
StreamDescriptor::Reply reply{};
reply.status = STATUS_BAD_VALUE;
using Tag = StreamDescriptor::Command::Tag;
switch (command.getTag()) {
case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) {
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
} else {
LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
}
break;
case Tag::getStatus:
populateReply(&reply, mIsConnected);
break;
case Tag::start:
switch (mState) {
case StreamDescriptor::State::STANDBY:
mState = StreamDescriptor::State::IDLE;
populateReply(&reply, mIsConnected);
break;
case StreamDescriptor::State::PAUSED:
mState = StreamDescriptor::State::ACTIVE;
populateReply(&reply, mIsConnected);
break;
case StreamDescriptor::State::DRAIN_PAUSED:
switchToTransientState(StreamDescriptor::State::DRAINING);
populateReply(&reply, mIsConnected);
break;
default:
populateReplyWrongState(&reply, command);
}
break;
case Tag::burst:
if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
<< fmqByteCount << " bytes";
if (mState !=
StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states
if (!write(fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
mState = StreamDescriptor::State::PAUSED;
} else if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::DRAINING) {
mState = StreamDescriptor::State::ACTIVE;
} // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
}
break;
case Tag::drain:
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
switchToTransientState(StreamDescriptor::State::DRAINING);
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::standby:
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
if (mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::DRAINING) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::ACTIVE
? StreamDescriptor::State::PAUSED
: StreamDescriptor::State::DRAIN_PAUSED;
} else {
populateReplyWrongState(&reply, command);
}
break;
case Tag::flush:
if (mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::IDLE;
} else {
populateReplyWrongState(&reply, command);
}
break;
}
reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
const size_t readByteCount = mDataMQ->availableToRead();
// Amount of data that the HAL module is going to actually use.
const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
bool fatal = false;
if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
const bool isConnected = mIsConnected;
LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are consumed and counted regardless of connection status.
reply->fmqByteCount += byteCount;
mFrameCount += byteCount / mFrameSize;
populateReply(reply, isConnected);
usleep(3000); // Simulate a blocking call into the driver.
// Set 'fatal = true' if a driver error occurs.
} else {
LOG(WARNING) << __func__ << ": reading of " << readByteCount
<< " bytes of data from MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
reply->latencyMs = Module::kLatencyMs;
return !fatal;
}
template <class Metadata, class StreamWorker>
StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
if (!isClosed()) {
LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
stopWorker();
// The worker and the context should clean up by themselves via destructors.
}
}
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
LOG(DEBUG) << __func__;
if (!isClosed()) {
stopWorker();
LOG(DEBUG) << __func__ << ": joining the worker thread...";
mWorker.stop();
LOG(DEBUG) << __func__ << ": worker thread joined";
mContext.reset();
mWorker.setClosed();
return ndk::ScopedAStatus::ok();
} else {
LOG(ERROR) << __func__ << ": stream was already closed";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
}
template <class Metadata, class StreamWorker>
void StreamCommon<Metadata, StreamWorker>::stopWorker() {
if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
LOG(DEBUG) << __func__ << ": asking the worker to exit...";
auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
mContext.getInternalCommandCookie());
// Note: never call 'pause' and 'resume' methods of StreamWorker
// in the HAL implementation. These methods are to be used by
// the client side only. Preventing the worker loop from running
// on the HAL side can cause a deadlock.
if (!commandMQ->writeBlocking(&cmd, 1)) {
LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
}
LOG(DEBUG) << __func__ << ": done";
}
}
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
LOG(DEBUG) << __func__;
if (!isClosed()) {
mMetadata = metadata;
return ndk::ScopedAStatus::ok();
}
LOG(ERROR) << __func__ << ": stream was closed";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
StreamIn::StreamIn(const SinkMetadata& sinkMetadata, StreamContext context)
: StreamCommon<SinkMetadata, StreamInWorker>(sinkMetadata, std::move(context)) {
LOG(DEBUG) << __func__;
}
StreamOut::StreamOut(const SourceMetadata& sourceMetadata, StreamContext context,
const std::optional<AudioOffloadInfo>& offloadInfo)
: StreamCommon<SourceMetadata, StreamOutWorker>(sourceMetadata, std::move(context)),
mOffloadInfo(offloadInfo) {
LOG(DEBUG) << __func__;
}
} // namespace aidl::android::hardware::audio::core