diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp index 34e3442df0..de94467d14 100644 --- a/tv/tuner/aidl/default/Demux.cpp +++ b/tv/tuner/aidl/default/Demux.cpp @@ -53,6 +53,9 @@ void Demux::setTunerService(std::shared_ptr tuner) { Demux::~Demux() { ALOGV("%s", __FUNCTION__); + if (mDemuxIptvReadThread.joinable()) { + mDemuxIptvReadThread.join(); + } close(); } @@ -114,16 +117,26 @@ Demux::~Demux() { } } -void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, - size_t buf_size, int timeout_ms, int buffer_timeout) { +void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { + std::unique_lock lock(mIsIptvThreadRunningMutex); + mIsIptvReadThreadRunning = isIptvThreadRunning; + mIsIptvThreadRunningCv.notify_all(); +} + +void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size, + int timeout_ms, int buffer_timeout) { Timer *timer, *fullBufferTimer; - while (mDemuxIptvReadThreadRunning) { + while (true) { + std::unique_lock lock(mIsIptvThreadRunningMutex); + mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; }); if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) { ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout); delete fullBufferTimer; break; } timer = new Timer(); + void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); + if (buf == nullptr) ALOGI("Buffer allocation failed"); ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms); if (bytes_read == 0) { double elapsed_time = timer->get_elapsed_time_ms(); @@ -157,8 +170,9 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo default: ALOGI("Invalid DVR Status"); } + + free(buf); } - mDemuxIptvReadThreadRunning = false; } ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) { @@ -216,17 +230,8 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo // while thread is alive, keep reading data int timeout_ms = 20; int buffer_timeout = 10000; // 10s - void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); - if (buf == nullptr) ALOGI("malloc buf failed"); - ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE); - ALOGI("Getting FMQ from DVR instance to write socket data"); - mDemuxIptvReadThreadRunning = true; mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer, - buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); - if (mDemuxIptvReadThread.joinable()) { - mDemuxIptvReadThread.join(); - } - free(buf); + IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); } return ::ndk::ScopedAStatus::ok(); } diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h index a23063f8ad..ad7b7a77a1 100644 --- a/tv/tuner/aidl/default/Demux.h +++ b/tv/tuner/aidl/default/Demux.h @@ -103,7 +103,7 @@ class Demux : public BnDemux { void setIsRecording(bool isRecording); bool isRecording(); void startFrontendInputLoop(); - void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size, + void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size, int timeout_ms, int buffer_timeout); /** @@ -124,6 +124,11 @@ class Demux : public BnDemux { void setInUse(bool inUse); void setTunerService(std::shared_ptr tuner); + /** + * Setter for IPTV Reading thread + */ + void setIptvThreadRunning(bool isIptvThreadRunning); + private: // Tuner service std::shared_ptr mTuner; @@ -196,9 +201,15 @@ class Demux : public BnDemux { * If a specific filter's writing loop is still running */ std::atomic mFrontendInputThreadRunning; - std::atomic mDemuxIptvReadThreadRunning; std::atomic mKeepFetchingDataFromFrontend; + /** + * Controls IPTV reading thread status + */ + bool mIsIptvReadThreadRunning; + std::mutex mIsIptvThreadRunningMutex; + std::condition_variable mIsIptvThreadRunningCv; + /** * If the dvr recording is running. */ diff --git a/tv/tuner/aidl/default/Filter.cpp b/tv/tuner/aidl/default/Filter.cpp index d8f5dd5f28..212d329cdc 100644 --- a/tv/tuner/aidl/default/Filter.cpp +++ b/tv/tuner/aidl/default/Filter.cpp @@ -328,6 +328,8 @@ Filter::~Filter() { std::vector events; mFilterCount += 1; + mDemux->setIptvThreadRunning(true); + // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: @@ -365,6 +367,9 @@ Filter::~Filter() { ALOGV("%s", __FUNCTION__); mFilterCount -= 1; + if (mFilterCount == 0) { + mDemux->setIptvThreadRunning(false); + } mFilterThreadRunning = false; if (mFilterThread.joinable()) {