From 705297317b09249142d3f5c9e0f5b044ec1add5a Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Thu, 20 Oct 2022 01:16:34 +0000 Subject: [PATCH 1/2] audio: Add 'join' method to StreamWorker This is intended for use in tests where the worker just executes some actions and then exits by itself. Use of 'join' instead of 'stop' ensures that the worker goes through all actions. Bug: 205884982 Test: atest libaudioaidlcommon_test Change-Id: I8a9f4f0bb786ee606e3b63a9847f414119716a7d --- audio/aidl/common/StreamWorker.cpp | 4 ++++ audio/aidl/common/include/StreamWorker.h | 7 ++++++- audio/aidl/common/tests/streamworker_tests.cpp | 8 ++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp index 9bca7609fd..dda0e4a6fd 100644 --- a/audio/aidl/common/StreamWorker.cpp +++ b/audio/aidl/common/StreamWorker.cpp @@ -44,6 +44,10 @@ void ThreadController::stop() { mWorkerStateChangeRequest = true; } } + join(); +} + +void ThreadController::join() { if (mWorker.joinable()) { mWorker.join(); } diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 6260eca49a..88235c2893 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -50,6 +50,10 @@ class ThreadController { return mError; } void stop(); + // Direct use of 'join' assumes that the StreamLogic is not intended + // to run forever, and is guaranteed to exit by itself. This normally + // only happen in tests. + void join(); bool waitForAtLeastOneCycle(); // Only used by unit tests. @@ -133,7 +137,8 @@ class StreamWorker : public LogicImpl { void resume() { mThread.resume(); } bool hasError() { return mThread.hasError(); } std::string getError() { return mThread.getError(); } - void stop() { return mThread.stop(); } + void stop() { mThread.stop(); } + void join() { mThread.join(); } bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); } // Only used by unit tests. diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index e3e484d7e3..8ea8424ab5 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -160,6 +160,14 @@ TEST_P(StreamWorkerTest, WorkerExit) { EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); } +TEST_P(StreamWorkerTest, WorkerJoin) { + ASSERT_TRUE(worker.start()); + stream.setStopStatus(); + worker.join(); + EXPECT_FALSE(worker.hasError()); + EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime)); +} + TEST_P(StreamWorkerTest, WorkerError) { ASSERT_TRUE(worker.start()); stream.setErrorStatus(); From cce8e5f39dc783d8a67a4576a3e53ebf572dae32 Mon Sep 17 00:00:00 2001 From: Mikhail Naganov Date: Tue, 13 Sep 2022 01:20:45 +0000 Subject: [PATCH 2/2] audio: Add pause, resume, and standby stream operations Clarify and verify in VTS that the data FMQ of StreamDescriptor is a transient buffer. The consumer must always read its entire contents. This is the same behavior as in the HIDL HAL. Define the state machine for streams and the set of commands for transferring between states. Clarify and verify in VTS that the frame counter of the observable position must never be reset. Implement commands for the synchronous I/O case. Refactor stream test logic to simplify testing of state transitions. Bug: 205884982 Test: atest VtsHalAudioCoreTargetTest Change-Id: Ibed7f4c3e77852863714f1910112f664b32d5897 --- .../hardware/audio/core/StreamDescriptor.aidl | 25 +- .../android/hardware/audio/core/IModule.aidl | 6 + .../hardware/audio/core/StreamDescriptor.aidl | 277 ++++++- .../hardware/audio/core/stream-in-sm.gv | 42 ++ .../hardware/audio/core/stream-out-sm.gv | 48 ++ audio/aidl/common/include/StreamWorker.h | 3 + audio/aidl/default/Stream.cpp | 318 +++++++-- audio/aidl/default/include/core-impl/Stream.h | 26 +- audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp | 673 ++++++++++++------ 9 files changed, 1110 insertions(+), 308 deletions(-) create mode 100644 audio/aidl/android/hardware/audio/core/stream-in-sm.gv create mode 100644 audio/aidl/android/hardware/audio/core/stream-out-sm.gv diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl index db1ac22c52..da24a10439 100644 --- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl @@ -39,15 +39,34 @@ parcelable StreamDescriptor { int frameSizeBytes; long bufferSizeFrames; android.hardware.audio.core.StreamDescriptor.AudioBuffer audio; - const int COMMAND_BURST = 1; + const int LATENCY_UNKNOWN = -1; @FixedSize @VintfStability parcelable Position { long frames; long timeNs; } + @Backing(type="int") @VintfStability + enum State { + STANDBY = 1, + IDLE = 2, + ACTIVE = 3, + PAUSED = 4, + DRAINING = 5, + DRAIN_PAUSED = 6, + ERROR = 100, + } + @Backing(type="int") @VintfStability + enum CommandCode { + START = 1, + BURST = 2, + DRAIN = 3, + STANDBY = 4, + PAUSE = 5, + FLUSH = 6, + } @FixedSize @VintfStability parcelable Command { - int code; + android.hardware.audio.core.StreamDescriptor.CommandCode code = android.hardware.audio.core.StreamDescriptor.CommandCode.START; int fmqByteCount; } @FixedSize @VintfStability @@ -57,6 +76,8 @@ parcelable StreamDescriptor { android.hardware.audio.core.StreamDescriptor.Position observable; android.hardware.audio.core.StreamDescriptor.Position hardware; int latencyMs; + int xrunFrames; + android.hardware.audio.core.StreamDescriptor.State state = android.hardware.audio.core.StreamDescriptor.State.STANDBY; } @VintfStability union AudioBuffer { diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl index 735f87ff17..095984059a 100644 --- a/audio/aidl/android/hardware/audio/core/IModule.aidl +++ b/audio/aidl/android/hardware/audio/core/IModule.aidl @@ -263,6 +263,9 @@ interface IModule { * be completing with an error, although data (zero filled) will still be * provided. * + * After the stream has been opened, it remains in the STANDBY state, see + * StreamDescriptor for more details. + * * @return An opened input stream and the associated descriptor. * @param args The pack of arguments, see 'OpenInputStreamArguments' parcelable. * @throws EX_ILLEGAL_ARGUMENT In the following cases: @@ -325,6 +328,9 @@ interface IModule { * StreamDescriptor will be completing with an error, although the data * will still be accepted and immediately discarded. * + * After the stream has been opened, it remains in the STANDBY state, see + * StreamDescriptor for more details. + * * @return An opened output stream and the associated descriptor. * @param args The pack of arguments, see 'OpenOutputStreamArguments' parcelable. * @throws EX_ILLEGAL_ARGUMENT In the following cases: diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl index 2b1ed8ceb1..e5e56fcf21 100644 --- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl @@ -33,6 +33,72 @@ import android.hardware.common.fmq.SynchronizedReadWrite; * internal components of the stream while serving commands invoked via the * stream's AIDL interface and commands invoked via the command queue of the * descriptor. + * + * There is a state machine defined for the stream, which executes on the + * thread handling the commands from the queue. The states are defined based + * on the model of idealized producer and consumer connected via a ring buffer. + * For input streams, the "producer" is hardware, the "consumer" is software, + * for outputs streams it's the opposite. When the producer is active, but + * the buffer is full, the following actions are possible: + * - if the consumer is active, the producer blocks until there is space, + * this behavior is only possible for software producers; + * - if the consumer is passive: + * - the producer can preserve the buffer contents—a s/w producer can + * keep the data on their side, while a h/w producer can only drop captured + * data in this case; + * - or the producer overwrites old data in the buffer. + * Similarly, when an active consumer faces an empty buffer, it can: + * - block until there is data (producer must be active), only possible + * for software consumers; + * - walk away with no data; when the consumer is hardware, it must emit + * silence in this case. + * + * The model is defined below, note the asymmetry regarding the 'IDLE' state + * between input and output streams: + * + * Producer | Buffer state | Consumer | Applies | State + * active? | | active? | to | + * ==========|==============|==========|=========|============================== + * No | Empty | No | Both | STANDBY + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No | Input | IDLE, overwrite behavior + * ----------|--------------|----------|---------|----------------------------- + * No | Empty | Yes† | Output | IDLE, h/w emits silence + * ----------|--------------|----------|---------|----------------------------- + * Yes | Not empty | Yes | Both | ACTIVE, s/w x-runs counted + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No | Input | PAUSED, drop behavior + * ----------|--------------|----------|---------|----------------------------- + * Yes | Filling up | No† | Output | PAUSED, s/w stops writing once + * | | | | the buffer is filled up; + * | | | | h/w emits silence. + * ----------|--------------|----------|---------|----------------------------- + * No | Not empty | Yes | Both | DRAINING + * ----------|--------------|----------|---------|----------------------------- + * No | Not empty | No† | Output | DRAIN_PAUSED, + * | | | | h/w emits silence. + * + * † - note that for output, "buffer empty, h/w consuming" has the same outcome + * as "buffer not empty, h/w not consuming", but logically these conditions + * are different. + * + * State machines of both input and output streams start from the 'STANDBY' + * state. Transitions between states happen naturally with changes in the + * states of the model elements. For simplicity, we restrict the change to one + * element only, for example, in the 'STANDBY' state, either the producer or the + * consumer can become active, but not both at the same time. States 'STANDBY', + * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event, + * whereas a change from the 'DRAINING' state can happen with time as the buffer + * gets empty. + * + * The state machine for input streams is defined in the `stream-in-sm.gv` file, + * for output streams—in the `stream-out-sm.gv` file. State machines define how + * commands (from the enum 'CommandCode') trigger state changes. The full list + * of states and commands is defined by constants of the 'State' enum. Note that + * the 'CLOSED' state does not have a constant in the interface because the + * client can never observe a stream with a functioning command queue in this + * state. The 'ERROR' state is a special state which the state machine enters + * when an unrecoverable hardware error is detected by the HAL module. */ @JavaDerive(equals=true, toString=true) @VintfStability @@ -55,12 +121,110 @@ parcelable StreamDescriptor { long timeNs; } - /** - * The command used for audio I/O, see 'AudioBuffer'. For MMap No IRQ mode - * this command only provides updated positions and latency because actual - * audio I/O is done via the 'AudioBuffer.mmap' shared buffer. - */ - const int COMMAND_BURST = 1; + @VintfStability + @Backing(type="int") + enum State { + /** + * 'STANDBY' is the initial state of the stream, entered after + * opening. Since both the producer and the consumer are inactive in + * this state, it allows the HAL module to put associated hardware into + * "standby" mode to save power. + */ + STANDBY = 1, + /** + * In the 'IDLE' state the audio hardware is active. For input streams, + * the hardware is filling buffer with captured data, overwriting old + * contents on buffer wraparounds. For output streams, the buffer is + * still empty, and the hardware is outputting zeroes. The HAL module + * must not account for any under- or overruns as the client is not + * expected to perform audio I/O. + */ + IDLE = 2, + /** + * The active state of the stream in which it handles audio I/O. The HAL + * module can assume that the audio I/O will be periodic, thus inability + * of the client to provide or consume audio data on time must be + * considered as an under- or overrun and indicated via the 'xrunFrames' + * field of the reply. + */ + ACTIVE = 3, + /** + * In the 'PAUSED' state the consumer is inactive. For input streams, + * the hardware stops updating the buffer as soon as it fills up (this + * is the difference from the 'IDLE' state). For output streams, + * "inactivity" of hardware means that it does not consume audio data, + * but rather emits silence. + */ + PAUSED = 4, + /** + * In the 'DRAINING' state the producer is inactive, the consumer is + * finishing up on the buffer contents, emptying it up. As soon as it + * gets empty, the stream transfers itself into the next state. + */ + DRAINING = 5, + /** + * Used for output streams only, pauses draining. This state is similar + * to the 'PAUSED' state, except that the client is not adding any + * new data. If it emits a 'BURST' command, this brings the stream + * into the regular 'PAUSED' state. + */ + DRAIN_PAUSED = 6, + /** + * The ERROR state is entered when the stream has encountered an + * irrecoverable error from the lower layer. After entering it, the + * stream can only be closed. + */ + ERROR = 100, + } + + @VintfStability + @Backing(type="int") + enum CommandCode { + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + START = 1, + /** + * The BURST command used for audio I/O, see 'AudioBuffer'. Differences + * for the MMap No IRQ mode: + * + * - this command only provides updated positions and latency because + * actual audio I/O is done via the 'AudioBuffer.mmap' shared buffer. + * The client does not synchronize reads and writes into the buffer + * with sending of this command. + * + * - the 'fmqByteCount' must always be set to 0. + */ + BURST = 2, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + DRAIN = 3, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + * + * Note that it's left on the discretion of the HAL implementation to + * assess all the necessary conditions that could prevent hardware from + * being suspended. Even if it can not be suspended, the state machine + * must still enter the 'STANDBY' state for consistency. Since the + * buffer must remain empty in this state, even if capturing hardware is + * still active, captured data must be discarded. + */ + STANDBY = 4, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + PAUSE = 5, + /** + * See the state machines on the applicability of this command to + * different states. The 'fmqByteCount' field must always be set to 0. + */ + FLUSH = 6, + } /** * Used for sending commands to the HAL module. The client writes into @@ -71,12 +235,16 @@ parcelable StreamDescriptor { @FixedSize parcelable Command { /** - * One of COMMAND_* codes. + * The code of the command. */ - int code; + CommandCode code = CommandCode.START; /** + * This field is only used for the BURST command. For all other commands + * it must be set to 0. The following description applies to the use + * of this field for the BURST command. + * * For output streams: the amount of bytes that the client requests the - * HAL module to read from the 'audio.fmq' queue. + * HAL module to use out of the data contained in the 'audio.fmq' queue. * For input streams: the amount of bytes requested by the client to * read from the hardware into the 'audio.fmq' queue. * @@ -95,6 +263,12 @@ parcelable StreamDescriptor { } MQDescriptor command; + /** + * The value used for the 'Reply.latencyMs' field when the effective + * latency can not be reported by the HAL module. + */ + const int LATENCY_UNKNOWN = -1; + /** * Used for providing replies to commands. The HAL module writes into * the queue, the client reads. The queue can only contain a single reply, @@ -107,17 +281,22 @@ parcelable StreamDescriptor { * One of Binder STATUS_* statuses: * - STATUS_OK: the command has completed successfully; * - STATUS_BAD_VALUE: invalid value in the 'Command' structure; - * - STATUS_INVALID_OPERATION: the mix port is not connected - * to any producer or consumer, thus - * positions can not be reported; + * - STATUS_INVALID_OPERATION: the command is not applicable in the + * current state of the stream, or to this + * type of the stream; + * - STATUS_NO_INIT: positions can not be reported because the mix port + * is not connected to any producer or consumer, or + * because the HAL module does not support positions + * reporting for this AudioSource (on input streams). * - STATUS_NOT_ENOUGH_DATA: a read or write error has * occurred for the 'audio.fmq' queue; - * */ int status; /** - * For output streams: the amount of bytes actually consumed by the HAL - * module from the 'audio.fmq' queue. + * Used with the BURST command only. + * + * For output streams: the amount of bytes of data actually consumed + * by the HAL module. * For input streams: the amount of bytes actually provided by the HAL * in the 'audio.fmq' queue. * @@ -126,10 +305,18 @@ parcelable StreamDescriptor { */ int fmqByteCount; /** + * It is recommended to report the current position for any command. + * If the position can not be reported, the 'status' field must be + * set to 'NO_INIT'. + * * For output streams: the moment when the specified stream position * was presented to an external observer (i.e. presentation position). * For input streams: the moment when data at the specified stream position * was acquired (i.e. capture position). + * + * The observable position must never be reset by the HAL module. + * The data type of the frame counter is large enough to support + * continuous counting for years of operation. */ Position observable; /** @@ -138,9 +325,22 @@ parcelable StreamDescriptor { */ Position hardware; /** - * Current latency reported by the hardware. + * Current latency reported by the hardware. It is recommended to + * report the current latency for any command. If the value of latency + * can not be determined, this field must be set to 'LATENCY_UNKNOWN'. */ int latencyMs; + /** + * Number of frames lost due to an underrun (for input streams), + * or not provided on time (for output streams) for the **previous** + * transfer operation. + */ + int xrunFrames; + /** + * The state that the stream was in while the HAL module was sending the + * reply. + */ + State state = State.STANDBY; } MQDescriptor reply; @@ -170,42 +370,59 @@ parcelable StreamDescriptor { @VintfStability union AudioBuffer { /** - * The fast message queue used for all modes except MMap No IRQ. Both - * reads and writes into this queue are non-blocking because access to - * this queue is synchronized via the 'command' and 'reply' queues as - * described below. The queue nevertheless uses 'SynchronizedReadWrite' - * because there is only one reader, and the reading position must be - * shared. + * The fast message queue used for BURST commands in all modes except + * MMap No IRQ. Both reads and writes into this queue are non-blocking + * because access to this queue is synchronized via the 'command' and + * 'reply' queues as described below. The queue nevertheless uses + * 'SynchronizedReadWrite' because there is only one reader, and the + * reading position must be shared. + * + * Note that the fast message queue is a transient buffer, only used for + * data transfer. Neither of the sides can use it to store any data + * outside of the 'BURST' operation. The consumer must always retrieve + * all data available in the fast message queue, even if it can not use + * it. The producer must re-send any unconsumed data on the next + * transfer operation. This restriction is posed in order to make the + * fast message queue fully transparent from the latency perspective. * * For output streams the following sequence of operations is used: * 1. The client writes audio data into the 'audio.fmq' queue. - * 2. The client writes the 'BURST' command into the 'command' queue, + * 2. The client writes the BURST command into the 'command' queue, * and hangs on waiting on a read from the 'reply' queue. * 3. The high priority thread in the HAL module wakes up due to 2. - * 4. The HAL module reads the command and audio data. + * 4. The HAL module reads the command and audio data. According + * to the statement above, the HAL module must always read + * from the FMQ all the data it contains. The amount of data that + * the HAL module has actually consumed is indicated to the client + * via the 'reply.fmqByteCount' field. * 5. The HAL module writes the command status and current positions * into 'reply' queue, and hangs on waiting on a read from * the 'command' queue. * 6. The client wakes up due to 5. and reads the reply. * * For input streams the following sequence of operations is used: - * 1. The client writes the 'BURST' command into the 'command' queue, + * 1. The client writes the BURST command into the 'command' queue, * and hangs on waiting on a read from the 'reply' queue. * 2. The high priority thread in the HAL module wakes up due to 1. * 3. The HAL module writes audio data into the 'audio.fmq' queue. + * The value of 'reply.fmqByteCount' must be the equal to the amount + * of data in the queue. * 4. The HAL module writes the command status and current positions * into 'reply' queue, and hangs on waiting on a read from * the 'command' queue. * 5. The client wakes up due to 4. - * 6. The client reads the reply and audio data. + * 6. The client reads the reply and audio data. The client must + * always read from the FMQ all the data it contains. + * */ MQDescriptor fmq; /** * MMap buffers are shared directly with the DSP, which operates - * independently from the CPU. Writes and reads into these buffers - * are not synchronized with 'command' and 'reply' queues. However, - * the client still uses the 'BURST' command for obtaining current - * positions from the HAL module. + * independently from the CPU. Writes and reads into these buffers are + * not synchronized with 'command' and 'reply' queues. However, the + * client still uses the same commands for controlling the audio data + * exchange and for obtaining current positions and latency from the HAL + * module. */ MmapBufferDescriptor mmap; } diff --git a/audio/aidl/android/hardware/audio/core/stream-in-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv new file mode 100644 index 0000000000..889a14b5c1 --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-in-sm.gv @@ -0,0 +1,42 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-in-sm.gv -o stream-in-sm.png +digraph stream_in_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + I -> STANDBY; + STANDBY -> IDLE [label="START"]; // producer -> active + IDLE -> STANDBY [label="STANDBY"]; // producer -> passive, buffer is cleared + IDLE -> ACTIVE [label="BURST"]; // consumer -> active + ACTIVE -> ACTIVE [label="BURST"]; + ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive + ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive + PAUSED -> ACTIVE [label="BURST"]; // consumer -> active + PAUSED -> STANDBY [label="FLUSH"]; // producer -> passive, buffer is cleared + DRAINING -> DRAINING [label="BURST"]; + DRAINING -> ACTIVE [label="START"]; // producer -> active + DRAINING -> STANDBY [label=""]; // consumer deactivates + IDLE -> ERROR [label=""]; + ACTIVE -> ERROR [label=""]; + PAUSED -> ERROR [label=""]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/android/hardware/audio/core/stream-out-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv new file mode 100644 index 0000000000..56dd52904c --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-out-sm.gv @@ -0,0 +1,48 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-out-sm.gv -o stream-out-sm.png +digraph stream_out_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=lightgreen] IDLE; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + I -> STANDBY; + STANDBY -> IDLE [label="START"]; // consumer -> active + STANDBY -> PAUSED [label="BURST"]; // producer -> active + IDLE -> STANDBY [label="STANDBY"]; // consumer -> passive + IDLE -> ACTIVE [label="BURST"]; // producer -> active + ACTIVE -> ACTIVE [label="BURST"]; + ACTIVE -> PAUSED [label="PAUSE"]; // consumer -> passive (not consuming) + ACTIVE -> DRAINING [label="DRAIN"]; // producer -> passive + PAUSED -> PAUSED [label="BURST"]; + PAUSED -> ACTIVE [label="START"]; // consumer -> active + PAUSED -> IDLE [label="FLUSH"]; // producer -> passive, buffer is cleared + DRAINING -> IDLE [label=""]; + DRAINING -> ACTIVE [label="BURST"]; // producer -> active + DRAINING -> DRAIN_PAUSED [label="PAUSE"]; // consumer -> passive (not consuming) + DRAIN_PAUSED -> DRAINING [label="START"]; // consumer -> active + DRAIN_PAUSED -> PAUSED [label="BURST"]; // producer -> active + DRAIN_PAUSED -> IDLE [label="FLUSH"]; // buffer is cleared + IDLE -> ERROR [label=""]; + ACTIVE -> ERROR [label=""]; + DRAINING -> ERROR [label=""]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index 88235c2893..ab2ec26485 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -39,6 +39,9 @@ class ThreadController { ~ThreadController() { stop(); } bool start(const std::string& name, int priority); + // Note: 'pause' and 'resume' methods should only be used on the "driving" side. + // In the case of audio HAL I/O, the driving side is the client, because the HAL + // implementation always blocks on getting a command. void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); } void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); } bool hasError() { diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp index 312df720eb..7b544a19a9 100644 --- a/audio/aidl/default/Stream.cpp +++ b/audio/aidl/default/Stream.cpp @@ -85,126 +85,315 @@ std::string StreamWorkerCommonLogic::init() { return ""; } +void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply, + bool isConnected) const { + if (isConnected) { + reply->status = STATUS_OK; + reply->observable.frames = mFrameCount; + reply->observable.timeNs = ::android::elapsedRealtimeNano(); + } else { + reply->status = STATUS_NO_INIT; + } +} + const std::string StreamInWorkerLogic::kThreadName = "reader"; StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (command.code == StreamContext::COMMAND_EXIT && + if (static_cast(command.code) == StreamContext::COMMAND_EXIT && command.fmqByteCount == mInternalCommandCookie) { LOG(DEBUG) << __func__ << ": received EXIT command"; + setClosed(); // This is an internal command, no need to reply. return Status::EXIT; - } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) { + } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) { + LOG(DEBUG) << __func__ << ": received START read command"; + if (mState == StreamDescriptor::State::STANDBY || + mState == StreamDescriptor::State::DRAINING) { + populateReply(&reply, mIsConnected); + mState = mState == StreamDescriptor::State::STANDBY ? StreamDescriptor::State::IDLE + : StreamDescriptor::State::ACTIVE; + } else { + LOG(WARNING) << __func__ << ": START command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) { LOG(DEBUG) << __func__ << ": received BURST read command for " << command.fmqByteCount << " bytes"; - usleep(3000); // Simulate a blocking call into the driver. - const size_t byteCount = std::min({static_cast(command.fmqByteCount), - mDataMQ->availableToWrite(), mDataBufferSize}); - const bool isConnected = mIsConnected; - // Simulate reading of data, or provide zeroes if the stream is not connected. - for (size_t i = 0; i < byteCount; ++i) { - using buffer_type = decltype(mDataBuffer)::element_type; - constexpr int kBufferValueRange = std::numeric_limits::max() - - std::numeric_limits::min() + 1; - mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) + - std::numeric_limits::min() - : 0; - } - bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; - if (success) { - LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ" - << " succeeded; connected? " << isConnected; - // Frames are provided and counted regardless of connection status. - reply.fmqByteCount = byteCount; - mFrameCount += byteCount / mFrameSize; - if (isConnected) { - reply.status = STATUS_OK; - reply.observable.frames = mFrameCount; - reply.observable.timeNs = ::android::elapsedRealtimeNano(); - } else { - reply.status = STATUS_INVALID_OPERATION; + if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE || + mState == StreamDescriptor::State::PAUSED || + mState == StreamDescriptor::State::DRAINING) { + if (!read(command.fmqByteCount, &reply)) { + mState = StreamDescriptor::State::ERROR; + } + if (mState == StreamDescriptor::State::IDLE || + mState == StreamDescriptor::State::PAUSED) { + mState = StreamDescriptor::State::ACTIVE; + } else if (mState == StreamDescriptor::State::DRAINING) { + // To simplify the reference code, we assume that the read operation + // has consumed all the data remaining in the hardware buffer. + // TODO: Provide parametrization on the duration of draining to test + // handling of commands during the 'DRAINING' state. + mState = StreamDescriptor::State::STANDBY; } } else { - LOG(WARNING) << __func__ << ": writing of " << byteCount - << " bytes of data to MQ failed"; - reply.status = STATUS_NOT_ENOUGH_DATA; + LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received DRAIN read command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::DRAINING; + } else { + LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received PAUSE read command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::PAUSED; + } else { + LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received FLUSH read command"; + if (mState == StreamDescriptor::State::PAUSED) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::STANDBY && + command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received STANDBY read command"; + if (mState == StreamDescriptor::State::IDLE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; } - reply.latencyMs = Module::kLatencyMs; } else { LOG(WARNING) << __func__ << ": invalid command (" << command.toString() << ") or count: " << command.fmqByteCount; reply.status = STATUS_BAD_VALUE; } + reply.state = mState; LOG(DEBUG) << __func__ << ": writing reply " << reply.toString(); if (!mReplyMQ->writeBlocking(&reply, 1)) { LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } return Status::CONTINUE; } +bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) { + // Can switch the state to ERROR if a driver error occurs. + const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize}); + const bool isConnected = mIsConnected; + bool fatal = false; + // Simulate reading of data, or provide zeroes if the stream is not connected. + for (size_t i = 0; i < byteCount; ++i) { + using buffer_type = decltype(mDataBuffer)::element_type; + constexpr int kBufferValueRange = std::numeric_limits::max() - + std::numeric_limits::min() + 1; + mDataBuffer[i] = isConnected ? (std::rand() % kBufferValueRange) + + std::numeric_limits::min() + : 0; + } + usleep(3000); // Simulate a blocking call into the driver. + // Set 'fatal = true' if a driver error occurs. + if (bool success = byteCount > 0 ? mDataMQ->write(&mDataBuffer[0], byteCount) : true; success) { + LOG(DEBUG) << __func__ << ": writing of " << byteCount << " bytes into data MQ" + << " succeeded; connected? " << isConnected; + // Frames are provided and counted regardless of connection status. + reply->fmqByteCount += byteCount; + mFrameCount += byteCount / mFrameSize; + populateReply(reply, isConnected); + } else { + LOG(WARNING) << __func__ << ": writing of " << byteCount << " bytes of data to MQ failed"; + reply->status = STATUS_NOT_ENOUGH_DATA; + } + reply->latencyMs = Module::kLatencyMs; + return !fatal; +} + const std::string StreamOutWorkerLogic::kThreadName = "writer"; StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (command.code == StreamContext::COMMAND_EXIT && + if (static_cast(command.code) == StreamContext::COMMAND_EXIT && command.fmqByteCount == mInternalCommandCookie) { LOG(DEBUG) << __func__ << ": received EXIT command"; + setClosed(); // This is an internal command, no need to reply. return Status::EXIT; - } else if (command.code == StreamDescriptor::COMMAND_BURST && command.fmqByteCount >= 0) { + } else if (command.code == StreamDescriptor::CommandCode::START && command.fmqByteCount >= 0) { + LOG(DEBUG) << __func__ << ": received START read command"; + switch (mState) { + case StreamDescriptor::State::STANDBY: + mState = StreamDescriptor::State::IDLE; + break; + case StreamDescriptor::State::PAUSED: + mState = StreamDescriptor::State::ACTIVE; + break; + case StreamDescriptor::State::DRAIN_PAUSED: + mState = StreamDescriptor::State::PAUSED; + break; + default: + LOG(WARNING) << __func__ << ": START command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + if (reply.status != STATUS_INVALID_OPERATION) { + populateReply(&reply, mIsConnected); + } + } else if (command.code == StreamDescriptor::CommandCode::BURST && command.fmqByteCount >= 0) { LOG(DEBUG) << __func__ << ": received BURST write command for " << command.fmqByteCount << " bytes"; - const size_t byteCount = std::min({static_cast(command.fmqByteCount), - mDataMQ->availableToRead(), mDataBufferSize}); - bool success = byteCount > 0 ? mDataMQ->read(&mDataBuffer[0], byteCount) : true; - if (success) { - const bool isConnected = mIsConnected; - LOG(DEBUG) << __func__ << ": reading of " << byteCount << " bytes from data MQ" - << " succeeded; connected? " << isConnected; - // Frames are consumed and counted regardless of connection status. - reply.fmqByteCount = byteCount; - mFrameCount += byteCount / mFrameSize; - if (isConnected) { - reply.status = STATUS_OK; - reply.observable.frames = mFrameCount; - reply.observable.timeNs = ::android::elapsedRealtimeNano(); - } else { - reply.status = STATUS_INVALID_OPERATION; + if (mState != StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states + if (!write(command.fmqByteCount, &reply)) { + mState = StreamDescriptor::State::ERROR; } - usleep(3000); // Simulate a blocking call into the driver. + if (mState == StreamDescriptor::State::STANDBY || + mState == StreamDescriptor::State::DRAIN_PAUSED) { + mState = StreamDescriptor::State::PAUSED; + } else if (mState == StreamDescriptor::State::IDLE || + mState == StreamDescriptor::State::DRAINING) { + mState = StreamDescriptor::State::ACTIVE; + } // When in 'ACTIVE' and 'PAUSED' do not need to change the state. } else { - LOG(WARNING) << __func__ << ": reading of " << byteCount - << " bytes of data from MQ failed"; - reply.status = STATUS_NOT_ENOUGH_DATA; + LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::DRAIN && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received DRAIN write command"; + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::IDLE; + // Since there is no actual hardware that would be draining the buffer, + // in order to simplify the reference code, we assume that draining + // happens instantly, thus skipping the 'DRAINING' state. + // TODO: Provide parametrization on the duration of draining to test + // handling of commands during the 'DRAINING' state. + } else { + LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::STANDBY && + command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received STANDBY write command"; + if (mState == StreamDescriptor::State::IDLE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::STANDBY; + } else { + LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::PAUSE && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received PAUSE write command"; + if (mState == StreamDescriptor::State::ACTIVE || + mState == StreamDescriptor::State::DRAINING) { + populateReply(&reply, mIsConnected); + mState = mState == StreamDescriptor::State::ACTIVE + ? StreamDescriptor::State::PAUSED + : StreamDescriptor::State::DRAIN_PAUSED; + } else { + LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; + } + } else if (command.code == StreamDescriptor::CommandCode::FLUSH && command.fmqByteCount == 0) { + LOG(DEBUG) << __func__ << ": received FLUSH write command"; + if (mState == StreamDescriptor::State::PAUSED || + mState == StreamDescriptor::State::DRAIN_PAUSED) { + populateReply(&reply, mIsConnected); + mState = StreamDescriptor::State::IDLE; + } else { + LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " + << toString(mState); + reply.status = STATUS_INVALID_OPERATION; } - reply.latencyMs = Module::kLatencyMs; } else { LOG(WARNING) << __func__ << ": invalid command (" << command.toString() << ") or count: " << command.fmqByteCount; reply.status = STATUS_BAD_VALUE; } + reply.state = mState; LOG(DEBUG) << __func__ << ": writing reply " << reply.toString(); if (!mReplyMQ->writeBlocking(&reply, 1)) { LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed"; + mState = StreamDescriptor::State::ERROR; return Status::ABORT; } return Status::CONTINUE; } +bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) { + const size_t readByteCount = mDataMQ->availableToRead(); + // Amount of data that the HAL module is going to actually use. + const size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize}); + bool fatal = false; + if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) { + const bool isConnected = mIsConnected; + LOG(DEBUG) << __func__ << ": reading of " << readByteCount << " bytes from data MQ" + << " succeeded; connected? " << isConnected; + // Frames are consumed and counted regardless of connection status. + reply->fmqByteCount += byteCount; + mFrameCount += byteCount / mFrameSize; + populateReply(reply, isConnected); + usleep(3000); // Simulate a blocking call into the driver. + // Set 'fatal = true' if a driver error occurs. + } else { + LOG(WARNING) << __func__ << ": reading of " << readByteCount + << " bytes of data from MQ failed"; + reply->status = STATUS_NOT_ENOUGH_DATA; + } + reply->latencyMs = Module::kLatencyMs; + return !fatal; +} + template StreamCommon::~StreamCommon() { - if (!mIsClosed) { + if (!isClosed()) { LOG(ERROR) << __func__ << ": stream was not closed prior to destruction, resource leak"; stopWorker(); // The worker and the context should clean up by themselves via destructors. @@ -214,13 +403,13 @@ StreamCommon::~StreamCommon() { template ndk::ScopedAStatus StreamCommon::close() { LOG(DEBUG) << __func__; - if (!mIsClosed) { + if (!isClosed()) { stopWorker(); LOG(DEBUG) << __func__ << ": joining the worker thread..."; mWorker.stop(); LOG(DEBUG) << __func__ << ": worker thread joined"; mContext.reset(); - mIsClosed = true; + mWorker.setClosed(); return ndk::ScopedAStatus::ok(); } else { LOG(ERROR) << __func__ << ": stream was already closed"; @@ -231,13 +420,14 @@ ndk::ScopedAStatus StreamCommon::close() { template void StreamCommon::stopWorker() { if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) { - LOG(DEBUG) << __func__ << ": asking the worker to stop..."; + LOG(DEBUG) << __func__ << ": asking the worker to exit..."; StreamDescriptor::Command cmd; - cmd.code = StreamContext::COMMAND_EXIT; + cmd.code = StreamDescriptor::CommandCode(StreamContext::COMMAND_EXIT); cmd.fmqByteCount = mContext.getInternalCommandCookie(); - // FIXME: This can block in the case when the client wrote a command - // while the stream worker's cycle is not running. Need to revisit - // when implementing standby and pause/resume. + // Note: never call 'pause' and 'resume' methods of StreamWorker + // in the HAL implementation. These methods are to be used by + // the client side only. Preventing the worker loop from running + // on the HAL side can cause a deadlock. if (!commandMQ->writeBlocking(&cmd, 1)) { LOG(ERROR) << __func__ << ": failed to write exit command to the MQ"; } @@ -248,7 +438,7 @@ void StreamCommon::stopWorker() { template ndk::ScopedAStatus StreamCommon::updateMetadata(const Metadata& metadata) { LOG(DEBUG) << __func__; - if (!mIsClosed) { + if (!isClosed()) { mMetadata = metadata; return ndk::ScopedAStatus::ok(); } diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h index 488edf116a..539fa8b85c 100644 --- a/audio/aidl/default/include/core-impl/Stream.h +++ b/audio/aidl/default/include/core-impl/Stream.h @@ -54,8 +54,10 @@ class StreamContext { int8_t, ::aidl::android::hardware::common::fmq::SynchronizedReadWrite> DataMQ; - // Ensure that this value is not used by any of StreamDescriptor.COMMAND_* - static constexpr int COMMAND_EXIT = -1; + // Ensure that this value is not used by any of StreamDescriptor.CommandCode enums + static constexpr int32_t COMMAND_EXIT = -1; + // Ensure that this value is not used by any of StreamDescriptor.State enums + static constexpr int32_t STATE_CLOSED = -1; StreamContext() = default; StreamContext(std::unique_ptr commandMQ, std::unique_ptr replyMQ, @@ -99,6 +101,10 @@ class StreamContext { class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic { public: + bool isClosed() const { + return static_cast(mState.load()) == StreamContext::STATE_CLOSED; + } + void setClosed() { mState = static_cast(StreamContext::STATE_CLOSED); } void setIsConnected(bool connected) { mIsConnected = connected; } protected: @@ -109,9 +115,12 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea mReplyMQ(context.getReplyMQ()), mDataMQ(context.getDataMQ()) {} std::string init() override; + void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const; - // Used both by the main and worker threads. + // Atomic fields are used both by the main and worker threads. std::atomic mIsConnected = false; + static_assert(std::atomic::is_always_lock_free); + std::atomic mState = StreamDescriptor::State::STANDBY; // All fields are used on the worker thread only. const int mInternalCommandCookie; const size_t mFrameSize; @@ -132,6 +141,9 @@ class StreamInWorkerLogic : public StreamWorkerCommonLogic { protected: Status cycle() override; + + private: + bool read(size_t clientSize, StreamDescriptor::Reply* reply); }; using StreamInWorker = ::android::hardware::audio::common::StreamWorker; @@ -143,6 +155,9 @@ class StreamOutWorkerLogic : public StreamWorkerCommonLogic { protected: Status cycle() override; + + private: + bool write(size_t clientSize, StreamDescriptor::Reply* reply); }; using StreamOutWorker = ::android::hardware::audio::common::StreamWorker; @@ -155,7 +170,7 @@ class StreamCommon { ? ndk::ScopedAStatus::ok() : ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE); } - bool isClosed() const { return mIsClosed; } + bool isClosed() const { return mWorker.isClosed(); } void setIsConnected(bool connected) { mWorker.setIsConnected(connected); } ndk::ScopedAStatus updateMetadata(const Metadata& metadata); @@ -168,9 +183,6 @@ class StreamCommon { Metadata mMetadata; StreamContext mContext; StreamWorker mWorker; - // This variable is checked in the destructor which can be called on an arbitrary Binder thread, - // thus we need to ensure that any changes made by other threads are sequentially consistent. - std::atomic mIsClosed = false; }; class StreamIn diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp index 23812008a6..b415da4465 100644 --- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp +++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "AudioHalBinderServiceUtil.h" @@ -69,6 +70,7 @@ using aidl::android::media::audio::common::AudioUsage; using android::hardware::audio::common::isBitPositionFlagSet; using android::hardware::audio::common::StreamLogic; using android::hardware::audio::common::StreamWorker; +using ndk::enum_range; using ndk::ScopedAStatus; template @@ -171,25 +173,26 @@ class WithAudioPortConfig { AudioPortConfig mConfig; }; -class AudioCoreModule : public testing::TestWithParam { +// Can be used as a base for any test here, does not depend on the fixture GTest parameters. +class AudioCoreModuleBase { public: // The default buffer size is used mostly for negative tests. static constexpr int kDefaultBufferSizeFrames = 256; - void SetUp() override { - ASSERT_NO_FATAL_FAILURE(ConnectToService()); + void SetUpImpl(const std::string& moduleName) { + ASSERT_NO_FATAL_FAILURE(ConnectToService(moduleName)); debug.flags().simulateDeviceConnections = true; ASSERT_NO_FATAL_FAILURE(debug.SetUp(module.get())); } - void TearDown() override { + void TearDownImpl() { if (module != nullptr) { EXPECT_IS_OK(module->setModuleDebug(ModuleDebug{})); } } - void ConnectToService() { - module = IModule::fromBinder(binderUtil.connectToService(GetParam())); + void ConnectToService(const std::string& moduleName) { + module = IModule::fromBinder(binderUtil.connectToService(moduleName)); ASSERT_NE(module, nullptr); } @@ -269,6 +272,13 @@ class AudioCoreModule : public testing::TestWithParam { WithDebugFlags debug; }; +class AudioCoreModule : public AudioCoreModuleBase, public testing::TestWithParam { + public: + void SetUp() override { ASSERT_NO_FATAL_FAILURE(SetUpImpl(GetParam())); } + + void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownImpl()); } +}; + class WithDevicePortConnectedState { public: explicit WithDevicePortConnectedState(const AudioPort& idAndData) : mIdAndData(idAndData) {} @@ -352,21 +362,36 @@ class StreamContext { std::unique_ptr mDataMQ; }; -class StreamCommonLogic : public StreamLogic { +class StreamLogicDriver { public: - StreamDescriptor::Position getLastObservablePosition() { - std::lock_guard lock(mLock); - return mLastReply.observable; - } + virtual ~StreamLogicDriver() = default; + // Return 'true' to stop the worker. + virtual bool done() = 0; + // For 'Writer' logic, if the 'actualSize' is 0, write is skipped. + // The 'fmqByteCount' from the returned command is passed as is to the HAL. + virtual StreamDescriptor::Command getNextCommand(int maxDataSize, + int* actualSize = nullptr) = 0; + // Return 'true' to indicate that no further processing is needed, + // for example, the driver is expecting a bad status to be returned. + // The logic cycle will return with 'CONTINUE' status. Otherwise, + // the reply will be validated and then passed to 'processValidReply'. + virtual bool interceptRawReply(const StreamDescriptor::Reply& reply) = 0; + // Return 'false' to indicate that the contents of the reply are unexpected. + // Will abort the logic cycle. + virtual bool processValidReply(const StreamDescriptor::Reply& reply) = 0; +}; +class StreamCommonLogic : public StreamLogic { protected: - explicit StreamCommonLogic(const StreamContext& context) + StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver) : mCommandMQ(context.getCommandMQ()), mReplyMQ(context.getReplyMQ()), mDataMQ(context.getDataMQ()), - mData(context.getBufferSizeBytes()) {} + mData(context.getBufferSizeBytes()), + mDriver(driver) {} StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; } StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; } + StreamLogicDriver* getDriver() const { return mDriver; } std::string init() override { return ""; } @@ -374,19 +399,20 @@ class StreamCommonLogic : public StreamLogic { StreamContext::ReplyMQ* mReplyMQ; StreamContext::DataMQ* mDataMQ; std::vector mData; - std::mutex mLock; - StreamDescriptor::Reply mLastReply GUARDED_BY(mLock); + StreamLogicDriver* const mDriver; }; class StreamReaderLogic : public StreamCommonLogic { public: - explicit StreamReaderLogic(const StreamContext& context) : StreamCommonLogic(context) {} + StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver) + : StreamCommonLogic(context, driver) {} protected: Status cycle() override { - StreamDescriptor::Command command{}; - command.code = StreamDescriptor::COMMAND_BURST; - command.fmqByteCount = mData.size(); + if (getDriver()->done()) { + return Status::EXIT; + } + StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size()); if (!mCommandMQ->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; @@ -396,6 +422,9 @@ class StreamReaderLogic : public StreamCommonLogic { LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; return Status::ABORT; } + if (getDriver()->interceptRawReply(reply)) { + return Status::CONTINUE; + } if (reply.status != STATUS_OK) { LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status); return Status::ABORT; @@ -405,16 +434,41 @@ class StreamReaderLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - { - std::lock_guard lock(mLock); - mLastReply = reply; + if (static_cast(reply.fmqByteCount) != mDataMQ->availableToRead()) { + LOG(ERROR) << __func__ + << ": the byte count in the reply is not the same as the amount of " + << "data available in the MQ: " << reply.fmqByteCount + << " != " << mDataMQ->availableToRead(); } - const size_t readCount = std::min({mDataMQ->availableToRead(), - static_cast(reply.fmqByteCount), mData.size()}); - if (readCount == 0 || mDataMQ->read(mData.data(), readCount)) { + if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { + LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs; + return Status::ABORT; + } + if (reply.xrunFrames < 0) { + LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames; + return Status::ABORT; + } + if (std::find(enum_range().begin(), + enum_range().end(), + reply.state) == enum_range().end()) { + LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state); + return Status::ABORT; + } + const bool acceptedReply = getDriver()->processValidReply(reply); + if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) { + std::vector data(readCount); + if (mDataMQ->read(data.data(), readCount)) { + memcpy(mData.data(), data.data(), std::min(mData.size(), data.size())); + goto checkAcceptedReply; + } + LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed"; + return Status::ABORT; + } // readCount == 0 + checkAcceptedReply: + if (acceptedReply) { return Status::CONTINUE; } - LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed"; + LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString(); return Status::ABORT; } }; @@ -422,17 +476,20 @@ using StreamReader = StreamWorker; class StreamWriterLogic : public StreamCommonLogic { public: - explicit StreamWriterLogic(const StreamContext& context) : StreamCommonLogic(context) {} + StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver) + : StreamCommonLogic(context, driver) {} protected: Status cycle() override { - if (!mDataMQ->write(mData.data(), mData.size())) { + if (getDriver()->done()) { + return Status::EXIT; + } + int actualSize = 0; + StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize); + if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) { LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed"; return Status::ABORT; } - StreamDescriptor::Command command{}; - command.code = StreamDescriptor::COMMAND_BURST; - command.fmqByteCount = mData.size(); if (!mCommandMQ->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; @@ -442,6 +499,9 @@ class StreamWriterLogic : public StreamCommonLogic { LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; return Status::ABORT; } + if (getDriver()->interceptRawReply(reply)) { + return Status::CONTINUE; + } if (reply.status != STATUS_OK) { LOG(ERROR) << __func__ << ": received error status: " << statusToString(reply.status); return Status::ABORT; @@ -451,11 +511,31 @@ class StreamWriterLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - { - std::lock_guard lock(mLock); - mLastReply = reply; + if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) { + LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: " + << "available to write " << mDataMQ->availableToWrite() + << ", total size: " << mDataMQ->getQuantumCount(); + return Status::ABORT; } - return Status::CONTINUE; + if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { + LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs; + return Status::ABORT; + } + if (reply.xrunFrames < 0) { + LOG(ERROR) << __func__ << ": received invalid xrunFrames value: " << reply.xrunFrames; + return Status::ABORT; + } + if (std::find(enum_range().begin(), + enum_range().end(), + reply.state) == enum_range().end()) { + LOG(ERROR) << __func__ << ": received invalid stream state: " << toString(reply.state); + return Status::ABORT; + } + if (getDriver()->processValidReply(reply)) { + return Status::CONTINUE; + } + LOG(ERROR) << __func__ << ": unacceptable reply: " << reply.toString(); + return Status::ABORT; } }; using StreamWriter = StreamWorker; @@ -466,52 +546,6 @@ struct IOTraits { using Worker = std::conditional_t; }; -// A dedicated version to test replies to invalid commands. -class StreamInvalidCommandLogic : public StreamCommonLogic { - public: - StreamInvalidCommandLogic(const StreamContext& context, - const std::vector& commands) - : StreamCommonLogic(context), mCommands(commands) {} - - std::vector getUnexpectedStatuses() { - std::lock_guard lock(mLock); - return mUnexpectedStatuses; - } - - protected: - Status cycle() override { - // Send all commands in one cycle to simplify testing. - // Extra logging helps to sort out issues with unexpected HAL behavior. - for (const auto& command : mCommands) { - LOG(INFO) << __func__ << ": writing command " << command.toString() << " into MQ..."; - if (!getCommandMQ()->writeBlocking(&command, 1)) { - LOG(ERROR) << __func__ << ": writing of command into MQ failed"; - return Status::ABORT; - } - StreamDescriptor::Reply reply{}; - LOG(INFO) << __func__ << ": reading reply for command " << command.toString() << "..."; - if (!getReplyMQ()->readBlocking(&reply, 1)) { - LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; - return Status::ABORT; - } - LOG(INFO) << __func__ << ": received status " << statusToString(reply.status) - << " for command " << command.toString(); - if (reply.status != STATUS_BAD_VALUE) { - std::string s = command.toString(); - s.append(", ").append(statusToString(reply.status)); - std::lock_guard lock(mLock); - mUnexpectedStatuses.push_back(std::move(s)); - } - }; - return Status::EXIT; - } - - private: - const std::vector mCommands; - std::mutex mLock; - std::vector mUnexpectedStatuses GUARDED_BY(mLock); -}; - template class WithStream { public: @@ -1208,6 +1242,46 @@ TEST_P(AudioCoreModule, ExternalDevicePortRoutes) { } } +class StreamLogicDriverInvalidCommand : public StreamLogicDriver { + public: + StreamLogicDriverInvalidCommand(const std::vector& commands) + : mCommands(commands) {} + + std::string getUnexpectedStatuses() { + // This method is intended to be called after the worker thread has joined, + // thus no extra synchronization is needed. + std::string s; + if (!mStatuses.empty()) { + s = std::string("Pairs of (command, actual status): ") + .append((android::internal::ToString(mStatuses))); + } + return s; + } + + bool done() override { return mNextCommand >= mCommands.size(); } + StreamDescriptor::Command getNextCommand(int, int* actualSize) override { + if (actualSize != nullptr) *actualSize = 0; + return mCommands[mNextCommand++]; + } + bool interceptRawReply(const StreamDescriptor::Reply& reply) override { + if (reply.status != STATUS_BAD_VALUE) { + std::string s = mCommands[mNextCommand - 1].toString(); + s.append(", ").append(statusToString(reply.status)); + mStatuses.push_back(std::move(s)); + // If the HAL does not recognize the command as invalid, + // retrieve the data etc. + return reply.status != STATUS_OK; + } + return true; + } + bool processValidReply(const StreamDescriptor::Reply&) override { return true; } + + private: + const std::vector mCommands; + size_t mNextCommand = 0; + std::vector mStatuses; +}; + template class AudioStream : public AudioCoreModule { public: @@ -1315,19 +1389,6 @@ class AudioStream : public AudioCoreModule { EXPECT_NO_FATAL_FAILURE(OpenTwiceSamePortConfigImpl(portConfig.value())); } - void ReadOrWrite(bool useSetupSequence2, bool validateObservablePosition) { - const auto allPortConfigs = - moduleConfig->getPortConfigsForMixPorts(IOTraits::is_input); - if (allPortConfigs.empty()) { - GTEST_SKIP() << "No mix ports have attached devices"; - } - for (const auto& portConfig : allPortConfigs) { - EXPECT_NO_FATAL_FAILURE( - ReadOrWriteImpl(portConfig, useSetupSequence2, validateObservablePosition)) - << portConfig.toString(); - } - } - void ResetPortConfigWithOpenStream() { const auto portConfig = moduleConfig->getSingleConfigForMixPort(IOTraits::is_input); if (!portConfig.has_value()) { @@ -1357,131 +1418,43 @@ class AudioStream : public AudioCoreModule { << stream1.getPortId(); } - template - void WaitForObservablePositionAdvance(Worker& worker) { - static constexpr int kWriteDurationUs = 50 * 1000; - static constexpr std::chrono::milliseconds kPositionChangeTimeout{10000}; - int64_t framesInitial; - framesInitial = worker.getLastObservablePosition().frames; - ASSERT_FALSE(worker.hasError()); - bool timedOut = false; - int64_t frames = framesInitial; - for (android::base::Timer elapsed; - frames <= framesInitial && !worker.hasError() && - !(timedOut = (elapsed.duration() >= kPositionChangeTimeout));) { - usleep(kWriteDurationUs); - frames = worker.getLastObservablePosition().frames; - } - EXPECT_FALSE(timedOut); - EXPECT_FALSE(worker.hasError()) << worker.getError(); - EXPECT_GT(frames, framesInitial); - } - - void ReadOrWriteImpl(const AudioPortConfig& portConfig, bool useSetupSequence2, - bool validateObservablePosition) { - if (!useSetupSequence2) { - ASSERT_NO_FATAL_FAILURE( - ReadOrWriteSetupSequence1(portConfig, validateObservablePosition)); - } else { - ASSERT_NO_FATAL_FAILURE( - ReadOrWriteSetupSequence2(portConfig, validateObservablePosition)); - } - } - - // Set up a patch first, then open a stream. - void ReadOrWriteSetupSequence1(const AudioPortConfig& portConfig, - bool validateObservablePosition) { - auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( - IOTraits::is_input, portConfig); - ASSERT_FALSE(devicePorts.empty()); - auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); - WithAudioPatch patch(IOTraits::is_input, portConfig, devicePortConfig); - ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); - - WithStream stream(patch.getPortConfig(IOTraits::is_input)); - ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - typename IOTraits::Worker worker(*stream.getContext()); - - ASSERT_TRUE(worker.start()); - ASSERT_TRUE(worker.waitForAtLeastOneCycle()); - if (validateObservablePosition) { - ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker)); - } - } - - // Open a stream, then set up a patch for it. - void ReadOrWriteSetupSequence2(const AudioPortConfig& portConfig, - bool validateObservablePosition) { - WithStream stream(portConfig); - ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - typename IOTraits::Worker worker(*stream.getContext()); - - auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( - IOTraits::is_input, portConfig); - ASSERT_FALSE(devicePorts.empty()); - auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); - WithAudioPatch patch(IOTraits::is_input, stream.getPortConfig(), devicePortConfig); - ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); - - ASSERT_TRUE(worker.start()); - ASSERT_TRUE(worker.waitForAtLeastOneCycle()); - if (validateObservablePosition) { - ASSERT_NO_FATAL_FAILURE(WaitForObservablePositionAdvance(worker)); - } - } - void SendInvalidCommandImpl(const AudioPortConfig& portConfig) { std::vector commands(6); - commands[0].code = -1; - commands[1].code = StreamDescriptor::COMMAND_BURST - 1; - commands[2].code = std::numeric_limits::min(); - commands[3].code = std::numeric_limits::max(); - commands[4].code = StreamDescriptor::COMMAND_BURST; + commands[0].code = StreamDescriptor::CommandCode(-1); + commands[1].code = StreamDescriptor::CommandCode( + static_cast(StreamDescriptor::CommandCode::START) - 1); + commands[2].code = StreamDescriptor::CommandCode(std::numeric_limits::min()); + commands[3].code = StreamDescriptor::CommandCode(std::numeric_limits::max()); + // TODO: For proper testing of input streams, need to put the stream into + // a state which accepts BURST commands. + commands[4].code = StreamDescriptor::CommandCode::BURST; commands[4].fmqByteCount = -1; - commands[5].code = StreamDescriptor::COMMAND_BURST; + commands[5].code = StreamDescriptor::CommandCode::BURST; commands[5].fmqByteCount = std::numeric_limits::min(); WithStream stream(portConfig); ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - StreamWorker writer(*stream.getContext(), commands); - ASSERT_TRUE(writer.start()); - writer.waitForAtLeastOneCycle(); - auto unexpectedStatuses = writer.getUnexpectedStatuses(); - EXPECT_EQ(0UL, unexpectedStatuses.size()) - << "Pairs of (command, actual status): " - << android::internal::ToString(unexpectedStatuses); + StreamLogicDriverInvalidCommand driver(commands); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_EQ("", driver.getUnexpectedStatuses()); } }; using AudioStreamIn = AudioStream; using AudioStreamOut = AudioStream; -#define TEST_IO_STREAM(method_name) \ +#define TEST_IN_AND_OUT_STREAM(method_name) \ TEST_P(AudioStreamIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \ TEST_P(AudioStreamOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } -#define TEST_IO_STREAM_2(method_name, arg1, arg2) \ - TEST_P(AudioStreamIn, method_name##_##arg1##_##arg2) { \ - ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \ - } \ - TEST_P(AudioStreamOut, method_name##_##arg1##_##arg2) { \ - ASSERT_NO_FATAL_FAILURE(method_name(arg1, arg2)); \ - } -TEST_IO_STREAM(CloseTwice); -TEST_IO_STREAM(OpenAllConfigs); -TEST_IO_STREAM(OpenInvalidBufferSize); -TEST_IO_STREAM(OpenInvalidDirection); -TEST_IO_STREAM(OpenOverMaxCount); -TEST_IO_STREAM(OpenTwiceSamePortConfig); -// Use of constants makes comprehensible test names. -constexpr bool SetupSequence1 = false; -constexpr bool SetupSequence2 = true; -constexpr bool SetupOnly = false; -constexpr bool ValidateObservablePosition = true; -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, SetupOnly); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, SetupOnly); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence1, ValidateObservablePosition); -TEST_IO_STREAM_2(ReadOrWrite, SetupSequence2, ValidateObservablePosition); -TEST_IO_STREAM(ResetPortConfigWithOpenStream); -TEST_IO_STREAM(SendInvalidCommand); +TEST_IN_AND_OUT_STREAM(CloseTwice); +TEST_IN_AND_OUT_STREAM(OpenAllConfigs); +TEST_IN_AND_OUT_STREAM(OpenInvalidBufferSize); +TEST_IN_AND_OUT_STREAM(OpenInvalidDirection); +TEST_IN_AND_OUT_STREAM(OpenOverMaxCount); +TEST_IN_AND_OUT_STREAM(OpenTwiceSamePortConfig); +TEST_IN_AND_OUT_STREAM(ResetPortConfigWithOpenStream); +TEST_IN_AND_OUT_STREAM(SendInvalidCommand); TEST_P(AudioStreamOut, OpenTwicePrimary) { const auto mixPorts = moduleConfig->getMixPorts(false); @@ -1523,6 +1496,163 @@ TEST_P(AudioStreamOut, RequireOffloadInfo) { << "when no offload info is provided for a compressed offload mix port"; } +using CommandAndState = std::pair; + +class StreamLogicDefaultDriver : public StreamLogicDriver { + public: + explicit StreamLogicDefaultDriver(const std::vector& commands) + : mCommands(commands) {} + + // The three methods below is intended to be called after the worker + // thread has joined, thus no extra synchronization is needed. + bool hasObservablePositionIncrease() const { return mObservablePositionIncrease; } + bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; } + std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; } + + bool done() override { return mNextCommand >= mCommands.size(); } + StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override { + StreamDescriptor::Command command{}; + command.code = mCommands[mNextCommand++].first; + const int dataSize = command.code == StreamDescriptor::CommandCode::BURST ? maxDataSize : 0; + command.fmqByteCount = dataSize; + if (actualSize != nullptr) { + // In the output scenario, reduce slightly the fmqByteCount to verify + // that the HAL module always consumes all data from the MQ. + if (command.fmqByteCount > 1) command.fmqByteCount--; + *actualSize = dataSize; + } + return command; + } + bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; } + bool processValidReply(const StreamDescriptor::Reply& reply) override { + if (mPreviousFrames.has_value()) { + if (reply.observable.frames > mPreviousFrames.value()) { + mObservablePositionIncrease = true; + } else if (reply.observable.frames < mPreviousFrames.value()) { + mRetrogradeObservablePosition = true; + } + } + mPreviousFrames = reply.observable.frames; + + const auto& lastCommandState = mCommands[mNextCommand - 1]; + if (lastCommandState.second != reply.state) { + std::string s = std::string("Unexpected transition from the state ") + .append(mPreviousState) + .append(" to ") + .append(toString(reply.state)) + .append(" caused by the command ") + .append(toString(lastCommandState.first)); + LOG(ERROR) << __func__ << ": " << s; + mUnexpectedTransition = std::move(s); + return false; + } + return true; + } + + protected: + const std::vector& mCommands; + size_t mNextCommand = 0; + std::optional mPreviousFrames; + std::string mPreviousState = ""; + bool mObservablePositionIncrease = false; + bool mRetrogradeObservablePosition = false; + std::string mUnexpectedTransition; +}; + +using NamedCommandSequence = std::pair>; +enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ }; +using StreamIoTestParameters = + std::tuple; +template +class AudioStreamIo : public AudioCoreModuleBase, + public testing::TestWithParam { + public: + void SetUp() override { + ASSERT_NO_FATAL_FAILURE(SetUpImpl(std::get(GetParam()))); + ASSERT_NO_FATAL_FAILURE(SetUpModuleConfig()); + } + + void Run() { + const auto allPortConfigs = + moduleConfig->getPortConfigsForMixPorts(IOTraits::is_input); + if (allPortConfigs.empty()) { + GTEST_SKIP() << "No mix ports have attached devices"; + } + for (const auto& portConfig : allPortConfigs) { + SCOPED_TRACE(portConfig.toString()); + const auto& commandsAndStates = std::get(GetParam()).second; + if (!std::get(GetParam())) { + ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates)); + } else { + ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq2(portConfig, commandsAndStates)); + } + } + } + + bool ValidateObservablePosition(const AudioPortConfig& /*portConfig*/) { + // May return false based on the portConfig, e.g. for telephony ports. + return true; + } + + // Set up a patch first, then open a stream. + void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig, + const std::vector& commandsAndStates) { + auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( + IOTraits::is_input, portConfig); + ASSERT_FALSE(devicePorts.empty()); + auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); + WithAudioPatch patch(IOTraits::is_input, portConfig, devicePortConfig); + ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); + + WithStream stream(patch.getPortConfig(IOTraits::is_input)); + ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); + StreamLogicDefaultDriver driver(commandsAndStates); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_FALSE(worker.hasError()) << worker.getError(); + EXPECT_EQ("", driver.getUnexpectedStateTransition()); + if (ValidateObservablePosition(portConfig)) { + EXPECT_TRUE(driver.hasObservablePositionIncrease()); + EXPECT_FALSE(driver.hasRetrogradeObservablePosition()); + } + } + + // Open a stream, then set up a patch for it. + void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig, + const std::vector& commandsAndStates) { + WithStream stream(portConfig); + ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); + StreamLogicDefaultDriver driver(commandsAndStates); + typename IOTraits::Worker worker(*stream.getContext(), &driver); + + auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( + IOTraits::is_input, portConfig); + ASSERT_FALSE(devicePorts.empty()); + auto devicePortConfig = moduleConfig->getSingleConfigForDevicePort(devicePorts[0]); + WithAudioPatch patch(IOTraits::is_input, stream.getPortConfig(), devicePortConfig); + ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); + + ASSERT_TRUE(worker.start()); + worker.join(); + EXPECT_FALSE(worker.hasError()) << worker.getError(); + EXPECT_EQ("", driver.getUnexpectedStateTransition()); + if (ValidateObservablePosition(portConfig)) { + EXPECT_TRUE(driver.hasObservablePositionIncrease()); + EXPECT_FALSE(driver.hasRetrogradeObservablePosition()); + } + } +}; +using AudioStreamIoIn = AudioStreamIo; +using AudioStreamIoOut = AudioStreamIo; + +#define TEST_IN_AND_OUT_STREAM_IO(method_name) \ + TEST_P(AudioStreamIoIn, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } \ + TEST_P(AudioStreamIoOut, method_name) { ASSERT_NO_FATAL_FAILURE(method_name()); } + +TEST_IN_AND_OUT_STREAM_IO(Run); + // Tests specific to audio patches. The fixure class is named 'AudioModulePatch' // to avoid clashing with 'AudioPatch' class. class AudioModulePatch : public AudioCoreModule { @@ -1704,6 +1834,139 @@ INSTANTIATE_TEST_SUITE_P(AudioStreamOutTest, AudioStreamOut, testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), android::PrintInstanceNameToString); GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut); + +static const NamedCommandSequence kReadOrWriteSeq = std::make_pair( + std::string("ReadOrWrite"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE)}); +static const NamedCommandSequence kDrainInSeq = std::make_pair( + std::string("Drain"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::DRAINING), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::DRAINING), + // TODO: This will need to be changed once DRAIN starts taking time. + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kDrainOutSeq = std::make_pair( + std::string("Drain"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + // TODO: This will need to be changed once DRAIN starts taking time. + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::IDLE)}); +// TODO: This will need to be changed once DRAIN starts taking time so we can pause it. +static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair( + std::string("DrainPause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::DRAIN, + StreamDescriptor::State::IDLE)}); +static const NamedCommandSequence kStandbySeq = std::make_pair( + std::string("Standby"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::STANDBY, + StreamDescriptor::State::STANDBY), + // Perform a read or write in order to advance observable position + // (this is verified by tests). + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE)}); +static const NamedCommandSequence kPauseInSeq = std::make_pair( + std::string("Pause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kPauseOutSeq = std::make_pair( + std::string("Pause"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::START, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED)}); +static const NamedCommandSequence kFlushInSeq = std::make_pair( + std::string("Flush"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::STANDBY)}); +static const NamedCommandSequence kFlushOutSeq = std::make_pair( + std::string("Flush"), + std::vector{ + std::make_pair(StreamDescriptor::CommandCode::START, StreamDescriptor::State::IDLE), + std::make_pair(StreamDescriptor::CommandCode::BURST, + StreamDescriptor::State::ACTIVE), + std::make_pair(StreamDescriptor::CommandCode::PAUSE, + StreamDescriptor::State::PAUSED), + std::make_pair(StreamDescriptor::CommandCode::FLUSH, + StreamDescriptor::State::IDLE)}); +std::string GetStreamIoTestName(const testing::TestParamInfo& info) { + return android::PrintInstanceNameToString( + testing::TestParamInfo{std::get(info.param), + info.index}) + .append("_") + .append(std::get(info.param).first) + .append("_SetupSeq") + .append(std::get(info.param) ? "2" : "1"); +} +INSTANTIATE_TEST_SUITE_P( + AudioStreamIoInTest, AudioStreamIoIn, + testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), + testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq, + kFlushInSeq), + testing::Values(false, true)), + GetStreamIoTestName); +GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn); +INSTANTIATE_TEST_SUITE_P( + AudioStreamIoOutTest, AudioStreamIoOut, + testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), + testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq, + kStandbySeq, kPauseOutSeq, kFlushOutSeq), + testing::Values(false, true)), + GetStreamIoTestName); +GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut); + INSTANTIATE_TEST_SUITE_P(AudioPatchTest, AudioModulePatch, testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), android::PrintInstanceNameToString);