Merge changes Ibed7f4c3,I8a9f4f0b

* changes:
  audio: Add pause, resume, and standby stream operations
  audio: Add 'join' method to StreamWorker
This commit is contained in:
Mikhail Naganov
2022-11-07 18:33:21 +00:00
committed by Gerrit Code Review
11 changed files with 1128 additions and 309 deletions

View File

@@ -39,15 +39,34 @@ parcelable StreamDescriptor {
int frameSizeBytes;
long bufferSizeFrames;
android.hardware.audio.core.StreamDescriptor.AudioBuffer audio;
const int COMMAND_BURST = 1;
const int LATENCY_UNKNOWN = -1;
@FixedSize @VintfStability
parcelable Position {
long frames;
long timeNs;
}
@Backing(type="int") @VintfStability
enum State {
STANDBY = 1,
IDLE = 2,
ACTIVE = 3,
PAUSED = 4,
DRAINING = 5,
DRAIN_PAUSED = 6,
ERROR = 100,
}
@Backing(type="int") @VintfStability
enum CommandCode {
START = 1,
BURST = 2,
DRAIN = 3,
STANDBY = 4,
PAUSE = 5,
FLUSH = 6,
}
@FixedSize @VintfStability
parcelable Command {
int code;
android.hardware.audio.core.StreamDescriptor.CommandCode code = android.hardware.audio.core.StreamDescriptor.CommandCode.START;
int fmqByteCount;
}
@FixedSize @VintfStability
@@ -57,6 +76,8 @@ parcelable StreamDescriptor {
android.hardware.audio.core.StreamDescriptor.Position observable;
android.hardware.audio.core.StreamDescriptor.Position hardware;
int latencyMs;
int xrunFrames;
android.hardware.audio.core.StreamDescriptor.State state = android.hardware.audio.core.StreamDescriptor.State.STANDBY;
}
@VintfStability
union AudioBuffer {

View File

@@ -263,6 +263,9 @@ interface IModule {
* be completing with an error, although data (zero filled) will still be
* provided.
*
* After the stream has been opened, it remains in the STANDBY state, see
* StreamDescriptor for more details.
*
* @return An opened input stream and the associated descriptor.
* @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable.
* @throws EX_ILLEGAL_ARGUMENT In the following cases:
@@ -325,6 +328,9 @@ interface IModule {
* StreamDescriptor will be completing with an error, although the data
* will still be accepted and immediately discarded.
*
* After the stream has been opened, it remains in the STANDBY state, see
* StreamDescriptor for more details.
*
* @return An opened output stream and the associated descriptor.
* @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable.
* @throws EX_ILLEGAL_ARGUMENT In the following cases:

View File

@@ -33,6 +33,72 @@ import android.hardware.common.fmq.SynchronizedReadWrite;
* internal components of the stream while serving commands invoked via the
* stream's AIDL interface and commands invoked via the command queue of the
* descriptor.
*
* There is a state machine defined for the stream, which executes on the
* thread handling the commands from the queue. The states are defined based
* on the model of idealized producer and consumer connected via a ring buffer.
* For input streams, the "producer" is hardware, the "consumer" is software,
* for outputs streams it's the opposite. When the producer is active, but
* the buffer is full, the following actions are possible:
* - if the consumer is active, the producer blocks until there is space,
* this behavior is only possible for software producers;
* - if the consumer is passive:
* - the producer can preserve the buffer contents—a s/w producer can
* keep the data on their side, while a h/w producer can only drop captured
* data in this case;
* - or the producer overwrites old data in the buffer.
* Similarly, when an active consumer faces an empty buffer, it can:
* - block until there is data (producer must be active), only possible
* for software consumers;
* - walk away with no data; when the consumer is hardware, it must emit
* silence in this case.
*
* The model is defined below, note the asymmetry regarding the 'IDLE' state
* between input and output streams:
*
* Producer | Buffer state | Consumer | Applies | State
* active? | | active? | to |
* ==========|==============|==========|=========|==============================
* No | Empty | No | Both | STANDBY
* ----------|--------------|----------|---------|-----------------------------
* Yes | Filling up | No | Input | IDLE, overwrite behavior
* ----------|--------------|----------|---------|-----------------------------
* No | Empty | Yes† | Output | IDLE, h/w emits silence
* ----------|--------------|----------|---------|-----------------------------
* Yes | Not empty | Yes | Both | ACTIVE, s/w x-runs counted
* ----------|--------------|----------|---------|-----------------------------
* Yes | Filling up | No | Input | PAUSED, drop behavior
* ----------|--------------|----------|---------|-----------------------------
* Yes | Filling up | No† | Output | PAUSED, s/w stops writing once
* | | | | the buffer is filled up;
* | | | | h/w emits silence.
* ----------|--------------|----------|---------|-----------------------------
* No | Not empty | Yes | Both | DRAINING
* ----------|--------------|----------|---------|-----------------------------
* No | Not empty | No† | Output | DRAIN_PAUSED,
* | | | | h/w emits silence.
*
* † - note that for output, "buffer empty, h/w consuming" has the same outcome
* as "buffer not empty, h/w not consuming", but logically these conditions
* are different.
*
* State machines of both input and output streams start from the 'STANDBY'
* state. Transitions between states happen naturally with changes in the
* states of the model elements. For simplicity, we restrict the change to one
* element only, for example, in the 'STANDBY' state, either the producer or the
* consumer can become active, but not both at the same time. States 'STANDBY',
* 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event,
* whereas a change from the 'DRAINING' state can happen with time as the buffer
* gets empty.
*
* The state machine for input streams is defined in the `stream-in-sm.gv` file,
* for output streams—in the `stream-out-sm.gv` file. State machines define how
* commands (from the enum 'CommandCode') trigger state changes. The full list
* of states and commands is defined by constants of the 'State' enum. Note that
* the 'CLOSED' state does not have a constant in the interface because the
* client can never observe a stream with a functioning command queue in this
* state. The 'ERROR' state is a special state which the state machine enters
* when an unrecoverable hardware error is detected by the HAL module.
*/
@JavaDerive(equals=true, toString=true)
@VintfStability
@@ -55,12 +121,110 @@ parcelable StreamDescriptor {
long timeNs;
}
/**
* The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode
* this command only provides updated positions and latency because actual
* audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
*/
const int COMMAND_BURST = 1;
@VintfStability
@Backing(type="int")
enum State {
/**
* 'STANDBY' is the initial state of the stream, entered after
* opening. Since both the producer and the consumer are inactive in
* this state, it allows the HAL module to put associated hardware into
* "standby" mode to save power.
*/
STANDBY = 1,
/**
* In the 'IDLE' state the audio hardware is active. For input streams,
* the hardware is filling buffer with captured data, overwriting old
* contents on buffer wraparounds. For output streams, the buffer is
* still empty, and the hardware is outputting zeroes. The HAL module
* must not account for any under- or overruns as the client is not
* expected to perform audio I/O.
*/
IDLE = 2,
/**
* The active state of the stream in which it handles audio I/O. The HAL
* module can assume that the audio I/O will be periodic, thus inability
* of the client to provide or consume audio data on time must be
* considered as an under- or overrun and indicated via the 'xrunFrames'
* field of the reply.
*/
ACTIVE = 3,
/**
* In the 'PAUSED' state the consumer is inactive. For input streams,
* the hardware stops updating the buffer as soon as it fills up (this
* is the difference from the 'IDLE' state). For output streams,
* "inactivity" of hardware means that it does not consume audio data,
* but rather emits silence.
*/
PAUSED = 4,
/**
* In the 'DRAINING' state the producer is inactive, the consumer is
* finishing up on the buffer contents, emptying it up. As soon as it
* gets empty, the stream transfers itself into the next state.
*/
DRAINING = 5,
/**
* Used for output streams only, pauses draining. This state is similar
* to the 'PAUSED' state, except that the client is not adding any
* new data. If it emits a 'BURST' command, this brings the stream
* into the regular 'PAUSED' state.
*/
DRAIN_PAUSED = 6,
/**
* The ERROR state is entered when the stream has encountered an
* irrecoverable error from the lower layer. After entering it, the
* stream can only be closed.
*/
ERROR = 100,
}
@VintfStability
@Backing(type="int")
enum CommandCode {
/**
* See the state machines on the applicability of this command to
* different states. The 'fmqByteCount' field must always be set to 0.
*/
START = 1,
/**
* The BURST command used for audio I/O, see 'AudioBuffer'. Differences
* for the MMap No IRQ mode:
*
* - this command only provides updated positions and latency because
* actual audio I/O is done via the 'AudioBuffer.mmap' shared buffer.
* The client does not synchronize reads and writes into the buffer
* with sending of this command.
*
* - the 'fmqByteCount' must always be set to 0.
*/
BURST = 2,
/**
* See the state machines on the applicability of this command to
* different states. The 'fmqByteCount' field must always be set to 0.
*/
DRAIN = 3,
/**
* See the state machines on the applicability of this command to
* different states. The 'fmqByteCount' field must always be set to 0.
*
* Note that it's left on the discretion of the HAL implementation to
* assess all the necessary conditions that could prevent hardware from
* being suspended. Even if it can not be suspended, the state machine
* must still enter the 'STANDBY' state for consistency. Since the
* buffer must remain empty in this state, even if capturing hardware is
* still active, captured data must be discarded.
*/
STANDBY = 4,
/**
* See the state machines on the applicability of this command to
* different states. The 'fmqByteCount' field must always be set to 0.
*/
PAUSE = 5,
/**
* See the state machines on the applicability of this command to
* different states. The 'fmqByteCount' field must always be set to 0.
*/
FLUSH = 6,
}
/**
* Used for sending commands to the HAL module. The client writes into
@@ -71,12 +235,16 @@ parcelable StreamDescriptor {
@FixedSize
parcelable Command {
/**
* One of COMMAND_* codes.
* The code of the command.
*/
int code;
CommandCode code = CommandCode.START;
/**
* This field is only used for the BURST command. For all other commands
* it must be set to 0. The following description applies to the use
* of this field for the BURST command.
*
* For output streams: the amount of bytes that the client requests the
* HAL module to read from the 'audio.fmq' queue.
* HAL module to use out of the data contained in the 'audio.fmq' queue.
* For input streams: the amount of bytes requested by the client to
* read from the hardware into the 'audio.fmq' queue.
*
@@ -95,6 +263,12 @@ parcelable StreamDescriptor {
}
MQDescriptor<Command, SynchronizedReadWrite> command;
/**
* The value used for the 'Reply.latencyMs' field when the effective
* latency can not be reported by the HAL module.
*/
const int LATENCY_UNKNOWN = -1;
/**
* Used for providing replies to commands. The HAL module writes into
* the queue, the client reads. The queue can only contain a single reply,
@@ -107,17 +281,22 @@ parcelable StreamDescriptor {
* One of Binder STATUS_* statuses:
* - STATUS_OK: the command has completed successfully;
* - STATUS_BAD_VALUE: invalid value in the 'Command' structure;
* - STATUS_INVALID_OPERATION: the mix port is not connected
* to any producer or consumer, thus
* positions can not be reported;
* - STATUS_INVALID_OPERATION: the command is not applicable in the
* current state of the stream, or to this
* type of the stream;
* - STATUS_NO_INIT: positions can not be reported because the mix port
* is not connected to any producer or consumer, or
* because the HAL module does not support positions
* reporting for this AudioSource (on input streams).
* - STATUS_NOT_ENOUGH_DATA: a read or write error has
* occurred for the 'audio.fmq' queue;
*
*/
int status;
/**
* For output streams: the amount of bytes actually consumed by the HAL
* module from the 'audio.fmq' queue.
* Used with the BURST command only.
*
* For output streams: the amount of bytes of data actually consumed
* by the HAL module.
* For input streams: the amount of bytes actually provided by the HAL
* in the 'audio.fmq' queue.
*
@@ -126,10 +305,18 @@ parcelable StreamDescriptor {
*/
int fmqByteCount;
/**
* It is recommended to report the current position for any command.
* If the position can not be reported, the 'status' field must be
* set to 'NO_INIT'.
*
* For output streams: the moment when the specified stream position
* was presented to an external observer (i.e. presentation position).
* For input streams: the moment when data at the specified stream position
* was acquired (i.e. capture position).
*
* The observable position must never be reset by the HAL module.
* The data type of the frame counter is large enough to support
* continuous counting for years of operation.
*/
Position observable;
/**
@@ -138,9 +325,22 @@ parcelable StreamDescriptor {
*/
Position hardware;
/**
* Current latency reported by the hardware.
* Current latency reported by the hardware. It is recommended to
* report the current latency for any command. If the value of latency
* can not be determined, this field must be set to 'LATENCY_UNKNOWN'.
*/
int latencyMs;
/**
* Number of frames lost due to an underrun (for input streams),
* or not provided on time (for output streams) for the **previous**
* transfer operation.
*/
int xrunFrames;
/**
* The state that the stream was in while the HAL module was sending the
* reply.
*/
State state = State.STANDBY;
}
MQDescriptor<Reply, SynchronizedReadWrite> reply;
@@ -170,42 +370,59 @@ parcelable StreamDescriptor {
@VintfStability
union AudioBuffer {
/**
* The fast message queue used for all modes except MMap No IRQ. Both
* reads and writes into this queue are non-blocking because access to
* this queue is synchronized via the 'command' and 'reply' queues as
* described below. The queue nevertheless uses 'SynchronizedReadWrite'
* because there is only one reader, and the reading position must be
* shared.
* The fast message queue used for BURST commands in all modes except
* MMap No IRQ. Both reads and writes into this queue are non-blocking
* because access to this queue is synchronized via the 'command' and
* 'reply' queues as described below. The queue nevertheless uses
* 'SynchronizedReadWrite' because there is only one reader, and the
* reading position must be shared.
*
* Note that the fast message queue is a transient buffer, only used for
* data transfer. Neither of the sides can use it to store any data
* outside of the 'BURST' operation. The consumer must always retrieve
* all data available in the fast message queue, even if it can not use
* it. The producer must re-send any unconsumed data on the next
* transfer operation. This restriction is posed in order to make the
* fast message queue fully transparent from the latency perspective.
*
* For output streams the following sequence of operations is used:
* 1. The client writes audio data into the 'audio.fmq' queue.
* 2. The client writes the 'BURST' command into the 'command' queue,
* 2. The client writes the BURST command into the 'command' queue,
* and hangs on waiting on a read from the 'reply' queue.
* 3. The high priority thread in the HAL module wakes up due to 2.
* 4. The HAL module reads the command and audio data.
* 4. The HAL module reads the command and audio data. According
* to the statement above, the HAL module must always read
* from the FMQ all the data it contains. The amount of data that
* the HAL module has actually consumed is indicated to the client
* via the 'reply.fmqByteCount' field.
* 5. The HAL module writes the command status and current positions
* into 'reply' queue, and hangs on waiting on a read from
* the 'command' queue.
* 6. The client wakes up due to 5. and reads the reply.
*
* For input streams the following sequence of operations is used:
* 1. The client writes the 'BURST' command into the 'command' queue,
* 1. The client writes the BURST command into the 'command' queue,
* and hangs on waiting on a read from the 'reply' queue.
* 2. The high priority thread in the HAL module wakes up due to 1.
* 3. The HAL module writes audio data into the 'audio.fmq' queue.
* The value of 'reply.fmqByteCount' must be the equal to the amount
* of data in the queue.
* 4. The HAL module writes the command status and current positions
* into 'reply' queue, and hangs on waiting on a read from
* the 'command' queue.
* 5. The client wakes up due to 4.
* 6. The client reads the reply and audio data.
* 6. The client reads the reply and audio data. The client must
* always read from the FMQ all the data it contains.
*
*/
MQDescriptor<byte, SynchronizedReadWrite> fmq;
/**
* MMap buffers are shared directly with the DSP, which operates
* independently from the CPU. Writes and reads into these buffers
* are not synchronized with 'command' and 'reply' queues. However,
* the client still uses the 'BURST' command for obtaining current
* positions from the HAL module.
* independently from the CPU. Writes and reads into these buffers are
* not synchronized with 'command' and 'reply' queues. However, the
* client still uses the same commands for controlling the audio data
* exchange and for obtaining current positions and latency from the HAL
* module.
*/
MmapBufferDescriptor mmap;
}

View File

@@ -0,0 +1,42 @@
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// To render: dot -Tpng stream-in-sm.gv -o stream-in-sm.png
digraph stream_in_state_machine {
node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
node [shape=point width=0.5] F;
node [shape=oval width=1];
node [fillcolor=lightgreen] STANDBY; // buffer is empty
node [fillcolor=tomato] CLOSED;
node [fillcolor=tomato] ERROR;
node [style=dashed] ANY_STATE;
node [fillcolor=lightblue style=filled];
I -> STANDBY;
STANDBY -> IDLE [label="START"]; // producer -> active
IDLE -> STANDBY [label="STANDBY"]; // producer -> passive, buffer is cleared
IDLE -> ACTIVE [label="BURST"]; // consumer -> active
ACTIVE -> ACTIVE [label="BURST"];
ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive
ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive
PAUSED -> ACTIVE [label="BURST"]; // consumer -> active
PAUSED -> STANDBY [label="FLUSH"]; // producer -> passive, buffer is cleared
DRAINING -> DRAINING [label="BURST"];
DRAINING -> ACTIVE [label="START"]; // producer -> active
DRAINING -> STANDBY [label="<empty buffer>"]; // consumer deactivates
IDLE -> ERROR [label="<hardware failure>"];
ACTIVE -> ERROR [label="<hardware failure>"];
PAUSED -> ERROR [label="<hardware failure>"];
ANY_STATE -> CLOSED [label="→IStream*.close"];
CLOSED -> F;
}

View File

@@ -0,0 +1,48 @@
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// To render: dot -Tpng stream-out-sm.gv -o stream-out-sm.png
digraph stream_out_state_machine {
node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
node [shape=point width=0.5] F;
node [shape=oval width=1];
node [fillcolor=lightgreen] STANDBY; // buffer is empty
node [fillcolor=lightgreen] IDLE; // buffer is empty
node [fillcolor=tomato] CLOSED;
node [fillcolor=tomato] ERROR;
node [style=dashed] ANY_STATE;
node [fillcolor=lightblue style=filled];
I -> STANDBY;
STANDBY -> IDLE [label="START"]; // consumer -> active
STANDBY -> PAUSED [label="BURST"]; // producer -> active
IDLE -> STANDBY [label="STANDBY"]; // consumer -> passive
IDLE -> ACTIVE [label="BURST"]; // producer -> active
ACTIVE -> ACTIVE [label="BURST"];
ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive (not consuming)
ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive
PAUSED -> PAUSED [label="BURST"];
PAUSED -> ACTIVE [label="START"]; // consumer -> active
PAUSED -> IDLE [label="FLUSH"]; // producer -> passive, buffer is cleared
DRAINING -> IDLE [label="<empty buffer>"];
DRAINING -> ACTIVE [label="BURST"]; // producer -> active
DRAINING -> DRAIN_PAUSED [label="PAUSE"]; // consumer -> passive (not consuming)
DRAIN_PAUSED -> DRAINING [label="START"]; // consumer -> active
DRAIN_PAUSED -> PAUSED [label="BURST"]; // producer -> active
DRAIN_PAUSED -> IDLE [label="FLUSH"]; // buffer is cleared
IDLE -> ERROR [label="<hardware failure>"];
ACTIVE -> ERROR [label="<hardware failure>"];
DRAINING -> ERROR [label="<hardware failure>"];
ANY_STATE -> CLOSED [label="→IStream*.close"];
CLOSED -> F;
}

View File

@@ -44,6 +44,10 @@ void ThreadController::stop() {
mWorkerStateChangeRequest = true;
}
}
join();
}
void ThreadController::join() {
if (mWorker.joinable()) {
mWorker.join();
}

View File

@@ -39,6 +39,9 @@ class ThreadController {
~ThreadController() { stop(); }
bool start(const std::string& name, int priority);
// Note: 'pause' and 'resume' methods should only be used on the "driving" side.
// In the case of audio HAL I/O, the driving side is the client, because the HAL
// implementation always blocks on getting a command.
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
bool hasError() {
@@ -50,6 +53,10 @@ class ThreadController {
return mError;
}
void stop();
// Direct use of 'join' assumes that the StreamLogic is not intended
// to run forever, and is guaranteed to exit by itself. This normally
// only happen in tests.
void join();
bool waitForAtLeastOneCycle();
// Only used by unit tests.
@@ -133,7 +140,8 @@ class StreamWorker : public LogicImpl {
void resume() { mThread.resume(); }
bool hasError() { return mThread.hasError(); }
std::string getError() { return mThread.getError(); }
void stop() { return mThread.stop(); }
void stop() { mThread.stop(); }
void join() { mThread.join(); }
bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
// Only used by unit tests.

View File

@@ -160,6 +160,14 @@ TEST_P(StreamWorkerTest, WorkerExit) {
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
TEST_P(StreamWorkerTest, WorkerJoin) {
ASSERT_TRUE(worker.start());
stream.setStopStatus();
worker.join();
EXPECT_FALSE(worker.hasError());
EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
TEST_P(StreamWorkerTest, WorkerError) {
ASSERT_TRUE(worker.start());
stream.setErrorStatus();

View File

@@ -85,126 +85,315 @@ std::string StreamWorkerCommonLogic::init() {
return "";
}
void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
bool isConnected) const {
if (isConnected) {
reply->status = STATUS_OK;
reply->observable.frames = mFrameCount;
reply->observable.timeNs = ::android::elapsedRealtimeNano();
} else {
reply->status = STATUS_NO_INIT;
}
}
const std::string StreamInWorkerLogic::kThreadName = "reader";
StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
StreamDescriptor::Reply reply{};
if (command.code == StreamContext::COMMAND_EXIT &&
if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
command.fmqByteCount == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
} else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
} else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received START read command";
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAINING) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE
: StreamDescriptor::State::ACTIVE;
} else {
LOG(WARNING) << __func__ << ": START command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount
<< " bytes";
usleep(3000); // Simulate a blocking call into the driver.
const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
mDataMQ->availableToWrite(), mDataBufferSize});
const bool isConnected = mIsConnected;
// Simulate reading of data, or provide zeroes if the stream is not connected.
for (size_t i = 0; i < byteCount; ++i) {
using buffer_type = decltype(mDataBuffer)::element_type;
constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
std::numeric_limits<buffer_type>::min() + 1;
mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
std::numeric_limits<buffer_type>::min()
: 0;
}
bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true;
if (success) {
LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are provided and counted regardless of connection status.
reply.fmqByteCount = byteCount;
mFrameCount += byteCount / mFrameSize;
if (isConnected) {
reply.status = STATUS_OK;
reply.observable.frames = mFrameCount;
reply.observable.timeNs = ::android::elapsedRealtimeNano();
} else {
reply.status = STATUS_INVALID_OPERATION;
if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAINING) {
if (!read(command.fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::PAUSED) {
mState = StreamDescriptor::State::ACTIVE;
} else if (mState == StreamDescriptor::State::DRAINING) {
// To simplify the reference code, we assume that the read operation
// has consumed all the data remaining in the hardware buffer.
// TODO: Provide parametrization on the duration of draining to test
// handling of commands during the 'DRAINING' state.
mState = StreamDescriptor::State::STANDBY;
}
} else {
LOG(WARNING) << __func__ << ": writing of " << byteCount
<< " bytes of data to MQ failed";
reply.status = STATUS_NOT_ENOUGH_DATA;
LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received DRAIN read command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::DRAINING;
} else {
LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received PAUSE read command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::PAUSED;
} else {
LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received FLUSH read command";
if (mState == StreamDescriptor::State::PAUSED) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received STANDBY read command";
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
reply.latencyMs = Module::kLatencyMs;
} else {
LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
<< ") or count: " << command.fmqByteCount;
reply.status = STATUS_BAD_VALUE;
}
reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
// Can switch the state to ERROR if a driver error occurs.
const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
const bool isConnected = mIsConnected;
bool fatal = false;
// Simulate reading of data, or provide zeroes if the stream is not connected.
for (size_t i = 0; i < byteCount; ++i) {
using buffer_type = decltype(mDataBuffer)::element_type;
constexpr int kBufferValueRange = std::numeric_limits<buffer_type>::max() -
std::numeric_limits<buffer_type>::min() + 1;
mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) +
std::numeric_limits<buffer_type>::min()
: 0;
}
usleep(3000); // Simulate a blocking call into the driver.
// Set 'fatal = true' if a driver error occurs.
if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) {
LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are provided and counted regardless of connection status.
reply->fmqByteCount += byteCount;
mFrameCount += byteCount / mFrameSize;
populateReply(reply, isConnected);
} else {
LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
reply->latencyMs = Module::kLatencyMs;
return !fatal;
}
const std::string StreamOutWorkerLogic::kThreadName = "writer";
StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
StreamDescriptor::Reply reply{};
if (command.code == StreamContext::COMMAND_EXIT &&
if (static_cast<int32_t>(command.code) == StreamContext::COMMAND_EXIT &&
command.fmqByteCount == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
} else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) {
} else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received START read command";
switch (mState) {
case StreamDescriptor::State::STANDBY:
mState = StreamDescriptor::State::IDLE;
break;
case StreamDescriptor::State::PAUSED:
mState = StreamDescriptor::State::ACTIVE;
break;
case StreamDescriptor::State::DRAIN_PAUSED:
mState = StreamDescriptor::State::PAUSED;
break;
default:
LOG(WARNING) << __func__ << ": START command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
if (reply.status != STATUS_INVALID_OPERATION) {
populateReply(&reply, mIsConnected);
}
} else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount
<< " bytes";
const size_t byteCount = std::min({static_cast<size_t>(command.fmqByteCount),
mDataMQ->availableToRead(), mDataBufferSize});
bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true;
if (success) {
const bool isConnected = mIsConnected;
LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are consumed and counted regardless of connection status.
reply.fmqByteCount = byteCount;
mFrameCount += byteCount / mFrameSize;
if (isConnected) {
reply.status = STATUS_OK;
reply.observable.frames = mFrameCount;
reply.observable.timeNs = ::android::elapsedRealtimeNano();
} else {
reply.status = STATUS_INVALID_OPERATION;
if (mState != StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states
if (!write(command.fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
usleep(3000); // Simulate a blocking call into the driver.
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
mState = StreamDescriptor::State::PAUSED;
} else if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::DRAINING) {
mState = StreamDescriptor::State::ACTIVE;
} // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
} else {
LOG(WARNING) << __func__ << ": reading of " << byteCount
<< " bytes of data from MQ failed";
reply.status = STATUS_NOT_ENOUGH_DATA;
LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received DRAIN write command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::IDLE;
// Since there is no actual hardware that would be draining the buffer,
// in order to simplify the reference code, we assume that draining
// happens instantly, thus skipping the 'DRAINING' state.
// TODO: Provide parametrization on the duration of draining to test
// handling of commands during the 'DRAINING' state.
} else {
LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::STANDBY &&
command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received STANDBY write command";
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received PAUSE write command";
if (mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::DRAINING) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::ACTIVE
? StreamDescriptor::State::PAUSED
: StreamDescriptor::State::DRAIN_PAUSED;
} else {
LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
} else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) {
LOG(DEBUG) << __func__ << ": received FLUSH write command";
if (mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::IDLE;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
reply.latencyMs = Module::kLatencyMs;
} else {
LOG(WARNING) << __func__ << ": invalid command (" << command.toString()
<< ") or count: " << command.fmqByteCount;
reply.status = STATUS_BAD_VALUE;
}
reply.state = mState;
LOG(DEBUG) << __func__ << ": writing reply " << reply.toString();
if (!mReplyMQ->writeBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
return Status::CONTINUE;
}
bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
const size_t readByteCount = mDataMQ->availableToRead();
// Amount of data that the HAL module is going to actually use.
const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
bool fatal = false;
if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
const bool isConnected = mIsConnected;
LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
<< " succeeded; connected? " << isConnected;
// Frames are consumed and counted regardless of connection status.
reply->fmqByteCount += byteCount;
mFrameCount += byteCount / mFrameSize;
populateReply(reply, isConnected);
usleep(3000); // Simulate a blocking call into the driver.
// Set 'fatal = true' if a driver error occurs.
} else {
LOG(WARNING) << __func__ << ": reading of " << readByteCount
<< " bytes of data from MQ failed";
reply->status = STATUS_NOT_ENOUGH_DATA;
}
reply->latencyMs = Module::kLatencyMs;
return !fatal;
}
template <class Metadata, class StreamWorker>
StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
if (!mIsClosed) {
if (!isClosed()) {
LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak";
stopWorker();
// The worker and the context should clean up by themselves via destructors.
@@ -214,13 +403,13 @@ StreamCommon<Metadata, StreamWorker>::~StreamCommon() {
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
LOG(DEBUG) << __func__;
if (!mIsClosed) {
if (!isClosed()) {
stopWorker();
LOG(DEBUG) << __func__ << ": joining the worker thread...";
mWorker.stop();
LOG(DEBUG) << __func__ << ": worker thread joined";
mContext.reset();
mIsClosed = true;
mWorker.setClosed();
return ndk::ScopedAStatus::ok();
} else {
LOG(ERROR) << __func__ << ": stream was already closed";
@@ -231,13 +420,14 @@ ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::close() {
template <class Metadata, class StreamWorker>
void StreamCommon<Metadata, StreamWorker>::stopWorker() {
if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
LOG(DEBUG) << __func__ << ": asking the worker to stop...";
LOG(DEBUG) << __func__ << ": asking the worker to exit...";
StreamDescriptor::Command cmd;
cmd.code = StreamContext::COMMAND_EXIT;
cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT);
cmd.fmqByteCount = mContext.getInternalCommandCookie();
// FIXME: This can block in the case when the client wrote a command
// while the stream worker's cycle is not running. Need to revisit
// when implementing standby and pause/resume.
// Note: never call 'pause' and 'resume' methods of StreamWorker
// in the HAL implementation. These methods are to be used by
// the client side only. Preventing the worker loop from running
// on the HAL side can cause a deadlock.
if (!commandMQ->writeBlocking(&cmd, 1)) {
LOG(ERROR) << __func__ << ": failed to write exit command to the MQ";
}
@@ -248,7 +438,7 @@ void StreamCommon<Metadata, StreamWorker>::stopWorker() {
template <class Metadata, class StreamWorker>
ndk::ScopedAStatus StreamCommon<Metadata, StreamWorker>::updateMetadata(const Metadata& metadata) {
LOG(DEBUG) << __func__;
if (!mIsClosed) {
if (!isClosed()) {
mMetadata = metadata;
return ndk::ScopedAStatus::ok();
}

View File

@@ -54,8 +54,10 @@ class StreamContext {
int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite>
DataMQ;
// Ensure that this value is not used by any of StreamDescriptor.COMMAND_*
static constexpr int COMMAND_EXIT = -1;
// Ensure that this value is not used by any of StreamDescriptor.CommandCode enums
static constexpr int32_t COMMAND_EXIT = -1;
// Ensure that this value is not used by any of StreamDescriptor.State enums
static constexpr int32_t STATE_CLOSED = -1;
StreamContext() = default;
StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
@@ -99,6 +101,10 @@ class StreamContext {
class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
public:
bool isClosed() const {
return static_cast<int32_t>(mState.load()) == StreamContext::STATE_CLOSED;
}
void setClosed() { mState = static_cast<StreamDescriptor::State>(StreamContext::STATE_CLOSED); }
void setIsConnected(bool connected) { mIsConnected = connected; }
protected:
@@ -109,9 +115,12 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()) {}
std::string init() override;
void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
// Used both by the main and worker threads.
// Atomic fields are used both by the main and worker threads.
std::atomic<bool> mIsConnected = false;
static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
// All fields are used on the worker thread only.
const int mInternalCommandCookie;
const size_t mFrameSize;
@@ -132,6 +141,9 @@ class StreamInWorkerLogic : public StreamWorkerCommonLogic {
protected:
Status cycle() override;
private:
bool read(size_t clientSize, StreamDescriptor::Reply* reply);
};
using StreamInWorker = ::android::hardware::audio::common::StreamWorker<StreamInWorkerLogic>;
@@ -143,6 +155,9 @@ class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
protected:
Status cycle() override;
private:
bool write(size_t clientSize, StreamDescriptor::Reply* reply);
};
using StreamOutWorker = ::android::hardware::audio::common::StreamWorker<StreamOutWorkerLogic>;
@@ -155,7 +170,7 @@ class StreamCommon {
? ndk::ScopedAStatus::ok()
: ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
bool isClosed() const { return mIsClosed; }
bool isClosed() const { return mWorker.isClosed(); }
void setIsConnected(bool connected) { mWorker.setIsConnected(connected); }
ndk::ScopedAStatus updateMetadata(const Metadata& metadata);
@@ -168,9 +183,6 @@ class StreamCommon {
Metadata mMetadata;
StreamContext mContext;
StreamWorker mWorker;
// This variable is checked in the destructor which can be called on an arbitrary Binder thread,
// thus we need to ensure that any changes made by other threads are sequentially consistent.
std::atomic<bool> mIsClosed = false;
};
class StreamIn

View File

@@ -34,6 +34,7 @@
#include <aidl/android/media/audio/common/AudioIoFlags.h>
#include <aidl/android/media/audio/common/AudioOutputFlags.h>
#include <android-base/chrono_utils.h>
#include <android/binder_enums.h>
#include <fmq/AidlMessageQueue.h>
#include "AudioHalBinderServiceUtil.h"
@@ -69,6 +70,7 @@ using aidl::android::media::audio::common::AudioUsage;
using android::hardware::audio::common::isBitPositionFlagSet;
using android::hardware::audio::common::StreamLogic;
using android::hardware::audio::common::StreamWorker;
using ndk::enum_range;
using ndk::ScopedAStatus;
template <typename T>
@@ -171,25 +173,26 @@ class WithAudioPortConfig {
AudioPortConfig mConfig;
};
class AudioCoreModule : public testing::TestWithParam<std::string> {
// Can be used as a base for any test here, does not depend on the fixture GTest parameters.
class AudioCoreModuleBase {
public:
// The default buffer size is used mostly for negative tests.
static constexpr int kDefaultBufferSizeFrames = 256;
void SetUp() override {
ASSERT_NO_FATAL_FAILURE(ConnectToService());
void SetUpImpl(const std::string& moduleName) {
ASSERT_NO_FATAL_FAILURE(ConnectToService(moduleName));
debug.flags().simulateDeviceConnections = true;
ASSERT_NO_FATAL_FAILURE(debug.SetUp(module.get()));
}
void TearDown() override {
void TearDownImpl() {
if (module != nullptr) {
EXPECT_IS_OK(module->setModuleDebug(ModuleDebug{}));
}
}
void ConnectToService() {
module = IModule::fromBinder(binderUtil.connectToService(GetParam()));
void ConnectToService(const std::string& moduleName) {
module = IModule::fromBinder(binderUtil.connectToService(moduleName));
ASSERT_NE(module, nullptr);
}
@@ -269,6 +272,13 @@ class AudioCoreModule : public testing::TestWithParam<std::string> {
WithDebugFlags debug;
};
class AudioCoreModule : public AudioCoreModuleBase, public testing::TestWithParam<std::string> {
public:
void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpImpl(GetParam())); }
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownImpl()); }
};
class WithDevicePortConnectedState {
public:
explicit WithDevicePortConnectedState(const AudioPort& idAndData) : mIdAndData(idAndData) {}
@@ -352,21 +362,36 @@ class StreamContext {
std::unique_ptr<DataMQ> mDataMQ;
};
class StreamCommonLogic : public StreamLogic {
class StreamLogicDriver {
public:
StreamDescriptor::Position getLastObservablePosition() {
std::lock_guard<std::mutex> lock(mLock);
return mLastReply.observable;
}
virtual ~StreamLogicDriver() = default;
// Return 'true' to stop the worker.
virtual bool done() = 0;
// For 'Writer' logic, if the 'actualSize' is 0, write is skipped.
// The 'fmqByteCount' from the returned command is passed as is to the HAL.
virtual StreamDescriptor::Command getNextCommand(int maxDataSize,
int* actualSize = nullptr) = 0;
// Return 'true' to indicate that no further processing is needed,
// for example, the driver is expecting a bad status to be returned.
// The logic cycle will return with 'CONTINUE' status. Otherwise,
// the reply will be validated and then passed to 'processValidReply'.
virtual bool interceptRawReply(const StreamDescriptor::Reply& reply) = 0;
// Return 'false' to indicate that the contents of the reply are unexpected.
// Will abort the logic cycle.
virtual bool processValidReply(const StreamDescriptor::Reply& reply) = 0;
};
class StreamCommonLogic : public StreamLogic {
protected:
explicit StreamCommonLogic(const StreamContext& context)
StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver)
: mCommandMQ(context.getCommandMQ()),
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()),
mData(context.getBufferSizeBytes()) {}
mData(context.getBufferSizeBytes()),
mDriver(driver) {}
StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; }
StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; }
StreamLogicDriver* getDriver() const { return mDriver; }
std::string init() override { return ""; }
@@ -374,19 +399,20 @@ class StreamCommonLogic : public StreamLogic {
StreamContext::ReplyMQ* mReplyMQ;
StreamContext::DataMQ* mDataMQ;
std::vector<int8_t> mData;
std::mutex mLock;
StreamDescriptor::Reply mLastReply GUARDED_BY(mLock);
StreamLogicDriver* const mDriver;
};
class StreamReaderLogic : public StreamCommonLogic {
public:
explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {}
StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver)
: StreamCommonLogic(context, driver) {}
protected:
Status cycle() override {
StreamDescriptor::Command command{};
command.code = StreamDescriptor::COMMAND_BURST;
command.fmqByteCount = mData.size();
if (getDriver()->done()) {
return Status::EXIT;
}
StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size());
if (!mCommandMQ->writeBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": writing of command into MQ failed";
return Status::ABORT;
@@ -396,6 +422,9 @@ class StreamReaderLogic : public StreamCommonLogic {
LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
return Status::ABORT;
}
if (getDriver()->interceptRawReply(reply)) {
return Status::CONTINUE;
}
if (reply.status != STATUS_OK) {
LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
return Status::ABORT;
@@ -405,16 +434,41 @@ class StreamReaderLogic : public StreamCommonLogic {
<< ": received invalid byte count in the reply: " << reply.fmqByteCount;
return Status::ABORT;
}
{
std::lock_guard<std::mutex> lock(mLock);
mLastReply = reply;
if (static_cast<size_t>(reply.fmqByteCount) != mDataMQ->availableToRead()) {
LOG(ERROR) << __func__
<< ": the byte count in the reply is not the same as the amount of "
<< "data available in the MQ: " << reply.fmqByteCount
<< " != " << mDataMQ->availableToRead();
}
const size_t readCount = std::min({mDataMQ->availableToRead(),
static_cast<size_t>(reply.fmqByteCount), mData.size()});
if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) {
if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
return Status::ABORT;
}
if (reply.xrunFrames < 0) {
LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
return Status::ABORT;
}
if (std::find(enum_range<StreamDescriptor::State>().begin(),
enum_range<StreamDescriptor::State>().end(),
reply.state) == enum_range<StreamDescriptor::State>().end()) {
LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
return Status::ABORT;
}
const bool acceptedReply = getDriver()->processValidReply(reply);
if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) {
std::vector<int8_t> data(readCount);
if (mDataMQ->read(data.data(), readCount)) {
memcpy(mData.data(), data.data(), std::min(mData.size(), data.size()));
goto checkAcceptedReply;
}
LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
return Status::ABORT;
} // readCount == 0
checkAcceptedReply:
if (acceptedReply) {
return Status::CONTINUE;
}
LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed";
LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
return Status::ABORT;
}
};
@@ -422,17 +476,20 @@ using StreamReader = StreamWorker<StreamReaderLogic>;
class StreamWriterLogic : public StreamCommonLogic {
public:
explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {}
StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver)
: StreamCommonLogic(context, driver) {}
protected:
Status cycle() override {
if (!mDataMQ->write(mData.data(), mData.size())) {
if (getDriver()->done()) {
return Status::EXIT;
}
int actualSize = 0;
StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize);
if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) {
LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed";
return Status::ABORT;
}
StreamDescriptor::Command command{};
command.code = StreamDescriptor::COMMAND_BURST;
command.fmqByteCount = mData.size();
if (!mCommandMQ->writeBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": writing of command into MQ failed";
return Status::ABORT;
@@ -442,6 +499,9 @@ class StreamWriterLogic : public StreamCommonLogic {
LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
return Status::ABORT;
}
if (getDriver()->interceptRawReply(reply)) {
return Status::CONTINUE;
}
if (reply.status != STATUS_OK) {
LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status);
return Status::ABORT;
@@ -451,11 +511,31 @@ class StreamWriterLogic : public StreamCommonLogic {
<< ": received invalid byte count in the reply: " << reply.fmqByteCount;
return Status::ABORT;
}
{
std::lock_guard<std::mutex> lock(mLock);
mLastReply = reply;
if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) {
LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: "
<< "available to write " << mDataMQ->availableToWrite()
<< ", total size: " << mDataMQ->getQuantumCount();
return Status::ABORT;
}
return Status::CONTINUE;
if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) {
LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs;
return Status::ABORT;
}
if (reply.xrunFrames < 0) {
LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames;
return Status::ABORT;
}
if (std::find(enum_range<StreamDescriptor::State>().begin(),
enum_range<StreamDescriptor::State>().end(),
reply.state) == enum_range<StreamDescriptor::State>().end()) {
LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state);
return Status::ABORT;
}
if (getDriver()->processValidReply(reply)) {
return Status::CONTINUE;
}
LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString();
return Status::ABORT;
}
};
using StreamWriter = StreamWorker<StreamWriterLogic>;
@@ -466,52 +546,6 @@ struct IOTraits {
using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
};
// A dedicated version to test replies to invalid commands.
class StreamInvalidCommandLogic : public StreamCommonLogic {
public:
StreamInvalidCommandLogic(const StreamContext& context,
const std::vector<StreamDescriptor::Command>& commands)
: StreamCommonLogic(context), mCommands(commands) {}
std::vector<std::string> getUnexpectedStatuses() {
std::lock_guard<std::mutex> lock(mLock);
return mUnexpectedStatuses;
}
protected:
Status cycle() override {
// Send all commands in one cycle to simplify testing.
// Extra logging helps to sort out issues with unexpected HAL behavior.
for (const auto& command : mCommands) {
LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ...";
if (!getCommandMQ()->writeBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": writing of command into MQ failed";
return Status::ABORT;
}
StreamDescriptor::Reply reply{};
LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "...";
if (!getReplyMQ()->readBlocking(&reply, 1)) {
LOG(ERROR) << __func__ << ": reading of reply from MQ failed";
return Status::ABORT;
}
LOG(INFO) << __func__ << ": received status " << statusToString(reply.status)
<< " for command " << command.toString();
if (reply.status != STATUS_BAD_VALUE) {
std::string s = command.toString();
s.append(", ").append(statusToString(reply.status));
std::lock_guard<std::mutex> lock(mLock);
mUnexpectedStatuses.push_back(std::move(s));
}
};
return Status::EXIT;
}
private:
const std::vector<StreamDescriptor::Command> mCommands;
std::mutex mLock;
std::vector<std::string> mUnexpectedStatuses GUARDED_BY(mLock);
};
template <typename Stream>
class WithStream {
public:
@@ -1208,6 +1242,46 @@ TEST_P(AudioCoreModule, ExternalDevicePortRoutes) {
}
}
class StreamLogicDriverInvalidCommand : public StreamLogicDriver {
public:
StreamLogicDriverInvalidCommand(const std::vector<StreamDescriptor::Command>& commands)
: mCommands(commands) {}
std::string getUnexpectedStatuses() {
// This method is intended to be called after the worker thread has joined,
// thus no extra synchronization is needed.
std::string s;
if (!mStatuses.empty()) {
s = std::string("Pairs of (command, actual status): ")
.append((android::internal::ToString(mStatuses)));
}
return s;
}
bool done() override { return mNextCommand >= mCommands.size(); }
StreamDescriptor::Command getNextCommand(int, int* actualSize) override {
if (actualSize != nullptr) *actualSize = 0;
return mCommands[mNextCommand++];
}
bool interceptRawReply(const StreamDescriptor::Reply& reply) override {
if (reply.status != STATUS_BAD_VALUE) {
std::string s = mCommands[mNextCommand - 1].toString();
s.append(", ").append(statusToString(reply.status));
mStatuses.push_back(std::move(s));
// If the HAL does not recognize the command as invalid,
// retrieve the data etc.
return reply.status != STATUS_OK;
}
return true;
}
bool processValidReply(const StreamDescriptor::Reply&) override { return true; }
private:
const std::vector<StreamDescriptor::Command> mCommands;
size_t mNextCommand = 0;
std::vector<std::string> mStatuses;
};
template <typename Stream>
class AudioStream : public AudioCoreModule {
public:
@@ -1315,19 +1389,6 @@ class AudioStream : public AudioCoreModule {
EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value()));
}
void ReadOrWrite(bool useSetupSequence2, bool validateObservablePosition) {
const auto allPortConfigs =
moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
if (allPortConfigs.empty()) {
GTEST_SKIP() << "No mix ports have attached devices";
}
for (const auto& portConfig : allPortConfigs) {
EXPECT_NO_FATAL_FAILURE(
ReadOrWriteImpl(portConfig, useSetupSequence2, validateObservablePosition))
<< portConfig.toString();
}
}
void ResetPortConfigWithOpenStream() {
const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits<Stream>::is_input);
if (!portConfig.has_value()) {
@@ -1357,131 +1418,43 @@ class AudioStream : public AudioCoreModule {
<< stream1.getPortId();
}
template <class Worker>
void WaitForObservablePositionAdvance(Worker& worker) {
static constexpr int kWriteDurationUs = 50 * 1000;
static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000};
int64_t framesInitial;
framesInitial = worker.getLastObservablePosition().frames;
ASSERT_FALSE(worker.hasError());
bool timedOut = false;
int64_t frames = framesInitial;
for (android::base::Timer elapsed;
frames <= framesInitial && !worker.hasError() &&
!(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) {
usleep(kWriteDurationUs);
frames = worker.getLastObservablePosition().frames;
}
EXPECT_FALSE(timedOut);
EXPECT_FALSE(worker.hasError()) << worker.getError();
EXPECT_GT(frames, framesInitial);
}
void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useSetupSequence2,
bool validateObservablePosition) {
if (!useSetupSequence2) {
ASSERT_NO_FATAL_FAILURE(
ReadOrWriteSetupSequence1(portConfig, validateObservablePosition));
} else {
ASSERT_NO_FATAL_FAILURE(
ReadOrWriteSetupSequence2(portConfig, validateObservablePosition));
}
}
// Set up a patch first, then open a stream.
void ReadOrWriteSetupSequence1(const AudioPortConfig& portConfig,
bool validateObservablePosition) {
auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
IOTraits<Stream>::is_input, portConfig);
ASSERT_FALSE(devicePorts.empty());
auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
typename IOTraits<Stream>::Worker worker(*stream.getContext());
ASSERT_TRUE(worker.start());
ASSERT_TRUE(worker.waitForAtLeastOneCycle());
if (validateObservablePosition) {
ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
}
}
// Open a stream, then set up a patch for it.
void ReadOrWriteSetupSequence2(const AudioPortConfig& portConfig,
bool validateObservablePosition) {
WithStream<Stream> stream(portConfig);
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
typename IOTraits<Stream>::Worker worker(*stream.getContext());
auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
IOTraits<Stream>::is_input, portConfig);
ASSERT_FALSE(devicePorts.empty());
auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
ASSERT_TRUE(worker.start());
ASSERT_TRUE(worker.waitForAtLeastOneCycle());
if (validateObservablePosition) {
ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker));
}
}
void SendInvalidCommandImpl(const AudioPortConfig& portConfig) {
std::vector<StreamDescriptor::Command> commands(6);
commands[0].code = -1;
commands[1].code = StreamDescriptor::COMMAND_BURST - 1;
commands[2].code = std::numeric_limits<int32_t>::min();
commands[3].code = std::numeric_limits<int32_t>::max();
commands[4].code = StreamDescriptor::COMMAND_BURST;
commands[0].code = StreamDescriptor::CommandCode(-1);
commands[1].code = StreamDescriptor::CommandCode(
static_cast<int32_t>(StreamDescriptor::CommandCode::START) - 1);
commands[2].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::min());
commands[3].code = StreamDescriptor::CommandCode(std::numeric_limits<int32_t>::max());
// TODO: For proper testing of input streams, need to put the stream into
// a state which accepts BURST commands.
commands[4].code = StreamDescriptor::CommandCode::BURST;
commands[4].fmqByteCount = -1;
commands[5].code = StreamDescriptor::COMMAND_BURST;
commands[5].code = StreamDescriptor::CommandCode::BURST;
commands[5].fmqByteCount = std::numeric_limits<int32_t>::min();
WithStream<Stream> stream(portConfig);
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
StreamWorker<StreamInvalidCommandLogic> writer(*stream.getContext(), commands);
ASSERT_TRUE(writer.start());
writer.waitForAtLeastOneCycle();
auto unexpectedStatuses = writer.getUnexpectedStatuses();
EXPECT_EQ(0UL, unexpectedStatuses.size())
<< "Pairs of (command, actual status): "
<< android::internal::ToString(unexpectedStatuses);
StreamLogicDriverInvalidCommand driver(commands);
typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
ASSERT_TRUE(worker.start());
worker.join();
EXPECT_EQ("", driver.getUnexpectedStatuses());
}
};
using AudioStreamIn = AudioStream<IStreamIn>;
using AudioStreamOut = AudioStream<IStreamOut>;
#define TEST_IO_STREAM(method_name) \
#define TEST_IN_AND_OUT_STREAM(method_name) \
TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
#define TEST_IO_STREAM_2(method_name, arg1, arg2) \
TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) { \
ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \
} \
TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \
ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \
}
TEST_IO_STREAM(CloseTwice);
TEST_IO_STREAM(OpenAllConfigs);
TEST_IO_STREAM(OpenInvalidBufferSize);
TEST_IO_STREAM(OpenInvalidDirection);
TEST_IO_STREAM(OpenOverMaxCount);
TEST_IO_STREAM(OpenTwiceSamePortConfig);
// Use of constants makes comprehensible test names.
constexpr bool SetupSequence1 = false;
constexpr bool SetupSequence2 = true;
constexpr bool SetupOnly = false;
constexpr bool ValidateObservablePosition = true;
TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, SetupOnly);
TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, SetupOnly);
TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, ValidateObservablePosition);
TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, ValidateObservablePosition);
TEST_IO_STREAM(ResetPortConfigWithOpenStream);
TEST_IO_STREAM(SendInvalidCommand);
TEST_IN_AND_OUT_STREAM(CloseTwice);
TEST_IN_AND_OUT_STREAM(OpenAllConfigs);
TEST_IN_AND_OUT_STREAM(OpenInvalidBufferSize);
TEST_IN_AND_OUT_STREAM(OpenInvalidDirection);
TEST_IN_AND_OUT_STREAM(OpenOverMaxCount);
TEST_IN_AND_OUT_STREAM(OpenTwiceSamePortConfig);
TEST_IN_AND_OUT_STREAM(ResetPortConfigWithOpenStream);
TEST_IN_AND_OUT_STREAM(SendInvalidCommand);
TEST_P(AudioStreamOut, OpenTwicePrimary) {
const auto mixPorts = moduleConfig->getMixPorts(false);
@@ -1523,6 +1496,163 @@ TEST_P(AudioStreamOut, RequireOffloadInfo) {
<< "when no offload info is provided for a compressed offload mix port";
}
using CommandAndState = std::pair<StreamDescriptor::CommandCode, StreamDescriptor::State>;
class StreamLogicDefaultDriver : public StreamLogicDriver {
public:
explicit StreamLogicDefaultDriver(const std::vector<CommandAndState>& commands)
: mCommands(commands) {}
// The three methods below is intended to be called after the worker
// thread has joined, thus no extra synchronization is needed.
bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; }
bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; }
std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; }
bool done() override { return mNextCommand >= mCommands.size(); }
StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override {
StreamDescriptor::Command command{};
command.code = mCommands[mNextCommand++].first;
const int dataSize = command.code == StreamDescriptor::CommandCode::BURST ? maxDataSize : 0;
command.fmqByteCount = dataSize;
if (actualSize != nullptr) {
// In the output scenario, reduce slightly the fmqByteCount to verify
// that the HAL module always consumes all data from the MQ.
if (command.fmqByteCount > 1) command.fmqByteCount--;
*actualSize = dataSize;
}
return command;
}
bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; }
bool processValidReply(const StreamDescriptor::Reply& reply) override {
if (mPreviousFrames.has_value()) {
if (reply.observable.frames > mPreviousFrames.value()) {
mObservablePositionIncrease = true;
} else if (reply.observable.frames < mPreviousFrames.value()) {
mRetrogradeObservablePosition = true;
}
}
mPreviousFrames = reply.observable.frames;
const auto& lastCommandState = mCommands[mNextCommand - 1];
if (lastCommandState.second != reply.state) {
std::string s = std::string("Unexpected transition from the state ")
.append(mPreviousState)
.append(" to ")
.append(toString(reply.state))
.append(" caused by the command ")
.append(toString(lastCommandState.first));
LOG(ERROR) << __func__ << ": " << s;
mUnexpectedTransition = std::move(s);
return false;
}
return true;
}
protected:
const std::vector<CommandAndState>& mCommands;
size_t mNextCommand = 0;
std::optional<int64_t> mPreviousFrames;
std::string mPreviousState = "<initial state>";
bool mObservablePositionIncrease = false;
bool mRetrogradeObservablePosition = false;
std::string mUnexpectedTransition;
};
using NamedCommandSequence = std::pair<std::string, std::vector<CommandAndState>>;
enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ };
using StreamIoTestParameters =
std::tuple<std::string /*moduleName*/, NamedCommandSequence, bool /*useSetupSequence2*/>;
template <typename Stream>
class AudioStreamIo : public AudioCoreModuleBase,
public testing::TestWithParam<StreamIoTestParameters> {
public:
void SetUp() override {
ASSERT_NO_FATAL_FAILURE(SetUpImpl(std::get<PARAM_MODULE_NAME>(GetParam())));
ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig());
}
void Run() {
const auto allPortConfigs =
moduleConfig->getPortConfigsForMixPorts(IOTraits<Stream>::is_input);
if (allPortConfigs.empty()) {
GTEST_SKIP() << "No mix ports have attached devices";
}
for (const auto& portConfig : allPortConfigs) {
SCOPED_TRACE(portConfig.toString());
const auto& commandsAndStates = std::get<PARAM_CMD_SEQ>(GetParam()).second;
if (!std::get<PARAM_SETUP_SEQ>(GetParam())) {
ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates));
} else {
ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq2(portConfig, commandsAndStates));
}
}
}
bool ValidateObservablePosition(const AudioPortConfig& /*portConfig*/) {
// May return false based on the portConfig, e.g. for telephony ports.
return true;
}
// Set up a patch first, then open a stream.
void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig,
const std::vector<CommandAndState>& commandsAndStates) {
auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
IOTraits<Stream>::is_input, portConfig);
ASSERT_FALSE(devicePorts.empty());
auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
WithAudioPatch patch(IOTraits<Stream>::is_input, portConfig, devicePortConfig);
ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
WithStream<Stream> stream(patch.getPortConfig(IOTraits<Stream>::is_input));
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
StreamLogicDefaultDriver driver(commandsAndStates);
typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
ASSERT_TRUE(worker.start());
worker.join();
EXPECT_FALSE(worker.hasError()) << worker.getError();
EXPECT_EQ("", driver.getUnexpectedStateTransition());
if (ValidateObservablePosition(portConfig)) {
EXPECT_TRUE(driver.hasObservablePositionIncrease());
EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
}
}
// Open a stream, then set up a patch for it.
void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig,
const std::vector<CommandAndState>& commandsAndStates) {
WithStream<Stream> stream(portConfig);
ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames));
StreamLogicDefaultDriver driver(commandsAndStates);
typename IOTraits<Stream>::Worker worker(*stream.getContext(), &driver);
auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort(
IOTraits<Stream>::is_input, portConfig);
ASSERT_FALSE(devicePorts.empty());
auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]);
WithAudioPatch patch(IOTraits<Stream>::is_input, stream.getPortConfig(), devicePortConfig);
ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get()));
ASSERT_TRUE(worker.start());
worker.join();
EXPECT_FALSE(worker.hasError()) << worker.getError();
EXPECT_EQ("", driver.getUnexpectedStateTransition());
if (ValidateObservablePosition(portConfig)) {
EXPECT_TRUE(driver.hasObservablePositionIncrease());
EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
}
}
};
using AudioStreamIoIn = AudioStreamIo<IStreamIn>;
using AudioStreamIoOut = AudioStreamIo<IStreamOut>;
#define TEST_IN_AND_OUT_STREAM_IO(method_name) \
TEST_P(AudioStreamIoIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \
TEST_P(AudioStreamIoOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); }
TEST_IN_AND_OUT_STREAM_IO(Run);
// Tests specific to audio patches. The fixure class is named 'AudioModulePatch'
// to avoid clashing with 'AudioPatch' class.
class AudioModulePatch : public AudioCoreModule {
@@ -1704,6 +1834,139 @@ INSTANTIATE_TEST_SUITE_P(AudioStreamOutTest, AudioStreamOut,
testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
android::PrintInstanceNameToString);
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut);
static const NamedCommandSequence kReadOrWriteSeq = std::make_pair(
std::string("ReadOrWrite"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE)});
static const NamedCommandSequence kDrainInSeq = std::make_pair(
std::string("Drain"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::DRAIN,
StreamDescriptor::State::DRAINING),
std::make_pair(StreamDescriptor::CommandCode::START,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::DRAIN,
StreamDescriptor::State::DRAINING),
// TODO: This will need to be changed once DRAIN starts taking time.
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::STANDBY)});
static const NamedCommandSequence kDrainOutSeq = std::make_pair(
std::string("Drain"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
// TODO: This will need to be changed once DRAIN starts taking time.
std::make_pair(StreamDescriptor::CommandCode::DRAIN,
StreamDescriptor::State::IDLE)});
// TODO: This will need to be changed once DRAIN starts taking time so we can pause it.
static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair(
std::string("DrainPause"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::DRAIN,
StreamDescriptor::State::IDLE)});
static const NamedCommandSequence kStandbySeq = std::make_pair(
std::string("Standby"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::STANDBY,
StreamDescriptor::State::STANDBY),
// Perform a read or write in order to advance observable position
// (this is verified by tests).
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE)});
static const NamedCommandSequence kPauseInSeq = std::make_pair(
std::string("Pause"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::FLUSH,
StreamDescriptor::State::STANDBY)});
static const NamedCommandSequence kPauseOutSeq = std::make_pair(
std::string("Pause"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::START,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::START,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED)});
static const NamedCommandSequence kFlushInSeq = std::make_pair(
std::string("Flush"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::FLUSH,
StreamDescriptor::State::STANDBY)});
static const NamedCommandSequence kFlushOutSeq = std::make_pair(
std::string("Flush"),
std::vector<CommandAndState>{
std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE),
std::make_pair(StreamDescriptor::CommandCode::BURST,
StreamDescriptor::State::ACTIVE),
std::make_pair(StreamDescriptor::CommandCode::PAUSE,
StreamDescriptor::State::PAUSED),
std::make_pair(StreamDescriptor::CommandCode::FLUSH,
StreamDescriptor::State::IDLE)});
std::string GetStreamIoTestName(const testing::TestParamInfo<StreamIoTestParameters>& info) {
return android::PrintInstanceNameToString(
testing::TestParamInfo<std::string>{std::get<PARAM_MODULE_NAME>(info.param),
info.index})
.append("_")
.append(std::get<PARAM_CMD_SEQ>(info.param).first)
.append("_SetupSeq")
.append(std::get<PARAM_SETUP_SEQ>(info.param) ? "2" : "1");
}
INSTANTIATE_TEST_SUITE_P(
AudioStreamIoInTest, AudioStreamIoIn,
testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq,
kFlushInSeq),
testing::Values(false, true)),
GetStreamIoTestName);
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn);
INSTANTIATE_TEST_SUITE_P(
AudioStreamIoOutTest, AudioStreamIoOut,
testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq,
kStandbySeq, kPauseOutSeq, kFlushOutSeq),
testing::Values(false, true)),
GetStreamIoTestName);
GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut);
INSTANTIATE_TEST_SUITE_P(AudioPatchTest, AudioModulePatch,
testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)),
android::PrintInstanceNameToString);