From 028f27618d990c41b6cea77b81b8e4f695b4e625 Mon Sep 17 00:00:00 2001 From: sadiqsada Date: Tue, 31 Oct 2023 16:09:25 -0700 Subject: [PATCH] Demux thread reads data after filter start Demux thread should read socket data only when there are active filters reading data. When a filter is started, the reading thread on the demux is notified of the active filter, and it starts reading data. When the last filter is stoped, the thread is notified and it stops reading data. Bug: 288170590 Test: manual Change-Id: Idd380bc0d86c445ce9faef8e445d636bbe4e91fc --- tv/tuner/aidl/default/Demux.cpp | 33 ++++++++++++++++++-------------- tv/tuner/aidl/default/Demux.h | 15 +++++++++++++-- tv/tuner/aidl/default/Filter.cpp | 5 +++++ 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp index 34e3442df0..de94467d14 100644 --- a/tv/tuner/aidl/default/Demux.cpp +++ b/tv/tuner/aidl/default/Demux.cpp @@ -53,6 +53,9 @@ void Demux::setTunerService(std::shared_ptr tuner) { Demux::~Demux() { ALOGV("%s", __FUNCTION__); + if (mDemuxIptvReadThread.joinable()) { + mDemuxIptvReadThread.join(); + } close(); } @@ -114,16 +117,26 @@ Demux::~Demux() { } } -void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, - size_t buf_size, int timeout_ms, int buffer_timeout) { +void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { + std::unique_lock lock(mIsIptvThreadRunningMutex); + mIsIptvReadThreadRunning = isIptvThreadRunning; + mIsIptvThreadRunningCv.notify_all(); +} + +void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size, + int timeout_ms, int buffer_timeout) { Timer *timer, *fullBufferTimer; - while (mDemuxIptvReadThreadRunning) { + while (true) { + std::unique_lock lock(mIsIptvThreadRunningMutex); + mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; }); if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) { ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout); delete fullBufferTimer; break; } timer = new Timer(); + void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); + if (buf == nullptr) ALOGI("Buffer allocation failed"); ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms); if (bytes_read == 0) { double elapsed_time = timer->get_elapsed_time_ms(); @@ -157,8 +170,9 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo default: ALOGI("Invalid DVR Status"); } + + free(buf); } - mDemuxIptvReadThreadRunning = false; } ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) { @@ -216,17 +230,8 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo // while thread is alive, keep reading data int timeout_ms = 20; int buffer_timeout = 10000; // 10s - void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); - if (buf == nullptr) ALOGI("malloc buf failed"); - ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE); - ALOGI("Getting FMQ from DVR instance to write socket data"); - mDemuxIptvReadThreadRunning = true; mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer, - buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); - if (mDemuxIptvReadThread.joinable()) { - mDemuxIptvReadThread.join(); - } - free(buf); + IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); } return ::ndk::ScopedAStatus::ok(); } diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h index a23063f8ad..ad7b7a77a1 100644 --- a/tv/tuner/aidl/default/Demux.h +++ b/tv/tuner/aidl/default/Demux.h @@ -103,7 +103,7 @@ class Demux : public BnDemux { void setIsRecording(bool isRecording); bool isRecording(); void startFrontendInputLoop(); - void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size, + void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size, int timeout_ms, int buffer_timeout); /** @@ -124,6 +124,11 @@ class Demux : public BnDemux { void setInUse(bool inUse); void setTunerService(std::shared_ptr tuner); + /** + * Setter for IPTV Reading thread + */ + void setIptvThreadRunning(bool isIptvThreadRunning); + private: // Tuner service std::shared_ptr mTuner; @@ -196,9 +201,15 @@ class Demux : public BnDemux { * If a specific filter's writing loop is still running */ std::atomic mFrontendInputThreadRunning; - std::atomic mDemuxIptvReadThreadRunning; std::atomic mKeepFetchingDataFromFrontend; + /** + * Controls IPTV reading thread status + */ + bool mIsIptvReadThreadRunning; + std::mutex mIsIptvThreadRunningMutex; + std::condition_variable mIsIptvThreadRunningCv; + /** * If the dvr recording is running. */ diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp index d8f5dd5f28..212d329cdc 100644 --- a/tv/tuner/aidl/default/Filter.cpp +++ b/tv/tuner/aidl/default/Filter.cpp @@ -328,6 +328,8 @@ Filter::~Filter() { std::vector events; mFilterCount += 1; + mDemux->setIptvThreadRunning(true); + // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: @@ -365,6 +367,9 @@ Filter::~Filter() { ALOGV("%s", __FUNCTION__); mFilterCount -= 1; + if (mFilterCount == 0) { + mDemux->setIptvThreadRunning(false); + } mFilterThreadRunning = false; if (mFilterThread.joinable()) {