Adding a TS filter functionality into the Demux default impl am: c13371c650

am: 9505fa8f8a

Change-Id: I46e2ef5089544712bf0f5a0a535ab4840206d999
This commit is contained in:
Amy
2019-10-08 15:51:28 -07:00
committed by android-build-merger
8 changed files with 175 additions and 74 deletions

View File

@@ -100,6 +100,8 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
mFilterEventFlags.resize(filterId + 1);
mFilterThreadRunning.resize(filterId + 1);
mFilterThreads.resize(filterId + 1);
mFilterPids.resize(filterId + 1);
mFilterOutputs.resize(filterId + 1);
}
mUsedFilterIds.insert(filterId);
@@ -142,10 +144,34 @@ Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb
return Void();
}
Return<Result> Demux::configureFilter(uint32_t /* filterId */,
const DemuxFilterSettings& /* settings */) {
Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
ALOGV("%s", __FUNCTION__);
switch (mFilterEvents[filterId].filterType) {
case DemuxFilterType::SECTION:
mFilterPids[filterId] = settings.section().tpid;
break;
case DemuxFilterType::PES:
mFilterPids[filterId] = settings.pesData().tpid;
break;
case DemuxFilterType::TS:
mFilterPids[filterId] = settings.ts().tpid;
break;
case DemuxFilterType::AUDIO:
mFilterPids[filterId] = settings.audio().tpid;
break;
case DemuxFilterType::VIDEO:
mFilterPids[filterId] = settings.video().tpid;
break;
case DemuxFilterType::RECORD:
mFilterPids[filterId] = settings.record().tpid;
break;
case DemuxFilterType::PCR:
mFilterPids[filterId] = settings.pcr().tpid;
break;
default:
return Result::UNKNOWN_ERROR;
}
return Result::SUCCESS;
}
@@ -158,36 +184,16 @@ Return<Result> Demux::startFilter(uint32_t filterId) {
return Result::INVALID_ARGUMENT;
}
switch (mFilterEvents[filterId].filterType) {
case DemuxFilterType::SECTION:
result = startFilterLoop(filterId);
break;
case DemuxFilterType::PES:
result = startPesFilterHandler(filterId);
break;
case DemuxFilterType::TS:
result = startTsFilterHandler();
return Result::SUCCESS;
case DemuxFilterType::AUDIO:
case DemuxFilterType::VIDEO:
result = startMediaFilterHandler(filterId);
break;
case DemuxFilterType::RECORD:
result = startRecordFilterHandler(filterId);
break;
case DemuxFilterType::PCR:
result = startPcrFilterHandler();
return Result::SUCCESS;
default:
return Result::UNKNOWN_ERROR;
}
result = startFilterLoop(filterId);
return result;
}
Return<Result> Demux::stopFilter(uint32_t /* filterId */) {
Return<Result> Demux::stopFilter(uint32_t filterId) {
ALOGV("%s", __FUNCTION__);
mFilterThreadRunning[filterId] = false;
return Result::SUCCESS;
}
@@ -238,6 +244,8 @@ Return<Result> Demux::close() {
mFilterMQs.clear();
mFilterEvents.clear();
mFilterEventFlags.clear();
mFilterOutputs.clear();
mFilterPids.clear();
mLastUsedFilterId = -1;
return Result::SUCCESS;
@@ -277,19 +285,21 @@ Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
return Void();
}
Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
ALOGV("%s", __FUNCTION__);
mOutputConfigured = true;
mOutputSettings = settings;
return Result::SUCCESS;
}
Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
@@ -353,15 +363,26 @@ Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
return Void();
}
Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
ALOGV("%s", __FUNCTION__);
mInputConfigured = true;
mInputSettings = settings;
return Result::SUCCESS;
}
Return<Result> Demux::startInput() {
ALOGV("%s", __FUNCTION__);
if (!mInputCallback) {
return Result::NOT_INITIALIZED;
}
if (!mInputConfigured) {
return Result::INVALID_STATE;
}
pthread_create(&mInputThread, NULL, __threadLoopInput, this);
pthread_setname_np(mInputThread, "demux_input_waiting_loop");
@@ -373,6 +394,8 @@ Return<Result> Demux::startInput() {
Return<Result> Demux::stopInput() {
ALOGV("%s", __FUNCTION__);
mInputThreadRunning = false;
return Result::SUCCESS;
}
@@ -403,36 +426,43 @@ Result Demux::startFilterLoop(uint32_t filterId) {
return Result::SUCCESS;
}
Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
if (!writeSectionsAndCreateEvent(filterId, data)) {
Result Demux::startSectionFilterHandler(uint32_t filterId) {
if (mFilterOutputs[filterId].empty()) {
return Result::SUCCESS;
}
if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
return Result::UNKNOWN_ERROR;
}
mFilterOutputs[filterId].clear();
return Result::SUCCESS;
}
Result Demux::startPesFilterHandler(uint32_t filterId) {
// TODO generate multiple events in one event callback
std::lock_guard<std::mutex> lock(mFilterEventLock);
DemuxFilterPesEvent pesEvent;
if (mFilterOutputs[filterId].empty()) {
return Result::SUCCESS;
}
// TODO extract PES from TS
if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
mFilterOutputs[filterId].clear();
return Result::INVALID_STATE;
}
pesEvent = {
// temp dump meta data
.streamId = 0,
.dataLength = 530,
.dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
};
mFilterEvents[filterId].events.resize(1);
mFilterEvents[filterId].events[0].pes(pesEvent);
/*pthread_create(&mThreadId, NULL, __threadLoop, this);
pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
return Result::INVALID_STATE;
}
int size = mFilterEvents[filterId].events.size();
mFilterEvents[filterId].events.resize(size + 1);
mFilterEvents[filterId].events[size].pes(pesEvent);
if (mDemuxCallbacks[filterId] == nullptr) {
return Result::NOT_INITIALIZED;
}
mFilterOutputs[filterId].clear();
mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
return Result::SUCCESS;
}
@@ -499,18 +529,18 @@ bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
// TODO check how many sections has been read
std::lock_guard<std::mutex> lock(mFilterEventLock);
int size = mFilterEvents[filterId].events.size();
mFilterEvents[filterId].events.resize(size + 1);
if (!writeDataToFilterMQ(data, filterId)) {
return false;
}
int size = mFilterEvents[filterId].events.size();
mFilterEvents[filterId].events.resize(size + 1);
DemuxFilterSectionEvent secEvent;
secEvent = {
// temp dump meta data
.tableId = 0,
.version = 1,
.sectionNum = 1,
.dataLength = 530,
.dataLength = static_cast<uint16_t>(data.size()),
};
mFilterEvents[filterId].events[size].section(secEvent);
return true;
@@ -525,20 +555,32 @@ bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filte
}
bool Demux::filterAndOutputData() {
ALOGD("[Demux] start to dispatch data to filters");
Result result;
set<uint32_t>::iterator it;
// Read input data from the input FMQ
int size = mInputMQ->availableToRead();
int inputPacketSize = mInputSettings.packetSize;
vector<uint8_t> dataOutputBuffer;
dataOutputBuffer.resize(size);
mInputMQ->read(dataOutputBuffer.data(), size);
dataOutputBuffer.resize(inputPacketSize);
Result result;
// Filter the data and feed the output to each filter
set<uint32_t>::iterator it;
// Dispatch the packet to the PID matching filter output buffer
for (int i = 0; i < size / inputPacketSize; i++) {
mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
if (pid == mFilterPids[*it]) {
mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
dataOutputBuffer.end());
}
}
}
// Handle the output data per filter type
for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
switch (mFilterEvents[*it].filterType) {
case DemuxFilterType::SECTION:
result = startSectionFilterHandler(*it, dataOutputBuffer);
result = startSectionFilterHandler(*it);
break;
case DemuxFilterType::PES:
result = startPesFilterHandler(*it);
@@ -657,12 +699,42 @@ void Demux::inputThreadLoop() {
ALOGD("[Demux] input data failed to be filtered. Ending thread");
break;
}
maySendInputStatusCallback();
}
mInputThreadRunning = false;
ALOGD("[Demux] input thread ended.");
}
void Demux::maySendInputStatusCallback() {
std::lock_guard<std::mutex> lock(mInputStatusLock);
int availableToRead = mInputMQ->availableToRead();
int availableToWrite = mInputMQ->availableToWrite();
DemuxInputStatus newStatus =
checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
mInputSettings.lowThreshold);
if (mIntputStatus != newStatus) {
mInputCallback->onInputStatus(newStatus);
mIntputStatus = newStatus;
}
}
DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
uint32_t highThreshold, uint32_t lowThreshold) {
if (availableToWrite == 0) {
return DemuxInputStatus::SPACE_FULL;
} else if (availableToRead > highThreshold) {
return DemuxInputStatus::SPACE_ALMOST_FULL;
} else if (availableToRead < lowThreshold) {
return DemuxInputStatus::SPACE_ALMOST_EMPTY;
} else if (availableToRead == 0) {
return DemuxInputStatus::SPACE_EMPTY;
}
return mIntputStatus;
}
} // namespace implementation
} // namespace V1_0
} // namespace tuner

View File

@@ -91,9 +91,9 @@ class Demux : public IDemux {
virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;
virtual Return<Result> attachOutputTsFilter(uint32_t filterId) override;
virtual Return<Result> attachOutputFilter(uint32_t filterId) override;
virtual Return<Result> detachOutputTsFilter(uint32_t filterId) override;
virtual Return<Result> detachOutputFilter(uint32_t filterId) override;
virtual Return<Result> startOutput() override;
@@ -115,7 +115,7 @@ class Demux : public IDemux {
* They are also responsible to write the filtered output into the filter FMQ
* and update the filterEvent bound with the same filterId.
*/
Result startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data);
Result startSectionFilterHandler(uint32_t filterId);
Result startPesFilterHandler(uint32_t filterId);
Result startTsFilterHandler();
Result startMediaFilterHandler(uint32_t filterId);
@@ -136,6 +136,9 @@ class Demux : public IDemux {
bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
bool readDataFromMQ();
bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
void maySendInputStatusCallback();
DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
uint32_t highThreshold, uint32_t lowThreshold);
/**
* A dispatcher to read and dispatch input data to all the started filters.
* Each filter handler handles the data filtering/output writing/filterEvent updating.
@@ -169,6 +172,8 @@ class Demux : public IDemux {
* A list of created FilterMQ ptrs.
* The array number is the filter ID.
*/
vector<uint16_t> mFilterPids;
vector<vector<uint8_t>> mFilterOutputs;
vector<unique_ptr<FilterMQ>> mFilterMQs;
vector<EventFlag*> mFilterEventFlags;
vector<DemuxFilterEvent> mFilterEvents;
@@ -182,10 +187,18 @@ class Demux : public IDemux {
vector<sp<IDemuxCallback>> mDemuxCallbacks;
sp<IDemuxCallback> mInputCallback;
sp<IDemuxCallback> mOutputCallback;
bool mInputConfigured = false;
bool mOutputConfigured = false;
DemuxInputSettings mInputSettings;
DemuxOutputSettings mOutputSettings;
// Thread handlers
pthread_t mInputThread;
pthread_t mOutputThread;
vector<pthread_t> mFilterThreads;
// FMQ status local records
DemuxInputStatus mIntputStatus;
/**
* If a specific filter's writing loop is still running
*/
@@ -198,7 +211,12 @@ class Demux : public IDemux {
/**
* Lock to protect writes to the filter event
*/
// TODO make each filter separate event lock
std::mutex mFilterEventLock;
/**
* Lock to protect writes to the input status
*/
std::mutex mInputStatusLock;
/**
* How many times a filter should write
* TODO make this dynamic/random/can take as a parameter

View File

@@ -105,13 +105,7 @@ Return<Result> Frontend::setLna(bool /* bEnable */) {
return Result::SUCCESS;
}
Return<Result> Frontend::setLnb(const sp<ILnb>& /* lnb */) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Frontend::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
Return<Result> Frontend::setLnb(uint32_t /* lnb */) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;

View File

@@ -56,11 +56,9 @@ class Frontend : public IFrontend {
virtual Return<void> getStatus(const hidl_vec<FrontendStatusType>& statusTypes,
getStatus_cb _hidl_cb) override;
virtual Return<Result> sendDiseqcMessage(const hidl_vec<uint8_t>& diseqcMessage) override;
virtual Return<Result> setLna(bool bEnable) override;
virtual Return<Result> setLnb(const sp<ILnb>& lnb) override;
virtual Return<Result> setLnb(uint32_t lnb) override;
FrontendType getFrontendType();

View File

@@ -48,6 +48,12 @@ Return<Result> Lnb::setSatellitePosition(FrontendLnbPosition /* position */) {
return Result::SUCCESS;
}
Return<Result> Lnb::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
ALOGV("%s", __FUNCTION__);
return Result::SUCCESS;
}
Return<Result> Lnb::close() {
ALOGV("%s", __FUNCTION__);

View File

@@ -38,12 +38,14 @@ class Lnb : public ILnb {
public:
Lnb();
virtual Return<Result> setVoltage(FrontendLnbVoltage voltage);
virtual Return<Result> setVoltage(FrontendLnbVoltage voltage) override;
virtual Return<Result> setTone(FrontendLnbTone tone) override;
virtual Return<Result> setSatellitePosition(FrontendLnbPosition position) override;
virtual Return<Result> sendDiseqcMessage(const hidl_vec<uint8_t>& diseqcMessage) override;
virtual Return<Result> close() override;
private:

View File

@@ -87,6 +87,15 @@ Return<void> Tuner::openDemux(openDemux_cb _hidl_cb) {
return Void();
}
Return<void> Tuner::getDemuxCaps(getDemuxCaps_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);
DemuxCapabilities caps;
_hidl_cb(Result::SUCCESS, caps);
return Void();
}
Return<void> Tuner::openDescrambler(openDescrambler_cb _hidl_cb) {
ALOGV("%s", __FUNCTION__);

View File

@@ -39,6 +39,8 @@ class Tuner : public ITuner {
virtual Return<void> openDemux(openDemux_cb _hidl_cb) override;
virtual Return<void> getDemuxCaps(getDemuxCaps_cb _hidl_cb) override;
virtual Return<void> openDescrambler(openDescrambler_cb _hidl_cb) override;
virtual Return<void> getFrontendInfo(FrontendId frontendId,