Merge changes I0a18a6d9,I13a83113,I13c9c8d1,I8717acac

* changes:
  audio: Add non-blocking I/O stream operations
  audio: Fix handling of quick worker completion in StreamWorker
  audio: Report unknown stream positions explicitly
  audio: Implement transient state testing
This commit is contained in:
Treehugger Robot
2022-12-05 22:08:02 +00:00
committed by Gerrit Code Review
21 changed files with 1252 additions and 346 deletions

View File

@@ -113,6 +113,7 @@ aidl_interface {
"android/hardware/audio/core/AudioRoute.aidl",
"android/hardware/audio/core/IConfig.aidl",
"android/hardware/audio/core/IModule.aidl",
"android/hardware/audio/core/IStreamCallback.aidl",
"android/hardware/audio/core/IStreamIn.aidl",
"android/hardware/audio/core/IStreamOut.aidl",
"android/hardware/audio/core/ITelephony.aidl",

View File

@@ -76,6 +76,7 @@ interface IModule {
android.hardware.audio.common.SourceMetadata sourceMetadata;
@nullable android.media.audio.common.AudioOffloadInfo offloadInfo;
long bufferSizeFrames;
@nullable android.hardware.audio.core.IStreamCallback callback;
}
@VintfStability
parcelable OpenOutputStreamReturn {

View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
///////////////////////////////////////////////////////////////////////////////
// THIS FILE IS IMMUTABLE. DO NOT EDIT IN ANY CASE. //
///////////////////////////////////////////////////////////////////////////////
// This file is a snapshot of an AIDL file. Do not edit it manually. There are
// two cases:
// 1). this is a frozen version file - do not edit this in any case.
// 2). this is a 'current' file. If you make a backwards compatible change to
// the interface (from the latest frozen version), the build system will
// prompt you to update this file with `m <name>-update-api`.
//
// You must not make a backward incompatible change to any AIDL file built
// with the aidl_interface module type with versions property set. The module
// type is used to build AIDL files in a way that they can be used across
// independently updatable components of the system. If a device is shipped
// with such a backward incompatible change, it has a high risk of breaking
// later when a module using the interface is updated, e.g., Mainline modules.
package android.hardware.audio.core;
@VintfStability
interface IStreamCallback {
oneway void onTransferReady();
oneway void onError();
oneway void onDrainReady();
}

View File

@@ -35,4 +35,5 @@ package android.hardware.audio.core;
@JavaDerive(equals=true, toString=true) @VintfStability
parcelable ModuleDebug {
boolean simulateDeviceConnections;
int streamTransientStateDelayMs;
}

View File

@@ -42,8 +42,9 @@ parcelable StreamDescriptor {
const int LATENCY_UNKNOWN = -1;
@FixedSize @VintfStability
parcelable Position {
long frames;
long timeNs;
long frames = -1;
long timeNs = -1;
const long UNKNOWN = -1;
}
@Backing(type="int") @VintfStability
enum State {
@@ -53,14 +54,23 @@ parcelable StreamDescriptor {
PAUSED = 4,
DRAINING = 5,
DRAIN_PAUSED = 6,
TRANSFERRING = 7,
TRANSFER_PAUSED = 8,
ERROR = 100,
}
@Backing(type="byte") @VintfStability
enum DrainMode {
DRAIN_UNSPECIFIED = 0,
DRAIN_ALL = 1,
DRAIN_EARLY_NOTIFY = 2,
}
@FixedSize @VintfStability
union Command {
int hal_reserved_exit;
int halReservedExit;
android.media.audio.common.Void getStatus;
android.media.audio.common.Void start;
int burst;
android.media.audio.common.Void drain;
android.hardware.audio.core.StreamDescriptor.DrainMode drain;
android.media.audio.common.Void standby;
android.media.audio.common.Void pause;
android.media.audio.common.Void flush;

View File

@@ -21,6 +21,7 @@ import android.hardware.audio.common.SourceMetadata;
import android.hardware.audio.core.AudioMode;
import android.hardware.audio.core.AudioPatch;
import android.hardware.audio.core.AudioRoute;
import android.hardware.audio.core.IStreamCallback;
import android.hardware.audio.core.IStreamIn;
import android.hardware.audio.core.IStreamOut;
import android.hardware.audio.core.ITelephony;
@@ -53,9 +54,13 @@ interface IModule {
* the HAL module behavior that would otherwise require human intervention.
*
* The HAL module must throw an error if there is an attempt to change
* the debug behavior for the aspect which is currently in use.
* the debug behavior for the aspect which is currently in use, or when
* the value of any of the debug flags is invalid. See 'ModuleDebug' for
* the full list of constraints.
*
* @param debug The debug options.
* @throws EX_ILLEGAL_ARGUMENT If some of the configuration parameters are
* invalid.
* @throws EX_ILLEGAL_STATE If the flag(s) being changed affect functionality
* which is currently in use.
*/
@@ -316,9 +321,13 @@ interface IModule {
* 'setAudioPortConfig' method. Existence of an audio patch involving this
* port configuration is not required for successful opening of a stream.
*
* If the port configuration has 'COMPRESS_OFFLOAD' output flag set,
* the framework must provide additional information about the encoded
* audio stream in 'offloadInfo' argument.
* If the port configuration has the 'COMPRESS_OFFLOAD' output flag set,
* the client must provide additional information about the encoded
* audio stream in the 'offloadInfo' argument.
*
* If the port configuration has the 'NON_BLOCKING' output flag set,
* the client must provide a callback for asynchronous notifications
* in the 'callback' argument.
*
* The requested buffer size is expressed in frames, thus the actual size
* in bytes depends on the audio port configuration. Also, the HAL module
@@ -354,6 +363,8 @@ interface IModule {
* - If the offload info is not provided for an offload
* port configuration.
* - If a buffer of the requested size can not be provided.
* - If the callback is not provided for a non-blocking
* port configuration.
* @throws EX_ILLEGAL_STATE In the following cases:
* - If the port config already has a stream opened on it.
* - If the limit on the open stream count for the port has
@@ -372,6 +383,8 @@ interface IModule {
@nullable AudioOffloadInfo offloadInfo;
/** Requested audio I/O buffer minimum size, in frames. */
long bufferSizeFrames;
/** Client callback interface for the non-blocking output mode. */
@nullable IStreamCallback callback;
}
@VintfStability
parcelable OpenOutputStreamReturn {

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package android.hardware.audio.core;
/**
* This interface is used to indicate completion of asynchronous operations.
* See the state machines referenced by StreamDescriptor for details.
*/
@VintfStability
oneway interface IStreamCallback {
/**
* Indicate that the stream is ready for next data exchange.
*/
void onTransferReady();
/**
* Indicate that an irrecoverable error has occurred during the last I/O
* operation. After sending this callback, the stream enters the 'ERROR'
* state.
*/
void onError();
/**
* Indicate that the stream has finished draining. This is only used
* for output streams because for input streams draining is performed
* by the client.
*/
void onDrainReady();
}

View File

@@ -35,4 +35,19 @@ parcelable ModuleDebug {
* profiles.
*/
boolean simulateDeviceConnections;
/**
* Must be non-negative. When set to non-zero, HAL module must delay
* transition from "transient" stream states (see StreamDescriptor.aidl)
* by the specified amount of milliseconds. The purpose of this delay
* is to allow VTS to test sending of stream commands while the stream is
* in a transient state. The delay must apply to newly created streams,
* it is not required to apply the delay to already opened streams.
*
* Note: the drawback of enabling this delay for asynchronous (non-blocking)
* modes is that sending of callbacks will also be delayed, because
* callbacks are sent once the stream state machine exits a transient
* state. Thus, it's not recommended to use it with tests that require
* waiting for an async callback.
*/
int streamTransientStateDelayMs;
}

View File

@@ -84,13 +84,13 @@ import android.media.audio.common.Void;
* are different.
*
* State machines of both input and output streams start from the 'STANDBY'
* state. Transitions between states happen naturally with changes in the
* state. Transitions between states happen naturally with changes in the
* states of the model elements. For simplicity, we restrict the change to one
* element only, for example, in the 'STANDBY' state, either the producer or the
* consumer can become active, but not both at the same time. States 'STANDBY',
* 'IDLE', 'READY', and '*PAUSED' are "stable"—they require an external event,
* whereas a change from the 'DRAINING' state can happen with time as the buffer
* gets empty.
* gets empty, thus it's a "transient" state.
*
* The state machine for input streams is defined in the `stream-in-sm.gv` file,
* for output streams—in the `stream-out-sm.gv` file. State machines define how
@@ -100,6 +100,28 @@ import android.media.audio.common.Void;
* client can never observe a stream with a functioning command queue in this
* state. The 'ERROR' state is a special state which the state machine enters
* when an unrecoverable hardware error is detected by the HAL module.
*
* Non-blocking (asynchronous) modes introduce a new 'TRANSFERRING' state, which
* the state machine can enter after replying to the 'burst' command, instead of
* staying in the 'ACTIVE' state. In this case the client gets unblocked
* earlier, while the actual audio delivery to / from the observer is not
* complete yet. Once the HAL module is ready for the next transfer, it notifies
* the client via a oneway callback, and the machine switches to 'ACTIVE'
* state. The 'TRANSFERRING' state is thus "transient", similar to the
* 'DRAINING' state. For output streams, asynchronous transfer can be paused,
* and it's another new state: 'TRANSFER_PAUSED'. It differs from 'PAUSED' by
* the fact that no new writes are allowed. Please see 'stream-in-async-sm.gv'
* and 'stream-out-async-sm.gv' files for details. Below is the table summary
* for asynchronous only-states:
*
* Producer | Buffer state | Consumer | Applies | State
* active? | | active? | to |
* ==========|==============|==========|=========|==============================
* Yes | Not empty | Yes | Both | TRANSFERRING, s/w x-runs counted
* ----------|--------------|----------|---------|-----------------------------
* Yes | Not empty | No | Output | TRANSFER_PAUSED,
* | | | | h/w emits silence.
*
*/
@JavaDerive(equals=true, toString=true)
@VintfStability
@@ -116,10 +138,15 @@ parcelable StreamDescriptor {
@VintfStability
@FixedSize
parcelable Position {
/**
* The value used when the position can not be reported by the HAL
* module.
*/
const long UNKNOWN = -1;
/** Frame count. */
long frames;
long frames = UNKNOWN;
/** Timestamp in nanoseconds. */
long timeNs;
long timeNs = UNKNOWN;
}
@VintfStability
@@ -166,10 +193,23 @@ parcelable StreamDescriptor {
/**
* Used for output streams only, pauses draining. This state is similar
* to the 'PAUSED' state, except that the client is not adding any
* new data. If it emits a 'BURST' command, this brings the stream
* new data. If it emits a 'burst' command, this brings the stream
* into the regular 'PAUSED' state.
*/
DRAIN_PAUSED = 6,
/**
* Used only for streams in asynchronous mode. The stream enters this
* state after receiving a 'burst' command and returning control back
* to the client, thus unblocking it.
*/
TRANSFERRING = 7,
/**
* Used only for output streams in asynchronous mode only. The stream
* enters this state after receiving a 'pause' command while being in
* the 'TRANSFERRING' state. Unlike 'PAUSED' state, this state does not
* accept new writes.
*/
TRANSFER_PAUSED = 8,
/**
* The ERROR state is entered when the stream has encountered an
* irrecoverable error from the lower layer. After entering it, the
@@ -178,6 +218,29 @@ parcelable StreamDescriptor {
ERROR = 100,
}
@VintfStability
@Backing(type="byte")
enum DrainMode {
/**
* Unspecified—used with input streams only, because the client controls
* draining.
*/
DRAIN_UNSPECIFIED = 0,
/**
* Used with output streams only, the HAL module indicates drain
* completion when all remaining audio data has been consumed.
*/
DRAIN_ALL = 1,
/**
* Used with output streams only, the HAL module indicates drain
* completion shortly before all audio data has been consumed in order
* to give the client an opportunity to provide data for the next track
* for gapless playback. The exact amount of provided time is specific
* to the HAL implementation.
*/
DRAIN_EARLY_NOTIFY = 2,
}
/**
* Used for sending commands to the HAL module. The client writes into
* the queue, the HAL module reads. The queue can only contain a single
@@ -198,7 +261,14 @@ parcelable StreamDescriptor {
* implementation must pass a random cookie as the command argument,
* which is only known to the implementation.
*/
int hal_reserved_exit;
int halReservedExit;
/**
* Retrieve the current state of the stream. This command must be
* processed by the stream in any state. The stream must provide current
* positions, counters, and its state in the reply. This command must be
* handled by the HAL module without any observable side effects.
*/
Void getStatus;
/**
* See the state machines on the applicability of this command to
* different states.
@@ -215,15 +285,14 @@ parcelable StreamDescriptor {
* read from the hardware into the 'audio.fmq' queue.
*
* In both cases it is allowed for this field to contain any
* non-negative number. The value 0 can be used if the client only needs
* to retrieve current positions and latency. Any sufficiently big value
* which exceeds the size of the queue's area which is currently
* available for reading or writing by the HAL module must be trimmed by
* the HAL module to the available size. Note that the HAL module is
* allowed to consume or provide less data than requested, and it must
* return the amount of actually read or written data via the
* 'Reply.fmqByteCount' field. Thus, only attempts to pass a negative
* number must be constituted as a client's error.
* non-negative number. Any sufficiently big value which exceeds the
* size of the queue's area which is currently available for reading or
* writing by the HAL module must be trimmed by the HAL module to the
* available size. Note that the HAL module is allowed to consume or
* provide less data than requested, and it must return the amount of
* actually read or written data via the 'Reply.fmqByteCount'
* field. Thus, only attempts to pass a negative number must be
* constituted as a client's error.
*
* Differences for the MMap No IRQ mode:
*
@@ -233,13 +302,16 @@ parcelable StreamDescriptor {
* with sending of this command.
*
* - the value must always be set to 0.
*
* See the state machines on the applicability of this command to
* different states.
*/
int burst;
/**
* See the state machines on the applicability of this command to
* different states.
*/
Void drain;
DrainMode drain;
/**
* See the state machines on the applicability of this command to
* different states.
@@ -286,10 +358,6 @@ parcelable StreamDescriptor {
* - STATUS_INVALID_OPERATION: the command is not applicable in the
* current state of the stream, or to this
* type of the stream;
* - STATUS_NO_INIT: positions can not be reported because the mix port
* is not connected to any producer or consumer, or
* because the HAL module does not support positions
* reporting for this AudioSource (on input streams).
* - STATUS_NOT_ENOUGH_DATA: a read or write error has
* occurred for the 'audio.fmq' queue;
*/
@@ -307,9 +375,11 @@ parcelable StreamDescriptor {
*/
int fmqByteCount;
/**
* It is recommended to report the current position for any command.
* If the position can not be reported, the 'status' field must be
* set to 'NO_INIT'.
* It is recommended to report the current position for any command. If
* the position can not be reported, for example because the mix port is
* not connected to any producer or consumer, or because the HAL module
* does not support positions reporting for this AudioSource (on input
* streams), the 'Position::UNKNOWN' value must be used.
*
* For output streams: the moment when the specified stream position
* was presented to an external observer (i.e. presentation position).
@@ -401,6 +471,10 @@ parcelable StreamDescriptor {
* into 'reply' queue, and hangs on waiting on a read from
* the 'command' queue.
* 6. The client wakes up due to 5. and reads the reply.
* Note: in non-blocking mode, when the HAL module goes to
* the 'TRANSFERRING' state (as indicated by the 'reply.state'
* field), the client must wait for the 'IStreamCallback.onTransferReady'
* notification to arrive before starting the next burst.
*
* For input streams the following sequence of operations is used:
* 1. The client writes the BURST command into the 'command' queue,
@@ -415,6 +489,10 @@ parcelable StreamDescriptor {
* 5. The client wakes up due to 4.
* 6. The client reads the reply and audio data. The client must
* always read from the FMQ all the data it contains.
* Note: in non-blocking mode, when the HAL module goes to
* the 'TRANSFERRING' state (as indicated by the 'reply.state'
* field) the client must wait for the 'IStreamCallback.onTransferReady'
* notification to arrive before starting the next burst.
*
*/
MQDescriptor<byte, SynchronizedReadWrite> fmq;

View File

@@ -0,0 +1,47 @@
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// To render: dot -Tpng stream-in-async-sm.gv -o stream-in-async-sm.png
digraph stream_in_async_state_machine {
node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
node [shape=point width=0.5] F;
node [shape=oval width=1];
node [fillcolor=lightgreen] STANDBY; // buffer is empty
node [fillcolor=tomato] CLOSED;
node [fillcolor=tomato] ERROR;
node [style=dashed] ANY_STATE;
node [fillcolor=lightblue style=filled];
// Note that when the producer (h/w) is passive, "burst" operations
// complete synchronously, bypassing the TRANSFERRING state.
I -> STANDBY;
STANDBY -> IDLE [label="start"]; // producer -> active
IDLE -> STANDBY [label="standby"]; // producer -> passive, buffer is cleared
IDLE -> TRANSFERRING [label="burst"]; // consumer -> active
ACTIVE -> PAUSED [label="pause"]; // consumer -> passive
ACTIVE -> DRAINING [label="drain"]; // producer -> passive
ACTIVE -> TRANSFERRING [label="burst"];
TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"];
TRANSFERRING -> PAUSED [label="pause"]; // consumer -> passive
TRANSFERRING -> DRAINING [label="drain"]; // producer -> passive
PAUSED -> TRANSFERRING [label="burst"]; // consumer -> active
PAUSED -> STANDBY [label="flush"]; // producer -> passive, buffer is cleared
DRAINING -> DRAINING [label="burst"];
DRAINING -> ACTIVE [label="start"]; // producer -> active
DRAINING -> STANDBY [label="<empty buffer>"]; // consumer deactivates
IDLE -> ERROR [label="←IStreamCallback.onError"];
PAUSED -> ERROR [label="←IStreamCallback.onError"];
TRANSFERRING -> ERROR [label="←IStreamCallback.onError"];
ANY_STATE -> CLOSED [label="→IStream*.close"];
CLOSED -> F;
}

View File

@@ -0,0 +1,57 @@
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// To render: dot -Tpng stream-out-async-sm.gv -o stream-out-async-sm.png
digraph stream_out_async_state_machine {
node [shape=doublecircle style=filled fillcolor=black width=0.5] I;
node [shape=point width=0.5] F;
node [shape=oval width=1];
node [fillcolor=lightgreen] STANDBY; // buffer is empty
node [fillcolor=lightgreen] IDLE; // buffer is empty
node [fillcolor=tomato] CLOSED;
node [fillcolor=tomato] ERROR;
node [style=dashed] ANY_STATE;
node [fillcolor=lightblue style=filled];
// Note that when the consumer (h/w) is passive, "burst" operations
// complete synchronously, bypassing the TRANSFERRING state.
I -> STANDBY;
STANDBY -> IDLE [label="start"]; // consumer -> active
STANDBY -> PAUSED [label="burst"]; // producer -> active
IDLE -> STANDBY [label="standby"]; // consumer -> passive
IDLE -> TRANSFERRING [label="burst"]; // producer -> active
ACTIVE -> PAUSED [label="pause"]; // consumer -> passive (not consuming)
ACTIVE -> DRAINING [label="drain"]; // producer -> passive
ACTIVE -> TRANSFERRING [label="burst"]; // early unblocking
ACTIVE -> ACTIVE [label="burst"]; // full write
TRANSFERRING -> ACTIVE [label="←IStreamCallback.onTransferReady"];
TRANSFERRING -> TRANSFER_PAUSED [label="pause"]; // consumer -> passive (not consuming)
TRANSFERRING -> DRAINING [label="drain"]; // producer -> passive
TRANSFER_PAUSED -> TRANSFERRING [label="start"]; // consumer -> active
TRANSFER_PAUSED -> DRAIN_PAUSED [label="drain"]; // producer -> passive
TRANSFER_PAUSED -> IDLE [label="flush"]; // buffer is cleared
PAUSED -> PAUSED [label="burst"];
PAUSED -> ACTIVE [label="start"]; // consumer -> active
PAUSED -> IDLE [label="flush"]; // producer -> passive, buffer is cleared
DRAINING -> IDLE [label="←IStreamCallback.onDrainReady"];
DRAINING -> TRANSFERRING [label="burst"]; // producer -> active
DRAINING -> DRAIN_PAUSED [label="pause"]; // consumer -> passive (not consuming)
DRAIN_PAUSED -> DRAINING [label="start"]; // consumer -> active
DRAIN_PAUSED -> TRANSFER_PAUSED [label="burst"]; // producer -> active
DRAIN_PAUSED -> IDLE [label="flush"]; // buffer is cleared
IDLE -> ERROR [label="←IStreamCallback.onError"];
DRAINING -> ERROR [label="←IStreamCallback.onError"];
TRANSFERRING -> ERROR [label="←IStreamCallback.onError"];
ANY_STATE -> CLOSED [label="→IStream*.close"];
CLOSED -> F;
}

View File

@@ -25,15 +25,20 @@ namespace android::hardware::audio::common::internal {
bool ThreadController::start(const std::string& name, int priority) {
mThreadName = name;
mThreadPriority = priority;
mWorker = std::thread(&ThreadController::workerThread, this);
if (kTestSingleThread != name) {
mWorker = std::thread(&ThreadController::workerThread, this);
} else {
// Simulate the case when the workerThread completes prior
// to the moment when we being waiting for its start.
workerThread();
}
std::unique_lock<std::mutex> lock(mWorkerLock);
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
mWorkerCv.wait(lock, [&]() {
android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
return mWorkerState == WorkerState::RUNNING || !mError.empty();
return mWorkerState != WorkerState::INITIAL || !mError.empty();
});
mWorkerStateChangeRequest = false;
return mWorkerState == WorkerState::RUNNING;
return mError.empty();
}
void ThreadController::stop() {
@@ -81,8 +86,8 @@ void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState n
void ThreadController::workerThread() {
using Status = StreamLogic::Status;
std::string error = mLogic->init();
if (error.empty() && !mThreadName.empty()) {
std::string error;
if (!mThreadName.empty()) {
std::string compliantName(mThreadName.substr(0, 15));
if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
error.append("Failed to set thread name: ").append(strerror(errCode));
@@ -94,6 +99,9 @@ void ThreadController::workerThread() {
error.append("Failed to set thread priority: ").append(strerror(errCode));
}
}
if (error.empty()) {
error.append(mLogic->init());
}
{
std::lock_guard<std::mutex> lock(mWorkerLock);
mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;

View File

@@ -32,7 +32,7 @@ class StreamLogic;
namespace internal {
class ThreadController {
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
enum class WorkerState { INITIAL, STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
public:
explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
@@ -76,7 +76,7 @@ class ThreadController {
std::thread mWorker;
std::mutex mWorkerLock;
std::condition_variable mWorkerCv;
WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::INITIAL;
std::string mError GUARDED_BY(mWorkerLock);
// The atomic lock-free variable is used to prevent priority inversions
// that can occur when a high priority worker tries to acquire the lock
@@ -90,6 +90,9 @@ class ThreadController {
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
// A special thread name used in tests only.
static const std::string kTestSingleThread = "__testST__";
} // namespace internal
class StreamLogic {

View File

@@ -283,4 +283,16 @@ TEST_P(StreamWorkerTest, ThreadPriority) {
EXPECT_EQ(priority, worker.getPriority());
}
TEST_P(StreamWorkerTest, DeferredStartCheckNoError) {
stream.setStopStatus();
EXPECT_TRUE(worker.start(android::hardware::audio::common::internal::kTestSingleThread));
EXPECT_FALSE(worker.hasError());
}
TEST_P(StreamWorkerTest, DeferredStartCheckWithError) {
stream.setErrorStatus();
EXPECT_FALSE(worker.start(android::hardware::audio::common::internal::kTestSingleThread));
EXPECT_TRUE(worker.hasError());
}
INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());

View File

@@ -97,6 +97,7 @@ void Module::cleanUpPatch(int32_t patchId) {
}
ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t in_bufferSizeFrames,
std::shared_ptr<IStreamCallback> asyncCallback,
StreamContext* out_context) {
if (in_bufferSizeFrames <= 0) {
LOG(ERROR) << __func__ << ": non-positive buffer size " << in_bufferSizeFrames;
@@ -135,8 +136,8 @@ ndk::ScopedAStatus Module::createStreamContext(int32_t in_portConfigId, int64_t
StreamContext temp(
std::make_unique<StreamContext::CommandMQ>(1, true /*configureEventFlagWord*/),
std::make_unique<StreamContext::ReplyMQ>(1, true /*configureEventFlagWord*/),
frameSize,
std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames));
frameSize, std::make_unique<StreamContext::DataMQ>(frameSize * in_bufferSizeFrames),
asyncCallback, mDebug.streamTransientStateDelayMs);
if (temp.isValid()) {
*out_context = std::move(temp);
} else {
@@ -242,6 +243,11 @@ ndk::ScopedAStatus Module::setModuleDebug(
<< "while having external devices connected";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
}
if (in_debug.streamTransientStateDelayMs < 0) {
LOG(ERROR) << __func__ << ": streamTransientStateDelayMs is negative: "
<< in_debug.streamTransientStateDelayMs;
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
mDebug = in_debug;
return ndk::ScopedAStatus::ok();
}
@@ -456,7 +462,8 @@ ndk::ScopedAStatus Module::openInputStream(const OpenInputStreamArguments& in_ar
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
StreamContext context;
if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, nullptr,
&context);
!status.isOk()) {
return status;
}
@@ -496,8 +503,16 @@ ndk::ScopedAStatus Module::openOutputStream(const OpenOutputStreamArguments& in_
<< " has COMPRESS_OFFLOAD flag set, requires offload info";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
const bool isNonBlocking = isBitPositionFlagSet(port->flags.get<AudioIoFlags::Tag::output>(),
AudioOutputFlags::NON_BLOCKING);
if (isNonBlocking && in_args.callback == nullptr) {
LOG(ERROR) << __func__ << ": port id " << port->id
<< " has NON_BLOCKING flag set, requires async callback";
return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_ARGUMENT);
}
StreamContext context;
if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames, &context);
if (auto status = createStreamContext(in_args.portConfigId, in_args.bufferSizeFrames,
isNonBlocking ? in_args.callback : nullptr, &context);
!status.isOk()) {
return status;
}

View File

@@ -87,32 +87,46 @@ std::string StreamWorkerCommonLogic::init() {
void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
bool isConnected) const {
reply->status = STATUS_OK;
if (isConnected) {
reply->status = STATUS_OK;
reply->observable.frames = mFrameCount;
reply->observable.timeNs = ::android::elapsedRealtimeNano();
} else {
reply->status = STATUS_NO_INIT;
reply->observable.frames = StreamDescriptor::Position::UNKNOWN;
reply->observable.timeNs = StreamDescriptor::Position::UNKNOWN;
}
}
void StreamWorkerCommonLogic::populateReplyWrongState(
StreamDescriptor::Reply* reply, const StreamDescriptor::Command& command) const {
LOG(WARNING) << "command '" << toString(command.getTag())
<< "' can not be handled in the state " << toString(mState);
reply->status = STATUS_INVALID_OPERATION;
}
const std::string StreamInWorkerLogic::kThreadName = "reader";
StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
// Note: for input streams, draining is driven by the client, thus
// "empty buffer" condition can only happen while handling the 'burst'
// command. Thus, unlike for output streams, it does not make sense to
// delay the 'DRAINING' state here by 'mTransientStateDelayMs'.
// TODO: Add a delay for transitions of async operations when/if they added.
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
StreamDescriptor::Reply reply{};
reply.status = STATUS_BAD_VALUE;
using Tag = StreamDescriptor::Command::Tag;
switch (command.getTag()) {
case Tag::hal_reserved_exit:
if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
@@ -120,8 +134,10 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
}
break;
case Tag::getStatus:
populateReply(&reply, mIsConnected);
break;
case Tag::start:
LOG(DEBUG) << __func__ << ": received START read command";
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAINING) {
populateReply(&reply, mIsConnected);
@@ -129,15 +145,13 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
? StreamDescriptor::State::IDLE
: StreamDescriptor::State::ACTIVE;
} else {
LOG(WARNING) << __func__ << ": START command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
case Tag::burst:
if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST read command for " << fmqByteCount
<< " bytes";
LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
<< fmqByteCount << " bytes";
if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::PAUSED ||
@@ -151,69 +165,61 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
} else if (mState == StreamDescriptor::State::DRAINING) {
// To simplify the reference code, we assume that the read operation
// has consumed all the data remaining in the hardware buffer.
// TODO: Provide parametrization on the duration of draining to test
// handling of commands during the 'DRAINING' state.
// In a real implementation, here we would either remain in
// the 'DRAINING' state, or transfer to 'STANDBY' depending on the
// buffer state.
mState = StreamDescriptor::State::STANDBY;
}
} else {
LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
}
break;
case Tag::drain:
LOG(DEBUG) << __func__ << ": received DRAIN read command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::DRAINING;
if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_UNSPECIFIED) {
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::DRAINING;
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
LOG(WARNING) << __func__
<< ": invalid drain mode: " << toString(command.get<Tag::drain>());
}
break;
case Tag::standby:
LOG(DEBUG) << __func__ << ": received STANDBY read command";
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
LOG(DEBUG) << __func__ << ": received PAUSE read command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::PAUSED;
} else {
LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
case Tag::flush:
LOG(DEBUG) << __func__ << ": received FLUSH read command";
if (mState == StreamDescriptor::State::PAUSED) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
}
@@ -261,20 +267,52 @@ bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply
const std::string StreamOutWorkerLogic::kThreadName = "writer";
StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
if (mState == StreamDescriptor::State::DRAINING ||
mState == StreamDescriptor::State::TRANSFERRING) {
if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - mTransientStateStart);
stateDurationMs >= mTransientStateDelayMs) {
if (mAsyncCallback == nullptr) {
// In blocking mode, mState can only be DRAINING.
mState = StreamDescriptor::State::IDLE;
} else {
// In a real implementation, the driver should notify the HAL about
// drain or transfer completion. In the stub, we switch unconditionally.
if (mState == StreamDescriptor::State::DRAINING) {
mState = StreamDescriptor::State::IDLE;
ndk::ScopedAStatus status = mAsyncCallback->onDrainReady();
if (!status.isOk()) {
LOG(ERROR) << __func__ << ": error from onDrainReady: " << status;
}
} else {
mState = StreamDescriptor::State::ACTIVE;
ndk::ScopedAStatus status = mAsyncCallback->onTransferReady();
if (!status.isOk()) {
LOG(ERROR) << __func__ << ": error from onTransferReady: " << status;
}
}
}
if (mTransientStateDelayMs.count() != 0) {
LOG(DEBUG) << __func__ << ": switched to state " << toString(mState)
<< " after a timeout";
}
}
}
StreamDescriptor::Command command{};
if (!mCommandMQ->readBlocking(&command, 1)) {
LOG(ERROR) << __func__ << ": reading of command from MQ failed";
mState = StreamDescriptor::State::ERROR;
return Status::ABORT;
}
LOG(DEBUG) << __func__ << ": received command " << command.toString() << " in " << kThreadName;
StreamDescriptor::Reply reply{};
reply.status = STATUS_BAD_VALUE;
using Tag = StreamDescriptor::Command::Tag;
switch (command.getTag()) {
case Tag::hal_reserved_exit:
if (const int32_t cookie = command.get<Tag::hal_reserved_exit>();
case Tag::halReservedExit:
if (const int32_t cookie = command.get<Tag::halReservedExit>();
cookie == mInternalCommandCookie) {
LOG(DEBUG) << __func__ << ": received EXIT command";
setClosed();
// This is an internal command, no need to reply.
return Status::EXIT;
@@ -282,8 +320,11 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
LOG(WARNING) << __func__ << ": EXIT command has a bad cookie: " << cookie;
}
break;
case Tag::start:
LOG(DEBUG) << __func__ << ": received START write command";
case Tag::getStatus:
populateReply(&reply, mIsConnected);
break;
case Tag::start: {
bool commandAccepted = true;
switch (mState) {
case StreamDescriptor::State::STANDBY:
mState = StreamDescriptor::State::IDLE;
@@ -292,97 +333,112 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
mState = StreamDescriptor::State::ACTIVE;
break;
case StreamDescriptor::State::DRAIN_PAUSED:
mState = StreamDescriptor::State::PAUSED;
switchToTransientState(StreamDescriptor::State::DRAINING);
break;
case StreamDescriptor::State::TRANSFER_PAUSED:
switchToTransientState(StreamDescriptor::State::TRANSFERRING);
break;
default:
LOG(WARNING) << __func__ << ": START command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
commandAccepted = false;
}
if (reply.status != STATUS_INVALID_OPERATION) {
if (commandAccepted) {
populateReply(&reply, mIsConnected);
}
break;
} break;
case Tag::burst:
if (const int32_t fmqByteCount = command.get<Tag::burst>(); fmqByteCount >= 0) {
LOG(DEBUG) << __func__ << ": received BURST write command for " << fmqByteCount
<< " bytes";
if (mState !=
StreamDescriptor::State::ERROR) { // BURST can be handled in all valid states
LOG(DEBUG) << __func__ << ": '" << toString(command.getTag()) << "' command for "
<< fmqByteCount << " bytes";
if (mState != StreamDescriptor::State::ERROR &&
mState != StreamDescriptor::State::TRANSFERRING &&
mState != StreamDescriptor::State::TRANSFER_PAUSED) {
if (!write(fmqByteCount, &reply)) {
mState = StreamDescriptor::State::ERROR;
}
if (mState == StreamDescriptor::State::STANDBY ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
mState = StreamDescriptor::State::PAUSED;
mState == StreamDescriptor::State::DRAIN_PAUSED ||
mState == StreamDescriptor::State::PAUSED) {
if (mAsyncCallback == nullptr ||
mState != StreamDescriptor::State::DRAIN_PAUSED) {
mState = StreamDescriptor::State::PAUSED;
} else {
mState = StreamDescriptor::State::TRANSFER_PAUSED;
}
} else if (mState == StreamDescriptor::State::IDLE ||
mState == StreamDescriptor::State::DRAINING) {
mState = StreamDescriptor::State::ACTIVE;
} // When in 'ACTIVE' and 'PAUSED' do not need to change the state.
mState == StreamDescriptor::State::DRAINING ||
mState == StreamDescriptor::State::ACTIVE) {
if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
mState = StreamDescriptor::State::ACTIVE;
} else {
switchToTransientState(StreamDescriptor::State::TRANSFERRING);
}
}
} else {
LOG(WARNING) << __func__ << ": BURST command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": invalid burst byte count: " << fmqByteCount;
}
break;
case Tag::drain:
LOG(DEBUG) << __func__ << ": received DRAIN write command";
if (mState == StreamDescriptor::State::ACTIVE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::IDLE;
// Since there is no actual hardware that would be draining the buffer,
// in order to simplify the reference code, we assume that draining
// happens instantly, thus skipping the 'DRAINING' state.
// TODO: Provide parametrization on the duration of draining to test
// handling of commands during the 'DRAINING' state.
if (command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_ALL ||
command.get<Tag::drain>() == StreamDescriptor::DrainMode::DRAIN_EARLY_NOTIFY) {
if (mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::TRANSFERRING) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
switchToTransientState(StreamDescriptor::State::DRAINING);
} else if (mState == StreamDescriptor::State::TRANSFER_PAUSED) {
mState = StreamDescriptor::State::DRAIN_PAUSED;
populateReply(&reply, mIsConnected);
} else {
populateReplyWrongState(&reply, command);
}
} else {
LOG(WARNING) << __func__ << ": DRAIN command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
LOG(WARNING) << __func__
<< ": invalid drain mode: " << toString(command.get<Tag::drain>());
}
break;
case Tag::standby:
LOG(DEBUG) << __func__ << ": received STANDBY write command";
if (mState == StreamDescriptor::State::IDLE) {
usleep(1000); // Simulate a blocking call into the driver.
populateReply(&reply, mIsConnected);
// Can switch the state to ERROR if a driver error occurs.
mState = StreamDescriptor::State::STANDBY;
} else {
LOG(WARNING) << __func__ << ": STANDBY command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
case Tag::pause:
LOG(DEBUG) << __func__ << ": received PAUSE write command";
if (mState == StreamDescriptor::State::ACTIVE ||
mState == StreamDescriptor::State::DRAINING) {
case Tag::pause: {
bool commandAccepted = true;
switch (mState) {
case StreamDescriptor::State::ACTIVE:
mState = StreamDescriptor::State::PAUSED;
break;
case StreamDescriptor::State::DRAINING:
mState = StreamDescriptor::State::DRAIN_PAUSED;
break;
case StreamDescriptor::State::TRANSFERRING:
mState = StreamDescriptor::State::TRANSFER_PAUSED;
break;
default:
populateReplyWrongState(&reply, command);
commandAccepted = false;
}
if (commandAccepted) {
populateReply(&reply, mIsConnected);
mState = mState == StreamDescriptor::State::ACTIVE
? StreamDescriptor::State::PAUSED
: StreamDescriptor::State::DRAIN_PAUSED;
} else {
LOG(WARNING) << __func__ << ": PAUSE command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
}
break;
} break;
case Tag::flush:
LOG(DEBUG) << __func__ << ": received FLUSH write command";
if (mState == StreamDescriptor::State::PAUSED ||
mState == StreamDescriptor::State::DRAIN_PAUSED) {
mState == StreamDescriptor::State::DRAIN_PAUSED ||
mState == StreamDescriptor::State::TRANSFER_PAUSED) {
populateReply(&reply, mIsConnected);
mState = StreamDescriptor::State::IDLE;
} else {
LOG(WARNING) << __func__ << ": FLUSH command can not be handled in the state "
<< toString(mState);
reply.status = STATUS_INVALID_OPERATION;
populateReplyWrongState(&reply, command);
}
break;
}
@@ -450,9 +506,8 @@ template <class Metadata, class StreamWorker>
void StreamCommon<Metadata, StreamWorker>::stopWorker() {
if (auto commandMQ = mContext.getCommandMQ(); commandMQ != nullptr) {
LOG(DEBUG) << __func__ << ": asking the worker to exit...";
auto cmd =
StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::hal_reserved_exit>(
mContext.getInternalCommandCookie());
auto cmd = StreamDescriptor::Command::make<StreamDescriptor::Command::Tag::halReservedExit>(
mContext.getInternalCommandCookie());
// Note: never call 'pause' and 'resume' methods of StreamWorker
// in the HAL implementation. These methods are to be used by
// the client side only. Preventing the worker loop from running

View File

@@ -86,6 +86,7 @@ class Module : public BnModule {
void cleanUpPatch(int32_t patchId);
ndk::ScopedAStatus createStreamContext(
int32_t in_portConfigId, int64_t in_bufferSizeFrames,
std::shared_ptr<IStreamCallback> asyncCallback,
::aidl::android::hardware::audio::core::StreamContext* out_context);
ndk::ScopedAStatus findPortIdForNewStream(
int32_t in_portConfigId, ::aidl::android::media::audio::common::AudioPort** port);

View File

@@ -17,6 +17,7 @@
#pragma once
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <map>
#include <memory>
@@ -28,6 +29,7 @@
#include <aidl/android/hardware/audio/common/SourceMetadata.h>
#include <aidl/android/hardware/audio/core/BnStreamIn.h>
#include <aidl/android/hardware/audio/core/BnStreamOut.h>
#include <aidl/android/hardware/audio/core/IStreamCallback.h>
#include <aidl/android/hardware/audio/core/StreamDescriptor.h>
#include <aidl/android/media/audio/common/AudioOffloadInfo.h>
#include <fmq/AidlMessageQueue.h>
@@ -59,33 +61,42 @@ class StreamContext {
StreamContext() = default;
StreamContext(std::unique_ptr<CommandMQ> commandMQ, std::unique_ptr<ReplyMQ> replyMQ,
size_t frameSize, std::unique_ptr<DataMQ> dataMQ)
size_t frameSize, std::unique_ptr<DataMQ> dataMQ,
std::shared_ptr<IStreamCallback> asyncCallback, int transientStateDelayMs)
: mCommandMQ(std::move(commandMQ)),
mInternalCommandCookie(std::rand()),
mReplyMQ(std::move(replyMQ)),
mFrameSize(frameSize),
mDataMQ(std::move(dataMQ)) {}
mDataMQ(std::move(dataMQ)),
mAsyncCallback(asyncCallback),
mTransientStateDelayMs(transientStateDelayMs) {}
StreamContext(StreamContext&& other)
: mCommandMQ(std::move(other.mCommandMQ)),
mInternalCommandCookie(other.mInternalCommandCookie),
mReplyMQ(std::move(other.mReplyMQ)),
mFrameSize(other.mFrameSize),
mDataMQ(std::move(other.mDataMQ)) {}
mDataMQ(std::move(other.mDataMQ)),
mAsyncCallback(other.mAsyncCallback),
mTransientStateDelayMs(other.mTransientStateDelayMs) {}
StreamContext& operator=(StreamContext&& other) {
mCommandMQ = std::move(other.mCommandMQ);
mInternalCommandCookie = other.mInternalCommandCookie;
mReplyMQ = std::move(other.mReplyMQ);
mFrameSize = other.mFrameSize;
mDataMQ = std::move(other.mDataMQ);
mAsyncCallback = other.mAsyncCallback;
mTransientStateDelayMs = other.mTransientStateDelayMs;
return *this;
}
void fillDescriptor(StreamDescriptor* desc);
std::shared_ptr<IStreamCallback> getAsyncCallback() const { return mAsyncCallback; }
CommandMQ* getCommandMQ() const { return mCommandMQ.get(); }
DataMQ* getDataMQ() const { return mDataMQ.get(); }
size_t getFrameSize() const { return mFrameSize; }
int getInternalCommandCookie() const { return mInternalCommandCookie; }
ReplyMQ* getReplyMQ() const { return mReplyMQ.get(); }
int getTransientStateDelayMs() const { return mTransientStateDelayMs; }
bool isValid() const;
void reset();
@@ -95,6 +106,8 @@ class StreamContext {
std::unique_ptr<ReplyMQ> mReplyMQ;
size_t mFrameSize;
std::unique_ptr<DataMQ> mDataMQ;
std::shared_ptr<IStreamCallback> mAsyncCallback;
int mTransientStateDelayMs;
};
class StreamWorkerCommonLogic : public ::android::hardware::audio::common::StreamLogic {
@@ -111,9 +124,17 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
mFrameSize(context.getFrameSize()),
mCommandMQ(context.getCommandMQ()),
mReplyMQ(context.getReplyMQ()),
mDataMQ(context.getDataMQ()) {}
mDataMQ(context.getDataMQ()),
mAsyncCallback(context.getAsyncCallback()),
mTransientStateDelayMs(context.getTransientStateDelayMs()) {}
std::string init() override;
void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
void populateReplyWrongState(StreamDescriptor::Reply* reply,
const StreamDescriptor::Command& command) const;
void switchToTransientState(StreamDescriptor::State state) {
mState = state;
mTransientStateStart = std::chrono::steady_clock::now();
}
// Atomic fields are used both by the main and worker threads.
std::atomic<bool> mIsConnected = false;
@@ -125,6 +146,9 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
StreamContext::CommandMQ* mCommandMQ;
StreamContext::ReplyMQ* mReplyMQ;
StreamContext::DataMQ* mDataMQ;
std::shared_ptr<IStreamCallback> mAsyncCallback;
const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
// We use an array and the "size" field instead of a vector to be able to detect
// memory allocation issues.
std::unique_ptr<int8_t[]> mDataBuffer;

View File

@@ -125,21 +125,21 @@ std::vector<AudioPort> ModuleConfig::getOutputMixPorts() const {
return result;
}
std::vector<AudioPort> ModuleConfig::getNonBlockingMixPorts(bool attachedOnly,
bool singlePort) const {
return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) {
return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
AudioOutputFlags::NON_BLOCKING) &&
(!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
});
}
std::vector<AudioPort> ModuleConfig::getOffloadMixPorts(bool attachedOnly, bool singlePort) const {
std::vector<AudioPort> result;
const auto mixPorts = getMixPorts(false /*isInput*/);
auto offloadPortIt = mixPorts.begin();
while (offloadPortIt != mixPorts.end()) {
offloadPortIt = std::find_if(offloadPortIt, mixPorts.end(), [&](const AudioPort& port) {
return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
AudioOutputFlags::COMPRESS_OFFLOAD) &&
(!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
});
if (offloadPortIt == mixPorts.end()) break;
result.push_back(*offloadPortIt++);
if (singlePort) break;
}
return result;
return findMixPorts(false /*isInput*/, singlePort, [&](const AudioPort& port) {
return isBitPositionFlagSet(port.flags.get<AudioIoFlags::Tag::output>(),
AudioOutputFlags::COMPRESS_OFFLOAD) &&
(!attachedOnly || !getAttachedSinkDevicesPortsForMixPort(port).empty());
});
}
std::vector<AudioPort> ModuleConfig::getAttachedDevicesPortsForMixPort(
@@ -343,6 +343,19 @@ static bool isDynamicProfile(const AudioProfile& profile) {
profile.sampleRates.empty() || profile.channelMasks.empty();
}
std::vector<AudioPort> ModuleConfig::findMixPorts(
bool isInput, bool singlePort, std::function<bool(const AudioPort&)> pred) const {
std::vector<AudioPort> result;
const auto mixPorts = getMixPorts(isInput);
for (auto mixPortIt = mixPorts.begin(); mixPortIt != mixPorts.end();) {
mixPortIt = std::find_if(mixPortIt, mixPorts.end(), pred);
if (mixPortIt == mixPorts.end()) break;
result.push_back(*mixPortIt++);
if (singlePort) break;
}
return result;
}
std::vector<AudioPortConfig> ModuleConfig::generateAudioMixPortConfigs(
const std::vector<AudioPort>& ports, bool isInput, bool singleProfile) const {
std::vector<AudioPortConfig> result;

View File

@@ -16,6 +16,7 @@
#pragma once
#include <functional>
#include <optional>
#include <set>
#include <utility>
@@ -48,6 +49,8 @@ class ModuleConfig {
std::vector<aidl::android::media::audio::common::AudioPort> getMixPorts(bool isInput) const {
return isInput ? getInputMixPorts() : getOutputMixPorts();
}
std::vector<aidl::android::media::audio::common::AudioPort> getNonBlockingMixPorts(
bool attachedOnly, bool singlePort) const;
std::vector<aidl::android::media::audio::common::AudioPort> getOffloadMixPorts(
bool attachedOnly, bool singlePort) const;
@@ -121,6 +124,9 @@ class ModuleConfig {
std::string toString() const;
private:
std::vector<aidl::android::media::audio::common::AudioPort> findMixPorts(
bool isInput, bool singlePort,
std::function<bool(const aidl::android::media::audio::common::AudioPort&)> pred) const;
std::vector<aidl::android::media::audio::common::AudioPortConfig> generateAudioMixPortConfigs(
const std::vector<aidl::android::media::audio::common::AudioPort>& ports, bool isInput,
bool singleProfile) const;

File diff suppressed because it is too large Load Diff