-
Notifications
You must be signed in to change notification settings - Fork 1
/
tail.py
64 lines (58 loc) · 1.93 KB
/
tail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import time
import logging
import threading
from os import path
OFFSETS_DIR = path.abspath('offsets')
if not path.exists(OFFSETS_DIR):
os.makedirs(OFFSETS_DIR)
def get_offset_file(fpath):
"provide path to offset file"
name = path.basename(fpath)
fpath = path.join(OFFSETS_DIR, '%s.offset'%name)
# touch the file
open(fpath, 'a').close()
return fpath
class Tail(threading.Thread):
"""This class tails a file continuously and pushes lines into
a global queue.
"""
def __init__(self, filepath, q, stop_event, fields, interval):
threading.Thread.__init__(self)
self.filepath = filepath
self.q = q
self.interval = interval
self.offset_file = get_offset_file(filepath)
self.offset = 0
try:
self.offset = int(open(self.offset_file).read().strip())
except ValueError:
open(self.offset_file, 'w').write('0')
self.fh = open(self.filepath, 'r')
self.stop_event = stop_event
self.fields = fields
def run(self):
while not self.stop_event.is_set():
self.fh.seek(self.offset)
while True:
line = self.fh.readline()
if not line:
break
entry = self.fields.copy()
entry['data'] = line
self.q.put(entry)
logging.debug('added to queue, size(%d)'%self.q.qsize())
self.offset = self.fh.tell()
self.flush_offset()
else:
logging.warn('max buffer reached!')
time.sleep(self.interval)
else:
self.flush_offset(fsync=True)
self.fh.close()
def flush_offset(self, fsync=False):
with open(self.offset_file, 'w') as off_fh:
off_fh.write(str(self.offset))
if fsync:
off_fh.flush()
os.fsync(off_fh.fileno())