diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl index db1ac22c52..da24a10439 100644 --- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl @@ -39,15 +39,34 @@ parcelable StreamDescriptor { int frameSizeBytes; long bufferSizeFrames; android.hardware.audio.core.StreamDescriptor.AudioBuffer audio; - const int COMMAND_BURST = 1; + const int LATENCY_UNKNOWN = -1; @FixedSize @VintfStability parcelable Position { long frames; long timeNs; } + @Backing(type="int") @VintfStability + enum State { + STANDBY = 1, + IDLE = 2, + ACTIVE = 3, + PAUSED = 4, + DRAINING = 5, + DRAIN_PAUSED = 6, + ERROR = 100, + } + @Backing(type="int") @VintfStability + enum CommandCode { + START = 1, + BURST = 2, + DRAIN = 3, + STANDBY = 4, + PAUSE = 5, + FLUSH = 6, + } @FixedSize @VintfStability parcelable Command { - int code; + android.hardware.audio.core.StreamDescriptor.CommandCode code = android.hardware.audio.core.StreamDescriptor.CommandCode.START; int fmqByteCount; } @FixedSize @VintfStability @@ -57,6 +76,8 @@ parcelable StreamDescriptor { android.hardware.audio.core.StreamDescriptor.Position observable; android.hardware.audio.core.StreamDescriptor.Position hardware; int latencyMs; + int xrunFrames; + android.hardware.audio.core.StreamDescriptor.State state = android.hardware.audio.core.StreamDescriptor.State.STANDBY; } @VintfStability union AudioBuffer { diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl index 735f87ff17..095984059a 100644 --- a/audio/aidl/android/hardware/audio/core/IModule.aidl +++ b/audio/aidl/android/hardware/audio/core/IModule.aidl @@ -263,6 +263,9 @@ interface IModule { * be completing with an error, although data (zero filled) will still be * provided. * + * After the stream has been opened, it remains in the STANDBY state, see + * StreamDescriptor for more details. + * * @return An opened input stream and the associated descriptor. * @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable. * @throws EX_ILLEGAL_ARGUMENT In the following cases: @@ -325,6 +328,9 @@ interface IModule { * StreamDescriptor will be completing with an error, although the data * will still be accepted and immediately discarded. * + * After the stream has been opened, it remains in the STANDBY state, see + * StreamDescriptor for more details. + * * @return An opened output stream and the associated descriptor. * @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable. * @throws EX_ILLEGAL_ARGUMENT In the following cases: diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl index 2b1ed8ceb1..e5e56fcf21 100644 --- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl @@ -33,6 +33,72 @@ import android.hardware.common.fmq.SynchronizedReadWrite; * internal components of the stream while serving commands invoked via the * stream's AIDL interface and commands invoked via the command queue of the * descriptor. + * + * There is a state machine defined for the stream, which executes on the + * thread handling the commands from the queue. The states are defined based + * on the model of idealized producer and consumer connected via a ring buffer. + * For input streams, the "producer" is hardware, the "consumer" is software, + * for outputs streams it's the opposite. When the producer is active, but + * the buffer is full, the following actions are possible: + * - if the consumer is active, the producer blocks until there is space, + * this behavior is only possible for software producers; + * - if the consumer is passive: + * - the producer can preserve the buffer contents—a s/w producer can + * keep the data on their side, while a h/w producer can only drop captured + * data in this case; + * - or the producer overwrites old data in the buffer. + * Similarly, when an active consumer faces an empty buffer, it can: + * - block until there is data (producer must be active), only possible + * for software consumers; + * - walk away with no data; when the consumer is hardware, it must emit + * silence in this case. + * + * The model is defined below, note the asymmetry regarding the 'IDLE' state + * between input and output streams: + * + * Producer | Buffer state | Consumer | Applies | State + * active? | | active? | to | + * ==========|==============|==========|=========|============================== + * No | Empty | No | Both | STANDBY + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No | Input | IDLE, overwrite behavior + * ----------|--------------|----------|---------|----------------------------- + * No | Empty | Yes† | Output | IDLE, h/w emits silence + * ----------|--------------|----------|---------|----------------------------- + * Yes | Not empty | Yes | Both | ACTIVE, s/w x-runs counted + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No | Input | PAUSED, drop behavior + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No† | Output | PAUSED, s/w stops writing once + * | | | | the buffer is filled up; + * | | | | h/w emits silence. + * ----------|--------------|----------|---------|----------------------------- + * No | Not empty | Yes | Both | DRAINING + * ----------|--------------|----------|---------|----------------------------- + * No | Not empty | No† | Output | DRAIN_PAUSED, + * | | | | h/w emits silence. + * + * † - note that for output, "buffer empty, h/w consuming" has the same outcome + * as "buffer not empty, h/w not consuming", but logically these conditions + * are different. + * + * State machines of both input and output streams start from the 'STANDBY' + * state. Transitions between states happen naturally with changes in the + * states of the model elements. For simplicity, we restrict the change to one + * element only, for example, in the 'STANDBY' state, either the producer or the + * consumer can become active, but not both at the same time. States 'STANDBY', + * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event, + * whereas a change from the 'DRAINING' state can happen with time as the buffer + * gets empty. + * + * The state machine for input streams is defined in the `stream-in-sm.gv` file, + * for output streams—in the `stream-out-sm.gv` file. State machines define how + * commands (from the enum 'CommandCode') trigger state changes. The full list + * of states and commands is defined by constants of the 'State' enum. Note that + * the 'CLOSED' state does not have a constant in the interface because the + * client can never observe a stream with a functioning command queue in this + * state. The 'ERROR' state is a special state which the state machine enters + * when an unrecoverable hardware error is detected by the HAL module. */ @JavaDerive(equals=true, toString=true) @VintfStability @@ -55,12 +121,110 @@ parcelable StreamDescriptor { long timeNs; } - /** - * The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode - * this command only provides updated positions and latency because actual - * audio I/O is done via the 'AudioBuffer.mmap' shared buffer. - */ - const int COMMAND_BURST = 1; + @VintfStability + @Backing(type="int") + enum State { + /** + * 'STANDBY' is the initial state of the stream, entered after + * opening. Since both the producer and the consumer are inactive in + * this state, it allows the HAL module to put associated hardware into + * "standby" mode to save power. + */ + STANDBY = 1, + /** + * In the 'IDLE' state the audio hardware is active. For input streams, + * the hardware is filling buffer with captured data, overwriting old + * contents on buffer wraparounds. For output streams, the buffer is + * still empty, and the hardware is outputting zeroes. The HAL module + * must not account for any under- or overruns as the client is not + * expected to perform audio I/O. + */ + IDLE = 2, + /** + * The active state of the stream in which it handles audio I/O. The HAL + * module can assume that the audio I/O will be periodic, thus inability + * of the client to provide or consume audio data on time must be + * considered as an under- or overrun and indicated via the 'xrunFrames' + * field of the reply. + */ + ACTIVE = 3, + /** + * In the 'PAUSED' state the consumer is inactive. For input streams, + * the hardware stops updating the buffer as soon as it fills up (this + * is the difference from the 'IDLE' state). For output streams, + * "inactivity" of hardware means that it does not consume audio data, + * but rather emits silence. + */ + PAUSED = 4, + /** + * In the 'DRAINING' state the producer is inactive, the consumer is + * finishing up on the buffer contents, emptying it up. As soon as it + * gets empty, the stream transfers itself into the next state. + */ + DRAINING = 5, + /** + * Used for output streams only, pauses draining. This state is similar + * to the 'PAUSED' state, except that the client is not adding any + * new data. If it emits a 'BURST' command, this brings the stream + * into the regular 'PAUSED' state. + */ + DRAIN_PAUSED = 6, + /** + * The ERROR state is entered when the stream has encountered an + * irrecoverable error from the lower layer. After entering it, the + * stream can only be closed. + */ + ERROR = 100, + } + + @VintfStability + @Backing(type="int") + enum CommandCode { + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + START = 1, + /** + * The BURST command used for audio I/O, see 'AudioBuffer'. Differences + * for the MMap No IRQ mode: + * + * - this command only provides updated positions and latency because + * actual audio I/O is done via the 'AudioBuffer.mmap' shared buffer. + * The client does not synchronize reads and writes into the buffer + * with sending of this command. + * + * - the 'fmqByteCount' must always be set to 0. + */ + BURST = 2, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + DRAIN = 3, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + * + * Note that it's left on the discretion of the HAL implementation to + * assess all the necessary conditions that could prevent hardware from + * being suspended. Even if it can not be suspended, the state machine + * must still enter the 'STANDBY' state for consistency. Since the + * buffer must remain empty in this state, even if capturing hardware is + * still active, captured data must be discarded. + */ + STANDBY = 4, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + PAUSE = 5, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + FLUSH = 6, + } /** * Used for sending commands to the HAL module. The client writes into @@ -71,12 +235,16 @@ parcelable StreamDescriptor { @FixedSize parcelable Command { /** - * One of COMMAND_* codes. + * The code of the command. */ - int code; + CommandCode code = CommandCode.START; /** + * This field is only used for the BURST command. For all other commands + * it must be set to 0. The following description applies to the use + * of this field for the BURST command. + * * For output streams: the amount of bytes that the client requests the - * HAL module to read from the 'audio.fmq' queue. + * HAL module to use out of the data contained in the 'audio.fmq' queue. * For input streams: the amount of bytes requested by the client to * read from the hardware into the 'audio.fmq' queue. * @@ -95,6 +263,12 @@ parcelable StreamDescriptor { } MQDescriptor command; + /** + * The value used for the 'Reply.latencyMs' field when the effective + * latency can not be reported by the HAL module. + */ + const int LATENCY_UNKNOWN = -1; + /** * Used for providing replies to commands. The HAL module writes into * the queue, the client reads. The queue can only contain a single reply, @@ -107,17 +281,22 @@ parcelable StreamDescriptor { * One of Binder STATUS_* statuses: * - STATUS_OK: the command has completed successfully; * - STATUS_BAD_VALUE: invalid value in the 'Command' structure; - * - STATUS_INVALID_OPERATION: the mix port is not connected - * to any producer or consumer, thus - * positions can not be reported; + * - STATUS_INVALID_OPERATION: the command is not applicable in the + * current state of the stream, or to this + * type of the stream; + * - STATUS_NO_INIT: positions can not be reported because the mix port + * is not connected to any producer or consumer, or + * because the HAL module does not support positions + * reporting for this AudioSource (on input streams). * - STATUS_NOT_ENOUGH_DATA: a read or write error has * occurred for the 'audio.fmq' queue; - * */ int status; /** - * For output streams: the amount of bytes actually consumed by the HAL - * module from the 'audio.fmq' queue. + * Used with the BURST command only. + * + * For output streams: the amount of bytes of data actually consumed + * by the HAL module. * For input streams: the amount of bytes actually provided by the HAL * in the 'audio.fmq' queue. * @@ -126,10 +305,18 @@ parcelable StreamDescriptor { */ int fmqByteCount; /** + * It is recommended to report the current position for any command. + * If the position can not be reported, the 'status' field must be + * set to 'NO_INIT'. + * * For output streams: the moment when the specified stream position * was presented to an external observer (i.e. presentation position). * For input streams: the moment when data at the specified stream position * was acquired (i.e. capture position). + * + * The observable position must never be reset by the HAL module. + * The data type of the frame counter is large enough to support + * continuous counting for years of operation. */ Position observable; /** @@ -138,9 +325,22 @@ parcelable StreamDescriptor { */ Position hardware; /** - * Current latency reported by the hardware. + * Current latency reported by the hardware. It is recommended to + * report the current latency for any command. If the value of latency + * can not be determined, this field must be set to 'LATENCY_UNKNOWN'. */ int latencyMs; + /** + * Number of frames lost due to an underrun (for input streams), + * or not provided on time (for output streams) for the **previous** + * transfer operation. + */ + int xrunFrames; + /** + * The state that the stream was in while the HAL module was sending the + * reply. + */ + State state = State.STANDBY; } MQDescriptor reply; @@ -170,42 +370,59 @@ parcelable StreamDescriptor { @VintfStability union AudioBuffer { /** - * The fast message queue used for all modes except MMap No IRQ. Both - * reads and writes into this queue are non-blocking because access to - * this queue is synchronized via the 'command' and 'reply' queues as - * described below. The queue nevertheless uses 'SynchronizedReadWrite' - * because there is only one reader, and the reading position must be - * shared. + * The fast message queue used for BURST commands in all modes except + * MMap No IRQ. Both reads and writes into this queue are non-blocking + * because access to this queue is synchronized via the 'command' and + * 'reply' queues as described below. The queue nevertheless uses + * 'SynchronizedReadWrite' because there is only one reader, and the + * reading position must be shared. + * + * Note that the fast message queue is a transient buffer, only used for + * data transfer. Neither of the sides can use it to store any data + * outside of the 'BURST' operation. The consumer must always retrieve + * all data available in the fast message queue, even if it can not use + * it. The producer must re-send any unconsumed data on the next + * transfer operation. This restriction is posed in order to make the + * fast message queue fully transparent from the latency perspective. * * For output streams the following sequence of operations is used: * 1. The client writes audio data into the 'audio.fmq' queue. - * 2. The client writes the 'BURST' command into the 'command' queue, + * 2. The client writes the BURST command into the 'command' queue, * and hangs on waiting on a read from the 'reply' queue. * 3. The high priority thread in the HAL module wakes up due to 2. - * 4. The HAL module reads the command and audio data. + * 4. The HAL module reads the command and audio data. According + * to the statement above, the HAL module must always read + * from the FMQ all the data it contains. The amount of data that + * the HAL module has actually consumed is indicated to the client + * via the 'reply.fmqByteCount' field. * 5. The HAL module writes the command status and current positions * into 'reply' queue, and hangs on waiting on a read from * the 'command' queue. * 6. The client wakes up due to 5. and reads the reply. * * For input streams the following sequence of operations is used: - * 1. The client writes the 'BURST' command into the 'command' queue, + * 1. The client writes the BURST command into the 'command' queue, * and hangs on waiting on a read from the 'reply' queue. * 2. The high priority thread in the HAL module wakes up due to 1. * 3. The HAL module writes audio data into the 'audio.fmq' queue. + * The value of 'reply.fmqByteCount' must be the equal to the amount + * of data in the queue. * 4. The HAL module writes the command status and current positions * into 'reply' queue, and hangs on waiting on a read from * the 'command' queue. * 5. The client wakes up due to 4. - * 6. The client reads the reply and audio data. + * 6. The client reads the reply and audio data. The client must + * always read from the FMQ all the data it contains. + * */ MQDescriptor fmq; /** * MMap buffers are shared directly with the DSP, which operates - * independently from the CPU. Writes and reads into these buffers - * are not synchronized with 'command' and 'reply' queues. However, - * the client still uses the 'BURST' command for obtaining current - * positions from the HAL module. + * independently from the CPU. Writes and reads into these buffers are + * not synchronized with 'command' and 'reply' queues. However, the + * client still uses the same commands for controlling the audio data + * exchange and for obtaining current positions and latency from the HAL + * module. */ MmapBufferDescriptor mmap; } diff --git a/audio/aidl/android/hardware/audio/core/stream-in-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv new file mode 100644 index 0000000000..889a14b5c1 --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv @@ -0,0 +1,42 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-in-sm.gv -o stream-in-sm.png +digraph stream_in_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + I -> STANDBY; + STANDBY -> IDLE [label="START"]; // producer -> active + IDLE -> STANDBY [label="STANDBY"]; // producer -> passive, buffer is cleared + IDLE -> ACTIVE [label="BURST"]; // consumer -> active + ACTIVE -> ACTIVE [label="BURST"]; + ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive + ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive + PAUSED -> ACTIVE [label="BURST"]; // consumer -> active + PAUSED -> STANDBY [label="FLUSH"]; // producer -> passive, buffer is cleared + DRAINING -> DRAINING [label="BURST"]; + DRAINING -> ACTIVE [label="START"]; // producer -> active + DRAINING -> STANDBY [label=""]; // consumer deactivates + IDLE -> ERROR [label=""]; + ACTIVE -> ERROR [label=""]; + PAUSED -> ERROR [label=""]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/android/hardware/audio/core/stream-out-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv new file mode 100644 index 0000000000..56dd52904c --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv @@ -0,0 +1,48 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-out-sm.gv -o stream-out-sm.png +digraph stream_out_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=lightgreen] IDLE; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + I -> STANDBY; + STANDBY -> IDLE [label="START"]; // consumer -> active + STANDBY -> PAUSED [label="BURST"]; // producer -> active + IDLE -> STANDBY [label="STANDBY"]; // consumer -> passive + IDLE -> ACTIVE [label="BURST"]; // producer -> active + ACTIVE -> ACTIVE [label="BURST"]; + ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive (not consuming) + ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive + PAUSED -> PAUSED [label="BURST"]; + PAUSED -> ACTIVE [label="START"]; // consumer -> active + PAUSED -> IDLE [label="FLUSH"]; // producer -> passive, buffer is cleared + DRAINING -> IDLE [label=""]; + DRAINING -> ACTIVE [label="BURST"]; // producer -> active + DRAINING -> DRAIN_PAUSED [label="PAUSE"]; // consumer -> passive (not consuming) + DRAIN_PAUSED -> DRAINING [label="START"]; // consumer -> active + DRAIN_PAUSED -> PAUSED [label="BURST"]; // producer -> active + DRAIN_PAUSED -> IDLE [label="FLUSH"]; // buffer is cleared + IDLE -> ERROR [label=""]; + ACTIVE -> ERROR [label=""]; + DRAINING -> ERROR [label=""]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp index 9bca7609fd..dda0e4a6fd 100644 --- a/audio/aidl/common/StreamWorker.cpp +++ b/audio/aidl/common/StreamWorker.cpp @@ -44,6 +44,10 @@ void ThreadController::stop() { mWorkerStateChangeRequest = true; } } + join(); +} + +void ThreadController::join() { if (mWorker.joinable()) { mWorker.join(); } diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 6260eca49a..ab2ec26485 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -39,6 +39,9 @@ class ThreadController { ~ThreadController() { stop(); } bool start(const std::string& name, int priority); + // Note: 'pause' and 'resume' methods should only be used on the "driving" side. + // In the case of audio HAL I/O, the driving side is the client, because the HAL + // implementation always blocks on getting a command. void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } bool hasError() { @@ -50,6 +53,10 @@ class ThreadController { return mError; } void stop(); + // Direct use of 'join' assumes that the StreamLogic is not intended + // to run forever, and is guaranteed to exit by itself. This normally + // only happen in tests. + void join(); bool waitForAtLeastOneCycle(); // Only used by unit tests. @@ -133,7 +140,8 @@ class StreamWorker : public LogicImpl { void resume() { mThread.resume(); } bool hasError() { return mThread.hasError(); } std::string getError() { return mThread.getError(); } - void stop() { return mThread.stop(); } + void stop() { mThread.stop(); } + void join() { mThread.join(); } bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); } // Only used by unit tests. diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index e3e484d7e3..8ea8424ab5 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -160,6 +160,14 @@ TEST_P(StreamWorkerTest, WorkerExit) { EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); } +TEST_P(StreamWorkerTest, WorkerJoin) { + ASSERT_TRUE(worker.start()); + stream.setStopStatus(); + worker.join(); + EXPECT_FALSE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); +} + TEST_P(StreamWorkerTest, WorkerError) { ASSERT_TRUE(worker.start()); stream.setErrorStatus(); diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp index 312df720eb..7b544a19a9 100644 --- a/audio/aidl/default/Stream.cpp +++ b/audio/aidl/default/Stream.cpp @@ -85,126 +85,315 @@ std::string StreamWorkerCommonLogic::init() { return ""; } +void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply, + bool isConnected) const { + if (isConnected) { + reply->status = STATUS_OK; + reply->observable.frames = mFrameCount; + reply->observable.timeNs = ::android::elapsedRealtimeNano(); + } else { + reply->status = STATUS_NO_INIT; + } +} + const std::string StreamInWorkerLogic::kThreadName = "reader"; StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (command.code == StreamContext::COMMAND_EXIT && + if (static_cast(command.code) == StreamContext::COMMAND_EXIT && command.fmqByteCount == mInternalCommandCookie) { LOG(DEBUG) << __func__ << ": received EXIT command"; + setClosed(); // This is an internal command, no need to reply. return Status::EXIT; - } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) { + } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) { + LOG(DEBUG) << __func__ << ": received START read command"; + if (mState == StreamDescriptor::State::STANDBY || + mState == StreamDescriptor::State::DRAINING) { + populateReply(&reply, mIsConnected); + mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE + : StreamDescriptor::State::ACTIVE; + } else { + LOG(WARNING) << __func__ << ": START command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) { LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount << " bytes"; - usleep(3000); // Simulate a blocking call into the driver. - const size_t byteCount = std::min({static_cast(command.fmqByteCount), - mDataMQ->availableToWrite(), mDataBufferSize}); - const bool isConnected = mIsConnected; - // Simulate reading of data, or provide zeroes if the stream is not connected. - for (size_t i = 0; i < byteCount; ++i) { - using buffer_type = decltype(mDataBuffer)::element_type; - constexpr int kBufferValueRange = std::numeric_limits::max() - - std::numeric_limits::min() + 1; - mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) + - std::numeric_limits::min() - : 0; - } - bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; - if (success) { - LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ" - << " succeeded; connected? " << isConnected; - // Frames are provided and counted regardless of connection status. - reply.fmqByteCount = byteCount; - mFrameCount += byteCount / mFrameSize; - if (isConnected) { - reply.status = STATUS_OK; - reply.observable.frames = mFrameCount; - reply.observable.timeNs = ::android::elapsedRealtimeNano(); - } else { - reply.status = STATUS_INVALID_OPERATION; + if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE || + mState == StreamDescriptor::State::PAUSED || + mState == StreamDescriptor::State::DRAINING) { + if (!read(command.fmqByteCount, &reply)) { + mState = StreamDescriptor::State::ERROR; + } + if (mState == StreamDescriptor::State::IDLE || + mState == StreamDescriptor::State::PAUSED) { + mState = StreamDescriptor::State::ACTIVE; + } else if (mState == StreamDescriptor::State::DRAINING) { + // To simplify the reference code, we assume that the read operation + // has consumed all the data remaining in the hardware buffer. + // TODO: Provide parametrization on the duration of draining to test + // handling of commands during the 'DRAINING' state. + mState = StreamDescriptor::State::STANDBY; } } else { - LOG(WARNING) << __func__ << ": writing of " << byteCount - << " bytes of data to MQ failed"; - reply.status = STATUS_NOT_ENOUGH_DATA; + LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received DRAIN read command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::DRAINING; + } else { + LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received PAUSE read command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::PAUSED; + } else { + LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received FLUSH read command"; + if (mState == StreamDescriptor::State::PAUSED) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::STANDBY && + command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received STANDBY read command"; + if (mState == StreamDescriptor::State::IDLE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; } - reply.latencyMs = Module::kLatencyMs; } else { LOG(WARNING) << __func__ << ": invalid command (" << command.toString() << ") or count: " << command.fmqByteCount; reply.status = STATUS_BAD_VALUE; } + reply.state = mState; LOG(DEBUG) << __func__ << ": writing reply " << reply.toString(); if (!mReplyMQ->writeBlocking(&reply, 1)) { LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } return Status::CONTINUE; } +bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) { + // Can switch the state to ERROR if a driver error occurs. + const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize}); + const bool isConnected = mIsConnected; + bool fatal = false; + // Simulate reading of data, or provide zeroes if the stream is not connected. + for (size_t i = 0; i < byteCount; ++i) { + using buffer_type = decltype(mDataBuffer)::element_type; + constexpr int kBufferValueRange = std::numeric_limits::max() - + std::numeric_limits::min() + 1; + mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) + + std::numeric_limits::min() + : 0; + } + usleep(3000); // Simulate a blocking call into the driver. + // Set 'fatal = true' if a driver error occurs. + if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) { + LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ" + << " succeeded; connected? " << isConnected; + // Frames are provided and counted regardless of connection status. + reply->fmqByteCount += byteCount; + mFrameCount += byteCount / mFrameSize; + populateReply(reply, isConnected); + } else { + LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed"; + reply->status = STATUS_NOT_ENOUGH_DATA; + } + reply->latencyMs = Module::kLatencyMs; + return !fatal; +} + const std::string StreamOutWorkerLogic::kThreadName = "writer"; StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (command.code == StreamContext::COMMAND_EXIT && + if (static_cast(command.code) == StreamContext::COMMAND_EXIT && command.fmqByteCount == mInternalCommandCookie) { LOG(DEBUG) << __func__ << ": received EXIT command"; + setClosed(); // This is an internal command, no need to reply. return Status::EXIT; - } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) { + } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) { + LOG(DEBUG) << __func__ << ": received START read command"; + switch (mState) { + case StreamDescriptor::State::STANDBY: + mState = StreamDescriptor::State::IDLE; + break; + case StreamDescriptor::State::PAUSED: + mState = StreamDescriptor::State::ACTIVE; + break; + case StreamDescriptor::State::DRAIN_PAUSED: + mState = StreamDescriptor::State::PAUSED; + break; + default: + LOG(WARNING) << __func__ << ": START command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + if (reply.status != STATUS_INVALID_OPERATION) { + populateReply(&reply, mIsConnected); + } + } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) { LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount << " bytes"; - const size_t byteCount = std::min({static_cast(command.fmqByteCount), - mDataMQ->availableToRead(), mDataBufferSize}); - bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true; - if (success) { - const bool isConnected = mIsConnected; - LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ" - << " succeeded; connected? " << isConnected; - // Frames are consumed and counted regardless of connection status. - reply.fmqByteCount = byteCount; - mFrameCount += byteCount / mFrameSize; - if (isConnected) { - reply.status = STATUS_OK; - reply.observable.frames = mFrameCount; - reply.observable.timeNs = ::android::elapsedRealtimeNano(); - } else { - reply.status = STATUS_INVALID_OPERATION; + if (mState != StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states + if (!write(command.fmqByteCount, &reply)) { + mState = StreamDescriptor::State::ERROR; } - usleep(3000); // Simulate a blocking call into the driver. + if (mState == StreamDescriptor::State::STANDBY || + mState == StreamDescriptor::State::DRAIN_PAUSED) { + mState = StreamDescriptor::State::PAUSED; + } else if (mState == StreamDescriptor::State::IDLE || + mState == StreamDescriptor::State::DRAINING) { + mState = StreamDescriptor::State::ACTIVE; + } // When in 'ACTIVE' and 'PAUSED' do not need to change the state. } else { - LOG(WARNING) << __func__ << ": reading of " << byteCount - << " bytes of data from MQ failed"; - reply.status = STATUS_NOT_ENOUGH_DATA; + LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received DRAIN write command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::IDLE; + // Since there is no actual hardware that would be draining the buffer, + // in order to simplify the reference code, we assume that draining + // happens instantly, thus skipping the 'DRAINING' state. + // TODO: Provide parametrization on the duration of draining to test + // handling of commands during the 'DRAINING' state. + } else { + LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::STANDBY && + command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received STANDBY write command"; + if (mState == StreamDescriptor::State::IDLE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received PAUSE write command"; + if (mState == StreamDescriptor::State::ACTIVE || + mState == StreamDescriptor::State::DRAINING) { + populateReply(&reply, mIsConnected); + mState = mState == StreamDescriptor::State::ACTIVE + ? StreamDescriptor::State::PAUSED + : StreamDescriptor::State::DRAIN_PAUSED; + } else { + LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received FLUSH write command"; + if (mState == StreamDescriptor::State::PAUSED || + mState == StreamDescriptor::State::DRAIN_PAUSED) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::IDLE; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; } - reply.latencyMs = Module::kLatencyMs; } else { LOG(WARNING) << __func__ << ": invalid command (" << command.toString() << ") or count: " << command.fmqByteCount; reply.status = STATUS_BAD_VALUE; } + reply.state = mState; LOG(DEBUG) << __func__ << ": writing reply " << reply.toString(); if (!mReplyMQ->writeBlocking(&reply, 1)) { LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } return Status::CONTINUE; } +bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) { + const size_t readByteCount = mDataMQ->availableToRead(); + // Amount of data that the HAL module is going to actually use. + const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize}); + bool fatal = false; + if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) { + const bool isConnected = mIsConnected; + LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ" + << " succeeded; connected? " << isConnected; + // Frames are consumed and counted regardless of connection status. + reply->fmqByteCount += byteCount; + mFrameCount += byteCount / mFrameSize; + populateReply(reply, isConnected); + usleep(3000); // Simulate a blocking call into the driver. + // Set 'fatal = true' if a driver error occurs. + } else { + LOG(WARNING) << __func__ << ": reading of " << readByteCount + << " bytes of data from MQ failed"; + reply->status = STATUS_NOT_ENOUGH_DATA; + } + reply->latencyMs = Module::kLatencyMs; + return !fatal; +} + template StreamCommon::~StreamCommon() { - if (!mIsClosed) { + if (!isClosed()) { LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak"; stopWorker(); // The worker and the context should clean up by themselves via destructors. @@ -214,13 +403,13 @@ StreamCommon::~StreamCommon() { template ndk::ScopedAStatus StreamCommon::close() { LOG(DEBUG) << __func__; - if (!mIsClosed) { + if (!isClosed()) { stopWorker(); LOG(DEBUG) << __func__ << ": joining the worker thread..."; mWorker.stop(); LOG(DEBUG) << __func__ << ": worker thread joined"; mContext.reset(); - mIsClosed = true; + mWorker.setClosed(); return ndk::ScopedAStatus::ok(); } else { LOG(ERROR) << __func__ << ": stream was already closed"; @@ -231,13 +420,14 @@ ndk::ScopedAStatus StreamCommon::close() { template void StreamCommon::stopWorker() { if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) { - LOG(DEBUG) << __func__ << ": asking the worker to stop..."; + LOG(DEBUG) << __func__ << ": asking the worker to exit..."; StreamDescriptor::Command cmd; - cmd.code = StreamContext::COMMAND_EXIT; + cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT); cmd.fmqByteCount = mContext.getInternalCommandCookie(); - // FIXME: This can block in the case when the client wrote a command - // while the stream worker's cycle is not running. Need to revisit - // when implementing standby and pause/resume. + // Note: never call 'pause' and 'resume' methods of StreamWorker + // in the HAL implementation. These methods are to be used by + // the client side only. Preventing the worker loop from running + // on the HAL side can cause a deadlock. if (!commandMQ->writeBlocking(&cmd, 1)) { LOG(ERROR) << __func__ << ": failed to write exit command to the MQ"; } @@ -248,7 +438,7 @@ void StreamCommon::stopWorker() { template ndk::ScopedAStatus StreamCommon::updateMetadata(const Metadata& metadata) { LOG(DEBUG) << __func__; - if (!mIsClosed) { + if (!isClosed()) { mMetadata = metadata; return ndk::ScopedAStatus::ok(); } diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h index 488edf116a..539fa8b85c 100644 --- a/audio/aidl/default/include/core-impl/Stream.h +++ b/audio/aidl/default/include/core-impl/Stream.h @@ -54,8 +54,10 @@ class StreamContext { int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite> DataMQ; - // Ensure that this value is not used by any of StreamDescriptor.COMMAND_* - static constexpr int COMMAND_EXIT = -1; + // Ensure that this value is not used by any of StreamDescriptor.CommandCode enums + static constexpr int32_t COMMAND_EXIT = -1; + // Ensure that this value is not used by any of StreamDescriptor.State enums + static constexpr int32_t STATE_CLOSED = -1; StreamContext() = default; StreamContext(std::unique_ptr commandMQ, std::unique_ptr replyMQ, @@ -99,6 +101,10 @@ class StreamContext { class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic { public: + bool isClosed() const { + return static_cast(mState.load()) == StreamContext::STATE_CLOSED; + } + void setClosed() { mState = static_cast(StreamContext::STATE_CLOSED); } void setIsConnected(bool connected) { mIsConnected = connected; } protected: @@ -109,9 +115,12 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea mReplyMQ(context.getReplyMQ()), mDataMQ(context.getDataMQ()) {} std::string init() override; + void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const; - // Used both by the main and worker threads. + // Atomic fields are used both by the main and worker threads. std::atomic mIsConnected = false; + static_assert(std::atomic::is_always_lock_free); + std::atomic mState = StreamDescriptor::State::STANDBY; // All fields are used on the worker thread only. const int mInternalCommandCookie; const size_t mFrameSize; @@ -132,6 +141,9 @@ class StreamInWorkerLogic : public StreamWorkerCommonLogic { protected: Status cycle() override; + + private: + bool read(size_t clientSize, StreamDescriptor::Reply* reply); }; using StreamInWorker = ::android::hardware::audio::common::StreamWorker; @@ -143,6 +155,9 @@ class StreamOutWorkerLogic : public StreamWorkerCommonLogic { protected: Status cycle() override; + + private: + bool write(size_t clientSize, StreamDescriptor::Reply* reply); }; using StreamOutWorker = ::android::hardware::audio::common::StreamWorker; @@ -155,7 +170,7 @@ class StreamCommon { ? ndk::ScopedAStatus::ok() : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE); } - bool isClosed() const { return mIsClosed; } + bool isClosed() const { return mWorker.isClosed(); } void setIsConnected(bool connected) { mWorker.setIsConnected(connected); } ndk::ScopedAStatus updateMetadata(const Metadata& metadata); @@ -168,9 +183,6 @@ class StreamCommon { Metadata mMetadata; StreamContext mContext; StreamWorker mWorker; - // This variable is checked in the destructor which can be called on an arbitrary Binder thread, - // thus we need to ensure that any changes made by other threads are sequentially consistent. - std::atomic mIsClosed = false; }; class StreamIn diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp index 23812008a6..b415da4465 100644 --- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp +++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "AudioHalBinderServiceUtil.h" @@ -69,6 +70,7 @@ using aidl::android::media::audio::common::AudioUsage; using android::hardware::audio::common::isBitPositionFlagSet; using android::hardware::audio::common::StreamLogic; using android::hardware::audio::common::StreamWorker; +using ndk::enum_range; using ndk::ScopedAStatus; template @@ -171,25 +173,26 @@ class WithAudioPortConfig { AudioPortConfig mConfig; }; -class AudioCoreModule : public testing::TestWithParam { +// Can be used as a base for any test here, does not depend on the fixture GTest parameters. +class AudioCoreModuleBase { public: // The default buffer size is used mostly for negative tests. static constexpr int kDefaultBufferSizeFrames = 256; - void SetUp() override { - ASSERT_NO_FATAL_FAILURE(ConnectToService()); + void SetUpImpl(const std::string& moduleName) { + ASSERT_NO_FATAL_FAILURE(ConnectToService(moduleName)); debug.flags().simulateDeviceConnections = true; ASSERT_NO_FATAL_FAILURE(debug.SetUp(module.get())); } - void TearDown() override { + void TearDownImpl() { if (module != nullptr) { EXPECT_IS_OK(module->setModuleDebug(ModuleDebug{})); } } - void ConnectToService() { - module = IModule::fromBinder(binderUtil.connectToService(GetParam())); + void ConnectToService(const std::string& moduleName) { + module = IModule::fromBinder(binderUtil.connectToService(moduleName)); ASSERT_NE(module, nullptr); } @@ -269,6 +272,13 @@ class AudioCoreModule : public testing::TestWithParam { WithDebugFlags debug; }; +class AudioCoreModule : public AudioCoreModuleBase, public testing::TestWithParam { + public: + void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpImpl(GetParam())); } + + void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownImpl()); } +}; + class WithDevicePortConnectedState { public: explicit WithDevicePortConnectedState(const AudioPort& idAndData) : mIdAndData(idAndData) {} @@ -352,21 +362,36 @@ class StreamContext { std::unique_ptr mDataMQ; }; -class StreamCommonLogic : public StreamLogic { +class StreamLogicDriver { public: - StreamDescriptor::Position getLastObservablePosition() { - std::lock_guard lock(mLock); - return mLastReply.observable; - } + virtual ~StreamLogicDriver() = default; + // Return 'true' to stop the worker. + virtual bool done() = 0; + // For 'Writer' logic, if the 'actualSize' is 0, write is skipped. + // The 'fmqByteCount' from the returned command is passed as is to the HAL. + virtual StreamDescriptor::Command getNextCommand(int maxDataSize, + int* actualSize = nullptr) = 0; + // Return 'true' to indicate that no further processing is needed, + // for example, the driver is expecting a bad status to be returned. + // The logic cycle will return with 'CONTINUE' status. Otherwise, + // the reply will be validated and then passed to 'processValidReply'. + virtual bool interceptRawReply(const StreamDescriptor::Reply& reply) = 0; + // Return 'false' to indicate that the contents of the reply are unexpected. + // Will abort the logic cycle. + virtual bool processValidReply(const StreamDescriptor::Reply& reply) = 0; +}; +class StreamCommonLogic : public StreamLogic { protected: - explicit StreamCommonLogic(const StreamContext& context) + StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver) : mCommandMQ(context.getCommandMQ()), mReplyMQ(context.getReplyMQ()), mDataMQ(context.getDataMQ()), - mData(context.getBufferSizeBytes()) {} + mData(context.getBufferSizeBytes()), + mDriver(driver) {} StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; } StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; } + StreamLogicDriver* getDriver() const { return mDriver; } std::string init() override { return ""; } @@ -374,19 +399,20 @@ class StreamCommonLogic : public StreamLogic { StreamContext::ReplyMQ* mReplyMQ; StreamContext::DataMQ* mDataMQ; std::vector mData; - std::mutex mLock; - StreamDescriptor::Reply mLastReply GUARDED_BY(mLock); + StreamLogicDriver* const mDriver; }; class StreamReaderLogic : public StreamCommonLogic { public: - explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {} + StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver) + : StreamCommonLogic(context, driver) {} protected: Status cycle() override { - StreamDescriptor::Command command{}; - command.code = StreamDescriptor::COMMAND_BURST; - command.fmqByteCount = mData.size(); + if (getDriver()->done()) { + return Status::EXIT; + } + StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size()); if (!mCommandMQ->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; @@ -396,6 +422,9 @@ class StreamReaderLogic : public StreamCommonLogic { LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; return Status::ABORT; } + if (getDriver()->interceptRawReply(reply)) { + return Status::CONTINUE; + } if (reply.status != STATUS_OK) { LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status); return Status::ABORT; @@ -405,16 +434,41 @@ class StreamReaderLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - { - std::lock_guard lock(mLock); - mLastReply = reply; + if (static_cast(reply.fmqByteCount) != mDataMQ->availableToRead()) { + LOG(ERROR) << __func__ + << ": the byte count in the reply is not the same as the amount of " + << "data available in the MQ: " << reply.fmqByteCount + << " != " << mDataMQ->availableToRead(); } - const size_t readCount = std::min({mDataMQ->availableToRead(), - static_cast(reply.fmqByteCount), mData.size()}); - if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) { + if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { + LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs; + return Status::ABORT; + } + if (reply.xrunFrames < 0) { + LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames; + return Status::ABORT; + } + if (std::find(enum_range().begin(), + enum_range().end(), + reply.state) == enum_range().end()) { + LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state); + return Status::ABORT; + } + const bool acceptedReply = getDriver()->processValidReply(reply); + if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) { + std::vector data(readCount); + if (mDataMQ->read(data.data(), readCount)) { + memcpy(mData.data(), data.data(), std::min(mData.size(), data.size())); + goto checkAcceptedReply; + } + LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed"; + return Status::ABORT; + } // readCount == 0 + checkAcceptedReply: + if (acceptedReply) { return Status::CONTINUE; } - LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed"; + LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString(); return Status::ABORT; } }; @@ -422,17 +476,20 @@ using StreamReader = StreamWorker; class StreamWriterLogic : public StreamCommonLogic { public: - explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {} + StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver) + : StreamCommonLogic(context, driver) {} protected: Status cycle() override { - if (!mDataMQ->write(mData.data(), mData.size())) { + if (getDriver()->done()) { + return Status::EXIT; + } + int actualSize = 0; + StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize); + if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) { LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed"; return Status::ABORT; } - StreamDescriptor::Command command{}; - command.code = StreamDescriptor::COMMAND_BURST; - command.fmqByteCount = mData.size(); if (!mCommandMQ->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; @@ -442,6 +499,9 @@ class StreamWriterLogic : public StreamCommonLogic { LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; return Status::ABORT; } + if (getDriver()->interceptRawReply(reply)) { + return Status::CONTINUE; + } if (reply.status != STATUS_OK) { LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status); return Status::ABORT; @@ -451,11 +511,31 @@ class StreamWriterLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - { - std::lock_guard lock(mLock); - mLastReply = reply; + if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) { + LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: " + << "available to write " << mDataMQ->availableToWrite() + << ", total size: " << mDataMQ->getQuantumCount(); + return Status::ABORT; } - return Status::CONTINUE; + if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { + LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs; + return Status::ABORT; + } + if (reply.xrunFrames < 0) { + LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames; + return Status::ABORT; + } + if (std::find(enum_range().begin(), + enum_range().end(), + reply.state) == enum_range().end()) { + LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state); + return Status::ABORT; + } + if (getDriver()->processValidReply(reply)) { + return Status::CONTINUE; + } + LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString(); + return Status::ABORT; } }; using StreamWriter = StreamWorker; @@ -466,52 +546,6 @@ struct IOTraits { using Worker = std::conditional_t; }; -// A dedicated version to test replies to invalid commands. -class StreamInvalidCommandLogic : public StreamCommonLogic { - public: - StreamInvalidCommandLogic(const StreamContext& context, - const std::vector& commands) - : StreamCommonLogic(context), mCommands(commands) {} - - std::vector getUnexpectedStatuses() { - std::lock_guard lock(mLock); - return mUnexpectedStatuses; - } - - protected: - Status cycle() override { - // Send all commands in one cycle to simplify testing. - // Extra logging helps to sort out issues with unexpected HAL behavior. - for (const auto& command : mCommands) { - LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ..."; - if (!getCommandMQ()->writeBlocking(&command, 1)) { - LOG(ERROR) << __func__ << ": writing of command into MQ failed"; - return Status::ABORT; - } - StreamDescriptor::Reply reply{}; - LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "..."; - if (!getReplyMQ()->readBlocking(&reply, 1)) { - LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; - return Status::ABORT; - } - LOG(INFO) << __func__ << ": received status " << statusToString(reply.status) - << " for command " << command.toString(); - if (reply.status != STATUS_BAD_VALUE) { - std::string s = command.toString(); - s.append(", ").append(statusToString(reply.status)); - std::lock_guard lock(mLock); - mUnexpectedStatuses.push_back(std::move(s)); - } - }; - return Status::EXIT; - } - - private: - const std::vector mCommands; - std::mutex mLock; - std::vector mUnexpectedStatuses GUARDED_BY(mLock); -}; - template class WithStream { public: @@ -1208,6 +1242,46 @@ TEST_P(AudioCoreModule, ExternalDevicePortRoutes) { } } +class StreamLogicDriverInvalidCommand : public StreamLogicDriver { + public: + StreamLogicDriverInvalidCommand(const std::vector& commands) + : mCommands(commands) {} + + std::string getUnexpectedStatuses() { + // This method is intended to be called after the worker thread has joined, + // thus no extra synchronization is needed. + std::string s; + if (!mStatuses.empty()) { + s = std::string("Pairs of (command, actual status): ") + .append((android::internal::ToString(mStatuses))); + } + return s; + } + + bool done() override { return mNextCommand >= mCommands.size(); } + StreamDescriptor::Command getNextCommand(int, int* actualSize) override { + if (actualSize != nullptr) *actualSize = 0; + return mCommands[mNextCommand++]; + } + bool interceptRawReply(const StreamDescriptor::Reply& reply) override { + if (reply.status != STATUS_BAD_VALUE) { + std::string s = mCommands[mNextCommand - 1].toString(); + s.append(", ").append(statusToString(reply.status)); + mStatuses.push_back(std::move(s)); + // If the HAL does not recognize the command as invalid, + // retrieve the data etc. + return reply.status != STATUS_OK; + } + return true; + } + bool processValidReply(const StreamDescriptor::Reply&) override { return true; } + + private: + const std::vector mCommands; + size_t mNextCommand = 0; + std::vector mStatuses; +}; + template class AudioStream : public AudioCoreModule { public: @@ -1315,19 +1389,6 @@ class AudioStream : public AudioCoreModule { EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value())); } - void ReadOrWrite(bool useSetupSequence2, bool validateObservablePosition) { - const auto allPortConfigs = - moduleConfig->getPortConfigsForMixPorts(IOTraits::is_input); - if (allPortConfigs.empty()) { - GTEST_SKIP() << "No mix ports have attached devices"; - } - for (const auto& portConfig : allPortConfigs) { - EXPECT_NO_FATAL_FAILURE( - ReadOrWriteImpl(portConfig, useSetupSequence2, validateObservablePosition)) - << portConfig.toString(); - } - } - void ResetPortConfigWithOpenStream() { const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits::is_input); if (!portConfig.has_value()) { @@ -1357,131 +1418,43 @@ class AudioStream : public AudioCoreModule { << stream1.getPortId(); } - template - void WaitForObservablePositionAdvance(Worker& worker) { - static constexpr int kWriteDurationUs = 50 * 1000; - static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000}; - int64_t framesInitial; - framesInitial = worker.getLastObservablePosition().frames; - ASSERT_FALSE(worker.hasError()); - bool timedOut = false; - int64_t frames = framesInitial; - for (android::base::Timer elapsed; - frames <= framesInitial && !worker.hasError() && - !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) { - usleep(kWriteDurationUs); - frames = worker.getLastObservablePosition().frames; - } - EXPECT_FALSE(timedOut); - EXPECT_FALSE(worker.hasError()) << worker.getError(); - EXPECT_GT(frames, framesInitial); - } - - void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useSetupSequence2, - bool validateObservablePosition) { - if (!useSetupSequence2) { - ASSERT_NO_FATAL_FAILURE( - ReadOrWriteSetupSequence1(portConfig, validateObservablePosition)); - } else { - ASSERT_NO_FATAL_FAILURE( - ReadOrWriteSetupSequence2(portConfig, validateObservablePosition)); - } - } - - // Set up a patch first, then open a stream. - void ReadOrWriteSetupSequence1(const AudioPortConfig& portConfig, - bool validateObservablePosition) { - auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( - IOTraits::is_input, portConfig); - ASSERT_FALSE(devicePorts.empty()); - auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); - WithAudioPatch patch(IOTraits::is_input, portConfig, devicePortConfig); - ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); - - WithStream stream(patch.getPortConfig(IOTraits::is_input)); - ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - typename IOTraits::Worker worker(*stream.getContext()); - - ASSERT_TRUE(worker.start()); - ASSERT_TRUE(worker.waitForAtLeastOneCycle()); - if (validateObservablePosition) { - ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker)); - } - } - - // Open a stream, then set up a patch for it. - void ReadOrWriteSetupSequence2(const AudioPortConfig& portConfig, - bool validateObservablePosition) { - WithStream stream(portConfig); - ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - typename IOTraits::Worker worker(*stream.getContext()); - - auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( - IOTraits::is_input, portConfig); - ASSERT_FALSE(devicePorts.empty()); - auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); - WithAudioPatch patch(IOTraits::is_input, stream.getPortConfig(), devicePortConfig); - ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); - - ASSERT_TRUE(worker.start()); - ASSERT_TRUE(worker.waitForAtLeastOneCycle()); - if (validateObservablePosition) { - ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker)); - } - } - void SendInvalidCommandImpl(const AudioPortConfig& portConfig) { std::vector commands(6); - commands[0].code = -1; - commands[1].code = StreamDescriptor::COMMAND_BURST - 1; - commands[2].code = std::numeric_limits::min(); - commands[3].code = std::numeric_limits::max(); - commands[4].code = StreamDescriptor::COMMAND_BURST; + commands[0].code = StreamDescriptor::CommandCode(-1); + commands[1].code = StreamDescriptor::CommandCode( + static_cast(StreamDescriptor::CommandCode::START) - 1); + commands[2].code = StreamDescriptor::CommandCode(std::numeric_limits::min()); + commands[3].code = StreamDescriptor::CommandCode(std::numeric_limits::max()); + // TODO: For proper testing of input streams, need to put the stream into + // a state which accepts BURST commands. + commands[4].code = StreamDescriptor::CommandCode::BURST; commands[4].fmqByteCount = -1; - commands[5].code = StreamDescriptor::COMMAND_BURST; + commands[5].code = StreamDescriptor::CommandCode::BURST; commands[5].fmqByteCount = std::numeric_limits::min(); WithStream stream(portConfig); ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - StreamWorker writer(*stream.getContext(), commands); - ASSERT_TRUE(writer.start()); - writer.waitForAtLeastOneCycle(); - auto unexpectedStatuses = writer.getUnexpectedStatuses(); - EXPECT_EQ(0UL, unexpectedStatuses.size()) - << "Pairs of (command, actual status): " - << android::internal::ToString(unexpectedStatuses); + StreamLogicDriverInvalidCommand driver(commands); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_EQ("", driver.getUnexpectedStatuses()); } }; using AudioStreamIn = AudioStream; using AudioStreamOut = AudioStream; -#define TEST_IO_STREAM(method_name) \ +#define TEST_IN_AND_OUT_STREAM(method_name) \ TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \ TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } -#define TEST_IO_STREAM_2(method_name, arg1, arg2) \ - TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) { \ - ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \ - } \ - TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \ - ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \ - } -TEST_IO_STREAM(CloseTwice); -TEST_IO_STREAM(OpenAllConfigs); -TEST_IO_STREAM(OpenInvalidBufferSize); -TEST_IO_STREAM(OpenInvalidDirection); -TEST_IO_STREAM(OpenOverMaxCount); -TEST_IO_STREAM(OpenTwiceSamePortConfig); -// Use of constants makes comprehensible test names. -constexpr bool SetupSequence1 = false; -constexpr bool SetupSequence2 = true; -constexpr bool SetupOnly = false; -constexpr bool ValidateObservablePosition = true; -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, SetupOnly); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, SetupOnly); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, ValidateObservablePosition); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, ValidateObservablePosition); -TEST_IO_STREAM(ResetPortConfigWithOpenStream); -TEST_IO_STREAM(SendInvalidCommand); +TEST_IN_AND_OUT_STREAM(CloseTwice); +TEST_IN_AND_OUT_STREAM(OpenAllConfigs); +TEST_IN_AND_OUT_STREAM(OpenInvalidBufferSize); +TEST_IN_AND_OUT_STREAM(OpenInvalidDirection); +TEST_IN_AND_OUT_STREAM(OpenOverMaxCount); +TEST_IN_AND_OUT_STREAM(OpenTwiceSamePortConfig); +TEST_IN_AND_OUT_STREAM(ResetPortConfigWithOpenStream); +TEST_IN_AND_OUT_STREAM(SendInvalidCommand); TEST_P(AudioStreamOut, OpenTwicePrimary) { const auto mixPorts = moduleConfig->getMixPorts(false); @@ -1523,6 +1496,163 @@ TEST_P(AudioStreamOut, RequireOffloadInfo) { << "when no offload info is provided for a compressed offload mix port"; } +using CommandAndState = std::pair; + +class StreamLogicDefaultDriver : public StreamLogicDriver { + public: + explicit StreamLogicDefaultDriver(const std::vector& commands) + : mCommands(commands) {} + + // The three methods below is intended to be called after the worker + // thread has joined, thus no extra synchronization is needed. + bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; } + bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; } + std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; } + + bool done() override { return mNextCommand >= mCommands.size(); } + StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override { + StreamDescriptor::Command command{}; + command.code = mCommands[mNextCommand++].first; + const int dataSize = command.code == StreamDescriptor::CommandCode::BURST ? maxDataSize : 0; + command.fmqByteCount = dataSize; + if (actualSize != nullptr) { + // In the output scenario, reduce slightly the fmqByteCount to verify + // that the HAL module always consumes all data from the MQ. + if (command.fmqByteCount > 1) command.fmqByteCount--; + *actualSize = dataSize; + } + return command; + } + bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; } + bool processValidReply(const StreamDescriptor::Reply& reply) override { + if (mPreviousFrames.has_value()) { + if (reply.observable.frames > mPreviousFrames.value()) { + mObservablePositionIncrease = true; + } else if (reply.observable.frames < mPreviousFrames.value()) { + mRetrogradeObservablePosition = true; + } + } + mPreviousFrames = reply.observable.frames; + + const auto& lastCommandState = mCommands[mNextCommand - 1]; + if (lastCommandState.second != reply.state) { + std::string s = std::string("Unexpected transition from the state ") + .append(mPreviousState) + .append(" to ") + .append(toString(reply.state)) + .append(" caused by the command ") + .append(toString(lastCommandState.first)); + LOG(ERROR) << __func__ << ": " << s; + mUnexpectedTransition = std::move(s); + return false; + } + return true; + } + + protected: + const std::vector& mCommands; + size_t mNextCommand = 0; + std::optional mPreviousFrames; + std::string mPreviousState = ""; + bool mObservablePositionIncrease = false; + bool mRetrogradeObservablePosition = false; + std::string mUnexpectedTransition; +}; + +using NamedCommandSequence = std::pair>; +enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ }; +using StreamIoTestParameters = + std::tuple; +template +class AudioStreamIo : public AudioCoreModuleBase, + public testing::TestWithParam { + public: + void SetUp() override { + ASSERT_NO_FATAL_FAILURE(SetUpImpl(std::get(GetParam()))); + ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig()); + } + + void Run() { + const auto allPortConfigs = + moduleConfig->getPortConfigsForMixPorts(IOTraits::is_input); + if (allPortConfigs.empty()) { + GTEST_SKIP() << "No mix ports have attached devices"; + } + for (const auto& portConfig : allPortConfigs) { + SCOPED_TRACE(portConfig.toString()); + const auto& commandsAndStates = std::get(GetParam()).second; + if (!std::get(GetParam())) { + ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates)); + } else { + ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq2(portConfig, commandsAndStates)); + } + } + } + + bool ValidateObservablePosition(const AudioPortConfig& /*portConfig*/) { + // May return false based on the portConfig, e.g. for telephony ports. + return true; + } + + // Set up a patch first, then open a stream. + void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig, + const std::vector& commandsAndStates) { + auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( + IOTraits::is_input, portConfig); + ASSERT_FALSE(devicePorts.empty()); + auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); + WithAudioPatch patch(IOTraits::is_input, portConfig, devicePortConfig); + ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); + + WithStream stream(patch.getPortConfig(IOTraits::is_input)); + ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); + StreamLogicDefaultDriver driver(commandsAndStates); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_FALSE(worker.hasError()) << worker.getError(); + EXPECT_EQ("", driver.getUnexpectedStateTransition()); + if (ValidateObservablePosition(portConfig)) { + EXPECT_TRUE(driver.hasObservablePositionIncrease()); + EXPECT_FALSE(driver.hasRetrogradeObservablePosition()); + } + } + + // Open a stream, then set up a patch for it. + void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig, + const std::vector& commandsAndStates) { + WithStream stream(portConfig); + ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); + StreamLogicDefaultDriver driver(commandsAndStates); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + + auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( + IOTraits::is_input, portConfig); + ASSERT_FALSE(devicePorts.empty()); + auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); + WithAudioPatch patch(IOTraits::is_input, stream.getPortConfig(), devicePortConfig); + ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); + + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_FALSE(worker.hasError()) << worker.getError(); + EXPECT_EQ("", driver.getUnexpectedStateTransition()); + if (ValidateObservablePosition(portConfig)) { + EXPECT_TRUE(driver.hasObservablePositionIncrease()); + EXPECT_FALSE(driver.hasRetrogradeObservablePosition()); + } + } +}; +using AudioStreamIoIn = AudioStreamIo; +using AudioStreamIoOut = AudioStreamIo; + +#define TEST_IN_AND_OUT_STREAM_IO(method_name) \ + TEST_P(AudioStreamIoIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \ + TEST_P(AudioStreamIoOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } + +TEST_IN_AND_OUT_STREAM_IO(Run); + // Tests specific to audio patches. The fixure class is named 'AudioModulePatch' // to avoid clashing with 'AudioPatch' class. class AudioModulePatch : public AudioCoreModule { @@ -1704,6 +1834,139 @@ INSTANTIATE_TEST_SUITE_P(AudioStreamOutTest, AudioStreamOut, testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), android::PrintInstanceNameToString); GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut); + +static const NamedCommandSequence kReadOrWriteSeq = std::make_pair( + std::string("ReadOrWrite"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE)}); +static const NamedCommandSequence kDrainInSeq = std::make_pair( + std::string("Drain"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::DRAINING), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::DRAINING), + // TODO: This will need to be changed once DRAIN starts taking time. + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kDrainOutSeq = std::make_pair( + std::string("Drain"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + // TODO: This will need to be changed once DRAIN starts taking time. + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::IDLE)}); +// TODO: This will need to be changed once DRAIN starts taking time so we can pause it. +static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair( + std::string("DrainPause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::IDLE)}); +static const NamedCommandSequence kStandbySeq = std::make_pair( + std::string("Standby"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::STANDBY, + StreamDescriptor::State::STANDBY), + // Perform a read or write in order to advance observable position + // (this is verified by tests). + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE)}); +static const NamedCommandSequence kPauseInSeq = std::make_pair( + std::string("Pause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kPauseOutSeq = std::make_pair( + std::string("Pause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED)}); +static const NamedCommandSequence kFlushInSeq = std::make_pair( + std::string("Flush"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kFlushOutSeq = std::make_pair( + std::string("Flush"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::IDLE)}); +std::string GetStreamIoTestName(const testing::TestParamInfo& info) { + return android::PrintInstanceNameToString( + testing::TestParamInfo{std::get(info.param), + info.index}) + .append("_") + .append(std::get(info.param).first) + .append("_SetupSeq") + .append(std::get(info.param) ? "2" : "1"); +} +INSTANTIATE_TEST_SUITE_P( + AudioStreamIoInTest, AudioStreamIoIn, + testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), + testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq, + kFlushInSeq), + testing::Values(false, true)), + GetStreamIoTestName); +GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn); +INSTANTIATE_TEST_SUITE_P( + AudioStreamIoOutTest, AudioStreamIoOut, + testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), + testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq, + kStandbySeq, kPauseOutSeq, kFlushOutSeq), + testing::Values(false, true)), + GetStreamIoTestName); +GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut); + INSTANTIATE_TEST_SUITE_P(AudioPatchTest, AudioModulePatch, testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), android::PrintInstanceNameToString);