diff --git a/tv/tuner/aidl/default/Demux.cpp b/tv/tuner/aidl/default/Demux.cpp index de94467d14..2445a3e2a1 100644 --- a/tv/tuner/aidl/default/Demux.cpp +++ b/tv/tuner/aidl/default/Demux.cpp @@ -123,26 +123,28 @@ void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { mIsIptvThreadRunningCv.notify_all(); } -void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size, - int timeout_ms, int buffer_timeout) { +void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer) { Timer *timer, *fullBufferTimer; while (true) { std::unique_lock lock(mIsIptvThreadRunningMutex); mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; }); - if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) { - ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout); + if (mIsIptvDvrFMQFull && + fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) { + ALOGE("DVR FMQ has not been flushed within timeout of %d ms", + IPTV_PLAYBACK_BUFFER_TIMEOUT); delete fullBufferTimer; break; } timer = new Timer(); void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) ALOGI("Buffer allocation failed"); - ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms); + ssize_t bytes_read = + interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT); if (bytes_read == 0) { double elapsed_time = timer->get_elapsed_time_ms(); - if (elapsed_time > timeout_ms) { + if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) { ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time, - timeout_ms); + IPTV_PLAYBACK_TIMEOUT); } ALOGE("[Demux] Cannot read data from the socket"); delete timer; @@ -206,32 +208,37 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, si // get plugin interface from frontend dtv_plugin* interface = mFrontend->getIptvPluginInterface(); + // if plugin interface is not on frontend, create a new plugin interface if (interface == nullptr) { - ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null"); - return ::ndk::ScopedAStatus::fromServiceSpecificError( - static_cast(Result::INVALID_STATE)); + interface = mFrontend->createIptvPluginInterface(); + if (interface == nullptr) { + ALOGE("[ INFO ] Failed to load plugin."); + return ::ndk::ScopedAStatus::fromServiceSpecificError( + static_cast(Result::INVALID_STATE)); + } } - ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null"); + + // get transport description from frontend + string transport_desc = mFrontend->getIptvTransportDescription(); + if (transport_desc.empty()) { + string content_url = "rtp://127.0.0.1:12345"; + transport_desc = "{ \"uri\": \"" + content_url + "\"}"; + } + ALOGI("[Demux] transport_desc: %s", transport_desc.c_str()); // get streamer object from Frontend instance dtv_streamer* streamer = mFrontend->getIptvPluginStreamer(); if (streamer == nullptr) { - ALOGE("[Demux] getIptvPluginStreamer(): streamer is null"); - return ::ndk::ScopedAStatus::fromServiceSpecificError( - static_cast(Result::INVALID_STATE)); + streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str()); + if (streamer == nullptr) { + ALOGE("[ INFO ] Failed to open stream"); + return ::ndk::ScopedAStatus::fromServiceSpecificError( + static_cast(Result::INVALID_STATE)); + } } - ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null"); - // get transport description from frontend - string transport_desc = mFrontend->getIptvTransportDescription(); - ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str()); - - // call read_stream on the socket to populate the buffer with TS data - // while thread is alive, keep reading data - int timeout_ms = 20; - int buffer_timeout = 10000; // 10s - mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer, - IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); + mDemuxIptvReadThread = + std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer); } return ::ndk::ScopedAStatus::ok(); } diff --git a/tv/tuner/aidl/default/Demux.h b/tv/tuner/aidl/default/Demux.h index ad7b7a77a1..b8d57dffea 100644 --- a/tv/tuner/aidl/default/Demux.h +++ b/tv/tuner/aidl/default/Demux.h @@ -56,6 +56,9 @@ class Frontend; class TimeFilter; class Tuner; +const int IPTV_PLAYBACK_TIMEOUT = 20; // ms +const int IPTV_PLAYBACK_BUFFER_TIMEOUT = 20000; // ms + class DvrPlaybackCallback : public BnDvrCallback { public: virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override { @@ -103,8 +106,7 @@ class Demux : public BnDemux { void setIsRecording(bool isRecording); bool isRecording(); void startFrontendInputLoop(); - void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size, - int timeout_ms, int buffer_timeout); + void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer); /** * A dispatcher to read and dispatch input data to all the started filters. diff --git a/tv/tuner/aidl/default/Frontend.cpp b/tv/tuner/aidl/default/Frontend.cpp index 57ed1ba4f9..5cee3c7c59 100644 --- a/tv/tuner/aidl/default/Frontend.cpp +++ b/tv/tuner/aidl/default/Frontend.cpp @@ -215,8 +215,31 @@ Frontend::~Frontend() { return ::ndk::ScopedAStatus::ok(); } -void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms) { - ssize_t bytes_read = mIptvPluginInterface->read_stream(streamer, buf, buf_size, timeout_ms); +dtv_plugin* Frontend::createIptvPluginInterface() { + const char* path = "/vendor/lib/iptv_udp_plugin.so"; + DtvPlugin* plugin = new DtvPlugin(path); + bool plugin_loaded = plugin->load(); + if (!plugin_loaded) { + ALOGE("Failed to load plugin"); + return nullptr; + } + return plugin->interface(); +} + +dtv_streamer* Frontend::createIptvPluginStreamer(dtv_plugin* interface, + const char* transport_desc) { + dtv_streamer* streamer = interface->create_streamer(); + int open_fd = interface->open_stream(streamer, transport_desc); + if (open_fd < 0) { + return nullptr; + } + ALOGI("[ INFO ] open_stream successful, open_fd=%d", open_fd); + return streamer; +} + +void Frontend::readTuneByte(void* buf) { + ssize_t bytes_read = mIptvPluginInterface->read_stream(mIptvPluginStreamer, buf, + TUNE_BUFFER_SIZE, TUNE_BUFFER_TIMEOUT); if (bytes_read <= 0) { ALOGI("[ ERROR ] Tune byte couldn't be read."); return; @@ -242,53 +265,39 @@ void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size, ALOGI("[ INFO ] Frontend type is set to IPTV, tag = %d id=%d", in_settings.getTag(), mId); - // load udp plugin for reading TS data - const char* path = "/vendor/lib/iptv_udp_plugin.so"; - DtvPlugin* plugin = new DtvPlugin(path); - if (!plugin) { - ALOGE("Failed to create DtvPlugin, plugin_path is invalid"); + mIptvPluginInterface = createIptvPluginInterface(); + if (mIptvPluginInterface == nullptr) { + ALOGE("[ INFO ] Failed to load plugin."); return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } - bool plugin_loaded = plugin->load(); - if (!plugin_loaded) { - ALOGE("Failed to load plugin"); - return ::ndk::ScopedAStatus::fromServiceSpecificError( - static_cast(Result::INVALID_ARGUMENT)); - } - mIptvPluginInterface = plugin->interface(); // validate content_url format std::string content_url = in_settings.get()->contentUrl; - std::string transport_desc = "{ \"uri\": \"" + content_url + "\"}"; - ALOGI("[ INFO ] transport_desc: %s", transport_desc.c_str()); - bool is_transport_desc_valid = plugin->validate(transport_desc.c_str()); + mIptvTransportDescription = "{ \"uri\": \"" + content_url + "\"}"; + ALOGI("[ INFO ] transport_desc: %s", mIptvTransportDescription.c_str()); + bool is_transport_desc_valid = + mIptvPluginInterface->validate(mIptvTransportDescription.c_str()); if (!is_transport_desc_valid) { // not of format protocol://ip:port ALOGE("[ INFO ] transport_desc is not valid"); return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } - mIptvTransportDescription = transport_desc; // create a streamer and open it for reading data - dtv_streamer* streamer = mIptvPluginInterface->create_streamer(); - mIptvPluginStreamer = streamer; - int open_fd = mIptvPluginInterface->open_stream(streamer, transport_desc.c_str()); - if (open_fd < 0) { - ALOGE("[ INFO ] could not open stream"); - return ::ndk::ScopedAStatus::fromServiceSpecificError( - static_cast(Result::INVALID_ARGUMENT)); - } - ALOGI("[ INFO ] open_stream successful, open_fd=%d", open_fd); + mIptvPluginStreamer = + createIptvPluginStreamer(mIptvPluginInterface, mIptvTransportDescription.c_str()); - size_t buf_size = 1; - int timeout_ms = 2000; - void* buf = malloc(sizeof(char) * buf_size); - if (buf == nullptr) ALOGI("malloc buf failed [TUNE]"); - ALOGI("[ INFO ] [Tune] Allocated buffer of size %zu", buf_size); - mIptvFrontendTuneThread = - std::thread(&Frontend::readTuneByte, this, streamer, buf, buf_size, timeout_ms); - if (mIptvFrontendTuneThread.joinable()) mIptvFrontendTuneThread.join(); + void* buf = malloc(sizeof(char) * TUNE_BUFFER_SIZE); + if (buf == nullptr) { + ALOGE("Failed to allocate 1 byte buffer for tuning."); + return ::ndk::ScopedAStatus::fromServiceSpecificError( + static_cast(Result::INVALID_STATE)); + } + mIptvFrontendTuneThread = std::thread(&Frontend::readTuneByte, this, buf); + if (mIptvFrontendTuneThread.joinable()) { + mIptvFrontendTuneThread.join(); + } free(buf); } diff --git a/tv/tuner/aidl/default/Frontend.h b/tv/tuner/aidl/default/Frontend.h index 17a1aeeb40..7622ec0813 100644 --- a/tv/tuner/aidl/default/Frontend.h +++ b/tv/tuner/aidl/default/Frontend.h @@ -33,6 +33,9 @@ namespace tuner { class Tuner; +const int TUNE_BUFFER_SIZE = 1; // byte +const int TUNE_BUFFER_TIMEOUT = 2000; // ms + class Frontend : public BnFrontend { public: Frontend(FrontendType type, int32_t id); @@ -64,7 +67,9 @@ class Frontend : public BnFrontend { dtv_plugin* getIptvPluginInterface(); string getIptvTransportDescription(); dtv_streamer* getIptvPluginStreamer(); - void readTuneByte(dtv_streamer* streamer, void* buf, size_t size, int timeout_ms); + void readTuneByte(void* buf); + dtv_streamer* createIptvPluginStreamer(dtv_plugin* interface, const char* transport_desc); + dtv_plugin* createIptvPluginInterface(); bool isLocked(); void getFrontendInfo(FrontendInfo* _aidl_return); void setTunerService(std::shared_ptr tuner);