diff --git a/tv/tuner/1.0/default/Demux.cpp b/tv/tuner/1.0/default/Demux.cpp index 4016c5a0e9..889e42ed06 100644 --- a/tv/tuner/1.0/default/Demux.cpp +++ b/tv/tuner/1.0/default/Demux.cpp @@ -73,34 +73,6 @@ Demux::Demux(uint32_t demuxId) { Demux::~Demux() {} -bool Demux::createAndSaveMQ(uint32_t bufferSize, uint32_t filterId) { - ALOGV("%s", __FUNCTION__); - - // Create a synchronized FMQ that supports blocking read/write - std::unique_ptr tmpFilterMQ = - std::unique_ptr(new (std::nothrow) FilterMQ(bufferSize, true)); - if (!tmpFilterMQ->isValid()) { - ALOGW("Failed to create FMQ of filter with id: %d", filterId); - return false; - } - - mFilterMQs.resize(filterId + 1); - mFilterMQs[filterId] = std::move(tmpFilterMQ); - - EventFlag* mFilterEventFlag; - if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &mFilterEventFlag) != - OK) { - return false; - } - mFilterEventFlags.resize(filterId + 1); - mFilterEventFlags[filterId] = mFilterEventFlag; - mFilterWriteCount.resize(filterId + 1); - mFilterWriteCount[filterId] = 0; - mThreadRunning.resize(filterId + 1); - - return true; -} - Return Demux::setFrontendDataSource(uint32_t frontendId) { ALOGV("%s", __FUNCTION__); @@ -113,23 +85,42 @@ Return Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, const sp& cb, addFilter_cb _hidl_cb) { ALOGV("%s", __FUNCTION__); - uint32_t filterId = mLastUsedFilterId + 1; - mLastUsedFilterId += 1; + uint32_t filterId; + + if (!mUnusedFilterIds.empty()) { + filterId = *mUnusedFilterIds.begin(); + + mUnusedFilterIds.erase(filterId); + } else { + filterId = ++mLastUsedFilterId; + + mDemuxCallbacks.resize(filterId + 1); + mFilterMQs.resize(filterId + 1); + mFilterEvents.resize(filterId + 1); + mFilterEventFlags.resize(filterId + 1); + mFilterThreadRunning.resize(filterId + 1); + mFilterThreads.resize(filterId + 1); + } + + mUsedFilterIds.insert(filterId); if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) { ALOGW("callback can't be null"); _hidl_cb(Result::INVALID_ARGUMENT, filterId); return Void(); } + // Add callback - mDemuxCallbacks.resize(filterId + 1); mDemuxCallbacks[filterId] = cb; - // Mapping from the filter ID to the filter type - mFilterTypes.resize(filterId + 1); - mFilterTypes[filterId] = type; + // Mapping from the filter ID to the filter event + DemuxFilterEvent event{ + .filterId = filterId, + .filterType = type, + }; + mFilterEvents[filterId] = event; - if (!createAndSaveMQ(bufferSize, filterId)) { + if (!createFilterMQ(bufferSize, filterId)) { _hidl_cb(Result::UNKNOWN_ERROR, -1); return Void(); } @@ -141,8 +132,8 @@ Return Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, Return Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) { ALOGV("%s", __FUNCTION__); - if (filterId < 0 || filterId > mLastUsedFilterId) { - ALOGW("No filter with id: %d exists", filterId); + if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) { + ALOGW("No filter with id: %d exists to get desc", filterId); _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor()); return Void(); } @@ -160,35 +151,29 @@ Return Demux::configureFilter(uint32_t /* filterId */, Return Demux::startFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); + Result result; - if (filterId < 0 || filterId > mLastUsedFilterId) { - ALOGW("No filter with id: %d exists", filterId); + if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) { + ALOGW("No filter with id: %d exists to start filter", filterId); return Result::INVALID_ARGUMENT; } - DemuxFilterType filterType = mFilterTypes[filterId]; - Result result; - DemuxFilterEvent event{ - .filterId = filterId, - .filterType = filterType, - }; - - switch (filterType) { + switch (mFilterEvents[filterId].filterType) { case DemuxFilterType::SECTION: - result = startSectionFilterHandler(event); + result = startFilterLoop(filterId); break; case DemuxFilterType::PES: - result = startPesFilterHandler(event); + result = startPesFilterHandler(filterId); break; case DemuxFilterType::TS: result = startTsFilterHandler(); return Result::SUCCESS; case DemuxFilterType::AUDIO: case DemuxFilterType::VIDEO: - result = startMediaFilterHandler(event); + result = startMediaFilterHandler(filterId); break; case DemuxFilterType::RECORD: - result = startRecordFilterHandler(event); + result = startRecordFilterHandler(filterId); break; case DemuxFilterType::PCR: result = startPcrFilterHandler(); @@ -212,9 +197,13 @@ Return Demux::flushFilter(uint32_t /* filterId */) { return Result::SUCCESS; } -Return Demux::removeFilter(uint32_t /* filterId */) { +Return Demux::removeFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); + // resetFilterRecords(filterId); + mUsedFilterIds.erase(filterId); + mUnusedFilterIds.insert(filterId); + return Result::SUCCESS; } @@ -239,25 +228,291 @@ Return Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb Return Demux::close() { ALOGV("%s", __FUNCTION__); + set::iterator it; + mInputThread = 0; + mOutputThread = 0; + mFilterThreads.clear(); + mUnusedFilterIds.clear(); + mUsedFilterIds.clear(); + mDemuxCallbacks.clear(); + mFilterMQs.clear(); + mFilterEvents.clear(); + mFilterEventFlags.clear(); + mLastUsedFilterId = -1; + return Result::SUCCESS; } -bool Demux::writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum) { - event.events.resize(sectionNum); - for (int i = 0; i < sectionNum; i++) { - DemuxFilterSectionEvent secEvent; - secEvent = { - // temp dump meta data - .tableId = 0, - .version = 1, - .sectionNum = 1, - .dataLength = 530, - }; - event.events[i].section(secEvent); - if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) { - return false; - } +Return Demux::addOutput(uint32_t bufferSize, const sp& cb) { + ALOGV("%s", __FUNCTION__); + + // Create a synchronized FMQ that supports blocking read/write + std::unique_ptr tmpFilterMQ = + std::unique_ptr(new (std::nothrow) FilterMQ(bufferSize, true)); + if (!tmpFilterMQ->isValid()) { + ALOGW("Failed to create output FMQ"); + return Result::UNKNOWN_ERROR; } + + mOutputMQ = std::move(tmpFilterMQ); + + if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) { + return Result::UNKNOWN_ERROR; + } + + mOutputCallback = cb; + + return Result::SUCCESS; +} + +Return Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) { + ALOGV("%s", __FUNCTION__); + + if (!mOutputMQ) { + _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor()); + return Void(); + } + + _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc()); + return Void(); +} + +Return Demux::configureOutput(const DemuxOutputSettings& /* settings */) { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::attachOutputTsFilter(uint32_t /*filterId*/) { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::detachOutputTsFilter(uint32_t /* filterId */) { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::startOutput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::stopOutput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::flushOutput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::removeOutput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::addInput(uint32_t bufferSize, const sp& cb) { + ALOGV("%s", __FUNCTION__); + + // Create a synchronized FMQ that supports blocking read/write + std::unique_ptr tmpInputMQ = + std::unique_ptr(new (std::nothrow) FilterMQ(bufferSize, true)); + if (!tmpInputMQ->isValid()) { + ALOGW("Failed to create input FMQ"); + return Result::UNKNOWN_ERROR; + } + + mInputMQ = std::move(tmpInputMQ); + + if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) { + return Result::UNKNOWN_ERROR; + } + + mInputCallback = cb; + + return Result::SUCCESS; +} + +Return Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) { + ALOGV("%s", __FUNCTION__); + + if (!mInputMQ) { + _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor()); + return Void(); + } + + _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc()); + return Void(); +} + +Return Demux::configureInput(const DemuxInputSettings& /* settings */) { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::startInput() { + ALOGV("%s", __FUNCTION__); + + pthread_create(&mInputThread, NULL, __threadLoopInput, this); + pthread_setname_np(mInputThread, "demux_input_waiting_loop"); + + // TODO start another thread to send filter status callback to the framework + + return Result::SUCCESS; +} + +Return Demux::stopInput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::flushInput() { + ALOGV("%s", __FUNCTION__); + + return Result::SUCCESS; +} + +Return Demux::removeInput() { + ALOGV("%s", __FUNCTION__); + + mInputMQ = nullptr; + + return Result::SUCCESS; +} + +Result Demux::startFilterLoop(uint32_t filterId) { + struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs)); + threadArgs->user = this; + threadArgs->filterId = filterId; + + pthread_t mFilterThread; + pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs); + mFilterThreads[filterId] = mFilterThread; + pthread_setname_np(mFilterThread, "demux_filter_waiting_loop"); + + return Result::SUCCESS; +} + +Result Demux::startSectionFilterHandler(uint32_t filterId, vector data) { + if (!writeSectionsAndCreateEvent(filterId, data)) { + ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId); + return Result::UNKNOWN_ERROR; + } + + return Result::SUCCESS; +} + +Result Demux::startPesFilterHandler(uint32_t filterId) { + // TODO generate multiple events in one event callback + DemuxFilterPesEvent pesEvent; + pesEvent = { + // temp dump meta data + .streamId = 0, + .dataLength = 530, + }; + mFilterEvents[filterId].events.resize(1); + mFilterEvents[filterId].events[0].pes(pesEvent); + /*pthread_create(&mThreadId, NULL, __threadLoop, this); + pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/ + if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) { + return Result::INVALID_STATE; + } + + if (mDemuxCallbacks[filterId] == nullptr) { + return Result::NOT_INITIALIZED; + } + + mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); + return Result::SUCCESS; +} + +Result Demux::startTsFilterHandler() { + // TODO handle starting TS filter + return Result::SUCCESS; +} + +Result Demux::startMediaFilterHandler(uint32_t filterId) { + DemuxFilterMediaEvent mediaEvent; + mediaEvent = { + // temp dump meta data + .pts = 0, + .dataLength = 530, + .secureMemory = nullptr, + }; + mFilterEvents[filterId].events.resize(1); + mFilterEvents[filterId].events[0].media() = mediaEvent; + // TODO handle write FQM for media stream + return Result::SUCCESS; +} + +Result Demux::startRecordFilterHandler(uint32_t filterId) { + DemuxFilterRecordEvent recordEvent; + recordEvent = { + // temp dump meta data + .tpid = 0, + .packetNum = 0, + }; + recordEvent.indexMask.tsIndexMask() = 0x01; + mFilterEvents[filterId].events.resize(1); + mFilterEvents[filterId].events[0].ts() = recordEvent; + return Result::SUCCESS; +} + +Result Demux::startPcrFilterHandler() { + // TODO handle starting PCR filter + return Result::SUCCESS; +} + +bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) { + ALOGV("%s", __FUNCTION__); + + // Create a synchronized FMQ that supports blocking read/write + std::unique_ptr tmpFilterMQ = + std::unique_ptr(new (std::nothrow) FilterMQ(bufferSize, true)); + if (!tmpFilterMQ->isValid()) { + ALOGW("Failed to create FMQ of filter with id: %d", filterId); + return false; + } + + mFilterMQs[filterId] = std::move(tmpFilterMQ); + + EventFlag* filterEventFlag; + if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) != + OK) { + return false; + } + mFilterEventFlags[filterId] = filterEventFlag; + + return true; +} + +bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector data) { + // TODO check how many sections has been read + std::lock_guard lock(mFilterEventLock); + int size = mFilterEvents[filterId].events.size(); + mFilterEvents[filterId].events.resize(size + 1); + if (!writeDataToFilterMQ(data, filterId)) { + return false; + } + DemuxFilterSectionEvent secEvent; + secEvent = { + // temp dump meta data + .tableId = 0, + .version = 1, + .sectionNum = 1, + .dataLength = 530, + }; + mFilterEvents[filterId].events[size].section(secEvent); return true; } @@ -269,116 +524,82 @@ bool Demux::writeDataToFilterMQ(const std::vector& data, uint32_t filte return false; } -Result Demux::startSectionFilterHandler(DemuxFilterEvent event) { - struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs)); - threadArgs->user = this; - threadArgs->event = &event; +bool Demux::filterAndOutputData() { + ALOGD("[Demux] start to dispatch data to filters"); + // Read input data from the input FMQ + int size = mInputMQ->availableToRead(); + vector dataOutputBuffer; + dataOutputBuffer.resize(size); + mInputMQ->read(dataOutputBuffer.data(), size); - pthread_create(&mThreadId, NULL, __threadLoop, (void*)threadArgs); - pthread_setname_np(mThreadId, "demux_filter_waiting_loop"); - - return Result::SUCCESS; -} - -Result Demux::startPesFilterHandler(DemuxFilterEvent& event) { - // TODO generate multiple events in one event callback - DemuxFilterPesEvent pesEvent; - pesEvent = { - // temp dump meta data - .streamId = 0, - .dataLength = 530, - }; - event.events.resize(1); - event.events[0].pes(pesEvent); - /*pthread_create(&mThreadId, NULL, __threadLoop, this); - pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/ - if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) { - return Result::INVALID_STATE; + Result result; + // Filter the data and feed the output to each filter + set::iterator it; + for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) { + switch (mFilterEvents[*it].filterType) { + case DemuxFilterType::SECTION: + result = startSectionFilterHandler(*it, dataOutputBuffer); + break; + case DemuxFilterType::PES: + result = startPesFilterHandler(*it); + break; + case DemuxFilterType::TS: + result = startTsFilterHandler(); + break; + case DemuxFilterType::AUDIO: + case DemuxFilterType::VIDEO: + result = startMediaFilterHandler(*it); + break; + case DemuxFilterType::RECORD: + result = startRecordFilterHandler(*it); + break; + case DemuxFilterType::PCR: + result = startPcrFilterHandler(); + break; + default: + return false; + } } - if (mDemuxCallbacks[event.filterId] == nullptr) { - return Result::NOT_INITIALIZED; - } - - mDemuxCallbacks[event.filterId]->onFilterEvent(event); - return Result::SUCCESS; + return result == Result::SUCCESS; } -Result Demux::startTsFilterHandler() { - // TODO handle starting TS filter - return Result::SUCCESS; -} - -Result Demux::startMediaFilterHandler(DemuxFilterEvent& event) { - DemuxFilterMediaEvent mediaEvent; - mediaEvent = { - // temp dump meta data - .pts = 0, - .dataLength = 530, - .secureMemory = nullptr, - }; - event.events.resize(1); - event.events[0].media() = mediaEvent; - // TODO handle write FQM for media stream - return Result::SUCCESS; -} - -Result Demux::startRecordFilterHandler(DemuxFilterEvent& event) { - DemuxFilterRecordEvent recordEvent; - recordEvent = { - // temp dump meta data - .tpid = 0, - .packetNum = 0, - }; - recordEvent.indexMask.tsIndexMask() = 0x01; - event.events.resize(1); - event.events[0].ts() = recordEvent; - return Result::SUCCESS; -} - -Result Demux::startPcrFilterHandler() { - // TODO handle starting PCR filter - return Result::SUCCESS; -} - -void* Demux::__threadLoop(void* threadArg) { +void* Demux::__threadLoopFilter(void* threadArg) { Demux* const self = static_cast(((struct ThreadArgs*)threadArg)->user); - self->filterThreadLoop(((struct ThreadArgs*)threadArg)->event); + self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId); return 0; } -void Demux::filterThreadLoop(DemuxFilterEvent* event) { - uint32_t filterId = event->filterId; - ALOGD("[Demux] filter %d threadLoop start.", filterId); - mThreadRunning[filterId] = true; +void* Demux::__threadLoopInput(void* user) { + Demux* const self = static_cast(user); + self->inputThreadLoop(); + return 0; +} - while (mThreadRunning[filterId]) { +void Demux::filterThreadLoop(uint32_t filterId) { + ALOGD("[Demux] filter %d threadLoop start.", filterId); + mFilterThreadRunning[filterId] = true; + + // For the first time of filter output, implementation needs to send the filter + // Event Callback without waiting for the DATA_CONSUMED to init the process. + while (mFilterThreadRunning[filterId]) { + if (mFilterEvents[filterId].events.size() == 0) { + ALOGD("[Demux] wait for filter data output."); + usleep(1000 * 1000); + continue; + } + // After successfully write, send a callback and wait for the read to be done + mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); + mFilterEvents[filterId].events.resize(0); + break; + } + + while (mFilterThreadRunning[filterId]) { uint32_t efState = 0; // We do not wait for the last round of writen data to be read to finish the thread // because the VTS can verify the reading itself. for (int i = 0; i < SECTION_WRITE_COUNT; i++) { - DemuxFilterEvent filterEvent{ - .filterId = filterId, - .filterType = event->filterType, - }; - if (!writeSectionsAndCreateEvent(filterEvent, 2)) { - ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId); - break; - } - mFilterWriteCount[filterId]++; - if (mDemuxCallbacks[filterId] == nullptr) { - ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); - break; - } - // After successfully write, send a callback and wait for the read to be done - mDemuxCallbacks[filterId]->onFilterEvent(filterEvent); - // We do not wait for the last read to be done - // VTS can verify the read result itself. - if (i == SECTION_WRITE_COUNT - 1) { - ALOGD("[Demux] filter %d writing done. Ending thread", filterId); - break; - } - while (mThreadRunning[filterId]) { + while (mFilterThreadRunning[filterId]) { status_t status = mFilterEventFlags[filterId]->wait( static_cast(DemuxQueueNotifyBits::DATA_CONSUMED), &efState, WAIT_TIMEOUT, true /* retry on spurious wake */); @@ -388,15 +609,60 @@ void Demux::filterThreadLoop(DemuxFilterEvent* event) { } break; } - } - mFilterWriteCount[filterId] = 0; - mThreadRunning[filterId] = false; + if (mDemuxCallbacks[filterId] == nullptr) { + ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); + break; + } + + while (mFilterThreadRunning[filterId]) { + std::lock_guard lock(mFilterEventLock); + if (mFilterEvents[filterId].events.size() == 0) { + continue; + } + // After successfully write, send a callback and wait for the read to be done + mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); + mFilterEvents[filterId].events.resize(0); + break; + } + // We do not wait for the last read to be done + // VTS can verify the read result itself. + if (i == SECTION_WRITE_COUNT - 1) { + ALOGD("[Demux] filter %d writing done. Ending thread", filterId); + break; + } + } + mFilterThreadRunning[filterId] = false; } ALOGD("[Demux] filter thread ended."); } +void Demux::inputThreadLoop() { + ALOGD("[Demux] input threadLoop start."); + mInputThreadRunning = true; + + while (mInputThreadRunning) { + uint32_t efState = 0; + status_t status = + mInputEventFlag->wait(static_cast(DemuxQueueNotifyBits::DATA_READY), + &efState, WAIT_TIMEOUT, true /* retry on spurious wake */); + if (status != OK) { + ALOGD("[Demux] wait for data ready on the input FMQ"); + continue; + } + // Our current implementation filter the data and write it into the filter FMQ immedaitely + // after the DATA_READY from the VTS/framework + if (!filterAndOutputData()) { + ALOGD("[Demux] input data failed to be filtered. Ending thread"); + break; + } + } + + mInputThreadRunning = false; + ALOGD("[Demux] input thread ended."); +} + } // namespace implementation } // namespace V1_0 } // namespace tuner diff --git a/tv/tuner/1.0/default/Demux.h b/tv/tuner/1.0/default/Demux.h index 8b002669dd..2fdde8dcf8 100644 --- a/tv/tuner/1.0/default/Demux.h +++ b/tv/tuner/1.0/default/Demux.h @@ -19,6 +19,7 @@ #include #include +#include using namespace std; @@ -43,6 +44,8 @@ class Demux : public IDemux { public: Demux(uint32_t demuxId); + ~Demux(); + virtual Return setFrontendDataSource(uint32_t frontendId) override; virtual Return close() override; @@ -68,8 +71,58 @@ class Demux : public IDemux { virtual Return getAvSyncTime(AvSyncHwId avSyncHwId, getAvSyncTime_cb _hidl_cb) override; + virtual Return addInput(uint32_t bufferSize, const sp& cb) override; + + virtual Return getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) override; + + virtual Return configureInput(const DemuxInputSettings& settings) override; + + virtual Return startInput() override; + + virtual Return stopInput() override; + + virtual Return flushInput() override; + + virtual Return removeInput() override; + + virtual Return addOutput(uint32_t bufferSize, const sp& cb) override; + + virtual Return getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) override; + + virtual Return configureOutput(const DemuxOutputSettings& settings) override; + + virtual Return attachOutputTsFilter(uint32_t filterId) override; + + virtual Return detachOutputTsFilter(uint32_t filterId) override; + + virtual Return startOutput() override; + + virtual Return stopOutput() override; + + virtual Return flushOutput() override; + + virtual Return removeOutput() override; + private: - virtual ~Demux(); + // A struct that passes the arguments to a newly created filter thread + struct ThreadArgs { + Demux* user; + uint32_t filterId; + }; + + /** + * Filter handlers to handle the data filtering. + * They are also responsible to write the filtered output into the filter FMQ + * and update the filterEvent bound with the same filterId. + */ + Result startSectionFilterHandler(uint32_t filterId, vector data); + Result startPesFilterHandler(uint32_t filterId); + Result startTsFilterHandler(); + Result startMediaFilterHandler(uint32_t filterId); + Result startRecordFilterHandler(uint32_t filterId); + Result startPcrFilterHandler(); + Result startFilterLoop(uint32_t filterId); + /** * To create a FilterMQ with the the next available Filter ID. * Creating Event Flag at the same time. @@ -77,60 +130,80 @@ class Demux : public IDemux { * * Return false is any of the above processes fails. */ - bool createAndSaveMQ(uint32_t bufferSize, uint32_t filterId); + bool createFilterMQ(uint32_t bufferSize, uint32_t filterId); + bool createMQ(FilterMQ* queue, EventFlag* eventFlag, uint32_t bufferSize); void deleteEventFlag(); bool writeDataToFilterMQ(const std::vector& data, uint32_t filterId); - Result startSectionFilterHandler(DemuxFilterEvent event); - Result startPesFilterHandler(DemuxFilterEvent& event); - Result startTsFilterHandler(); - Result startMediaFilterHandler(DemuxFilterEvent& event); - Result startRecordFilterHandler(DemuxFilterEvent& event); - Result startPcrFilterHandler(); - bool writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum); - void filterThreadLoop(DemuxFilterEvent* event); - static void* __threadLoop(void* data); + bool readDataFromMQ(); + bool writeSectionsAndCreateEvent(uint32_t filterId, vector data); + /** + * A dispatcher to read and dispatch input data to all the started filters. + * Each filter handler handles the data filtering/output writing/filterEvent updating. + */ + bool filterAndOutputData(); + static void* __threadLoopFilter(void* data); + static void* __threadLoopInput(void* user); + void filterThreadLoop(uint32_t filterId); + void inputThreadLoop(); uint32_t mDemuxId; uint32_t mSourceFrontendId; /** - * Record the last used filer id. Initial value is -1. + * Record the last used filter id. Initial value is -1. * Filter Id starts with 0. */ uint32_t mLastUsedFilterId = -1; + /** + * Record all the used filter Ids. + * Any removed filter id should be removed from this set. + */ + set mUsedFilterIds; + /** + * Record all the unused filter Ids within mLastUsedFilterId. + * Removed filter Id should be added into this set. + * When this set is not empty, ids here should be allocated first + * and added into usedFilterIds. + */ + set mUnusedFilterIds; /** * A list of created FilterMQ ptrs. * The array number is the filter ID. */ vector> mFilterMQs; - vector mFilterTypes; vector mFilterEventFlags; + vector mFilterEvents; + unique_ptr mInputMQ; + unique_ptr mOutputMQ; + EventFlag* mInputEventFlag; + EventFlag* mOutputEventFlag; /** * Demux callbacks used on filter events or IO buffer status */ vector> mDemuxCallbacks; - /** - * How many times a specific filter has written since started - */ - vector mFilterWriteCount; - pthread_t mThreadId = 0; + sp mInputCallback; + sp mOutputCallback; + // Thread handlers + pthread_t mInputThread; + pthread_t mOutputThread; + vector mFilterThreads; /** * If a specific filter's writing loop is still running */ - vector mThreadRunning; + vector mFilterThreadRunning; + bool mInputThreadRunning; /** * Lock to protect writes to the FMQs */ std::mutex mWriteLock; + /** + * Lock to protect writes to the filter event + */ + std::mutex mFilterEventLock; /** * How many times a filter should write * TODO make this dynamic/random/can take as a parameter */ const uint16_t SECTION_WRITE_COUNT = 10; - // A struct that passes the arguments to a newly created filter thread - struct ThreadArgs { - Demux* user; - DemuxFilterEvent* event; - }; }; } // namespace implementation