diff --git a/audio/aidl/Android.bp b/audio/aidl/Android.bp index 92d7d54f7d..563ee626b5 100644 --- a/audio/aidl/Android.bp +++ b/audio/aidl/Android.bp @@ -113,6 +113,7 @@ aidl_interface { "android/hardware/audio/core/AudioRoute.aidl", "android/hardware/audio/core/IConfig.aidl", "android/hardware/audio/core/IModule.aidl", + "android/hardware/audio/core/IStreamCallback.aidl", "android/hardware/audio/core/IStreamIn.aidl", "android/hardware/audio/core/IStreamOut.aidl", "android/hardware/audio/core/ITelephony.aidl", diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl index be382c5b20..7f960e0b1b 100644 --- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IModule.aidl @@ -76,6 +76,7 @@ interface IModule { android.hardware.audio.common.SourceMetadata sourceMetadata; @nullable android.media.audio.common.AudioOffloadInfo offloadInfo; long bufferSizeFrames; + @nullable android.hardware.audio.core.IStreamCallback callback; } @VintfStability parcelable OpenOutputStreamReturn { diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl new file mode 100644 index 0000000000..5a2ab78d88 --- /dev/null +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/IStreamCallback.aidl @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/////////////////////////////////////////////////////////////////////////////// +// THIS FILE IS IMMUTABLE. DO NOT EDIT IN ANY CASE. // +/////////////////////////////////////////////////////////////////////////////// + +// This file is a snapshot of an AIDL file. Do not edit it manually. There are +// two cases: +// 1). this is a frozen version file - do not edit this in any case. +// 2). this is a 'current' file. If you make a backwards compatible change to +// the interface (from the latest frozen version), the build system will +// prompt you to update this file with `m -update-api`. +// +// You must not make a backward incompatible change to any AIDL file built +// with the aidl_interface module type with versions property set. The module +// type is used to build AIDL files in a way that they can be used across +// independently updatable components of the system. If a device is shipped +// with such a backward incompatible change, it has a high risk of breaking +// later when a module using the interface is updated, e.g., Mainline modules. + +package android.hardware.audio.core; +@VintfStability +interface IStreamCallback { + oneway void onTransferReady(); + oneway void onError(); + oneway void onDrainReady(); +} diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl index 80ee185bc8..467d37b6f7 100644 --- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/ModuleDebug.aidl @@ -35,4 +35,5 @@ package android.hardware.audio.core; @JavaDerive(equals=true, toString=true) @VintfStability parcelable ModuleDebug { boolean simulateDeviceConnections; + int streamTransientStateDelayMs; } diff --git a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl index 3a77ad1765..3a4271b89a 100644 --- a/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/aidl_api/android.hardware.audio.core/current/android/hardware/audio/core/StreamDescriptor.aidl @@ -42,8 +42,9 @@ parcelable StreamDescriptor { const int LATENCY_UNKNOWN = -1; @FixedSize @VintfStability parcelable Position { - long frames; - long timeNs; + long frames = -1; + long timeNs = -1; + const long UNKNOWN = -1; } @Backing(type="int") @VintfStability enum State { @@ -53,14 +54,23 @@ parcelable StreamDescriptor { PAUSED = 4, DRAINING = 5, DRAIN_PAUSED = 6, + TRANSFERRING = 7, + TRANSFER_PAUSED = 8, ERROR = 100, } + @Backing(type="byte") @VintfStability + enum DrainMode { + DRAIN_UNSPECIFIED = 0, + DRAIN_ALL = 1, + DRAIN_EARLY_NOTIFY = 2, + } @FixedSize @VintfStability union Command { - int hal_reserved_exit; + int halReservedExit; + android.media.audio.common.Void getStatus; android.media.audio.common.Void start; int burst; - android.media.audio.common.Void drain; + android.hardware.audio.core.StreamDescriptor.DrainMode drain; android.media.audio.common.Void standby; android.media.audio.common.Void pause; android.media.audio.common.Void flush; diff --git a/audio/aidl/android/hardware/audio/core/IModule.aidl b/audio/aidl/android/hardware/audio/core/IModule.aidl index be400512cd..974e7e8c57 100644 --- a/audio/aidl/android/hardware/audio/core/IModule.aidl +++ b/audio/aidl/android/hardware/audio/core/IModule.aidl @@ -21,6 +21,7 @@ import android.hardware.audio.common.SourceMetadata; import android.hardware.audio.core.AudioMode; import android.hardware.audio.core.AudioPatch; import android.hardware.audio.core.AudioRoute; +import android.hardware.audio.core.IStreamCallback; import android.hardware.audio.core.IStreamIn; import android.hardware.audio.core.IStreamOut; import android.hardware.audio.core.ITelephony; @@ -53,9 +54,13 @@ interface IModule { * the HAL module behavior that would otherwise require human intervention. * * The HAL module must throw an error if there is an attempt to change - * the debug behavior for the aspect which is currently in use. + * the debug behavior for the aspect which is currently in use, or when + * the value of any of the debug flags is invalid. See 'ModuleDebug' for + * the full list of constraints. * * @param debug The debug options. + * @throws EX_ILLEGAL_ARGUMENT If some of the configuration parameters are + * invalid. * @throws EX_ILLEGAL_STATE If the flag(s) being changed affect functionality * which is currently in use. */ @@ -316,9 +321,13 @@ interface IModule { * 'setAudioPortConfig' method. Existence of an audio patch involving this * port configuration is not required for successful opening of a stream. * - * If the port configuration has 'COMPRESS_OFFLOAD' output flag set, - * the framework must provide additional information about the encoded - * audio stream in 'offloadInfo' argument. + * If the port configuration has the 'COMPRESS_OFFLOAD' output flag set, + * the client must provide additional information about the encoded + * audio stream in the 'offloadInfo' argument. + * + * If the port configuration has the 'NON_BLOCKING' output flag set, + * the client must provide a callback for asynchronous notifications + * in the 'callback' argument. * * The requested buffer size is expressed in frames, thus the actual size * in bytes depends on the audio port configuration. Also, the HAL module @@ -354,6 +363,8 @@ interface IModule { * - If the offload info is not provided for an offload * port configuration. * - If a buffer of the requested size can not be provided. + * - If the callback is not provided for a non-blocking + * port configuration. * @throws EX_ILLEGAL_STATE In the following cases: * - If the port config already has a stream opened on it. * - If the limit on the open stream count for the port has @@ -372,6 +383,8 @@ interface IModule { @nullable AudioOffloadInfo offloadInfo; /** Requested audio I/O buffer minimum size, in frames. */ long bufferSizeFrames; + /** Client callback interface for the non-blocking output mode. */ + @nullable IStreamCallback callback; } @VintfStability parcelable OpenOutputStreamReturn { diff --git a/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl b/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl new file mode 100644 index 0000000000..440ab25ceb --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/IStreamCallback.aidl @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package android.hardware.audio.core; + +/** + * This interface is used to indicate completion of asynchronous operations. + * See the state machines referenced by StreamDescriptor for details. + */ +@VintfStability +oneway interface IStreamCallback { + /** + * Indicate that the stream is ready for next data exchange. + */ + void onTransferReady(); + /** + * Indicate that an irrecoverable error has occurred during the last I/O + * operation. After sending this callback, the stream enters the 'ERROR' + * state. + */ + void onError(); + /** + * Indicate that the stream has finished draining. This is only used + * for output streams because for input streams draining is performed + * by the client. + */ + void onDrainReady(); +} diff --git a/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl b/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl index 858a9bd841..871a5c99d7 100644 --- a/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl +++ b/audio/aidl/android/hardware/audio/core/ModuleDebug.aidl @@ -35,4 +35,19 @@ parcelable ModuleDebug { * profiles. */ boolean simulateDeviceConnections; + /** + * Must be non-negative. When set to non-zero, HAL module must delay + * transition from "transient" stream states (see StreamDescriptor.aidl) + * by the specified amount of milliseconds. The purpose of this delay + * is to allow VTS to test sending of stream commands while the stream is + * in a transient state. The delay must apply to newly created streams, + * it is not required to apply the delay to already opened streams. + * + * Note: the drawback of enabling this delay for asynchronous (non-blocking) + * modes is that sending of callbacks will also be delayed, because + * callbacks are sent once the stream state machine exits a transient + * state. Thus, it's not recommended to use it with tests that require + * waiting for an async callback. + */ + int streamTransientStateDelayMs; } diff --git a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl index 2b1fc993e7..65ea9ef090 100644 --- a/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl +++ b/audio/aidl/android/hardware/audio/core/StreamDescriptor.aidl @@ -84,13 +84,13 @@ import android.media.audio.common.Void; * are different. * * State machines of both input and output streams start from the 'STANDBY' - * state. Transitions between states happen naturally with changes in the + * state. Transitions between states happen naturally with changes in the * states of the model elements. For simplicity, we restrict the change to one * element only, for example, in the 'STANDBY' state, either the producer or the * consumer can become active, but not both at the same time. States 'STANDBY', * 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event, * whereas a change from the 'DRAINING' state can happen with time as the buffer - * gets empty. + * gets empty, thus it's a "transient" state. * * The state machine for input streams is defined in the `stream-in-sm.gv` file, * for output streams—in the `stream-out-sm.gv` file. State machines define how @@ -100,6 +100,28 @@ import android.media.audio.common.Void; * client can never observe a stream with a functioning command queue in this * state. The 'ERROR' state is a special state which the state machine enters * when an unrecoverable hardware error is detected by the HAL module. + * + * Non-blocking (asynchronous) modes introduce a new 'TRANSFERRING' state, which + * the state machine can enter after replying to the 'burst' command, instead of + * staying in the 'ACTIVE' state. In this case the client gets unblocked + * earlier, while the actual audio delivery to / from the observer is not + * complete yet. Once the HAL module is ready for the next transfer, it notifies + * the client via a oneway callback, and the machine switches to 'ACTIVE' + * state. The 'TRANSFERRING' state is thus "transient", similar to the + * 'DRAINING' state. For output streams, asynchronous transfer can be paused, + * and it's another new state: 'TRANSFER_PAUSED'. It differs from 'PAUSED' by + * the fact that no new writes are allowed. Please see 'stream-in-async-sm.gv' + * and 'stream-out-async-sm.gv' files for details. Below is the table summary + * for asynchronous only-states: + * + * Producer | Buffer state | Consumer | Applies | State + * active? | | active? | to | + * ==========|==============|==========|=========|============================== + * Yes | Not empty | Yes | Both | TRANSFERRING, s/w x-runs counted + * ----------|--------------|----------|---------|----------------------------- + * Yes | Not empty | No | Output | TRANSFER_PAUSED, + * | | | | h/w emits silence. + * */ @JavaDerive(equals=true, toString=true) @VintfStability @@ -116,10 +138,15 @@ parcelable StreamDescriptor { @VintfStability @FixedSize parcelable Position { + /** + * The value used when the position can not be reported by the HAL + * module. + */ + const long UNKNOWN = -1; /** Frame count. */ - long frames; + long frames = UNKNOWN; /** Timestamp in nanoseconds. */ - long timeNs; + long timeNs = UNKNOWN; } @VintfStability @@ -166,10 +193,23 @@ parcelable StreamDescriptor { /** * Used for output streams only, pauses draining. This state is similar * to the 'PAUSED' state, except that the client is not adding any - * new data. If it emits a 'BURST' command, this brings the stream + * new data. If it emits a 'burst' command, this brings the stream * into the regular 'PAUSED' state. */ DRAIN_PAUSED = 6, + /** + * Used only for streams in asynchronous mode. The stream enters this + * state after receiving a 'burst' command and returning control back + * to the client, thus unblocking it. + */ + TRANSFERRING = 7, + /** + * Used only for output streams in asynchronous mode only. The stream + * enters this state after receiving a 'pause' command while being in + * the 'TRANSFERRING' state. Unlike 'PAUSED' state, this state does not + * accept new writes. + */ + TRANSFER_PAUSED = 8, /** * The ERROR state is entered when the stream has encountered an * irrecoverable error from the lower layer. After entering it, the @@ -178,6 +218,29 @@ parcelable StreamDescriptor { ERROR = 100, } + @VintfStability + @Backing(type="byte") + enum DrainMode { + /** + * Unspecified—used with input streams only, because the client controls + * draining. + */ + DRAIN_UNSPECIFIED = 0, + /** + * Used with output streams only, the HAL module indicates drain + * completion when all remaining audio data has been consumed. + */ + DRAIN_ALL = 1, + /** + * Used with output streams only, the HAL module indicates drain + * completion shortly before all audio data has been consumed in order + * to give the client an opportunity to provide data for the next track + * for gapless playback. The exact amount of provided time is specific + * to the HAL implementation. + */ + DRAIN_EARLY_NOTIFY = 2, + } + /** * Used for sending commands to the HAL module. The client writes into * the queue, the HAL module reads. The queue can only contain a single @@ -198,7 +261,14 @@ parcelable StreamDescriptor { * implementation must pass a random cookie as the command argument, * which is only known to the implementation. */ - int hal_reserved_exit; + int halReservedExit; + /** + * Retrieve the current state of the stream. This command must be + * processed by the stream in any state. The stream must provide current + * positions, counters, and its state in the reply. This command must be + * handled by the HAL module without any observable side effects. + */ + Void getStatus; /** * See the state machines on the applicability of this command to * different states. @@ -215,15 +285,14 @@ parcelable StreamDescriptor { * read from the hardware into the 'audio.fmq' queue. * * In both cases it is allowed for this field to contain any - * non-negative number. The value 0 can be used if the client only needs - * to retrieve current positions and latency. Any sufficiently big value - * which exceeds the size of the queue's area which is currently - * available for reading or writing by the HAL module must be trimmed by - * the HAL module to the available size. Note that the HAL module is - * allowed to consume or provide less data than requested, and it must - * return the amount of actually read or written data via the - * 'Reply.fmqByteCount' field. Thus, only attempts to pass a negative - * number must be constituted as a client's error. + * non-negative number. Any sufficiently big value which exceeds the + * size of the queue's area which is currently available for reading or + * writing by the HAL module must be trimmed by the HAL module to the + * available size. Note that the HAL module is allowed to consume or + * provide less data than requested, and it must return the amount of + * actually read or written data via the 'Reply.fmqByteCount' + * field. Thus, only attempts to pass a negative number must be + * constituted as a client's error. * * Differences for the MMap No IRQ mode: * @@ -233,13 +302,16 @@ parcelable StreamDescriptor { * with sending of this command. * * - the value must always be set to 0. + * + * See the state machines on the applicability of this command to + * different states. */ int burst; /** * See the state machines on the applicability of this command to * different states. */ - Void drain; + DrainMode drain; /** * See the state machines on the applicability of this command to * different states. @@ -286,10 +358,6 @@ parcelable StreamDescriptor { * - STATUS_INVALID_OPERATION: the command is not applicable in the * current state of the stream, or to this * type of the stream; - * - STATUS_NO_INIT: positions can not be reported because the mix port - * is not connected to any producer or consumer, or - * because the HAL module does not support positions - * reporting for this AudioSource (on input streams). * - STATUS_NOT_ENOUGH_DATA: a read or write error has * occurred for the 'audio.fmq' queue; */ @@ -307,9 +375,11 @@ parcelable StreamDescriptor { */ int fmqByteCount; /** - * It is recommended to report the current position for any command. - * If the position can not be reported, the 'status' field must be - * set to 'NO_INIT'. + * It is recommended to report the current position for any command. If + * the position can not be reported, for example because the mix port is + * not connected to any producer or consumer, or because the HAL module + * does not support positions reporting for this AudioSource (on input + * streams), the 'Position::UNKNOWN' value must be used. * * For output streams: the moment when the specified stream position * was presented to an external observer (i.e. presentation position). @@ -401,6 +471,10 @@ parcelable StreamDescriptor { * into 'reply' queue, and hangs on waiting on a read from * the 'command' queue. * 6. The client wakes up due to 5. and reads the reply. + * Note: in non-blocking mode, when the HAL module goes to + * the 'TRANSFERRING' state (as indicated by the 'reply.state' + * field), the client must wait for the 'IStreamCallback.onTransferReady' + * notification to arrive before starting the next burst. * * For input streams the following sequence of operations is used: * 1. The client writes the BURST command into the 'command' queue, @@ -415,6 +489,10 @@ parcelable StreamDescriptor { * 5. The client wakes up due to 4. * 6. The client reads the reply and audio data. The client must * always read from the FMQ all the data it contains. + * Note: in non-blocking mode, when the HAL module goes to + * the 'TRANSFERRING' state (as indicated by the 'reply.state' + * field) the client must wait for the 'IStreamCallback.onTransferReady' + * notification to arrive before starting the next burst. * */ MQDescriptor fmq; diff --git a/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv b/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv new file mode 100644 index 0000000000..818b18eda7 --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-in-async-sm.gv @@ -0,0 +1,47 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-in-async-sm.gv -o stream-in-async-sm.png +digraph stream_in_async_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + // Note that when the producer (h/w) is passive, "burst" operations + // complete synchronously, bypassing the TRANSFERRING state. + I -> STANDBY; + STANDBY -> IDLE [label="start"]; // producer -> active + IDLE -> STANDBY [label="standby"]; // producer -> passive, buffer is cleared + IDLE -> TRANSFERRING [label="burst"]; // consumer -> active + ACTIVE -> PAUSED [label="pause"]; // consumer -> passive + ACTIVE -> DRAINING [label="drain"]; // producer -> passive + ACTIVE -> TRANSFERRING [label="burst"]; + TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"]; + TRANSFERRING -> PAUSED [label="pause"]; // consumer -> passive + TRANSFERRING -> DRAINING [label="drain"]; // producer -> passive + PAUSED -> TRANSFERRING [label="burst"]; // consumer -> active + PAUSED -> STANDBY [label="flush"]; // producer -> passive, buffer is cleared + DRAINING -> DRAINING [label="burst"]; + DRAINING -> ACTIVE [label="start"]; // producer -> active + DRAINING -> STANDBY [label=""]; // consumer deactivates + IDLE -> ERROR [label="←IStreamCallback.onError"]; + PAUSED -> ERROR [label="←IStreamCallback.onError"]; + TRANSFERRING -> ERROR [label="←IStreamCallback.onError"]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv b/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv new file mode 100644 index 0000000000..e25b15a755 --- /dev/null +++ b/audio/aidl/android/hardware/audio/core/stream-out-async-sm.gv @@ -0,0 +1,57 @@ +// Copyright (C) 2022 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// To render: dot -Tpng stream-out-async-sm.gv -o stream-out-async-sm.png +digraph stream_out_async_state_machine { + node [shape=doublecircle style=filled fillcolor=black width=0.5] I; + node [shape=point width=0.5] F; + node [shape=oval width=1]; + node [fillcolor=lightgreen] STANDBY; // buffer is empty + node [fillcolor=lightgreen] IDLE; // buffer is empty + node [fillcolor=tomato] CLOSED; + node [fillcolor=tomato] ERROR; + node [style=dashed] ANY_STATE; + node [fillcolor=lightblue style=filled]; + // Note that when the consumer (h/w) is passive, "burst" operations + // complete synchronously, bypassing the TRANSFERRING state. + I -> STANDBY; + STANDBY -> IDLE [label="start"]; // consumer -> active + STANDBY -> PAUSED [label="burst"]; // producer -> active + IDLE -> STANDBY [label="standby"]; // consumer -> passive + IDLE -> TRANSFERRING [label="burst"]; // producer -> active + ACTIVE -> PAUSED [label="pause"]; // consumer -> passive (not consuming) + ACTIVE -> DRAINING [label="drain"]; // producer -> passive + ACTIVE -> TRANSFERRING [label="burst"]; // early unblocking + ACTIVE -> ACTIVE [label="burst"]; // full write + TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"]; + TRANSFERRING -> TRANSFER_PAUSED [label="pause"]; // consumer -> passive (not consuming) + TRANSFERRING -> DRAINING [label="drain"]; // producer -> passive + TRANSFER_PAUSED -> TRANSFERRING [label="start"]; // consumer -> active + TRANSFER_PAUSED -> DRAIN_PAUSED [label="drain"]; // producer -> passive + TRANSFER_PAUSED -> IDLE [label="flush"]; // buffer is cleared + PAUSED -> PAUSED [label="burst"]; + PAUSED -> ACTIVE [label="start"]; // consumer -> active + PAUSED -> IDLE [label="flush"]; // producer -> passive, buffer is cleared + DRAINING -> IDLE [label="←IStreamCallback.onDrainReady"]; + DRAINING -> TRANSFERRING [label="burst"]; // producer -> active + DRAINING -> DRAIN_PAUSED [label="pause"]; // consumer -> passive (not consuming) + DRAIN_PAUSED -> DRAINING [label="start"]; // consumer -> active + DRAIN_PAUSED -> TRANSFER_PAUSED [label="burst"]; // producer -> active + DRAIN_PAUSED -> IDLE [label="flush"]; // buffer is cleared + IDLE -> ERROR [label="←IStreamCallback.onError"]; + DRAINING -> ERROR [label="←IStreamCallback.onError"]; + TRANSFERRING -> ERROR [label="←IStreamCallback.onError"]; + ANY_STATE -> CLOSED [label="→IStream*.close"]; + CLOSED -> F; +} diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp index dda0e4a6fd..0d2121cb29 100644 --- a/audio/aidl/common/StreamWorker.cpp +++ b/audio/aidl/common/StreamWorker.cpp @@ -25,15 +25,20 @@ namespace android::hardware::audio::common::internal { bool ThreadController::start(const std::string& name, int priority) { mThreadName = name; mThreadPriority = priority; - mWorker = std::thread(&ThreadController::workerThread, this); + if (kTestSingleThread != name) { + mWorker = std::thread(&ThreadController::workerThread, this); + } else { + // Simulate the case when the workerThread completes prior + // to the moment when we being waiting for its start. + workerThread(); + } std::unique_lock lock(mWorkerLock); android::base::ScopedLockAssertion lock_assertion(mWorkerLock); mWorkerCv.wait(lock, [&]() { android::base::ScopedLockAssertion lock_assertion(mWorkerLock); - return mWorkerState == WorkerState::RUNNING || !mError.empty(); + return mWorkerState != WorkerState::INITIAL || !mError.empty(); }); - mWorkerStateChangeRequest = false; - return mWorkerState == WorkerState::RUNNING; + return mError.empty(); } void ThreadController::stop() { @@ -81,8 +86,8 @@ void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState n void ThreadController::workerThread() { using Status = StreamLogic::Status; - std::string error = mLogic->init(); - if (error.empty() && !mThreadName.empty()) { + std::string error; + if (!mThreadName.empty()) { std::string compliantName(mThreadName.substr(0, 15)); if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) { error.append("Failed to set thread name: ").append(strerror(errCode)); @@ -94,6 +99,9 @@ void ThreadController::workerThread() { error.append("Failed to set thread priority: ").append(strerror(errCode)); } } + if (error.empty()) { + error.append(mLogic->init()); + } { std::lock_guard lock(mWorkerLock); mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED; diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h index ab2ec26485..e9c1070441 100644 --- a/audio/aidl/common/include/StreamWorker.h +++ b/audio/aidl/common/include/StreamWorker.h @@ -32,7 +32,7 @@ class StreamLogic; namespace internal { class ThreadController { - enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; + enum class WorkerState { INITIAL, STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED }; public: explicit ThreadController(StreamLogic* logic) : mLogic(logic) {} @@ -76,7 +76,7 @@ class ThreadController { std::thread mWorker; std::mutex mWorkerLock; std::condition_variable mWorkerCv; - WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED; + WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::INITIAL; std::string mError GUARDED_BY(mWorkerLock); // The atomic lock-free variable is used to prevent priority inversions // that can occur when a high priority worker tries to acquire the lock @@ -90,6 +90,9 @@ class ThreadController { std::atomic mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false; }; +// A special thread name used in tests only. +static const std::string kTestSingleThread = "__testST__"; + } // namespace internal class StreamLogic { diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp index 8ea8424ab5..f7a30b9926 100644 --- a/audio/aidl/common/tests/streamworker_tests.cpp +++ b/audio/aidl/common/tests/streamworker_tests.cpp @@ -283,4 +283,16 @@ TEST_P(StreamWorkerTest, ThreadPriority) { EXPECT_EQ(priority, worker.getPriority()); } +TEST_P(StreamWorkerTest, DeferredStartCheckNoError) { + stream.setStopStatus(); + EXPECT_TRUE(worker.start(android::hardware::audio::common::internal::kTestSingleThread)); + EXPECT_FALSE(worker.hasError()); +} + +TEST_P(StreamWorkerTest, DeferredStartCheckWithError) { + stream.setErrorStatus(); + EXPECT_FALSE(worker.start(android::hardware::audio::common::internal::kTestSingleThread)); + EXPECT_TRUE(worker.hasError()); +} + INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool()); diff --git a/audio/aidl/default/Module.cpp b/audio/aidl/default/Module.cpp index 6863fe34cc..9dbd61c442 100644 --- a/audio/aidl/default/Module.cpp +++ b/audio/aidl/default/Module.cpp @@ -97,6 +97,7 @@ void Module::cleanUpPatch(int32_t patchId) { } ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t in_bufferSizeFrames, + std::shared_ptr asyncCallback, StreamContext* out_context) { if (in_bufferSizeFrames <= 0) { LOG(ERROR) << __func__ << ": non-positive buffer size " << in_bufferSizeFrames; @@ -135,8 +136,8 @@ ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t StreamContext temp( std::make_unique(1, true /*configureEventFlagWord*/), std::make_unique(1, true /*configureEventFlagWord*/), - frameSize, - std::make_unique(frameSize * in_bufferSizeFrames)); + frameSize, std::make_unique(frameSize * in_bufferSizeFrames), + asyncCallback, mDebug.streamTransientStateDelayMs); if (temp.isValid()) { *out_context = std::move(temp); } else { @@ -242,6 +243,11 @@ ndk::ScopedAStatus Module::setModuleDebug( << "while having external devices connected"; return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE); } + if (in_debug.streamTransientStateDelayMs < 0) { + LOG(ERROR) << __func__ << ": streamTransientStateDelayMs is negative: " + << in_debug.streamTransientStateDelayMs; + return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT); + } mDebug = in_debug; return ndk::ScopedAStatus::ok(); } @@ -456,7 +462,8 @@ ndk::ScopedAStatus Module::openInputStream(const OpenInputStreamArguments& in_ar return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT); } StreamContext context; - if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context); + if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, nullptr, + &context); !status.isOk()) { return status; } @@ -496,8 +503,16 @@ ndk::ScopedAStatus Module::openOutputStream(const OpenOutputStreamArguments& in_ << " has COMPRESS_OFFLOAD flag set, requires offload info"; return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT); } + const bool isNonBlocking = isBitPositionFlagSet(port->flags.get(), + AudioOutputFlags::NON_BLOCKING); + if (isNonBlocking && in_args.callback == nullptr) { + LOG(ERROR) << __func__ << ": port id " << port->id + << " has NON_BLOCKING flag set, requires async callback"; + return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT); + } StreamContext context; - if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context); + if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, + isNonBlocking ? in_args.callback : nullptr, &context); !status.isOk()) { return status; } diff --git a/audio/aidl/default/Stream.cpp b/audio/aidl/default/Stream.cpp index 21dc4b6c08..d7c352fe86 100644 --- a/audio/aidl/default/Stream.cpp +++ b/audio/aidl/default/Stream.cpp @@ -87,32 +87,46 @@ std::string StreamWorkerCommonLogic::init() { void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply, bool isConnected) const { + reply->status = STATUS_OK; if (isConnected) { - reply->status = STATUS_OK; reply->observable.frames = mFrameCount; reply->observable.timeNs = ::android::elapsedRealtimeNano(); } else { - reply->status = STATUS_NO_INIT; + reply->observable.frames = StreamDescriptor::Position::UNKNOWN; + reply->observable.timeNs = StreamDescriptor::Position::UNKNOWN; } } +void StreamWorkerCommonLogic::populateReplyWrongState( + StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const { + LOG(WARNING) << "command '" << toString(command.getTag()) + << "' can not be handled in the state " << toString(mState); + reply->status = STATUS_INVALID_OPERATION; +} + const std::string StreamInWorkerLogic::kThreadName = "reader"; StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { + // Note: for input streams, draining is driven by the client, thus + // "empty buffer" condition can only happen while handling the 'burst' + // command. Thus, unlike for output streams, it does not make sense to + // delay the 'DRAINING' state here by 'mTransientStateDelayMs'. + // TODO: Add a delay for transitions of async operations when/if they added. + StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; mState = StreamDescriptor::State::ERROR; return Status::ABORT; } + LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName; StreamDescriptor::Reply reply{}; reply.status = STATUS_BAD_VALUE; using Tag = StreamDescriptor::Command::Tag; switch (command.getTag()) { - case Tag::hal_reserved_exit: - if (const int32_t cookie = command.get(); + case Tag::halReservedExit: + if (const int32_t cookie = command.get(); cookie == mInternalCommandCookie) { - LOG(DEBUG) << __func__ << ": received EXIT command"; setClosed(); // This is an internal command, no need to reply. return Status::EXIT; @@ -120,8 +134,10 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie; } break; + case Tag::getStatus: + populateReply(&reply, mIsConnected); + break; case Tag::start: - LOG(DEBUG) << __func__ << ": received START read command"; if (mState == StreamDescriptor::State::STANDBY || mState == StreamDescriptor::State::DRAINING) { populateReply(&reply, mIsConnected); @@ -129,15 +145,13 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { ? StreamDescriptor::State::IDLE : StreamDescriptor::State::ACTIVE; } else { - LOG(WARNING) << __func__ << ": START command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; case Tag::burst: if (const int32_t fmqByteCount = command.get(); fmqByteCount >= 0) { - LOG(DEBUG) << __func__ << ": received BURST read command for " << fmqByteCount - << " bytes"; + LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for " + << fmqByteCount << " bytes"; if (mState == StreamDescriptor::State::IDLE || mState == StreamDescriptor::State::ACTIVE || mState == StreamDescriptor::State::PAUSED || @@ -151,69 +165,61 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() { } else if (mState == StreamDescriptor::State::DRAINING) { // To simplify the reference code, we assume that the read operation // has consumed all the data remaining in the hardware buffer. - // TODO: Provide parametrization on the duration of draining to test - // handling of commands during the 'DRAINING' state. + // In a real implementation, here we would either remain in + // the 'DRAINING' state, or transfer to 'STANDBY' depending on the + // buffer state. mState = StreamDescriptor::State::STANDBY; } } else { - LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } } else { LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount; } break; case Tag::drain: - LOG(DEBUG) << __func__ << ": received DRAIN read command"; - if (mState == StreamDescriptor::State::ACTIVE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::DRAINING; + if (command.get() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) { + if (mState == StreamDescriptor::State::ACTIVE) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + mState = StreamDescriptor::State::DRAINING; + } else { + populateReplyWrongState(&reply, command); + } } else { - LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + LOG(WARNING) << __func__ + << ": invalid drain mode: " << toString(command.get()); } break; case Tag::standby: - LOG(DEBUG) << __func__ << ": received STANDBY read command"; if (mState == StreamDescriptor::State::IDLE) { usleep(1000); // Simulate a blocking call into the driver. populateReply(&reply, mIsConnected); // Can switch the state to ERROR if a driver error occurs. mState = StreamDescriptor::State::STANDBY; } else { - LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; case Tag::pause: - LOG(DEBUG) << __func__ << ": received PAUSE read command"; if (mState == StreamDescriptor::State::ACTIVE) { usleep(1000); // Simulate a blocking call into the driver. populateReply(&reply, mIsConnected); // Can switch the state to ERROR if a driver error occurs. mState = StreamDescriptor::State::PAUSED; } else { - LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; case Tag::flush: - LOG(DEBUG) << __func__ << ": received FLUSH read command"; if (mState == StreamDescriptor::State::PAUSED) { usleep(1000); // Simulate a blocking call into the driver. populateReply(&reply, mIsConnected); // Can switch the state to ERROR if a driver error occurs. mState = StreamDescriptor::State::STANDBY; } else { - LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; } @@ -261,20 +267,52 @@ bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply const std::string StreamOutWorkerLogic::kThreadName = "writer"; StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { + if (mState == StreamDescriptor::State::DRAINING || + mState == StreamDescriptor::State::TRANSFERRING) { + if (auto stateDurationMs = std::chrono::duration_cast( + std::chrono::steady_clock::now() - mTransientStateStart); + stateDurationMs >= mTransientStateDelayMs) { + if (mAsyncCallback == nullptr) { + // In blocking mode, mState can only be DRAINING. + mState = StreamDescriptor::State::IDLE; + } else { + // In a real implementation, the driver should notify the HAL about + // drain or transfer completion. In the stub, we switch unconditionally. + if (mState == StreamDescriptor::State::DRAINING) { + mState = StreamDescriptor::State::IDLE; + ndk::ScopedAStatus status = mAsyncCallback->onDrainReady(); + if (!status.isOk()) { + LOG(ERROR) << __func__ << ": error from onDrainReady: " << status; + } + } else { + mState = StreamDescriptor::State::ACTIVE; + ndk::ScopedAStatus status = mAsyncCallback->onTransferReady(); + if (!status.isOk()) { + LOG(ERROR) << __func__ << ": error from onTransferReady: " << status; + } + } + } + if (mTransientStateDelayMs.count() != 0) { + LOG(DEBUG) << __func__ << ": switched to state " << toString(mState) + << " after a timeout"; + } + } + } + StreamDescriptor::Command command{}; if (!mCommandMQ->readBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": reading of command from MQ failed"; mState = StreamDescriptor::State::ERROR; return Status::ABORT; } + LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName; StreamDescriptor::Reply reply{}; reply.status = STATUS_BAD_VALUE; using Tag = StreamDescriptor::Command::Tag; switch (command.getTag()) { - case Tag::hal_reserved_exit: - if (const int32_t cookie = command.get(); + case Tag::halReservedExit: + if (const int32_t cookie = command.get(); cookie == mInternalCommandCookie) { - LOG(DEBUG) << __func__ << ": received EXIT command"; setClosed(); // This is an internal command, no need to reply. return Status::EXIT; @@ -282,8 +320,11 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie; } break; - case Tag::start: - LOG(DEBUG) << __func__ << ": received START write command"; + case Tag::getStatus: + populateReply(&reply, mIsConnected); + break; + case Tag::start: { + bool commandAccepted = true; switch (mState) { case StreamDescriptor::State::STANDBY: mState = StreamDescriptor::State::IDLE; @@ -292,97 +333,112 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() { mState = StreamDescriptor::State::ACTIVE; break; case StreamDescriptor::State::DRAIN_PAUSED: - mState = StreamDescriptor::State::PAUSED; + switchToTransientState(StreamDescriptor::State::DRAINING); + break; + case StreamDescriptor::State::TRANSFER_PAUSED: + switchToTransientState(StreamDescriptor::State::TRANSFERRING); break; default: - LOG(WARNING) << __func__ << ": START command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); + commandAccepted = false; } - if (reply.status != STATUS_INVALID_OPERATION) { + if (commandAccepted) { populateReply(&reply, mIsConnected); } - break; + } break; case Tag::burst: if (const int32_t fmqByteCount = command.get(); fmqByteCount >= 0) { - LOG(DEBUG) << __func__ << ": received BURST write command for " << fmqByteCount - << " bytes"; - if (mState != - StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states + LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for " + << fmqByteCount << " bytes"; + if (mState != StreamDescriptor::State::ERROR && + mState != StreamDescriptor::State::TRANSFERRING && + mState != StreamDescriptor::State::TRANSFER_PAUSED) { if (!write(fmqByteCount, &reply)) { mState = StreamDescriptor::State::ERROR; } if (mState == StreamDescriptor::State::STANDBY || - mState == StreamDescriptor::State::DRAIN_PAUSED) { - mState = StreamDescriptor::State::PAUSED; + mState == StreamDescriptor::State::DRAIN_PAUSED || + mState == StreamDescriptor::State::PAUSED) { + if (mAsyncCallback == nullptr || + mState != StreamDescriptor::State::DRAIN_PAUSED) { + mState = StreamDescriptor::State::PAUSED; + } else { + mState = StreamDescriptor::State::TRANSFER_PAUSED; + } } else if (mState == StreamDescriptor::State::IDLE || - mState == StreamDescriptor::State::DRAINING) { - mState = StreamDescriptor::State::ACTIVE; - } // When in 'ACTIVE' and 'PAUSED' do not need to change the state. + mState == StreamDescriptor::State::DRAINING || + mState == StreamDescriptor::State::ACTIVE) { + if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) { + mState = StreamDescriptor::State::ACTIVE; + } else { + switchToTransientState(StreamDescriptor::State::TRANSFERRING); + } + } } else { - LOG(WARNING) << __func__ << ": BURST command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } } else { LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount; } break; case Tag::drain: - LOG(DEBUG) << __func__ << ": received DRAIN write command"; - if (mState == StreamDescriptor::State::ACTIVE) { - usleep(1000); // Simulate a blocking call into the driver. - populateReply(&reply, mIsConnected); - // Can switch the state to ERROR if a driver error occurs. - mState = StreamDescriptor::State::IDLE; - // Since there is no actual hardware that would be draining the buffer, - // in order to simplify the reference code, we assume that draining - // happens instantly, thus skipping the 'DRAINING' state. - // TODO: Provide parametrization on the duration of draining to test - // handling of commands during the 'DRAINING' state. + if (command.get() == StreamDescriptor::DrainMode::DRAIN_ALL || + command.get() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) { + if (mState == StreamDescriptor::State::ACTIVE || + mState == StreamDescriptor::State::TRANSFERRING) { + usleep(1000); // Simulate a blocking call into the driver. + populateReply(&reply, mIsConnected); + // Can switch the state to ERROR if a driver error occurs. + switchToTransientState(StreamDescriptor::State::DRAINING); + } else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) { + mState = StreamDescriptor::State::DRAIN_PAUSED; + populateReply(&reply, mIsConnected); + } else { + populateReplyWrongState(&reply, command); + } } else { - LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + LOG(WARNING) << __func__ + << ": invalid drain mode: " << toString(command.get()); } break; case Tag::standby: - LOG(DEBUG) << __func__ << ": received STANDBY write command"; if (mState == StreamDescriptor::State::IDLE) { usleep(1000); // Simulate a blocking call into the driver. populateReply(&reply, mIsConnected); // Can switch the state to ERROR if a driver error occurs. mState = StreamDescriptor::State::STANDBY; } else { - LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; - case Tag::pause: - LOG(DEBUG) << __func__ << ": received PAUSE write command"; - if (mState == StreamDescriptor::State::ACTIVE || - mState == StreamDescriptor::State::DRAINING) { + case Tag::pause: { + bool commandAccepted = true; + switch (mState) { + case StreamDescriptor::State::ACTIVE: + mState = StreamDescriptor::State::PAUSED; + break; + case StreamDescriptor::State::DRAINING: + mState = StreamDescriptor::State::DRAIN_PAUSED; + break; + case StreamDescriptor::State::TRANSFERRING: + mState = StreamDescriptor::State::TRANSFER_PAUSED; + break; + default: + populateReplyWrongState(&reply, command); + commandAccepted = false; + } + if (commandAccepted) { populateReply(&reply, mIsConnected); - mState = mState == StreamDescriptor::State::ACTIVE - ? StreamDescriptor::State::PAUSED - : StreamDescriptor::State::DRAIN_PAUSED; - } else { - LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; } - break; + } break; case Tag::flush: - LOG(DEBUG) << __func__ << ": received FLUSH write command"; if (mState == StreamDescriptor::State::PAUSED || - mState == StreamDescriptor::State::DRAIN_PAUSED) { + mState == StreamDescriptor::State::DRAIN_PAUSED || + mState == StreamDescriptor::State::TRANSFER_PAUSED) { populateReply(&reply, mIsConnected); mState = StreamDescriptor::State::IDLE; } else { - LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state " - << toString(mState); - reply.status = STATUS_INVALID_OPERATION; + populateReplyWrongState(&reply, command); } break; } @@ -450,9 +506,8 @@ template void StreamCommon::stopWorker() { if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) { LOG(DEBUG) << __func__ << ": asking the worker to exit..."; - auto cmd = - StreamDescriptor::Command::make( - mContext.getInternalCommandCookie()); + auto cmd = StreamDescriptor::Command::make( + mContext.getInternalCommandCookie()); // Note: never call 'pause' and 'resume' methods of StreamWorker // in the HAL implementation. These methods are to be used by // the client side only. Preventing the worker loop from running diff --git a/audio/aidl/default/include/core-impl/Module.h b/audio/aidl/default/include/core-impl/Module.h index 00867432c2..f7b85ed7d8 100644 --- a/audio/aidl/default/include/core-impl/Module.h +++ b/audio/aidl/default/include/core-impl/Module.h @@ -86,6 +86,7 @@ class Module : public BnModule { void cleanUpPatch(int32_t patchId); ndk::ScopedAStatus createStreamContext( int32_t in_portConfigId, int64_t in_bufferSizeFrames, + std::shared_ptr asyncCallback, ::aidl::android::hardware::audio::core::StreamContext* out_context); ndk::ScopedAStatus findPortIdForNewStream( int32_t in_portConfigId, ::aidl::android::media::audio::common::AudioPort** port); diff --git a/audio/aidl/default/include/core-impl/Stream.h b/audio/aidl/default/include/core-impl/Stream.h index 5ee0f82946..3c96973f2d 100644 --- a/audio/aidl/default/include/core-impl/Stream.h +++ b/audio/aidl/default/include/core-impl/Stream.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -28,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -59,33 +61,42 @@ class StreamContext { StreamContext() = default; StreamContext(std::unique_ptr commandMQ, std::unique_ptr replyMQ, - size_t frameSize, std::unique_ptr dataMQ) + size_t frameSize, std::unique_ptr dataMQ, + std::shared_ptr asyncCallback, int transientStateDelayMs) : mCommandMQ(std::move(commandMQ)), mInternalCommandCookie(std::rand()), mReplyMQ(std::move(replyMQ)), mFrameSize(frameSize), - mDataMQ(std::move(dataMQ)) {} + mDataMQ(std::move(dataMQ)), + mAsyncCallback(asyncCallback), + mTransientStateDelayMs(transientStateDelayMs) {} StreamContext(StreamContext&& other) : mCommandMQ(std::move(other.mCommandMQ)), mInternalCommandCookie(other.mInternalCommandCookie), mReplyMQ(std::move(other.mReplyMQ)), mFrameSize(other.mFrameSize), - mDataMQ(std::move(other.mDataMQ)) {} + mDataMQ(std::move(other.mDataMQ)), + mAsyncCallback(other.mAsyncCallback), + mTransientStateDelayMs(other.mTransientStateDelayMs) {} StreamContext& operator=(StreamContext&& other) { mCommandMQ = std::move(other.mCommandMQ); mInternalCommandCookie = other.mInternalCommandCookie; mReplyMQ = std::move(other.mReplyMQ); mFrameSize = other.mFrameSize; mDataMQ = std::move(other.mDataMQ); + mAsyncCallback = other.mAsyncCallback; + mTransientStateDelayMs = other.mTransientStateDelayMs; return *this; } void fillDescriptor(StreamDescriptor* desc); + std::shared_ptr getAsyncCallback() const { return mAsyncCallback; } CommandMQ* getCommandMQ() const { return mCommandMQ.get(); } DataMQ* getDataMQ() const { return mDataMQ.get(); } size_t getFrameSize() const { return mFrameSize; } int getInternalCommandCookie() const { return mInternalCommandCookie; } ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); } + int getTransientStateDelayMs() const { return mTransientStateDelayMs; } bool isValid() const; void reset(); @@ -95,6 +106,8 @@ class StreamContext { std::unique_ptr mReplyMQ; size_t mFrameSize; std::unique_ptr mDataMQ; + std::shared_ptr mAsyncCallback; + int mTransientStateDelayMs; }; class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic { @@ -111,9 +124,17 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea mFrameSize(context.getFrameSize()), mCommandMQ(context.getCommandMQ()), mReplyMQ(context.getReplyMQ()), - mDataMQ(context.getDataMQ()) {} + mDataMQ(context.getDataMQ()), + mAsyncCallback(context.getAsyncCallback()), + mTransientStateDelayMs(context.getTransientStateDelayMs()) {} std::string init() override; void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const; + void populateReplyWrongState(StreamDescriptor::Reply* reply, + const StreamDescriptor::Command& command) const; + void switchToTransientState(StreamDescriptor::State state) { + mState = state; + mTransientStateStart = std::chrono::steady_clock::now(); + } // Atomic fields are used both by the main and worker threads. std::atomic mIsConnected = false; @@ -125,6 +146,9 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea StreamContext::CommandMQ* mCommandMQ; StreamContext::ReplyMQ* mReplyMQ; StreamContext::DataMQ* mDataMQ; + std::shared_ptr mAsyncCallback; + const std::chrono::duration mTransientStateDelayMs; + std::chrono::time_point mTransientStateStart; // We use an array and the "size" field instead of a vector to be able to detect // memory allocation issues. std::unique_ptr mDataBuffer; diff --git a/audio/aidl/vts/ModuleConfig.cpp b/audio/aidl/vts/ModuleConfig.cpp index 33c5b72ad4..c081402106 100644 --- a/audio/aidl/vts/ModuleConfig.cpp +++ b/audio/aidl/vts/ModuleConfig.cpp @@ -125,21 +125,21 @@ std::vector ModuleConfig::getOutputMixPorts() const { return result; } +std::vector ModuleConfig::getNonBlockingMixPorts(bool attachedOnly, + bool singlePort) const { + return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) { + return isBitPositionFlagSet(port.flags.get(), + AudioOutputFlags::NON_BLOCKING) && + (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty()); + }); +} + std::vector ModuleConfig::getOffloadMixPorts(bool attachedOnly, bool singlePort) const { - std::vector result; - const auto mixPorts = getMixPorts(false /*isInput*/); - auto offloadPortIt = mixPorts.begin(); - while (offloadPortIt != mixPorts.end()) { - offloadPortIt = std::find_if(offloadPortIt, mixPorts.end(), [&](const AudioPort& port) { - return isBitPositionFlagSet(port.flags.get(), - AudioOutputFlags::COMPRESS_OFFLOAD) && - (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty()); - }); - if (offloadPortIt == mixPorts.end()) break; - result.push_back(*offloadPortIt++); - if (singlePort) break; - } - return result; + return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) { + return isBitPositionFlagSet(port.flags.get(), + AudioOutputFlags::COMPRESS_OFFLOAD) && + (!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty()); + }); } std::vector ModuleConfig::getAttachedDevicesPortsForMixPort( @@ -343,6 +343,19 @@ static bool isDynamicProfile(const AudioProfile& profile) { profile.sampleRates.empty() || profile.channelMasks.empty(); } +std::vector ModuleConfig::findMixPorts( + bool isInput, bool singlePort, std::function pred) const { + std::vector result; + const auto mixPorts = getMixPorts(isInput); + for (auto mixPortIt = mixPorts.begin(); mixPortIt != mixPorts.end();) { + mixPortIt = std::find_if(mixPortIt, mixPorts.end(), pred); + if (mixPortIt == mixPorts.end()) break; + result.push_back(*mixPortIt++); + if (singlePort) break; + } + return result; +} + std::vector ModuleConfig::generateAudioMixPortConfigs( const std::vector& ports, bool isInput, bool singleProfile) const { std::vector result; diff --git a/audio/aidl/vts/ModuleConfig.h b/audio/aidl/vts/ModuleConfig.h index dc109a792f..a85aa7f0d7 100644 --- a/audio/aidl/vts/ModuleConfig.h +++ b/audio/aidl/vts/ModuleConfig.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -48,6 +49,8 @@ class ModuleConfig { std::vector getMixPorts(bool isInput) const { return isInput ? getInputMixPorts() : getOutputMixPorts(); } + std::vector getNonBlockingMixPorts( + bool attachedOnly, bool singlePort) const; std::vector getOffloadMixPorts( bool attachedOnly, bool singlePort) const; @@ -121,6 +124,9 @@ class ModuleConfig { std::string toString() const; private: + std::vector findMixPorts( + bool isInput, bool singlePort, + std::function pred) const; std::vector generateAudioMixPortConfigs( const std::vector& ports, bool isInput, bool singleProfile) const; diff --git a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp index 5e9aa7f193..79b20fe0b7 100644 --- a/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp +++ b/audio/aidl/vts/VtsHalAudioCoreTargetTest.cpp @@ -15,12 +15,16 @@ */ #include +#include #include +#include #include #include +#include #include #include #include +#include #include #define LOG_TAG "VtsHalAudioCore" @@ -30,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -389,15 +394,132 @@ class StreamContext { std::unique_ptr mDataMQ; }; -class StreamLogicDriver { +struct StreamEventReceiver { + virtual ~StreamEventReceiver() = default; + enum class Event { None, DrainReady, Error, TransferReady }; + virtual std::tuple getLastEvent() const = 0; + virtual std::tuple waitForEvent(int clientEventSeq) = 0; + static constexpr int kEventSeqInit = -1; +}; +std::string toString(StreamEventReceiver::Event event) { + switch (event) { + case StreamEventReceiver::Event::None: + return "None"; + case StreamEventReceiver::Event::DrainReady: + return "DrainReady"; + case StreamEventReceiver::Event::Error: + return "Error"; + case StreamEventReceiver::Event::TransferReady: + return "TransferReady"; + } + return std::to_string(static_cast(event)); +} + +// Transition to the next state happens either due to a command from the client, +// or after an event received from the server. +using TransitionTrigger = std::variant; +using StateTransition = std::pair; +struct StateSequence { + virtual ~StateSequence() = default; + virtual void rewind() = 0; + virtual bool done() const = 0; + virtual TransitionTrigger getTrigger() = 0; + virtual std::set getExpectedStates() = 0; + virtual void advance(StreamDescriptor::State state) = 0; +}; + +static const StreamDescriptor::Command kGetStatusCommand = + StreamDescriptor::Command::make(Void{}); +static const StreamDescriptor::Command kStartCommand = + StreamDescriptor::Command::make(Void{}); +static const StreamDescriptor::Command kBurstCommand = + StreamDescriptor::Command::make(0); +static const StreamDescriptor::Command kDrainInCommand = + StreamDescriptor::Command::make( + StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED); +static const StreamDescriptor::Command kDrainOutAllCommand = + StreamDescriptor::Command::make( + StreamDescriptor::DrainMode::DRAIN_ALL); +static const StreamDescriptor::Command kDrainOutEarlyCommand = + StreamDescriptor::Command::make( + StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY); +static const StreamDescriptor::Command kStandbyCommand = + StreamDescriptor::Command::make(Void{}); +static const StreamDescriptor::Command kPauseCommand = + StreamDescriptor::Command::make(Void{}); +static const StreamDescriptor::Command kFlushCommand = + StreamDescriptor::Command::make(Void{}); +static const StreamEventReceiver::Event kTransferReadyEvent = + StreamEventReceiver::Event::TransferReady; +static const StreamEventReceiver::Event kDrainReadyEvent = StreamEventReceiver::Event::DrainReady; + +// Handle possible bifurcations: +// - on burst and on start: 'TRANSFERRING' -> {'ACTIVE', 'TRANSFERRING'} +// - on pause: 'TRANSFER_PAUSED' -> {'PAUSED', 'TRANSFER_PAUSED'} +// It is assumed that the 'steps' provided on the construction contain the sequence +// for the async case, which gets corrected in the case when the HAL decided to do +// a synchronous transfer. +class SmartStateSequence : public StateSequence { public: + explicit SmartStateSequence(const std::vector& steps) : mSteps(steps) {} + explicit SmartStateSequence(std::vector&& steps) : mSteps(std::move(steps)) {} + void rewind() override { mCurrentStep = 0; } + bool done() const override { return mCurrentStep >= mSteps.size(); } + TransitionTrigger getTrigger() override { return mSteps[mCurrentStep].first; } + std::set getExpectedStates() override { + std::set result = {getState()}; + if (isBurstBifurcation() || isStartBifurcation()) { + result.insert(StreamDescriptor::State::ACTIVE); + } else if (isPauseBifurcation()) { + result.insert(StreamDescriptor::State::PAUSED); + } + return result; + } + void advance(StreamDescriptor::State state) override { + if (isBurstBifurcation() && state == StreamDescriptor::State::ACTIVE && + mCurrentStep + 1 < mSteps.size() && + mSteps[mCurrentStep + 1].first == TransitionTrigger{kTransferReadyEvent}) { + mCurrentStep++; + } + mCurrentStep++; + } + + private: + StreamDescriptor::State getState() const { return mSteps[mCurrentStep].second; } + bool isBurstBifurcation() { + return getTrigger() == TransitionTrigger{kBurstCommand}&& getState() == + StreamDescriptor::State::TRANSFERRING; + } + bool isPauseBifurcation() { + return getTrigger() == TransitionTrigger{kPauseCommand}&& getState() == + StreamDescriptor::State::TRANSFER_PAUSED; + } + bool isStartBifurcation() { + return getTrigger() == TransitionTrigger{kStartCommand}&& getState() == + StreamDescriptor::State::TRANSFERRING; + } + const std::vector mSteps; + size_t mCurrentStep = 0; +}; + +std::string toString(const TransitionTrigger& trigger) { + if (std::holds_alternative(trigger)) { + return std::string("'") + .append(toString(std::get(trigger).getTag())) + .append("' command"); + } + return std::string("'") + .append(toString(std::get(trigger))) + .append("' event"); +} + +struct StreamLogicDriver { virtual ~StreamLogicDriver() = default; // Return 'true' to stop the worker. virtual bool done() = 0; // For 'Writer' logic, if the 'actualSize' is 0, write is skipped. // The 'fmqByteCount' from the returned command is passed as is to the HAL. - virtual StreamDescriptor::Command getNextCommand(int maxDataSize, - int* actualSize = nullptr) = 0; + virtual TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize = nullptr) = 0; // Return 'true' to indicate that no further processing is needed, // for example, the driver is expecting a bad status to be returned. // The logic cycle will return with 'CONTINUE' status. Otherwise, @@ -410,46 +532,102 @@ class StreamLogicDriver { class StreamCommonLogic : public StreamLogic { protected: - StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver) + StreamCommonLogic(const StreamContext& context, StreamLogicDriver* driver, + StreamEventReceiver* eventReceiver) : mCommandMQ(context.getCommandMQ()), mReplyMQ(context.getReplyMQ()), mDataMQ(context.getDataMQ()), mData(context.getBufferSizeBytes()), - mDriver(driver) {} + mDriver(driver), + mEventReceiver(eventReceiver) {} StreamContext::CommandMQ* getCommandMQ() const { return mCommandMQ; } StreamContext::ReplyMQ* getReplyMQ() const { return mReplyMQ; } + StreamContext::DataMQ* getDataMQ() const { return mDataMQ; } StreamLogicDriver* getDriver() const { return mDriver; } + StreamEventReceiver* getEventReceiver() const { return mEventReceiver; } - std::string init() override { return ""; } + std::string init() override { + LOG(DEBUG) << __func__; + return ""; + } + std::optional maybeGetNextCommand(int* actualSize = nullptr) { + TransitionTrigger trigger = mDriver->getNextTrigger(mData.size(), actualSize); + if (StreamEventReceiver::Event* expEvent = + std::get_if(&trigger); + expEvent != nullptr) { + auto [eventSeq, event] = mEventReceiver->waitForEvent(mLastEventSeq); + mLastEventSeq = eventSeq; + if (event != *expEvent) { + LOG(ERROR) << __func__ << ": expected event " << toString(*expEvent) << ", got " + << toString(event); + return {}; + } + // If we were waiting for an event, the new stream state must be retrieved + // via 'getStatus'. + return StreamDescriptor::Command::make( + Void{}); + } + return std::get(trigger); + } + bool readDataFromMQ(size_t readCount) { + std::vector data(readCount); + if (mDataMQ->read(data.data(), readCount)) { + memcpy(mData.data(), data.data(), std::min(mData.size(), data.size())); + return true; + } + LOG(ERROR) << __func__ << ": reading of " << readCount << " bytes from MQ failed"; + return false; + } + bool writeDataToMQ() { + if (mDataMQ->write(mData.data(), mData.size())) { + return true; + } + LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed"; + return false; + } + private: StreamContext::CommandMQ* mCommandMQ; StreamContext::ReplyMQ* mReplyMQ; StreamContext::DataMQ* mDataMQ; std::vector mData; StreamLogicDriver* const mDriver; + StreamEventReceiver* const mEventReceiver; + int mLastEventSeq = StreamEventReceiver::kEventSeqInit; }; class StreamReaderLogic : public StreamCommonLogic { public: - StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver) - : StreamCommonLogic(context, driver) {} + StreamReaderLogic(const StreamContext& context, StreamLogicDriver* driver, + StreamEventReceiver* eventReceiver) + : StreamCommonLogic(context, driver, eventReceiver) {} protected: Status cycle() override { if (getDriver()->done()) { + LOG(DEBUG) << __func__ << ": clean exit"; return Status::EXIT; } - StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size()); - if (!mCommandMQ->writeBlocking(&command, 1)) { + StreamDescriptor::Command command; + if (auto maybeCommand = maybeGetNextCommand(); maybeCommand.has_value()) { + command = std::move(maybeCommand.value()); + } else { + LOG(ERROR) << __func__ << ": no next command"; + return Status::ABORT; + } + LOG(DEBUG) << "Writing command: " << command.toString(); + if (!getCommandMQ()->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (!mReplyMQ->readBlocking(&reply, 1)) { - LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; + LOG(DEBUG) << "Reading reply..."; + if (!getReplyMQ()->readBlocking(&reply, 1)) { return Status::ABORT; } + LOG(DEBUG) << "Reply received: " << reply.toString(); if (getDriver()->interceptRawReply(reply)) { + LOG(DEBUG) << __func__ << ": reply has been intercepted by the driver"; return Status::CONTINUE; } if (reply.status != STATUS_OK) { @@ -463,11 +641,11 @@ class StreamReaderLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - if (static_cast(reply.fmqByteCount) != mDataMQ->availableToRead()) { + if (static_cast(reply.fmqByteCount) != getDataMQ()->availableToRead()) { LOG(ERROR) << __func__ << ": the byte count in the reply is not the same as the amount of " << "data available in the MQ: " << reply.fmqByteCount - << " != " << mDataMQ->availableToRead(); + << " != " << getDataMQ()->availableToRead(); } if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { LOG(ERROR) << __func__ << ": received invalid latency value: " << reply.latencyMs; @@ -484,10 +662,8 @@ class StreamReaderLogic : public StreamCommonLogic { return Status::ABORT; } const bool acceptedReply = getDriver()->processValidReply(reply); - if (const size_t readCount = mDataMQ->availableToRead(); readCount > 0) { - std::vector data(readCount); - if (mDataMQ->read(data.data(), readCount)) { - memcpy(mData.data(), data.data(), std::min(mData.size(), data.size())); + if (const size_t readCount = getDataMQ()->availableToRead(); readCount > 0) { + if (readDataFromMQ(readCount)) { goto checkAcceptedReply; } LOG(ERROR) << __func__ << ": reading of " << readCount << " data bytes from MQ failed"; @@ -505,29 +681,39 @@ using StreamReader = StreamWorker; class StreamWriterLogic : public StreamCommonLogic { public: - StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver) - : StreamCommonLogic(context, driver) {} + StreamWriterLogic(const StreamContext& context, StreamLogicDriver* driver, + StreamEventReceiver* eventReceiver) + : StreamCommonLogic(context, driver, eventReceiver) {} protected: Status cycle() override { if (getDriver()->done()) { + LOG(DEBUG) << __func__ << ": clean exit"; return Status::EXIT; } int actualSize = 0; - StreamDescriptor::Command command = getDriver()->getNextCommand(mData.size(), &actualSize); - if (actualSize != 0 && !mDataMQ->write(mData.data(), mData.size())) { - LOG(ERROR) << __func__ << ": writing of " << mData.size() << " bytes to MQ failed"; + StreamDescriptor::Command command; + if (auto maybeCommand = maybeGetNextCommand(&actualSize); maybeCommand.has_value()) { + command = std::move(maybeCommand.value()); + } else { + LOG(ERROR) << __func__ << ": no next command"; return Status::ABORT; } - if (!mCommandMQ->writeBlocking(&command, 1)) { + if (actualSize != 0 && !writeDataToMQ()) { + return Status::ABORT; + } + LOG(DEBUG) << "Writing command: " << command.toString(); + if (!getCommandMQ()->writeBlocking(&command, 1)) { LOG(ERROR) << __func__ << ": writing of command into MQ failed"; return Status::ABORT; } StreamDescriptor::Reply reply{}; - if (!mReplyMQ->readBlocking(&reply, 1)) { + LOG(DEBUG) << "Reading reply..."; + if (!getReplyMQ()->readBlocking(&reply, 1)) { LOG(ERROR) << __func__ << ": reading of reply from MQ failed"; return Status::ABORT; } + LOG(DEBUG) << "Reply received: " << reply.toString(); if (getDriver()->interceptRawReply(reply)) { return Status::CONTINUE; } @@ -542,10 +728,10 @@ class StreamWriterLogic : public StreamCommonLogic { << ": received invalid byte count in the reply: " << reply.fmqByteCount; return Status::ABORT; } - if (mDataMQ->availableToWrite() != mDataMQ->getQuantumCount()) { + if (getDataMQ()->availableToWrite() != getDataMQ()->getQuantumCount()) { LOG(ERROR) << __func__ << ": the HAL module did not consume all data from the data MQ: " - << "available to write " << mDataMQ->availableToWrite() - << ", total size: " << mDataMQ->getQuantumCount(); + << "available to write " << getDataMQ()->availableToWrite() + << ", total size: " << getDataMQ()->getQuantumCount(); return Status::ABORT; } if (reply.latencyMs < 0 && reply.latencyMs != StreamDescriptor::LATENCY_UNKNOWN) { @@ -571,6 +757,71 @@ class StreamWriterLogic : public StreamCommonLogic { }; using StreamWriter = StreamWorker; +class DefaultStreamCallback : public ::aidl::android::hardware::audio::core::BnStreamCallback, + public StreamEventReceiver { + ndk::ScopedAStatus onTransferReady() override { + LOG(DEBUG) << __func__; + putLastEvent(Event::TransferReady); + return ndk::ScopedAStatus::ok(); + } + ndk::ScopedAStatus onError() override { + LOG(DEBUG) << __func__; + putLastEvent(Event::Error); + return ndk::ScopedAStatus::ok(); + } + ndk::ScopedAStatus onDrainReady() override { + LOG(DEBUG) << __func__; + putLastEvent(Event::DrainReady); + return ndk::ScopedAStatus::ok(); + } + + public: + // To avoid timing out the whole test suite in case no event is received + // from the HAL, use a local timeout for event waiting. + static constexpr auto kEventTimeoutMs = std::chrono::milliseconds(1000); + + StreamEventReceiver* getEventReceiver() { return this; } + std::tuple getLastEvent() const override { + std::lock_guard l(mLock); + return getLastEvent_l(); + } + std::tuple waitForEvent(int clientEventSeq) override { + std::unique_lock l(mLock); + android::base::ScopedLockAssertion lock_assertion(mLock); + LOG(DEBUG) << __func__ << ": client " << clientEventSeq << ", last " << mLastEventSeq; + if (mCv.wait_for(l, kEventTimeoutMs, [&]() { + android::base::ScopedLockAssertion lock_assertion(mLock); + return clientEventSeq < mLastEventSeq; + })) { + } else { + LOG(WARNING) << __func__ << ": timed out waiting for an event"; + putLastEvent_l(Event::None); + } + return getLastEvent_l(); + } + + private: + std::tuple getLastEvent_l() const REQUIRES(mLock) { + return std::make_tuple(mLastEventSeq, mLastEvent); + } + void putLastEvent(Event event) { + { + std::lock_guard l(mLock); + putLastEvent_l(event); + } + mCv.notify_one(); + } + void putLastEvent_l(Event event) REQUIRES(mLock) { + mLastEventSeq++; + mLastEvent = event; + } + + mutable std::mutex mLock; + std::condition_variable mCv; + int mLastEventSeq GUARDED_BY(mLock) = kEventSeqInit; + Event mLastEvent GUARDED_BY(mLock) = Event::None; +}; + template struct IOTraits { static constexpr bool is_input = std::is_same_v; @@ -607,6 +858,7 @@ class WithStream { } Stream* get() const { return mStream.get(); } const StreamContext* getContext() const { return mContext ? &(mContext.value()) : nullptr; } + StreamEventReceiver* getEventReceiver() { return mStreamCallback->getEventReceiver(); } std::shared_ptr getSharedPointer() const { return mStream; } const AudioPortConfig& getPortConfig() const { return mPortConfig.get(); } int32_t getPortId() const { return mPortConfig.getId(); } @@ -616,6 +868,7 @@ class WithStream { std::shared_ptr mStream; StreamDescriptor mDescriptor; std::optional mContext; + std::shared_ptr mStreamCallback; }; SinkMetadata GenerateSinkMetadata(const AudioPortConfig& portConfig) { @@ -636,11 +889,15 @@ ScopedAStatus WithStream::SetUpNoChecks(IModule* module, args.portConfigId = portConfig.id; args.sinkMetadata = GenerateSinkMetadata(portConfig); args.bufferSizeFrames = bufferSizeFrames; + auto callback = ndk::SharedRefBase::make(); + // TODO: Uncomment when support for asynchronous input is implemented. + // args.callback = callback; aidl::android::hardware::audio::core::IModule::OpenInputStreamReturn ret; ScopedAStatus status = module->openInputStream(args, &ret); if (status.isOk()) { mStream = std::move(ret.stream); mDescriptor = std::move(ret.desc); + mStreamCallback = std::move(callback); } return status; } @@ -665,11 +922,14 @@ ScopedAStatus WithStream::SetUpNoChecks(IModule* module, args.sourceMetadata = GenerateSourceMetadata(portConfig); args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig); args.bufferSizeFrames = bufferSizeFrames; + auto callback = ndk::SharedRefBase::make(); + args.callback = callback; aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret; ScopedAStatus status = module->openOutputStream(args, &ret); if (status.isOk()) { mStream = std::move(ret.stream); mDescriptor = std::move(ret.desc); + mStreamCallback = std::move(callback); } return status; } @@ -1379,10 +1639,10 @@ TEST_P(AudioCoreTelephony, SwitchAudioMode) { } } +using CommandSequence = std::vector; class StreamLogicDriverInvalidCommand : public StreamLogicDriver { public: - StreamLogicDriverInvalidCommand(const std::vector& commands) - : mCommands(commands) {} + StreamLogicDriverInvalidCommand(const CommandSequence& commands) : mCommands(commands) {} std::string getUnexpectedStatuses() { // This method is intended to be called after the worker thread has joined, @@ -1396,25 +1656,29 @@ class StreamLogicDriverInvalidCommand : public StreamLogicDriver { } bool done() override { return mNextCommand >= mCommands.size(); } - StreamDescriptor::Command getNextCommand(int, int* actualSize) override { + TransitionTrigger getNextTrigger(int, int* actualSize) override { if (actualSize != nullptr) *actualSize = 0; return mCommands[mNextCommand++]; } bool interceptRawReply(const StreamDescriptor::Reply& reply) override { - if (reply.status != STATUS_BAD_VALUE) { - std::string s = mCommands[mNextCommand - 1].toString(); + const size_t currentCommand = mNextCommand - 1; // increased by getNextTrigger + const bool isLastCommand = currentCommand == mCommands.size() - 1; + // All but the last command should run correctly. The last command must return 'BAD_VALUE' + // status. + if ((!isLastCommand && reply.status != STATUS_OK) || + (isLastCommand && reply.status != STATUS_BAD_VALUE)) { + std::string s = mCommands[currentCommand].toString(); s.append(", ").append(statusToString(reply.status)); mStatuses.push_back(std::move(s)); - // If the HAL does not recognize the command as invalid, - // retrieve the data etc. - return reply.status != STATUS_OK; + // Process the reply, since the worker exits in case of an error. + return false; } - return true; + return isLastCommand; } bool processValidReply(const StreamDescriptor::Reply&) override { return true; } private: - const std::vector mCommands; + const CommandSequence mCommands; size_t mNextCommand = 0; std::vector mStatuses; }; @@ -1556,22 +1820,46 @@ class AudioStream : public AudioCoreModule { } void SendInvalidCommandImpl(const AudioPortConfig& portConfig) { - std::vector commands = { - StreamDescriptor::Command::make( - 0), - // TODO: For proper testing of input streams, need to put the stream into - // a state which accepts BURST commands. - StreamDescriptor::Command::make(-1), - StreamDescriptor::Command::make( - std::numeric_limits::min()), - }; - WithStream stream(portConfig); - ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); - StreamLogicDriverInvalidCommand driver(commands); - typename IOTraits::Worker worker(*stream.getContext(), &driver); - ASSERT_TRUE(worker.start()); - worker.join(); - EXPECT_EQ("", driver.getUnexpectedStatuses()); + using TestSequence = std::pair; + // The last command in 'CommandSequence' is the one that must trigger + // an error status. All preceding commands are to put the state machine + // into a state which accepts the last command. + std::vector sequences{ + std::make_pair(std::string("HalReservedExit"), + std::vector{StreamDescriptor::Command::make< + StreamDescriptor::Command::Tag::halReservedExit>(0)}), + std::make_pair(std::string("BurstNeg"), + std::vector{kStartCommand, + StreamDescriptor::Command::make< + StreamDescriptor::Command::Tag::burst>(-1)}), + std::make_pair( + std::string("BurstMinInt"), + std::vector{kStartCommand, StreamDescriptor::Command::make< + StreamDescriptor::Command::Tag::burst>( + std::numeric_limits::min())})}; + if (IOTraits::is_input) { + sequences.emplace_back("DrainAll", + std::vector{kStartCommand, kBurstCommand, kDrainOutAllCommand}); + sequences.emplace_back( + "DrainEarly", std::vector{kStartCommand, kBurstCommand, kDrainOutEarlyCommand}); + } else { + sequences.emplace_back("DrainUnspecified", + std::vector{kStartCommand, kBurstCommand, kDrainInCommand}); + } + for (const auto& seq : sequences) { + SCOPED_TRACE(std::string("Sequence ").append(seq.first)); + LOG(DEBUG) << __func__ << ": Sequence " << seq.first; + WithStream stream(portConfig); + ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); + StreamLogicDriverInvalidCommand driver(seq.second); + typename IOTraits::Worker worker(*stream.getContext(), &driver, + stream.getEventReceiver()); + LOG(DEBUG) << __func__ << ": starting worker..."; + ASSERT_TRUE(worker.start()); + LOG(DEBUG) << __func__ << ": joining worker..."; + worker.join(); + EXPECT_EQ("", driver.getUnexpectedStatuses()); + } } }; using AudioStreamIn = AudioStream; @@ -1615,27 +1903,51 @@ TEST_P(AudioStreamOut, RequireOffloadInfo) { GTEST_SKIP() << "No mix port for compressed offload that could be routed to attached devices"; } - const auto portConfig = - moduleConfig->getSingleConfigForMixPort(false, *offloadMixPorts.begin()); - ASSERT_TRUE(portConfig.has_value()) - << "No profiles specified for the compressed offload mix port"; + const auto config = moduleConfig->getSingleConfigForMixPort(false, *offloadMixPorts.begin()); + ASSERT_TRUE(config.has_value()) << "No profiles specified for the compressed offload mix port"; + WithAudioPortConfig portConfig(config.value()); + ASSERT_NO_FATAL_FAILURE(portConfig.SetUp(module.get())); StreamDescriptor descriptor; std::shared_ptr ignored; aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args; - args.portConfigId = portConfig.value().id; - args.sourceMetadata = GenerateSourceMetadata(portConfig.value()); + args.portConfigId = portConfig.getId(); + args.sourceMetadata = GenerateSourceMetadata(portConfig.get()); args.bufferSizeFrames = kDefaultBufferSizeFrames; aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret; EXPECT_STATUS(EX_ILLEGAL_ARGUMENT, module->openOutputStream(args, &ret)) << "when no offload info is provided for a compressed offload mix port"; } -using CommandAndState = std::pair; +TEST_P(AudioStreamOut, RequireAsyncCallback) { + const auto nonBlockingMixPorts = + moduleConfig->getNonBlockingMixPorts(true /*attachedOnly*/, true /*singlePort*/); + if (nonBlockingMixPorts.empty()) { + GTEST_SKIP() + << "No mix port for non-blocking output that could be routed to attached devices"; + } + const auto config = + moduleConfig->getSingleConfigForMixPort(false, *nonBlockingMixPorts.begin()); + ASSERT_TRUE(config.has_value()) << "No profiles specified for the non-blocking mix port"; + WithAudioPortConfig portConfig(config.value()); + ASSERT_NO_FATAL_FAILURE(portConfig.SetUp(module.get())); + StreamDescriptor descriptor; + std::shared_ptr ignored; + aidl::android::hardware::audio::core::IModule::OpenOutputStreamArguments args; + args.portConfigId = portConfig.getId(); + args.sourceMetadata = GenerateSourceMetadata(portConfig.get()); + args.offloadInfo = ModuleConfig::generateOffloadInfoIfNeeded(portConfig.get()); + args.bufferSizeFrames = kDefaultBufferSizeFrames; + aidl::android::hardware::audio::core::IModule::OpenOutputStreamReturn ret; + EXPECT_STATUS(EX_ILLEGAL_ARGUMENT, module->openOutputStream(args, &ret)) + << "when no async callback is provided for a non-blocking mix port"; +} class StreamLogicDefaultDriver : public StreamLogicDriver { public: - explicit StreamLogicDefaultDriver(const std::vector& commands) - : mCommands(commands) {} + explicit StreamLogicDefaultDriver(std::shared_ptr commands) + : mCommands(commands) { + mCommands->rewind(); + } // The three methods below is intended to be called after the worker // thread has joined, thus no extra synchronization is needed. @@ -1643,59 +1955,72 @@ class StreamLogicDefaultDriver : public StreamLogicDriver { bool hasRetrogradeObservablePosition() const { return mRetrogradeObservablePosition; } std::string getUnexpectedStateTransition() const { return mUnexpectedTransition; } - bool done() override { return mNextCommand >= mCommands.size(); } - StreamDescriptor::Command getNextCommand(int maxDataSize, int* actualSize) override { - auto command = mCommands[mNextCommand++].first; - if (command.getTag() == StreamDescriptor::Command::Tag::burst) { - if (actualSize != nullptr) { - // In the output scenario, reduce slightly the fmqByteCount to verify - // that the HAL module always consumes all data from the MQ. - if (maxDataSize > 1) maxDataSize--; - *actualSize = maxDataSize; + bool done() override { return mCommands->done(); } + TransitionTrigger getNextTrigger(int maxDataSize, int* actualSize) override { + auto trigger = mCommands->getTrigger(); + if (StreamDescriptor::Command* command = std::get_if(&trigger); + command != nullptr) { + if (command->getTag() == StreamDescriptor::Command::Tag::burst) { + if (actualSize != nullptr) { + // In the output scenario, reduce slightly the fmqByteCount to verify + // that the HAL module always consumes all data from the MQ. + if (maxDataSize > 1) maxDataSize--; + *actualSize = maxDataSize; + } + command->set(maxDataSize); + } else { + if (actualSize != nullptr) *actualSize = 0; } - command.set(maxDataSize); - } else { - if (actualSize != nullptr) *actualSize = 0; } - return command; + return trigger; } bool interceptRawReply(const StreamDescriptor::Reply&) override { return false; } bool processValidReply(const StreamDescriptor::Reply& reply) override { - if (mPreviousFrames.has_value()) { - if (reply.observable.frames > mPreviousFrames.value()) { - mObservablePositionIncrease = true; - } else if (reply.observable.frames < mPreviousFrames.value()) { - mRetrogradeObservablePosition = true; + if (reply.observable.frames != StreamDescriptor::Position::UNKNOWN) { + if (mPreviousFrames.has_value()) { + if (reply.observable.frames > mPreviousFrames.value()) { + mObservablePositionIncrease = true; + } else if (reply.observable.frames < mPreviousFrames.value()) { + mRetrogradeObservablePosition = true; + } } + mPreviousFrames = reply.observable.frames; } - mPreviousFrames = reply.observable.frames; - const auto& lastCommandState = mCommands[mNextCommand - 1]; - if (lastCommandState.second != reply.state) { - std::string s = std::string("Unexpected transition from the state ") - .append(mPreviousState) - .append(" to ") - .append(toString(reply.state)) - .append(" caused by the command ") - .append(lastCommandState.first.toString()); + auto expected = mCommands->getExpectedStates(); + if (expected.count(reply.state) == 0) { + std::string s = + std::string("Unexpected transition from the state ") + .append(mPreviousState.has_value() ? toString(mPreviousState.value()) + : "") + .append(" to ") + .append(toString(reply.state)) + .append(" (expected one of ") + .append(::android::internal::ToString(expected)) + .append(") caused by the ") + .append(toString(mCommands->getTrigger())); LOG(ERROR) << __func__ << ": " << s; mUnexpectedTransition = std::move(s); return false; } + mCommands->advance(reply.state); + mPreviousState = reply.state; return true; } protected: - const std::vector& mCommands; - size_t mNextCommand = 0; + std::shared_ptr mCommands; + std::optional mPreviousState; std::optional mPreviousFrames; - std::string mPreviousState = ""; bool mObservablePositionIncrease = false; bool mRetrogradeObservablePosition = false; std::string mUnexpectedTransition; }; -using NamedCommandSequence = std::pair>; +enum { NAMED_CMD_NAME, NAMED_CMD_DELAY_MS, NAMED_CMD_STREAM_TYPE, NAMED_CMD_CMDS }; +enum class StreamTypeFilter { ANY, SYNC, ASYNC }; +using NamedCommandSequence = + std::tuple>; enum { PARAM_MODULE_NAME, PARAM_CMD_SEQ, PARAM_SETUP_SEQ }; using StreamIoTestParameters = std::tuple; @@ -1716,7 +2041,29 @@ class AudioStreamIo : public AudioCoreModuleBase, } for (const auto& portConfig : allPortConfigs) { SCOPED_TRACE(portConfig.toString()); - const auto& commandsAndStates = std::get(GetParam()).second; + const bool isNonBlocking = + IOTraits::is_input + ? false + : + // TODO: Uncomment when support for asynchronous input is implemented. + /*isBitPositionFlagSet( + portConfig.flags.value().template get(), + AudioInputFlags::NON_BLOCKING) :*/ + isBitPositionFlagSet(portConfig.flags.value() + .template get(), + AudioOutputFlags::NON_BLOCKING); + if (auto streamType = + std::get(std::get(GetParam())); + (isNonBlocking && streamType == StreamTypeFilter::SYNC) || + (!isNonBlocking && streamType == StreamTypeFilter::ASYNC)) { + continue; + } + WithDebugFlags delayTransientStates = WithDebugFlags::createNested(debug); + delayTransientStates.flags().streamTransientStateDelayMs = + std::get(std::get(GetParam())); + ASSERT_NO_FATAL_FAILURE(delayTransientStates.SetUp(module.get())); + const auto& commandsAndStates = + std::get(std::get(GetParam())); if (!std::get(GetParam())) { ASSERT_NO_FATAL_FAILURE(RunStreamIoCommandsImplSeq1(portConfig, commandsAndStates)); } else { @@ -1732,7 +2079,7 @@ class AudioStreamIo : public AudioCoreModuleBase, // Set up a patch first, then open a stream. void RunStreamIoCommandsImplSeq1(const AudioPortConfig& portConfig, - const std::vector& commandsAndStates) { + std::shared_ptr commandsAndStates) { auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( IOTraits::is_input, portConfig); ASSERT_FALSE(devicePorts.empty()); @@ -1743,9 +2090,12 @@ class AudioStreamIo : public AudioCoreModuleBase, WithStream stream(patch.getPortConfig(IOTraits::is_input)); ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); StreamLogicDefaultDriver driver(commandsAndStates); - typename IOTraits::Worker worker(*stream.getContext(), &driver); + typename IOTraits::Worker worker(*stream.getContext(), &driver, + stream.getEventReceiver()); + LOG(DEBUG) << __func__ << ": starting worker..."; ASSERT_TRUE(worker.start()); + LOG(DEBUG) << __func__ << ": joining worker..."; worker.join(); EXPECT_FALSE(worker.hasError()) << worker.getError(); EXPECT_EQ("", driver.getUnexpectedStateTransition()); @@ -1757,11 +2107,12 @@ class AudioStreamIo : public AudioCoreModuleBase, // Open a stream, then set up a patch for it. void RunStreamIoCommandsImplSeq2(const AudioPortConfig& portConfig, - const std::vector& commandsAndStates) { + std::shared_ptr commandsAndStates) { WithStream stream(portConfig); ASSERT_NO_FATAL_FAILURE(stream.SetUp(module.get(), kDefaultBufferSizeFrames)); StreamLogicDefaultDriver driver(commandsAndStates); - typename IOTraits::Worker worker(*stream.getContext(), &driver); + typename IOTraits::Worker worker(*stream.getContext(), &driver, + stream.getEventReceiver()); auto devicePorts = moduleConfig->getAttachedDevicesPortsForMixPort( IOTraits::is_input, portConfig); @@ -1770,7 +2121,9 @@ class AudioStreamIo : public AudioCoreModuleBase, WithAudioPatch patch(IOTraits::is_input, stream.getPortConfig(), devicePortConfig); ASSERT_NO_FATAL_FAILURE(patch.SetUp(module.get())); + LOG(DEBUG) << __func__ << ": starting worker..."; ASSERT_TRUE(worker.start()); + LOG(DEBUG) << __func__ << ": joining worker..."; worker.join(); EXPECT_FALSE(worker.hasError()) << worker.getError(); EXPECT_EQ("", driver.getUnexpectedStateTransition()); @@ -1975,103 +2328,210 @@ INSTANTIATE_TEST_SUITE_P(AudioStreamOutTest, AudioStreamOut, android::PrintInstanceNameToString); GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamOut); -static const StreamDescriptor::Command kStartCommand = - StreamDescriptor::Command::make(Void{}); -static const StreamDescriptor::Command kBurstCommand = - StreamDescriptor::Command::make(0); -static const StreamDescriptor::Command kDrainCommand = - StreamDescriptor::Command::make(Void{}); -static const StreamDescriptor::Command kStandbyCommand = - StreamDescriptor::Command::make(Void{}); -static const StreamDescriptor::Command kPauseCommand = - StreamDescriptor::Command::make(Void{}); -static const StreamDescriptor::Command kFlushCommand = - StreamDescriptor::Command::make(Void{}); -static const NamedCommandSequence kReadOrWriteSeq = - std::make_pair(std::string("ReadOrWrite"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE)}); -static const NamedCommandSequence kDrainInSeq = - std::make_pair(std::string("Drain"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kDrainCommand, StreamDescriptor::State::DRAINING), - std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kDrainCommand, StreamDescriptor::State::DRAINING), - // TODO: This will need to be changed once DRAIN starts taking time. - std::make_pair(kBurstCommand, StreamDescriptor::State::STANDBY)}); -static const NamedCommandSequence kDrainOutSeq = - std::make_pair(std::string("Drain"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - // TODO: This will need to be changed once DRAIN starts taking time. - std::make_pair(kDrainCommand, StreamDescriptor::State::IDLE)}); -// TODO: This will need to be changed once DRAIN starts taking time so we can pause it. -static const NamedCommandSequence kDrainPauseOutSeq = std::make_pair( - std::string("DrainPause"), - std::vector{std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kDrainCommand, StreamDescriptor::State::IDLE)}); -static const NamedCommandSequence kStandbySeq = - std::make_pair(std::string("Standby"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kStandbyCommand, StreamDescriptor::State::STANDBY), - // Perform a read or write in order to advance observable position - // (this is verified by tests). - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE)}); +// This is the value used in test sequences for which the test needs to ensure +// that the HAL stays in a transient state long enough to receive the next command. +static const int kStreamTransientStateTransitionDelayMs = 3000; + +// TODO: Add async test cases for input once it is implemented. + +std::shared_ptr makeBurstCommands(bool isSync, size_t burstCount) { + const auto burst = + isSync ? std::vector{std::make_pair(kBurstCommand, + StreamDescriptor::State::ACTIVE)} + : std::vector{ + std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING), + std::make_pair(kTransferReadyEvent, StreamDescriptor::State::ACTIVE)}; + std::vector result{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE)}; + for (size_t i = 0; i < burstCount; ++i) { + result.insert(result.end(), burst.begin(), burst.end()); + } + return std::make_shared(result); +} +static const NamedCommandSequence kReadSeq = + std::make_tuple(std::string("Read"), 0, StreamTypeFilter::ANY, makeBurstCommands(true, 3)); +static const NamedCommandSequence kWriteSyncSeq = std::make_tuple( + std::string("Write"), 0, StreamTypeFilter::SYNC, makeBurstCommands(true, 3)); +static const NamedCommandSequence kWriteAsyncSeq = std::make_tuple( + std::string("Write"), 0, StreamTypeFilter::ASYNC, makeBurstCommands(false, 3)); + +std::shared_ptr makeAsyncDrainCommands(bool isInput) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isInput ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING), + std::make_pair(isInput ? kDrainInCommand : kDrainOutAllCommand, + StreamDescriptor::State::DRAINING), + isInput ? std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE) + : std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING), + std::make_pair(isInput ? kDrainInCommand : kDrainOutAllCommand, + StreamDescriptor::State::DRAINING)}); +} +static const NamedCommandSequence kWriteDrainAsyncSeq = + std::make_tuple(std::string("WriteDrain"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::ASYNC, makeAsyncDrainCommands(false)); +static const NamedCommandSequence kDrainInSeq = std::make_tuple( + std::string("Drain"), 0, StreamTypeFilter::ANY, makeAsyncDrainCommands(true)); + +std::shared_ptr makeDrainOutCommands(bool isSync) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING), + std::make_pair(isSync ? TransitionTrigger(kGetStatusCommand) + : TransitionTrigger(kDrainReadyEvent), + StreamDescriptor::State::IDLE)}); +} +static const NamedCommandSequence kDrainOutSyncSeq = std::make_tuple( + std::string("Drain"), 0, StreamTypeFilter::SYNC, makeDrainOutCommands(true)); +static const NamedCommandSequence kDrainOutAsyncSeq = std::make_tuple( + std::string("Drain"), 0, StreamTypeFilter::ASYNC, makeDrainOutCommands(false)); + +std::shared_ptr makeDrainOutPauseCommands(bool isSync) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING), + std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING), + std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED), + std::make_pair(kStartCommand, StreamDescriptor::State::DRAINING), + std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED), + std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::PAUSED + : StreamDescriptor::State::TRANSFER_PAUSED)}); +} +static const NamedCommandSequence kDrainPauseOutSyncSeq = + std::make_tuple(std::string("DrainPause"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::SYNC, makeDrainOutPauseCommands(true)); +static const NamedCommandSequence kDrainPauseOutAsyncSeq = + std::make_tuple(std::string("DrainPause"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::ASYNC, makeDrainOutPauseCommands(false)); + +// This sequence also verifies that the capture / presentation position is not reset on standby. +std::shared_ptr makeStandbyCommands(bool isInput, bool isSync) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kStandbyCommand, StreamDescriptor::State::STANDBY), + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isInput || isSync + ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING), + std::make_pair(kPauseCommand, isInput || isSync + ? StreamDescriptor::State::PAUSED + : StreamDescriptor::State::TRANSFER_PAUSED), + std::make_pair(kFlushCommand, isInput ? StreamDescriptor::State::STANDBY + : StreamDescriptor::State::IDLE), + std::make_pair(isInput ? kGetStatusCommand : kStandbyCommand, // no-op for input + StreamDescriptor::State::STANDBY), + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isInput || isSync + ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING)}); +} +static const NamedCommandSequence kStandbyInSeq = std::make_tuple( + std::string("Standby"), 0, StreamTypeFilter::ANY, makeStandbyCommands(true, false)); +static const NamedCommandSequence kStandbyOutSyncSeq = std::make_tuple( + std::string("Standby"), 0, StreamTypeFilter::SYNC, makeStandbyCommands(false, true)); +static const NamedCommandSequence kStandbyOutAsyncSeq = + std::make_tuple(std::string("Standby"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::ASYNC, makeStandbyCommands(false, false)); + static const NamedCommandSequence kPauseInSeq = - std::make_pair(std::string("Pause"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)}); -static const NamedCommandSequence kPauseOutSeq = - std::make_pair(std::string("Pause"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kBurstCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED)}); -static const NamedCommandSequence kFlushInSeq = - std::make_pair(std::string("Flush"), - std::vector{ - std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)}); -static const NamedCommandSequence kFlushOutSeq = std::make_pair( - std::string("Flush"), - std::vector{std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), - std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), - std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), - std::make_pair(kFlushCommand, StreamDescriptor::State::IDLE)}); + std::make_tuple(std::string("Pause"), 0, StreamTypeFilter::ANY, + std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), + std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), + std::make_pair(kFlushCommand, StreamDescriptor::State::STANDBY)})); +static const NamedCommandSequence kPauseOutSyncSeq = + std::make_tuple(std::string("Pause"), 0, StreamTypeFilter::SYNC, + std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), + std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED), + std::make_pair(kBurstCommand, StreamDescriptor::State::PAUSED), + std::make_pair(kStartCommand, StreamDescriptor::State::ACTIVE), + std::make_pair(kPauseCommand, StreamDescriptor::State::PAUSED)})); +/* TODO: Figure out a better way for testing sync/async bursts +static const NamedCommandSequence kPauseOutAsyncSeq = std::make_tuple( + std::string("Pause"), kStreamTransientStateTransitionDelayMs, StreamTypeFilter::ASYNC, + std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFERRING), + std::make_pair(kPauseCommand, StreamDescriptor::State::TRANSFER_PAUSED), + std::make_pair(kStartCommand, StreamDescriptor::State::TRANSFERRING), + std::make_pair(kPauseCommand, StreamDescriptor::State::TRANSFER_PAUSED), + std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAIN_PAUSED), + std::make_pair(kBurstCommand, StreamDescriptor::State::TRANSFER_PAUSED)})); +*/ + +std::shared_ptr makeFlushCommands(bool isInput, bool isSync) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isInput || isSync + ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING), + std::make_pair(kPauseCommand, isInput || isSync + ? StreamDescriptor::State::PAUSED + : StreamDescriptor::State::TRANSFER_PAUSED), + std::make_pair(kFlushCommand, isInput ? StreamDescriptor::State::STANDBY + : StreamDescriptor::State::IDLE)}); +} +static const NamedCommandSequence kFlushInSeq = std::make_tuple( + std::string("Flush"), 0, StreamTypeFilter::ANY, makeFlushCommands(true, false)); +static const NamedCommandSequence kFlushOutSyncSeq = std::make_tuple( + std::string("Flush"), 0, StreamTypeFilter::SYNC, makeFlushCommands(false, true)); +static const NamedCommandSequence kFlushOutAsyncSeq = + std::make_tuple(std::string("Flush"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::ASYNC, makeFlushCommands(false, false)); + +std::shared_ptr makeDrainPauseFlushOutCommands(bool isSync) { + return std::make_shared(std::vector{ + std::make_pair(kStartCommand, StreamDescriptor::State::IDLE), + std::make_pair(kBurstCommand, isSync ? StreamDescriptor::State::ACTIVE + : StreamDescriptor::State::TRANSFERRING), + std::make_pair(kDrainOutAllCommand, StreamDescriptor::State::DRAINING), + std::make_pair(kPauseCommand, StreamDescriptor::State::DRAIN_PAUSED), + std::make_pair(kFlushCommand, StreamDescriptor::State::IDLE)}); +} +static const NamedCommandSequence kDrainPauseFlushOutSyncSeq = + std::make_tuple(std::string("DrainPauseFlush"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::SYNC, makeDrainPauseFlushOutCommands(true)); +static const NamedCommandSequence kDrainPauseFlushOutAsyncSeq = + std::make_tuple(std::string("DrainPauseFlush"), kStreamTransientStateTransitionDelayMs, + StreamTypeFilter::ASYNC, makeDrainPauseFlushOutCommands(false)); + +// Note, this isn't the "official" enum printer, it is only used to make the test name suffix. +std::string PrintStreamFilterToString(StreamTypeFilter filter) { + switch (filter) { + case StreamTypeFilter::ANY: + return ""; + case StreamTypeFilter::SYNC: + return "Sync"; + case StreamTypeFilter::ASYNC: + return "Async"; + } + return std::string("Unknown").append(std::to_string(static_cast(filter))); +} std::string GetStreamIoTestName(const testing::TestParamInfo& info) { return android::PrintInstanceNameToString( testing::TestParamInfo{std::get(info.param), info.index}) .append("_") - .append(std::get(info.param).first) + .append(std::get(std::get(info.param))) + .append(PrintStreamFilterToString( + std::get(std::get(info.param)))) .append("_SetupSeq") .append(std::get(info.param) ? "2" : "1"); } + INSTANTIATE_TEST_SUITE_P( AudioStreamIoInTest, AudioStreamIoIn, testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), - testing::Values(kReadOrWriteSeq, kDrainInSeq, kStandbySeq, kPauseInSeq, + testing::Values(kReadSeq, kDrainInSeq, kStandbyInSeq, kPauseInSeq, kFlushInSeq), testing::Values(false, true)), GetStreamIoTestName); @@ -2079,8 +2539,13 @@ GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoIn); INSTANTIATE_TEST_SUITE_P( AudioStreamIoOutTest, AudioStreamIoOut, testing::Combine(testing::ValuesIn(android::getAidlHalInstanceNames(IModule::descriptor)), - testing::Values(kReadOrWriteSeq, kDrainOutSeq, kDrainPauseOutSeq, - kStandbySeq, kPauseOutSeq, kFlushOutSeq), + testing::Values(kWriteSyncSeq, kWriteAsyncSeq, kWriteDrainAsyncSeq, + kDrainOutSyncSeq, kDrainPauseOutSyncSeq, + kDrainPauseOutAsyncSeq, kStandbyOutSyncSeq, + kStandbyOutAsyncSeq, + kPauseOutSyncSeq, // kPauseOutAsyncSeq, + kFlushOutSyncSeq, kFlushOutAsyncSeq, + kDrainPauseFlushOutSyncSeq, kDrainPauseFlushOutAsyncSeq), testing::Values(false, true)), GetStreamIoTestName); GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(AudioStreamIoOut); @@ -2095,7 +2560,6 @@ class TestExecutionTracer : public ::testing::EmptyTestEventListener { void OnTestStart(const ::testing::TestInfo& test_info) override { TraceTestState("Started", test_info); } - void OnTestEnd(const ::testing::TestInfo& test_info) override { TraceTestState("Completed", test_info); } @@ -2109,6 +2573,7 @@ class TestExecutionTracer : public ::testing::EmptyTestEventListener { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::UnitTest::GetInstance()->listeners().Append(new TestExecutionTracer()); + android::base::SetMinimumLogSeverity(::android::base::DEBUG); ABinderProcess_setThreadPoolMaxThreadCount(1); ABinderProcess_startThreadPool(); return RUN_ALL_TESTS();