From eeb735269430e2a01a671b57f07f8738066da836 Mon Sep 17 00:00:00 2001 From: Sam Stenvall Date: Mon, 5 Jan 2015 13:35:45 +0200 Subject: [PATCH 1/3] [pvr.tvh] make CircBuffer thread-safe --- addons/pvr.hts/src/CircBuffer.cpp | 8 ++++++++ addons/pvr.hts/src/CircBuffer.h | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/addons/pvr.hts/src/CircBuffer.cpp b/addons/pvr.hts/src/CircBuffer.cpp index 74e38ddec..bed0a90bd 100644 --- a/addons/pvr.hts/src/CircBuffer.cpp +++ b/addons/pvr.hts/src/CircBuffer.cpp @@ -24,6 +24,8 @@ #include #include +using namespace PLATFORM; + CCircBuffer::CCircBuffer(void) : m_buffer(NULL), m_alloc(0), m_size(0), m_count(0), m_pin(0), m_pout(0) { @@ -64,6 +66,7 @@ void CCircBuffer::unalloc(void) void CCircBuffer::reset(void) { + CLockObject lock(m_mutex); m_pin = 0; m_pout = 0; m_count = 0; @@ -71,21 +74,25 @@ void CCircBuffer::reset(void) size_t CCircBuffer::size(void) const { + CLockObject lock(m_mutex); return m_size; } size_t CCircBuffer::avail(void) const { + CLockObject lock(m_mutex); return m_count; } size_t CCircBuffer::free(void) const { + CLockObject lock(m_mutex); return m_size - m_count - 1; } ssize_t CCircBuffer::write(const unsigned char* data, size_t len) { + CLockObject lock(m_mutex); if (m_size < 2) return -1; if (len > free()) @@ -111,6 +118,7 @@ ssize_t CCircBuffer::write(const unsigned char* data, size_t len) ssize_t CCircBuffer::read(unsigned char* data, size_t len) { + CLockObject lock(m_mutex); if (m_size < 2) return -1; if (len > avail()) diff --git a/addons/pvr.hts/src/CircBuffer.h b/addons/pvr.hts/src/CircBuffer.h index 80b99f796..28ab71976 100644 --- a/addons/pvr.hts/src/CircBuffer.h +++ b/addons/pvr.hts/src/CircBuffer.h @@ -22,6 +22,7 @@ */ #include "platform/os.h" +#include "platform/threads/mutex.h" class CCircBuffer { @@ -48,4 +49,7 @@ class CCircBuffer size_t m_pin; size_t m_pout; +private: + mutable PLATFORM::CMutex m_mutex; + }; From 7cc510941e59079c40c5f7f240e24c9eb325bd89 Mon Sep 17 00:00:00 2001 From: Sam Stenvall Date: Mon, 5 Jan 2015 18:05:44 +0200 Subject: [PATCH 2/3] [pvr.tvh] fill the VFS buffer on a separate thread. This means we can gradually increase the read chunk size (allowing for more jitter in the latency between client and server) gradually without making the reader wait for the full chunk to be received. The read length is reset during seeking and if the buffer happens to run dry (ffmpeg does around 10 short seeks for each seek request so it helps to keep the read size small). --- addons/pvr.hts/src/HTSPVFS.cpp | 162 ++++++++++++++++++++++++--------- addons/pvr.hts/src/Tvheadend.h | 19 +++- 2 files changed, 135 insertions(+), 46 deletions(-) diff --git a/addons/pvr.hts/src/HTSPVFS.cpp b/addons/pvr.hts/src/HTSPVFS.cpp index 07b78dbd2..9dbc8c756 100644 --- a/addons/pvr.hts/src/HTSPVFS.cpp +++ b/addons/pvr.hts/src/HTSPVFS.cpp @@ -36,14 +36,48 @@ using namespace std; using namespace ADDON; using namespace PLATFORM; +/* +* The buffer thread +*/ +void *CHTSPVFS::Process(void) +{ + while (!IsStopped()) + { + while (m_fileId && m_buffer.free() > 0) + { + if (!SendFileRead()) + continue; + + CLockObject lock(m_mutex); + m_bHasData = true; + m_condition.Broadcast(); + } + + // Take a break, we're either stopped or full + CLockObject lock(m_mutex); + m_condition.Wait(m_mutex, 1000); + } + + return NULL; +} + +/* +* VFS handler +*/ CHTSPVFS::CHTSPVFS ( CHTSPConnection &conn ) - : m_conn(conn), m_path(""), m_fileId(0), m_offset(0) + : m_conn(conn), m_path(""), m_fileId(0), m_offset(0), + m_currentReadLength(INITAL_READ_LENGTH) { m_buffer.alloc(MAX_BUFFER_SIZE); + + // Start the buffer thread + CreateThread(); } CHTSPVFS::~CHTSPVFS ( void ) { + // Stop the buffer thread + StopThread(); } void CHTSPVFS::Connected ( void ) @@ -90,10 +124,19 @@ void CHTSPVFS::Close ( void ) if (m_fileId != 0) SendFileClose(); - m_buffer.reset(); m_offset = 0; m_fileId = 0; m_path = ""; + + Reset(); +} + +void CHTSPVFS::Reset() +{ + CLockObject lock(m_mutex); + m_buffer.reset(); + m_bHasData = false; + m_currentReadLength = INITAL_READ_LENGTH; } int CHTSPVFS::Read ( unsigned char *buf, unsigned int len ) @@ -104,50 +147,22 @@ int CHTSPVFS::Read ( unsigned char *buf, unsigned int len ) if (!m_fileId) return -1; - /* Fetch data */ - if (m_buffer.avail() <= len) + /* Signal that we need more data in the buffer. Reset the read length to the + requested length so we don't wait unnecessarily long */ + if (m_buffer.avail() < len) { - htsmsg_t *m; - const void *buf; - size_t len; - - /* Build */ - m = htsmsg_create_map(); - htsmsg_add_u32(m, "id", m_fileId); - htsmsg_add_s64(m, "size", m_buffer.free()); - - tvhtrace("vfs read id=%d size=%lld", - m_fileId, (long long)m_buffer.free()); - - /* Send */ - { - CLockObject lock(m_conn.Mutex()); - m = m_conn.SendAndWait("fileRead", m); - } - - if (m == NULL) - return -1; - - /* Process */ - if (htsmsg_get_bin(m, "data", &buf, &len)) - { - htsmsg_destroy(m); - tvherror("vfs fileRead malformed response"); - return -1; - } - - /* Store */ - if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len) - { - htsmsg_destroy(m); - tvherror("vfs partial buffer write"); - return -1; - } - htsmsg_destroy(m); + CLockObject lock(m_mutex); + m_bHasData = false; + m_currentReadLength = len; + m_condition.Broadcast(); } + /* Wait for data */ + CLockObject lock(m_mutex); + m_condition.Wait(m_mutex, m_bHasData, 5000); + /* Read */ - ret = m_buffer.read(buf, len); + ret = m_buffer.read(buf, len); m_offset += ret; return (int)ret; } @@ -277,10 +292,71 @@ long long CHTSPVFS::SendFileSeek ( int64_t pos, int whence, bool force ) { tvhtrace("vfs seek offset=%lld", (long long)ret); m_offset = ret; - m_buffer.reset(); + + Reset(); } else tvherror("vfs fileSeek failed"); return ret; } + +bool CHTSPVFS::SendFileRead() +{ + htsmsg_t *m; + const void *buf; + size_t len; + size_t readLength; + + { + CLockObject lock(m_mutex); + + /* Determine read length */ + if (m_currentReadLength > m_buffer.free()) + readLength = m_buffer.free(); + else + readLength = m_currentReadLength; + } + + /* Build */ + m = htsmsg_create_map(); + htsmsg_add_u32(m, "id", m_fileId); + htsmsg_add_s64(m, "size", readLength); + + tvhtrace("vfs read id=%d size=%d", + m_fileId, readLength); + + /* Send */ + { + CLockObject lock(m_conn.Mutex()); + m = m_conn.SendAndWait("fileRead", m); + } + + if (m == NULL) + return false; + + /* Process */ + if (htsmsg_get_bin(m, "data", &buf, &len)) + { + htsmsg_destroy(m); + tvherror("vfs fileRead malformed response"); + return false; + } + + /* Store */ + if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len) + { + htsmsg_destroy(m); + tvherror("vfs partial buffer write"); + return false; + } + + /* Gradually increase read length */ + CLockObject lock(m_mutex); + + if (m_currentReadLength * 2 < MAX_READ_LENGTH) + m_currentReadLength *= 2; + + htsmsg_destroy(m); + return true; +} diff --git a/addons/pvr.hts/src/Tvheadend.h b/addons/pvr.hts/src/Tvheadend.h index 70d907e6a..b8ea4c93f 100644 --- a/addons/pvr.hts/src/Tvheadend.h +++ b/addons/pvr.hts/src/Tvheadend.h @@ -282,7 +282,8 @@ class CHTSPDemuxer /* * HTSP VFS - recordings */ -class CHTSPVFS +class CHTSPVFS + : public PLATFORM::CThread { friend class CTvheadend; @@ -296,21 +297,33 @@ class CHTSPVFS CHTSPConnection &m_conn; CStdString m_path; uint32_t m_fileId; - CCircBuffer m_buffer; int64_t m_offset; + CCircBuffer m_buffer; + PLATFORM::CMutex m_mutex; + bool m_bHasData; + PLATFORM::CCondition m_condition; + size_t m_currentReadLength; + bool Open ( const PVR_RECORDING &rec ); void Close ( void ); int Read ( unsigned char *buf, unsigned int len ); long long Seek ( long long pos, int whence ); long long Tell ( void ); long long Size ( void ); + void Reset ( void ); + + void *Process(); bool SendFileOpen ( bool force = false ); void SendFileClose ( void ); + bool SendFileRead ( void ); long long SendFileSeek ( int64_t pos, int whence, bool force = false ); - static const int MAX_BUFFER_SIZE = 1000000; + static const int MAX_BUFFER_SIZE = 5242880; // 5 MB + static const int INITAL_READ_LENGTH = 131072; // 128 KB + static const int MAX_READ_LENGTH = 1048576; // 1 MB + }; /* From 87487ad8dfac130a58aeed108b9661a47e43a45c Mon Sep 17 00:00:00 2001 From: Sam Stenvall Date: Wed, 21 Jan 2015 19:23:51 +0200 Subject: [PATCH 3/3] [pvr.hts] bump version --- addons/pvr.hts/addon/addon.xml.in | 2 +- addons/pvr.hts/addon/changelog.txt | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/addons/pvr.hts/addon/addon.xml.in b/addons/pvr.hts/addon/addon.xml.in index c4dc06ff9..220560cb8 100644 --- a/addons/pvr.hts/addon/addon.xml.in +++ b/addons/pvr.hts/addon/addon.xml.in @@ -1,7 +1,7 @@ diff --git a/addons/pvr.hts/addon/changelog.txt b/addons/pvr.hts/addon/changelog.txt index 9a2897d1b..91458fec7 100644 --- a/addons/pvr.hts/addon/changelog.txt +++ b/addons/pvr.hts/addon/changelog.txt @@ -1,3 +1,7 @@ +2.0.3 +- rebrand of HTSP client identifier +- improve the VFS slightly by doing reading and buffering on separate threads, as well as tweaking the read length + 2.0.2 - language files from Transifex - minor changes to conform with C++11