mirror of
https://github.com/Evolution-X/hardware_interfaces
synced 2026-02-01 16:50:18 +00:00
Merge "[UWB HAL] Use AsyncFd to read the buffer only when it's readable." into main am: 4e3c5aca14 am: 625d52ae10 am: 7235328a66 am: 8975814bda
Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2766185 Change-Id: I160ccca54d9cbc2efcd83f990e7dc7e4a457b532 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
@@ -20,6 +20,7 @@ rust_binary {
|
||||
"libbinder_rs",
|
||||
"libbinder_tokio_rs",
|
||||
"libtokio",
|
||||
"libtokio_util",
|
||||
"libnix",
|
||||
"libanyhow",
|
||||
],
|
||||
|
||||
@@ -4,32 +4,34 @@ use android_hardware_uwb::aidl::android::hardware::uwb::{
|
||||
};
|
||||
use android_hardware_uwb::binder;
|
||||
use async_trait::async_trait;
|
||||
use binder::{Result, Strong};
|
||||
use binder::{DeathRecipient, IBinder, Result, Strong};
|
||||
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use log::info;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::unix::AsyncFd;
|
||||
use tokio::select;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::os::fd::AsRawFd;
|
||||
|
||||
use std::io;
|
||||
|
||||
use nix::sys::termios;
|
||||
|
||||
enum State {
|
||||
Closed,
|
||||
Opened {
|
||||
callbacks: Strong<dyn IUwbClientCallback>,
|
||||
#[allow(dead_code)]
|
||||
tasks: tokio::task::JoinSet<()>,
|
||||
_handle: tokio::task::JoinHandle<()>,
|
||||
serial: File,
|
||||
death_recipient: DeathRecipient,
|
||||
token: CancellationToken,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct UwbChip {
|
||||
name: String,
|
||||
path: String,
|
||||
state: Mutex<State>,
|
||||
state: Arc<Mutex<State>>,
|
||||
}
|
||||
|
||||
impl UwbChip {
|
||||
@@ -37,23 +39,59 @@ impl UwbChip {
|
||||
Self {
|
||||
name,
|
||||
path,
|
||||
state: Mutex::new(State::Closed),
|
||||
state: Arc::new(Mutex::new(State::Closed)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Terminate the reader task.
|
||||
#[allow(dead_code)]
|
||||
fn close(&mut self) -> Result<()> {
|
||||
if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self {
|
||||
log::info!("waiting for task cancellation");
|
||||
callbacks.as_binder().unlink_to_death(death_recipient)?;
|
||||
token.cancel();
|
||||
log::info!("task successfully cancelled");
|
||||
callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
|
||||
*self = State::Closed;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn makeraw(file: File) -> io::Result<File> {
|
||||
let fd = file.as_raw_fd();
|
||||
|
||||
let mut attrs = termios::tcgetattr(fd)?;
|
||||
// Configure the file descritpro as raw fd.
|
||||
use nix::sys::termios::*;
|
||||
let mut attrs = tcgetattr(fd)?;
|
||||
cfmakeraw(&mut attrs);
|
||||
tcsetattr(fd, SetArg::TCSANOW, &attrs)?;
|
||||
|
||||
termios::cfmakeraw(&mut attrs);
|
||||
|
||||
termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?;
|
||||
// Configure the file descriptor as non blocking.
|
||||
use nix::fcntl::*;
|
||||
let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap();
|
||||
fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
/// Wrapper around Read::read to handle EWOULDBLOCK.
|
||||
/// /!\ will actively wait for more data, make sure to call
|
||||
/// this method only when data is immediately expected.
|
||||
fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
|
||||
while buf.len() > 0 {
|
||||
match file.read(buf) {
|
||||
Ok(0) => panic!("unexpectedly reached end of file"),
|
||||
Ok(read_len) => buf = &mut buf[read_len..],
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl binder::Interface for UwbChip {}
|
||||
|
||||
#[async_trait]
|
||||
@@ -65,60 +103,109 @@ impl IUwbChipAsyncServer for UwbChip {
|
||||
async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
|
||||
log::debug!("open: {:?}", &self.path);
|
||||
|
||||
let mut state = self.state.lock().await;
|
||||
|
||||
if matches!(*state, State::Opened { .. }) {
|
||||
log::error!("the state is already opened");
|
||||
return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
|
||||
}
|
||||
|
||||
let serial = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(false)
|
||||
.open(&self.path)
|
||||
.await
|
||||
.and_then(makeraw)
|
||||
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
|
||||
|
||||
let mut state = self.state.lock().await;
|
||||
let state_death_recipient = self.state.clone();
|
||||
let mut death_recipient = DeathRecipient::new(move || {
|
||||
let mut state = state_death_recipient.blocking_lock();
|
||||
log::info!("Uwb service has died");
|
||||
state.close().unwrap();
|
||||
});
|
||||
|
||||
if let State::Closed = *state {
|
||||
let client_callbacks = callbacks.clone();
|
||||
callbacks.as_binder().link_to_death(&mut death_recipient)?;
|
||||
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
let mut reader = serial
|
||||
.try_clone()
|
||||
.await
|
||||
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
|
||||
let token = CancellationToken::new();
|
||||
let cloned_token = token.clone();
|
||||
|
||||
tasks.spawn(async move {
|
||||
loop {
|
||||
const UWB_HEADER_SIZE: usize = 4;
|
||||
let client_callbacks = callbacks.clone();
|
||||
|
||||
let mut buffer = vec![0; UWB_HEADER_SIZE];
|
||||
reader
|
||||
.read_exact(&mut buffer[0..UWB_HEADER_SIZE])
|
||||
.await
|
||||
.unwrap();
|
||||
let reader = serial
|
||||
.try_clone()
|
||||
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
|
||||
|
||||
let length = buffer[3] as usize + UWB_HEADER_SIZE;
|
||||
let join_handle = tokio::task::spawn(async move {
|
||||
info!("UCI reader task started");
|
||||
let mut reader = AsyncFd::new(reader).unwrap();
|
||||
|
||||
buffer.resize(length, 0);
|
||||
reader
|
||||
.read_exact(&mut buffer[UWB_HEADER_SIZE..length])
|
||||
.await
|
||||
.unwrap();
|
||||
loop {
|
||||
const UWB_HEADER_SIZE: usize = 4;
|
||||
let mut buffer = vec![0; UWB_HEADER_SIZE];
|
||||
|
||||
client_callbacks.onUciMessage(&buffer[..]).unwrap();
|
||||
}
|
||||
});
|
||||
// The only time where the task can be safely
|
||||
// cancelled is when no packet bytes have been read.
|
||||
//
|
||||
// - read_exact() cannot be used here since it is not
|
||||
// cancellation safe.
|
||||
// - read() cannot be used because it cannot be cancelled:
|
||||
// the syscall is executed blocking on the threadpool
|
||||
// and completes after termination of the task when
|
||||
// the pipe receives more data.
|
||||
let read_len = loop {
|
||||
// On some platforms, the readiness detecting mechanism
|
||||
// relies on edge-triggered notifications. This means that
|
||||
// the OS will only notify Tokio when the file descriptor
|
||||
// transitions from not-ready to ready. For this to work
|
||||
// you should first try to read or write and only poll for
|
||||
// readiness if that fails with an error of
|
||||
// std::io::ErrorKind::WouldBlock.
|
||||
match reader.get_mut().read(&mut buffer) {
|
||||
Ok(0) => {
|
||||
log::error!("file unexpectedly closed");
|
||||
return;
|
||||
}
|
||||
Ok(read_len) => break read_len,
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
|
||||
Err(_) => panic!("unexpected read failure"),
|
||||
}
|
||||
|
||||
callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
|
||||
let mut guard = select! {
|
||||
_ = cloned_token.cancelled() => {
|
||||
info!("task is cancelled!");
|
||||
return;
|
||||
},
|
||||
result = reader.readable() => result.unwrap()
|
||||
};
|
||||
|
||||
*state = State::Opened {
|
||||
callbacks: callbacks.clone(),
|
||||
tasks,
|
||||
serial,
|
||||
};
|
||||
guard.clear_ready();
|
||||
};
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(binder::ExceptionCode::ILLEGAL_STATE.into())
|
||||
}
|
||||
// Read the remaining header bytes, if truncated.
|
||||
read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();
|
||||
|
||||
let length = buffer[3] as usize + UWB_HEADER_SIZE;
|
||||
buffer.resize(length, 0);
|
||||
|
||||
// Read the payload bytes.
|
||||
read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();
|
||||
|
||||
client_callbacks.onUciMessage(&buffer).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
|
||||
|
||||
*state = State::Opened {
|
||||
callbacks: callbacks.clone(),
|
||||
_handle: join_handle,
|
||||
serial,
|
||||
death_recipient,
|
||||
token,
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<()> {
|
||||
@@ -126,10 +213,8 @@ impl IUwbChipAsyncServer for UwbChip {
|
||||
|
||||
let mut state = self.state.lock().await;
|
||||
|
||||
if let State::Opened { ref callbacks, .. } = *state {
|
||||
callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
|
||||
*state = State::Closed;
|
||||
Ok(())
|
||||
if matches!(*state, State::Opened { .. }) {
|
||||
state.close()
|
||||
} else {
|
||||
Err(binder::ExceptionCode::ILLEGAL_STATE.into())
|
||||
}
|
||||
@@ -162,7 +247,6 @@ impl IUwbChipAsyncServer for UwbChip {
|
||||
if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
|
||||
serial
|
||||
.write(data)
|
||||
.await
|
||||
.map(|written| written as i32)
|
||||
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user