Skip to content

Commit

Permalink
Merge pull request #76 from mlburgett/master
Browse files Browse the repository at this point in the history
Bug fixes in seeking and fix for uninitialized usec in struct timeval
  • Loading branch information
sub3 authored Dec 22, 2018
2 parents 450caeb + 21b9cdd commit 42a9a4b
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 38 deletions.
2 changes: 1 addition & 1 deletion pvr.nextpvr/addon.xml.in
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<addon
id="pvr.nextpvr"
version="3.3.7"
version="3.3.8"
name="NextPVR PVR Client"
provider-name="Graeme Blackley">
<requires>@ADDON_DEPENDS@</requires>
Expand Down
6 changes: 6 additions & 0 deletions pvr.nextpvr/changelog.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
v3.3.8
- bugfixes for live tv tsb seeking, fix uninitialzed struct timeval in socket.cpp

v3.3.3
- language string updates.

v3.3.2
- use iUniqueId instead of iChannelNumber for live streaming.

Expand Down
4 changes: 2 additions & 2 deletions src/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ bool Socket::read_ready()
FD_ZERO(&fdset);
FD_SET(_sd, &fdset);

struct timeval tv;
tv.tv_sec = 1;
struct timeval tv = { 1, 0 };
// tv.tv_sec = 1;

int retVal = select(_sd+1, &fdset, NULL, NULL, &tv);
if (retVal > 0)
Expand Down
49 changes: 36 additions & 13 deletions src/buffers/Seeker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ bool Seeker::InitSeek(int64_t offset, int whence)
}
m_iBlockOffset = temp % m_pSd->inputBlockSize;
m_xStreamOffset = temp - m_iBlockOffset;
XBMC->Log(LOG_DEBUG, "block: %d, stream: %lli", m_iBlockOffset, m_xStreamOffset);
m_bSeeking = true;
XBMC->Log(LOG_DEBUG, "block: %d, stream: %lli, m_bSeeking: %d", m_iBlockOffset, m_xStreamOffset, m_bSeeking);
return true;
}

Expand Down Expand Up @@ -111,26 +111,49 @@ bool Seeker::PreprocessSeek()

void Seeker::ProcessRequests()
{
if (m_bSeeking && !m_bSeekBlockRequested)
if (m_bSeeking)
{
m_pSd->requestBlock = m_xStreamOffset;
m_pSd->currentWindowSize = 0; // Request all blocks in window
m_bSeekBlockRequested = true;
m_streamPositionSet = false;
if (!m_bSeekBlockRequested)
{
m_pSd->requestBlock = m_xStreamOffset;
m_pSd->currentWindowSize = 0; // Request all blocks in window
m_bSeekBlockRequested = true;
}
}
}

bool Seeker::PostprocessSeek(int64_t blockNo)
{
// seeked block has just been buffered!
// reset seek mechanism
if (blockNo == m_xStreamOffset)
bool retVal = false;
if (m_bSeeking)
{
m_pSd->streamPosition.store(m_xStreamOffset + m_iBlockOffset);
m_cirBuf->AdjustBytes(m_iBlockOffset);
m_bSeekBlockRequested = false;
m_bSeeking = false;
m_xStreamOffset = -1;
return true;
if (blockNo == m_xStreamOffset)
{
if (!m_streamPositionSet)
{
m_pSd->streamPosition.store(m_xStreamOffset + m_iBlockOffset);
m_cirBuf->AdjustBytes(m_iBlockOffset);
m_streamPositionSet = true;
XBMC->Log(LOG_DEBUG, "%s:%d - m_xStreamOffset: %llu, m_iBlockOffset: %d", __FUNCTION__, __LINE__, m_xStreamOffset, m_iBlockOffset);
}
if (m_iBlockOffset)
{ // Go around one more time.
XBMC->Log(LOG_DEBUG, "%s:%d", __FUNCTION__, __LINE__);
m_iBlockOffset = 0;
m_xStreamOffset += m_pSd->inputBlockSize;
retVal = false;
}
else
{
m_bSeekBlockRequested = false;
m_bSeeking = false;
m_xStreamOffset = -1;
retVal = true;
}
}
}
return false;
return retVal;
}
6 changes: 4 additions & 2 deletions src/buffers/Seeker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace timeshift {
public:
Seeker(session_data_t *sd, CircularBuffer *cirBuf) :
m_pSd(sd), m_cirBuf(cirBuf), m_xStreamOffset(0), m_iBlockOffset(0), m_bSeeking(false),
m_bSeekBlockRequested(false), m_bSeekBlockReceived(false) {}
m_bSeekBlockRequested(false), m_bSeekBlockReceived(false), m_streamPositionSet(false) {}
~Seeker() {}
bool InitSeek(int64_t offset, int whence);
bool Active() { return m_bSeeking; }
Expand All @@ -42,7 +42,7 @@ namespace timeshift {
void ProcessRequests();
bool PostprocessSeek(int64_t);
int64_t SeekStreamOffset() { if (m_bSeeking) return m_xStreamOffset; return -1; }
void Clear() { m_xStreamOffset = 0; m_iBlockOffset = 0; m_bSeeking = m_bSeekBlockRequested = m_bSeekBlockReceived = false; }
void Clear() { m_xStreamOffset = 0; m_iBlockOffset = 0; m_bSeeking = m_bSeekBlockRequested = m_bSeekBlockReceived = m_streamPositionSet = false; }


private:
Expand All @@ -53,5 +53,7 @@ namespace timeshift {
bool m_bSeeking;
bool m_bSeekBlockRequested;
bool m_bSeekBlockReceived;
bool m_streamPositionSet;

};
}
61 changes: 45 additions & 16 deletions src/buffers/TimeshiftBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using namespace timeshift;
using namespace ADDON;

const int TimeshiftBuffer::INPUT_READ_LENGTH = 32768;
const int TimeshiftBuffer::BUFFER_BLOCKS = 48;
const int TimeshiftBuffer::BUFFER_BLOCKS = 24;
const int TimeshiftBuffer::WINDOW_SIZE = std::max(6, (BUFFER_BLOCKS/2));

// Fix a stupid #define on Windows which causes XBMC->DeleteFile() to break
Expand All @@ -41,7 +41,7 @@ const int TimeshiftBuffer::WINDOW_SIZE = std::max(6, (BUFFER_BLOCKS/2));

TimeshiftBuffer::TimeshiftBuffer()
: Buffer(), m_circularBuffer(INPUT_READ_LENGTH * BUFFER_BLOCKS),
m_seek(&m_sd, &m_circularBuffer), m_streamingclient(nullptr), m_tsbStartTime(0)
m_seek(&m_sd, &m_circularBuffer), m_streamingclient(nullptr), m_CanPause(true)
{
XBMC->Log(LOG_DEBUG, "TimeshiftBuffer created!");
m_sd.lastKnownLength.store(0);
Expand Down Expand Up @@ -97,6 +97,8 @@ bool TimeshiftBuffer::Open(const std::string inputUrl)
char buf[1024];
int read = m_streamingclient->receive(buf, sizeof buf, 0);

if (read < 0)
return false;

for (int i=0; i<read; i++)
{
Expand Down Expand Up @@ -208,11 +210,11 @@ void TimeshiftBuffer::Reset()
int TimeshiftBuffer::Read(byte *buffer, size_t length)
{
int bytesRead = 0;
std::unique_lock<std::mutex> lock(m_mutex);
XBMC->Log(LOG_DEBUG, "TimeshiftBuffer::Read() %d @ %lli", length, m_sd.streamPosition.load());

// Wait until we have enough data

std::unique_lock<std::mutex> lock(m_mutex);
if (! m_reader.wait_for(lock, std::chrono::seconds(m_readTimeout),
[this, length]()
{
Expand All @@ -229,6 +231,8 @@ int TimeshiftBuffer::Read(byte *buffer, size_t length)
m_writer.notify_one();
}

if (bytesRead != length)
XBMC->Log(LOG_DEBUG, "Read returns %d for %d request.", bytesRead, length);
return bytesRead;
}

Expand All @@ -239,20 +243,36 @@ int TimeshiftBuffer::Read(byte *buffer, size_t length)

int64_t TimeshiftBuffer::Seek(int64_t position, int whence)
{
bool sleep = false;
XBMC->Log(LOG_DEBUG, "TimeshiftBuffer::Seek()");
int64_t lastKnownLength = m_sd.lastKnownLength.load();

if (position > lastKnownLength)
{
XBMC->Log(LOG_DEBUG, "Seek requested to %lld, limiting to %lld\n", position, lastKnownLength);
position = lastKnownLength;
}

std::unique_lock<std::mutex> lock(m_mutex);
// m_streamPositon is the offset in the stream that will be read next,
// so if that matches the seek position, don't seek.
XBMC->Log(LOG_DEBUG, "Seek: %d %d %llu %llu", SEEK_SET, whence, m_sd.streamPosition.load(), position);
if ((whence == SEEK_SET) && (position == m_sd.streamPosition.load()))
return position;
m_seek.InitSeek(position, whence);
if (m_seek.PreprocessSeek())
{
internalRequestBlocks();
m_writer.notify_one(); // wake consumer.
m_seeker.wait(lock);
std::unique_lock<std::mutex> lock(m_mutex);
// m_streamPositon is the offset in the stream that will be read next,
// so if that matches the seek position, don't seek.
XBMC->Log(LOG_DEBUG, "Seek: %d %d %llu %llu", SEEK_SET, whence, m_sd.streamPosition.load(), position);
if ((whence == SEEK_SET) && (position == m_sd.streamPosition.load()))
return position;
m_seek.InitSeek(position, whence);
if (m_seek.PreprocessSeek())
{
internalRequestBlocks();
m_writer.notify_one(); // wake consumer.
sleep = true;
}
}
if (sleep)
{
std::unique_lock<std::mutex> sLock(m_sLock);
XBMC->Log(LOG_DEBUG, "Seek Waiting");
m_seeker.wait(sLock);
}
XBMC->Log(LOG_DEBUG, "Seek() returning %lli", position);
return position;
Expand Down Expand Up @@ -380,6 +400,7 @@ uint32_t TimeshiftBuffer::WatchForBlock(byte *buffer, uint64_t *block)
if (m_seek.BlockRequested())
{ // Can't watch for blocks that haven't been requested!
watchFor = m_seek.SeekStreamOffset();
XBMC->Log(LOG_DEBUG, "%s:%d: watching for bloc %llu", __FUNCTION__, __LINE__, watchFor);
}
else
{
Expand All @@ -404,18 +425,23 @@ uint32_t TimeshiftBuffer::WatchForBlock(byte *buffer, uint64_t *block)
char response[128];
memset(response, 0, sizeof(response));
int responseByteCount = m_streamingclient->receive(response, sizeof(response), sizeof(response));
XBMC->Log(LOG_DEBUG, "%s:%d: responseByteCount: %d\n", __FUNCTION__, __LINE__, responseByteCount);
if (responseByteCount > 0)
{
XBMC->Log(LOG_DEBUG, "%s:%d: got: %s\n", __FUNCTION__, __LINE__, response);
}
else if (responseByteCount < 0)
{
return 0;
}
#if defined(TARGET_WINDOWS)
else if (responseByteCount < 0 && errno == WSAEWOULDBLOCK)
#else
else if (responseByteCount < 0 && errno == EAGAIN)
#endif
{
#if defined(TARGET_WINDOWS)
Sleep(50);
Sleep(50);
#else
usleep(50000);
#endif
Expand All @@ -430,12 +456,13 @@ uint32_t TimeshiftBuffer::WatchForBlock(byte *buffer, uint64_t *block)
long long fileSize;
int dummy;
sscanf(response, "%llu:%d %llu %d", &payloadOffset, &payloadSize, &fileSize, &dummy);
XBMC->Log(LOG_DEBUG, "PKT_IN: %llu:%d %llu %d", payloadOffset, payloadSize, fileSize, dummy);
if (m_sd.lastKnownLength.load() != fileSize)
{
XBMC->Log(LOG_DEBUG, "Adjust lastKnownLength, and reset m_sd.lastBufferTime!");
m_sd.lastBufferTime = time(NULL);
time_t elapsed = m_sd.lastBufferTime - m_sd.sessionStartTime;
m_sd.iBytesPerSecond = (int )(elapsed ? fileSize / elapsed : fileSize); // Running estimate of 1 second worth of stream bytes.
XBMC->Log(LOG_DEBUG, "Adjust lastKnownLength, and reset m_sd.lastBufferTime! [%d]", m_sd.iBytesPerSecond);
m_sd.lastKnownLength.store(fileSize);
}

Expand Down Expand Up @@ -506,6 +533,7 @@ void TimeshiftBuffer::ConsumeInput()
{
if (m_seek.PostprocessSeek(blockNo))
{
XBMC->Log(LOG_DEBUG, "Notify Seek");
m_seeker.notify_one();
}
}
Expand All @@ -515,6 +543,7 @@ void TimeshiftBuffer::ConsumeInput()
{
XBMC->Log(LOG_DEBUG, "Error Buffering Data!!");
}
std::this_thread::yield();
std::unique_lock<std::mutex> lock(m_mutex);
if (m_circularBuffer.BytesFree() < INPUT_READ_LENGTH)
{
Expand Down
9 changes: 7 additions & 2 deletions src/buffers/TimeshiftBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace timeshift {

virtual bool CanPauseStream() const override
{
return true;
return m_CanPause;
}

virtual bool CanSeekStream() const override
Expand Down Expand Up @@ -129,6 +129,11 @@ namespace timeshift {
*/
mutable std::mutex m_mutex;

/**
* Protects seek completion
*/
mutable std::mutex m_sLock;

/**
* Signaled whenever new packets have been added to the buffer
*/
Expand All @@ -150,6 +155,6 @@ namespace timeshift {
Seeker m_seek;
CircularBuffer m_circularBuffer;
session_data_t m_sd;
time_t m_tsbStartTime;
bool m_CanPause;
};
}
3 changes: 1 addition & 2 deletions src/pvrclient-nextpvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,7 @@ bool cPVRClientNextPVR::IsUp()
}
else
{
m_lastRecordingUpdateTime = MAXINT64;
XBMC->Log(LOG_NOTICE, "Disabling recording update. Update NextPVR to v3.4");
m_lastRecordingUpdateTime = time(0);
}
}
return m_bConnected;
Expand Down

0 comments on commit 42a9a4b

Please sign in to comment.