Merge "[UWB HAL] Use AsyncFd to read the buffer only when it's readable." into main am: 4e3c5aca14

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2766185

Change-Id: I4665614ebfe72260217be365be89e2c2597e58bd
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
Bob Wang
2023-10-03 17:45:25 +00:00
committed by Automerger Merge Worker
2 changed files with 141 additions and 56 deletions

View File

@@ -20,6 +20,7 @@ rust_binary {
"libbinder_rs",
"libbinder_tokio_rs",
"libtokio",
"libtokio_util",
"libnix",
"libanyhow",
],

View File

@@ -4,32 +4,34 @@ use android_hardware_uwb::aidl::android::hardware::uwb::{
};
use android_hardware_uwb::binder;
use async_trait::async_trait;
use binder::{Result, Strong};
use binder::{DeathRecipient, IBinder, Result, Strong};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use log::info;
use std::sync::Arc;
use tokio::io::unix::AsyncFd;
use tokio::select;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use std::fs::{File, OpenOptions};
use std::io::{self, Read, Write};
use std::os::fd::AsRawFd;
use std::io;
use nix::sys::termios;
enum State {
Closed,
Opened {
callbacks: Strong<dyn IUwbClientCallback>,
#[allow(dead_code)]
tasks: tokio::task::JoinSet<()>,
_handle: tokio::task::JoinHandle<()>,
serial: File,
death_recipient: DeathRecipient,
token: CancellationToken,
},
}
pub struct UwbChip {
name: String,
path: String,
state: Mutex<State>,
state: Arc<Mutex<State>>,
}
impl UwbChip {
@@ -37,23 +39,59 @@ impl UwbChip {
Self {
name,
path,
state: Mutex::new(State::Closed),
state: Arc::new(Mutex::new(State::Closed)),
}
}
}
impl State {
/// Terminate the reader task.
#[allow(dead_code)]
fn close(&mut self) -> Result<()> {
if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self {
log::info!("waiting for task cancellation");
callbacks.as_binder().unlink_to_death(death_recipient)?;
token.cancel();
log::info!("task successfully cancelled");
callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
*self = State::Closed;
}
Ok(())
}
}
pub fn makeraw(file: File) -> io::Result<File> {
let fd = file.as_raw_fd();
let mut attrs = termios::tcgetattr(fd)?;
// Configure the file descritpro as raw fd.
use nix::sys::termios::*;
let mut attrs = tcgetattr(fd)?;
cfmakeraw(&mut attrs);
tcsetattr(fd, SetArg::TCSANOW, &attrs)?;
termios::cfmakeraw(&mut attrs);
termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?;
// Configure the file descriptor as non blocking.
use nix::fcntl::*;
let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap();
fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?;
Ok(file)
}
/// Wrapper around Read::read to handle EWOULDBLOCK.
/// /!\ will actively wait for more data, make sure to call
/// this method only when data is immediately expected.
fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
while buf.len() > 0 {
match file.read(buf) {
Ok(0) => panic!("unexpectedly reached end of file"),
Ok(read_len) => buf = &mut buf[read_len..],
Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
Err(err) => return Err(err),
}
}
Ok(())
}
impl binder::Interface for UwbChip {}
#[async_trait]
@@ -65,60 +103,109 @@ impl IUwbChipAsyncServer for UwbChip {
async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
log::debug!("open: {:?}", &self.path);
let mut state = self.state.lock().await;
if matches!(*state, State::Opened { .. }) {
log::error!("the state is already opened");
return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
}
let serial = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(&self.path)
.await
.and_then(makeraw)
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
let mut state = self.state.lock().await;
let state_death_recipient = self.state.clone();
let mut death_recipient = DeathRecipient::new(move || {
let mut state = state_death_recipient.blocking_lock();
log::info!("Uwb service has died");
state.close().unwrap();
});
if let State::Closed = *state {
let client_callbacks = callbacks.clone();
callbacks.as_binder().link_to_death(&mut death_recipient)?;
let mut tasks = tokio::task::JoinSet::new();
let mut reader = serial
.try_clone()
.await
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
let token = CancellationToken::new();
let cloned_token = token.clone();
tasks.spawn(async move {
loop {
const UWB_HEADER_SIZE: usize = 4;
let client_callbacks = callbacks.clone();
let mut buffer = vec![0; UWB_HEADER_SIZE];
reader
.read_exact(&mut buffer[0..UWB_HEADER_SIZE])
.await
.unwrap();
let reader = serial
.try_clone()
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
let length = buffer[3] as usize + UWB_HEADER_SIZE;
let join_handle = tokio::task::spawn(async move {
info!("UCI reader task started");
let mut reader = AsyncFd::new(reader).unwrap();
buffer.resize(length, 0);
reader
.read_exact(&mut buffer[UWB_HEADER_SIZE..length])
.await
.unwrap();
loop {
const UWB_HEADER_SIZE: usize = 4;
let mut buffer = vec![0; UWB_HEADER_SIZE];
client_callbacks.onUciMessage(&buffer[..]).unwrap();
}
});
// The only time where the task can be safely
// cancelled is when no packet bytes have been read.
//
// - read_exact() cannot be used here since it is not
// cancellation safe.
// - read() cannot be used because it cannot be cancelled:
// the syscall is executed blocking on the threadpool
// and completes after termination of the task when
// the pipe receives more data.
let read_len = loop {
// On some platforms, the readiness detecting mechanism
// relies on edge-triggered notifications. This means that
// the OS will only notify Tokio when the file descriptor
// transitions from not-ready to ready. For this to work
// you should first try to read or write and only poll for
// readiness if that fails with an error of 
// std::io::ErrorKind::WouldBlock.
match reader.get_mut().read(&mut buffer) {
Ok(0) => {
log::error!("file unexpectedly closed");
return;
}
Ok(read_len) => break read_len,
Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
Err(_) => panic!("unexpected read failure"),
}
callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
let mut guard = select! {
_ = cloned_token.cancelled() => {
info!("task is cancelled!");
return;
},
result = reader.readable() => result.unwrap()
};
*state = State::Opened {
callbacks: callbacks.clone(),
tasks,
serial,
};
guard.clear_ready();
};
Ok(())
} else {
Err(binder::ExceptionCode::ILLEGAL_STATE.into())
}
// Read the remaining header bytes, if truncated.
read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();
let length = buffer[3] as usize + UWB_HEADER_SIZE;
buffer.resize(length, 0);
// Read the payload bytes.
read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();
client_callbacks.onUciMessage(&buffer).unwrap();
}
});
callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
*state = State::Opened {
callbacks: callbacks.clone(),
_handle: join_handle,
serial,
death_recipient,
token,
};
Ok(())
}
async fn close(&self) -> Result<()> {
@@ -126,10 +213,8 @@ impl IUwbChipAsyncServer for UwbChip {
let mut state = self.state.lock().await;
if let State::Opened { ref callbacks, .. } = *state {
callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
*state = State::Closed;
Ok(())
if matches!(*state, State::Opened { .. }) {
state.close()
} else {
Err(binder::ExceptionCode::ILLEGAL_STATE.into())
}
@@ -162,7 +247,6 @@ impl IUwbChipAsyncServer for UwbChip {
if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
serial
.write(data)
.await
.map(|written| written as i32)
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
} else {