diff --git a/uwb/aidl/default/Android.bp b/uwb/aidl/default/Android.bp index c6d1a52f9e..2b7ef57d67 100644 --- a/uwb/aidl/default/Android.bp +++ b/uwb/aidl/default/Android.bp @@ -20,6 +20,7 @@ rust_binary { "libbinder_rs", "libbinder_tokio_rs", "libtokio", + "libtokio_util", "libnix", "libanyhow", ], diff --git a/uwb/aidl/default/src/uwb_chip.rs b/uwb/aidl/default/src/uwb_chip.rs index 9587efba5d..b63aabe010 100644 --- a/uwb/aidl/default/src/uwb_chip.rs +++ b/uwb/aidl/default/src/uwb_chip.rs @@ -4,32 +4,34 @@ use android_hardware_uwb::aidl::android::hardware::uwb::{ }; use android_hardware_uwb::binder; use async_trait::async_trait; -use binder::{Result, Strong}; +use binder::{DeathRecipient, IBinder, Result, Strong}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use log::info; +use std::sync::Arc; +use tokio::io::unix::AsyncFd; +use tokio::select; use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; +use std::fs::{File, OpenOptions}; +use std::io::{self, Read, Write}; use std::os::fd::AsRawFd; -use std::io; - -use nix::sys::termios; - enum State { Closed, Opened { callbacks: Strong, - #[allow(dead_code)] - tasks: tokio::task::JoinSet<()>, + _handle: tokio::task::JoinHandle<()>, serial: File, + death_recipient: DeathRecipient, + token: CancellationToken, }, } pub struct UwbChip { name: String, path: String, - state: Mutex, + state: Arc>, } impl UwbChip { @@ -37,23 +39,59 @@ impl UwbChip { Self { name, path, - state: Mutex::new(State::Closed), + state: Arc::new(Mutex::new(State::Closed)), } } } +impl State { + /// Terminate the reader task. + #[allow(dead_code)] + fn close(&mut self) -> Result<()> { + if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self { + log::info!("waiting for task cancellation"); + callbacks.as_binder().unlink_to_death(death_recipient)?; + token.cancel(); + log::info!("task successfully cancelled"); + callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?; + *self = State::Closed; + } + Ok(()) + } +} + pub fn makeraw(file: File) -> io::Result { let fd = file.as_raw_fd(); - let mut attrs = termios::tcgetattr(fd)?; + // Configure the file descritpro as raw fd. + use nix::sys::termios::*; + let mut attrs = tcgetattr(fd)?; + cfmakeraw(&mut attrs); + tcsetattr(fd, SetArg::TCSANOW, &attrs)?; - termios::cfmakeraw(&mut attrs); - - termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?; + // Configure the file descriptor as non blocking. + use nix::fcntl::*; + let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap(); + fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?; Ok(file) } +/// Wrapper around Read::read to handle EWOULDBLOCK. +/// /!\ will actively wait for more data, make sure to call +/// this method only when data is immediately expected. +fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> { + while buf.len() > 0 { + match file.read(buf) { + Ok(0) => panic!("unexpectedly reached end of file"), + Ok(read_len) => buf = &mut buf[read_len..], + Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue, + Err(err) => return Err(err), + } + } + Ok(()) +} + impl binder::Interface for UwbChip {} #[async_trait] @@ -65,60 +103,109 @@ impl IUwbChipAsyncServer for UwbChip { async fn open(&self, callbacks: &Strong) -> Result<()> { log::debug!("open: {:?}", &self.path); + let mut state = self.state.lock().await; + + if matches!(*state, State::Opened { .. }) { + log::error!("the state is already opened"); + return Err(binder::ExceptionCode::ILLEGAL_STATE.into()); + } + let serial = OpenOptions::new() .read(true) .write(true) .create(false) .open(&self.path) - .await .and_then(makeraw) .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?; - let mut state = self.state.lock().await; + let state_death_recipient = self.state.clone(); + let mut death_recipient = DeathRecipient::new(move || { + let mut state = state_death_recipient.blocking_lock(); + log::info!("Uwb service has died"); + state.close().unwrap(); + }); - if let State::Closed = *state { - let client_callbacks = callbacks.clone(); + callbacks.as_binder().link_to_death(&mut death_recipient)?; - let mut tasks = tokio::task::JoinSet::new(); - let mut reader = serial - .try_clone() - .await - .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?; + let token = CancellationToken::new(); + let cloned_token = token.clone(); - tasks.spawn(async move { - loop { - const UWB_HEADER_SIZE: usize = 4; + let client_callbacks = callbacks.clone(); - let mut buffer = vec![0; UWB_HEADER_SIZE]; - reader - .read_exact(&mut buffer[0..UWB_HEADER_SIZE]) - .await - .unwrap(); + let reader = serial + .try_clone() + .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?; - let length = buffer[3] as usize + UWB_HEADER_SIZE; + let join_handle = tokio::task::spawn(async move { + info!("UCI reader task started"); + let mut reader = AsyncFd::new(reader).unwrap(); - buffer.resize(length, 0); - reader - .read_exact(&mut buffer[UWB_HEADER_SIZE..length]) - .await - .unwrap(); + loop { + const UWB_HEADER_SIZE: usize = 4; + let mut buffer = vec![0; UWB_HEADER_SIZE]; - client_callbacks.onUciMessage(&buffer[..]).unwrap(); - } - }); + // The only time where the task can be safely + // cancelled is when no packet bytes have been read. + // + // - read_exact() cannot be used here since it is not + // cancellation safe. + // - read() cannot be used because it cannot be cancelled: + // the syscall is executed blocking on the threadpool + // and completes after termination of the task when + // the pipe receives more data. + let read_len = loop { + // On some platforms, the readiness detecting mechanism + // relies on edge-triggered notifications. This means that + // the OS will only notify Tokio when the file descriptor + // transitions from not-ready to ready. For this to work + // you should first try to read or write and only poll for + // readiness if that fails with an error of  + // std::io::ErrorKind::WouldBlock. + match reader.get_mut().read(&mut buffer) { + Ok(0) => { + log::error!("file unexpectedly closed"); + return; + } + Ok(read_len) => break read_len, + Err(err) if err.kind() == io::ErrorKind::WouldBlock => (), + Err(_) => panic!("unexpected read failure"), + } - callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?; + let mut guard = select! { + _ = cloned_token.cancelled() => { + info!("task is cancelled!"); + return; + }, + result = reader.readable() => result.unwrap() + }; - *state = State::Opened { - callbacks: callbacks.clone(), - tasks, - serial, - }; + guard.clear_ready(); + }; - Ok(()) - } else { - Err(binder::ExceptionCode::ILLEGAL_STATE.into()) - } + // Read the remaining header bytes, if truncated. + read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap(); + + let length = buffer[3] as usize + UWB_HEADER_SIZE; + buffer.resize(length, 0); + + // Read the payload bytes. + read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap(); + + client_callbacks.onUciMessage(&buffer).unwrap(); + } + }); + + callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?; + + *state = State::Opened { + callbacks: callbacks.clone(), + _handle: join_handle, + serial, + death_recipient, + token, + }; + + Ok(()) } async fn close(&self) -> Result<()> { @@ -126,10 +213,8 @@ impl IUwbChipAsyncServer for UwbChip { let mut state = self.state.lock().await; - if let State::Opened { ref callbacks, .. } = *state { - callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?; - *state = State::Closed; - Ok(()) + if matches!(*state, State::Opened { .. }) { + state.close() } else { Err(binder::ExceptionCode::ILLEGAL_STATE.into()) } @@ -162,7 +247,6 @@ impl IUwbChipAsyncServer for UwbChip { if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await { serial .write(data) - .await .map(|written| written as i32) .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into()) } else {