Skip to content
This repository has been archived by the owner on Apr 15, 2023. It is now read-only.

[pvr.hts] fill the VFS buffer on a separate thread #404

Merged
merged 3 commits into from
Jan 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion addons/pvr.hts/addon/addon.xml.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon
id="pvr.hts"
version="2.0.2"
version="2.0.3"
name="Tvheadend HTSP Client"
provider-name="Adam Sutton, Sam Stenvall, Lars Op den Kamp">
<requires>
Expand Down
4 changes: 4 additions & 0 deletions addons/pvr.hts/addon/changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2.0.3
- rebrand of HTSP client identifier
- improve the VFS slightly by doing reading and buffering on separate threads, as well as tweaking the read length

2.0.2
- language files from Transifex
- minor changes to conform with C++11
Expand Down
8 changes: 8 additions & 0 deletions addons/pvr.hts/src/CircBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <stdlib.h>
#include <stdio.h>

using namespace PLATFORM;

CCircBuffer::CCircBuffer(void)
: m_buffer(NULL), m_alloc(0), m_size(0), m_count(0), m_pin(0), m_pout(0)
{
Expand Down Expand Up @@ -64,28 +66,33 @@ void CCircBuffer::unalloc(void)

void CCircBuffer::reset(void)
{
CLockObject lock(m_mutex);
m_pin = 0;
m_pout = 0;
m_count = 0;
}

size_t CCircBuffer::size(void) const
{
CLockObject lock(m_mutex);
return m_size;
}

size_t CCircBuffer::avail(void) const
{
CLockObject lock(m_mutex);
return m_count;
}

size_t CCircBuffer::free(void) const
{
CLockObject lock(m_mutex);
return m_size - m_count - 1;
}

ssize_t CCircBuffer::write(const unsigned char* data, size_t len)
{
CLockObject lock(m_mutex);
if (m_size < 2)
return -1;
if (len > free())
Expand All @@ -111,6 +118,7 @@ ssize_t CCircBuffer::write(const unsigned char* data, size_t len)

ssize_t CCircBuffer::read(unsigned char* data, size_t len)
{
CLockObject lock(m_mutex);
if (m_size < 2)
return -1;
if (len > avail())
Expand Down
4 changes: 4 additions & 0 deletions addons/pvr.hts/src/CircBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*/

#include "platform/os.h"
#include "platform/threads/mutex.h"

class CCircBuffer
{
Expand All @@ -48,4 +49,7 @@ class CCircBuffer
size_t m_pin;
size_t m_pout;

private:
mutable PLATFORM::CMutex m_mutex;

};
162 changes: 119 additions & 43 deletions addons/pvr.hts/src/HTSPVFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,48 @@ using namespace std;
using namespace ADDON;
using namespace PLATFORM;

/*
* The buffer thread
*/
void *CHTSPVFS::Process(void)
{
while (!IsStopped())
{
while (m_fileId && m_buffer.free() > 0)
{
if (!SendFileRead())
continue;

CLockObject lock(m_mutex);
m_bHasData = true;
m_condition.Broadcast();
}

// Take a break, we're either stopped or full
CLockObject lock(m_mutex);
m_condition.Wait(m_mutex, 1000);
}

return NULL;
}

/*
* VFS handler
*/
CHTSPVFS::CHTSPVFS ( CHTSPConnection &conn )
: m_conn(conn), m_path(""), m_fileId(0), m_offset(0)
: m_conn(conn), m_path(""), m_fileId(0), m_offset(0),
m_currentReadLength(INITAL_READ_LENGTH)
{
m_buffer.alloc(MAX_BUFFER_SIZE);

// Start the buffer thread
CreateThread();
}

CHTSPVFS::~CHTSPVFS ( void )
{
// Stop the buffer thread
StopThread();
}

void CHTSPVFS::Connected ( void )
Expand Down Expand Up @@ -90,10 +124,19 @@ void CHTSPVFS::Close ( void )
if (m_fileId != 0)
SendFileClose();

m_buffer.reset();
m_offset = 0;
m_fileId = 0;
m_path = "";

Reset();
}

void CHTSPVFS::Reset()
{
CLockObject lock(m_mutex);
m_buffer.reset();
m_bHasData = false;
m_currentReadLength = INITAL_READ_LENGTH;
}

int CHTSPVFS::Read ( unsigned char *buf, unsigned int len )
Expand All @@ -104,50 +147,22 @@ int CHTSPVFS::Read ( unsigned char *buf, unsigned int len )
if (!m_fileId)
return -1;

/* Fetch data */
if (m_buffer.avail() <= len)
/* Signal that we need more data in the buffer. Reset the read length to the
requested length so we don't wait unnecessarily long */
if (m_buffer.avail() < len)
{
htsmsg_t *m;
const void *buf;
size_t len;

/* Build */
m = htsmsg_create_map();
htsmsg_add_u32(m, "id", m_fileId);
htsmsg_add_s64(m, "size", m_buffer.free());

tvhtrace("vfs read id=%d size=%lld",
m_fileId, (long long)m_buffer.free());

/* Send */
{
CLockObject lock(m_conn.Mutex());
m = m_conn.SendAndWait("fileRead", m);
}

if (m == NULL)
return -1;

/* Process */
if (htsmsg_get_bin(m, "data", &buf, &len))
{
htsmsg_destroy(m);
tvherror("vfs fileRead malformed response");
return -1;
}

/* Store */
if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len)
{
htsmsg_destroy(m);
tvherror("vfs partial buffer write");
return -1;
}
htsmsg_destroy(m);
CLockObject lock(m_mutex);
m_bHasData = false;
m_currentReadLength = len;
m_condition.Broadcast();
}

/* Wait for data */
CLockObject lock(m_mutex);
m_condition.Wait(m_mutex, m_bHasData, 5000);

/* Read */
ret = m_buffer.read(buf, len);
ret = m_buffer.read(buf, len);
m_offset += ret;
return (int)ret;
}
Expand Down Expand Up @@ -277,10 +292,71 @@ long long CHTSPVFS::SendFileSeek ( int64_t pos, int whence, bool force )
{
tvhtrace("vfs seek offset=%lld", (long long)ret);
m_offset = ret;
m_buffer.reset();

Reset();
}
else
tvherror("vfs fileSeek failed");

return ret;
}

bool CHTSPVFS::SendFileRead()
{
htsmsg_t *m;
const void *buf;
size_t len;
size_t readLength;

{
CLockObject lock(m_mutex);

/* Determine read length */
if (m_currentReadLength > m_buffer.free())
readLength = m_buffer.free();
else
readLength = m_currentReadLength;
}

/* Build */
m = htsmsg_create_map();
htsmsg_add_u32(m, "id", m_fileId);
htsmsg_add_s64(m, "size", readLength);

tvhtrace("vfs read id=%d size=%d",
m_fileId, readLength);

/* Send */
{
CLockObject lock(m_conn.Mutex());
m = m_conn.SendAndWait("fileRead", m);
}

if (m == NULL)
return false;

/* Process */
if (htsmsg_get_bin(m, "data", &buf, &len))
{
htsmsg_destroy(m);
tvherror("vfs fileRead malformed response");
return false;
}

/* Store */
if (m_buffer.write((unsigned char*)buf, len) != (ssize_t)len)
{
htsmsg_destroy(m);
tvherror("vfs partial buffer write");
return false;
}

/* Gradually increase read length */
CLockObject lock(m_mutex);

if (m_currentReadLength * 2 < MAX_READ_LENGTH)
m_currentReadLength *= 2;

htsmsg_destroy(m);
return true;
}
19 changes: 16 additions & 3 deletions addons/pvr.hts/src/Tvheadend.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ class CHTSPDemuxer
/*
* HTSP VFS - recordings
*/
class CHTSPVFS
class CHTSPVFS
: public PLATFORM::CThread
{
friend class CTvheadend;

Expand All @@ -296,21 +297,33 @@ class CHTSPVFS
CHTSPConnection &m_conn;
CStdString m_path;
uint32_t m_fileId;
CCircBuffer m_buffer;
int64_t m_offset;

CCircBuffer m_buffer;
PLATFORM::CMutex m_mutex;
bool m_bHasData;
PLATFORM::CCondition<bool> m_condition;
size_t m_currentReadLength;

bool Open ( const PVR_RECORDING &rec );
void Close ( void );
int Read ( unsigned char *buf, unsigned int len );
long long Seek ( long long pos, int whence );
long long Tell ( void );
long long Size ( void );
void Reset ( void );

void *Process();

bool SendFileOpen ( bool force = false );
void SendFileClose ( void );
bool SendFileRead ( void );
long long SendFileSeek ( int64_t pos, int whence, bool force = false );

static const int MAX_BUFFER_SIZE = 1000000;
static const int MAX_BUFFER_SIZE = 5242880; // 5 MB
static const int INITAL_READ_LENGTH = 131072; // 128 KB
static const int MAX_READ_LENGTH = 1048576; // 1 MB

};

/*
Expand Down