[Thread] Implement read on socket interface

Bug: 313425570
Test: build pass & manual test
Change-Id: I4e6dfdfa73f7145e8f36d05abf1531d7796b4b9e
This commit is contained in:
shihchienc
2024-01-29 10:07:47 +00:00
parent 5b2cd32368
commit 1623351bc0
2 changed files with 143 additions and 0 deletions

View File

@@ -89,6 +89,41 @@ otError SocketInterface::SendFrame(const uint8_t* aFrame, uint16_t aLength) {
return OT_ERROR_NONE;
}
otError SocketInterface::WaitForFrame(uint64_t aTimeoutUs) {
otError error = OT_ERROR_NONE;
struct timeval timeout;
timeout.tv_sec = static_cast<time_t>(aTimeoutUs / US_PER_S);
timeout.tv_usec = static_cast<suseconds_t>(aTimeoutUs % US_PER_S);
fd_set readFds;
fd_set errorFds;
int rval;
FD_ZERO(&readFds);
FD_ZERO(&errorFds);
FD_SET(mSockFd, &readFds);
FD_SET(mSockFd, &errorFds);
rval = TEMP_FAILURE_RETRY(select(mSockFd + 1, &readFds, nullptr, &errorFds, &timeout));
if (rval > 0) {
if (FD_ISSET(mSockFd, &readFds)) {
Read();
} else if (FD_ISSET(mSockFd, &errorFds)) {
DieNowWithMessage("RCP error", OT_EXIT_FAILURE);
} else {
DieNow(OT_EXIT_FAILURE);
}
} else if (rval == 0) {
ExitNow(error = OT_ERROR_RESPONSE_TIMEOUT);
} else {
DieNowWithMessage("wait response", OT_EXIT_FAILURE);
}
exit:
return error;
}
void SocketInterface::UpdateFdSet(void* aMainloopContext) {
otSysMainloopContext* context = reinterpret_cast<otSysMainloopContext*>(aMainloopContext);
@@ -101,12 +136,69 @@ void SocketInterface::UpdateFdSet(void* aMainloopContext) {
}
}
void SocketInterface::Process(const void* aMainloopContext) {
const otSysMainloopContext* context =
reinterpret_cast<const otSysMainloopContext*>(aMainloopContext);
assert(context != nullptr);
if (FD_ISSET(mSockFd, &context->mReadFdSet)) {
Read();
}
}
void SocketInterface::Read(void) {
uint8_t buffer[kMaxFrameSize];
ssize_t rval = TEMP_FAILURE_RETRY(read(mSockFd, buffer, sizeof(buffer)));
if (rval > 0) {
ProcessReceivedData(buffer, static_cast<uint16_t>(rval));
} else if (rval < 0) {
DieNow(OT_EXIT_ERROR_ERRNO);
} else {
otLogCritPlat("Socket connection is closed by remote.");
exit(OT_EXIT_FAILURE);
}
}
void SocketInterface::Write(const uint8_t* aFrame, uint16_t aLength) {
ssize_t rval = TEMP_FAILURE_RETRY(write(mSockFd, aFrame, aLength));
VerifyOrDie(rval >= 0, OT_EXIT_ERROR_ERRNO);
VerifyOrDie(rval > 0, OT_EXIT_FAILURE);
}
void SocketInterface::ProcessReceivedData(const uint8_t* aBuffer, uint16_t aLength) {
while (aLength--) {
uint8_t byte = *aBuffer++;
if (mReceiveFrameBuffer->CanWrite(sizeof(uint8_t))) {
IgnoreError(mReceiveFrameBuffer->WriteByte(byte));
} else {
HandleSocketFrame(this, OT_ERROR_NO_BUFS);
return;
}
}
HandleSocketFrame(this, OT_ERROR_NONE);
}
void SocketInterface::HandleSocketFrame(void* aContext, otError aError) {
static_cast<SocketInterface*>(aContext)->HandleSocketFrame(aError);
}
void SocketInterface::HandleSocketFrame(otError aError) {
VerifyOrExit((mReceiveFrameCallback != nullptr) && (mReceiveFrameBuffer != nullptr));
if (aError == OT_ERROR_NONE) {
mReceiveFrameCallback(mReceiveFrameContext);
} else {
mReceiveFrameBuffer->DiscardFrame();
otLogWarnPlat("Process socket frame failed: %s", otThreadErrorToString(aError));
}
exit:
return;
}
int SocketInterface::OpenFile(const ot::Url::Url& aRadioUrl) {
int fd = -1;
sockaddr_un serverAddress;