Skip to content

Commit

Permalink
Updated filtered streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rocky4546 committed Dec 3, 2024
1 parent 03ae1be commit cf3dc4d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
2 changes: 1 addition & 1 deletion lib/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import lib.common.exceptions as exceptions

VERSION = '0.9.15.00-RC03'
VERSION = '0.9.15.00-RC04'
CABERNET_URL = 'https://github.com/cabernetwork/cabernet'
CABERNET_ID = 'cabernet'
CABERNET_REPO = 'manifest.json'
Expand Down
7 changes: 5 additions & 2 deletions lib/common/xmltv.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,12 @@ def get_p_credits(self, elem):
return {'actors': actors, 'directors': directors}

def get_p_date(self, elem):
# 2 formats: YYYY or YYYYMMDD
# formats: YYYY or YYYYMMDD that can include dashes that are removed
event, elem = next(self.iterator, (None, None))
return elem.text
p_date = elem.text
if '-' in p_date:
p_date = p_date.replace('-', '')
return p_date

def get_p_video_quality(self, elem):
event, elem = next(self.iterator, (None, None))
Expand Down
53 changes: 40 additions & 13 deletions lib/streams/internal_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,7 @@ def play_queue(self):
self.cue = True
self.logger.debug('Turning M3U8 cue to True')
if data['filtered']:
self.last_atsc_msg = self.idle_counter
self.filter_counter = self.idle_counter
self.logger.info('Filtered Msg {} {}'.format(self.t_m3u8_pid, urllib.parse.unquote(uri)))
self.update_tuner_status('Filtered')
# self.write_buffer(out_queue_item['stream'])
if self.is_starting:
self.is_starting = False
self.write_atsc_msg()
self.logger.debug('2 Requesting Status from m3u8_queue {}'.format(self.t_m3u8_pid))
self.in_queue.put({'thread_id': threading.get_ident(), 'uri': 'status'})
time.sleep(0.5)
self.process_filtered_packet(uri)
else:
self.video.data = out_queue_item['stream']
if self.video.data is not None:
Expand Down Expand Up @@ -278,6 +268,22 @@ def play_queue(self):
time.sleep(0.01)
self.video.terminate()

def process_filtered_packet(self, _uri):
"""
Assumes the queued item has been pulled and is a filtered item.
"""
self.last_atsc_msg = self.idle_counter
self.filter_counter = self.idle_counter
self.logger.info('Filtered Msg {} {}'.format(self.t_m3u8_pid, urllib.parse.unquote(_uri)))
self.update_tuner_status('Filtered')
# self.write_buffer(out_queue_item['stream'])
if self.is_starting:
self.is_starting = False
self.write_atsc_msg()
self.logger.debug('2 Requesting Status from m3u8_queue {}'.format(self.t_m3u8_pid))
self.in_queue.put({'thread_id': threading.get_ident(), 'uri': 'status'})
time.sleep(0.5)

def write_buffer(self, _data):
"""
Plan is to slowly push out bytes until something is
Expand All @@ -289,8 +295,11 @@ def write_buffer(self, _data):
try:
bytes_written = 0
count = 0
bytes_per_write = int(len(_data)/20) # number of seconds to keep transmitting
while self.out_queue.qsize() == 0:
bytes_per_write = int(len(_data)/25) # number of seconds to keep transmitting
while self.out_queue.qsize() < 1 or \
(self.out_queue.qsize() > 0 and \
self.out_queue.queue[0]['data'] is not None and \
self.out_queue.queue[0]['data']['filtered']):
self.wfile.flush()
# Do not use chunk writes! Just send data.
# x = self.wfile.write('{}\r\n'.format(len(_data)).encode())
Expand All @@ -309,6 +318,24 @@ def write_buffer(self, _data):
# x = self.wfile.write('\r\n'.encode())
self.wfile.flush()
time.sleep(1.0)
# special filtered packet processing
if self.out_queue.qsize() > 0 and \
self.out_queue.queue[0]['data'] is not None and \
self.out_queue.queue[0]['data']['filtered']:
# pull queue item and check to confirm it is filtered
try:
out_queue_item = self.out_queue.get(timeout=1)
except queue.Empty:
# no queue item. Should not happen
self.logger.warning('Unexpected Error: Expected filtered packet, but found no items in queue')
continue
if out_queue_item['data']['filtered']:
self.process_filtered_packet(out_queue_item['uri'])
bytes_per_write = 752 # change writes to a
# # small number so it does not exit during the filtered packets
else:
# somehow NOT filtered, log this issue
self.logger.warning('Unexpected Error: Found unfiltered packet when a filtered packet was expected')
if bytes_written != len(_data):
x = self.wfile.write(_data[bytes_written:])
self.wfile.flush()
Expand Down

0 comments on commit cf3dc4d

Please sign in to comment.