-
Notifications
You must be signed in to change notification settings - Fork 1
/
scribe_tail
executable file
·363 lines (311 loc) · 10.4 KB
/
scribe_tail
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#!/usr/bin/env python
'''scribe_tail: A simple script for sending logfiles to scribe.'''
import fcntl
import logging
logger = logging.getLogger()
import logging.handlers
import optparse
import os
import platform
import signal
import subprocess
import sys
import time
import traceback
from datetime import datetime
from datetime import timedelta
from scribe import scribe
from thrift.transport import TTransport, TSocket
from thrift.protocol import TBinaryProtocol
hostname = platform.node()
options = None
args = None
def handler(signum, frame):
if signum == signal.SIGTERM:
logger.info('Exiting due to SIGTERM')
sys.exit()
def run_command(cmd):
"""Runs a command in a subprocess.
@param cmd Command to run in the subprocess.
"""
p = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(stdout, stderr) = p.communicate()
if p.returncode:
logger.critical('Error running: %s' % cmd)
logger.critical('Error message: %s' % stderr)
else:
logger.info('Successfully ran: %s' % cmd)
return stdout.strip()
def publish_stats(stats):
"""Publish statistics to Ganglia with gmetric."""
for key, value in stats.iteritems():
cmd = ['/usr/bin/gmetric',
'--name', 'scribetail_%s_%s' % (options.category, key),
'--value', str(value),
'--type int32',
'--units count']
run_command(' '.join(cmd))
def write_pid(pid_filename):
logger.info('Writing pid file: %s' % pid_filename)
try:
fdw = open(pid_filename, 'a')
fcntl.lockf(fdw, fcntl.LOCK_NB | fcntl.LOCK_EX)
os.ftruncate(fdw.fileno(), 0)
fdw.write(str(os.getpid()))
fdw.flush()
# We leave the file descriptor open for a long as the process is running in
# order to maintain the lock on the file. Once the app closes the lock is
# released which allows another instance to start.
except IOError, e:
# This usually means that there is already a running process.
# Get the pid of the other process.
fdr = open(pid_filename, 'r')
pid = fdr.read()
fdr.close()
logger.error('Unable to obtain lock on %s' % pid_filename)
logger.error('Is another process already running? Perhaps %s' % pid)
sys.exit(1)
def daemonize():
logger.info('Daemonizing')
try:
os.chdir('/')
if os.fork() != 0:
os._exit(0)
# Become a session leader.
os.setsid()
if os.fork() != 0:
os._exit(0)
# Set the default umask.
os.umask(0)
except OSError, e:
logger.error('Unable to daemonize: %s' % e.message())
def cleanup_fds():
# Close stdin, stdout, stderr and redirect the output to /dev/null.
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
os.open(os.devnull, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
def tail_lines(fd, linesback = 10):
# Contributed to Python Cookbook by Ed Pascoe (2003)
avgcharsperline = 75
while 1:
try:
fd.seek(-1 * avgcharsperline * linesback, 2)
except IOError:
fd.seek(0)
if fd.tell() == 0:
atstart = 1
else:
atstart = 0
lines = fd.read().split("\n")
if (len(lines) > (linesback+1)) or atstart:
break
avgcharsperline = avgcharsperline * 1.3
if len(lines) > linesback:
start = len(lines) - linesback - 1
else:
start = 0
return lines[start:len(lines)-1]
def do_tail(client, filename):
"""Tail a file sending new lines to Scribe.
@param client Scribe client object.
@param filename Filename to send lines to scribe from.
"""
logger.info('Opening log file: %s' % filename)
fd = open(filename, 'r')
# wind back to near the end of the file...
tail_lines(fd, 10)
stats = {'ok': 0,
'try_later': 0,
}
logger.info('Starting log loop.')
last_log_file_update = datetime.now()
delta = timedelta(seconds=60)
log_buffer = []
size_buffer = 0
max_buffer = 50
line = ""
while 1:
where = fd.tell()
line = fd.readline()
if not line:
fd_results = os.fstat(fd.fileno())
try:
st_results = os.stat(filename)
except OSError:
st_results = fd_results
if st_results.st_size < where:
logger.info('%s was truncated. Jump back to 0.', filename)
fd.seek(0)
elif st_results.st_ino == fd_results.st_ino:
time.sleep(1)
fd.seek(where)
else:
logger.info("%s changed inode numbers from %d to %d" %
(filename, fd_results[1], st_results[1]))
fd = open(filename, 'r')
else:
if options.prepend_hostname:
line = '%s: %s' % (hostname, line)
log_entry=scribe.LogEntry(category=options.category, message=line)
log_buffer.append(log_entry)
size_buffer += 1
if size_buffer == max_buffer:
result = client.Log(log_buffer)
size_buffer = 0
log_buffer = []
if result == scribe.ResultCode.OK:
stats['ok'] += max_buffer
elif result == scribe.ResultCode.TRY_LATER:
stats['try_later'] += max_buffer
now = datetime.now()
if (now - delta) > last_log_file_update:
last_log_file_update = now
logger.info('Now process log: %s' % line)
logger.info('Messages successfully logged: %d' % stats['ok'])
logger.info('Messages to try later: %d' % stats['try_later'])
stats['ok'] = 0
stats['try_later'] = 0
if options.publish_stats:
publish_stats(stats)
def main():
# Disable the built in -h/--help because we already used -h for host.
parser = optparse.OptionParser(add_help_option=False)
parser.add_option('-v', '--verbose',
dest='verbose',
default=False,
action='store_true',
help='Verbose logging. (default: %default)')
parser.add_option('--log-file',
dest='log_file',
default=None,
help=('Log file name. When logging to file no output is '
'displayed on stdout. When running in daemonize mode default is '
'/var/log/scribetail-CATEGORY.log'))
parser.add_option('--publish-stats',
action='store_true',
dest='publish_stats',
default=False,
help='Publish statistics to Ganglia with gmetric. (default: %default)')
parser.add_option('', '--prepend_hostname',
action='store_true',
dest='prepend_hostname',
default=False,
help='Prepend the hostname to all log lines.')
parser.add_option('--pid_file',
action='store',
dest='pid_file',
default=None,
help=('The path to write the pid of the running scribe_tail '
'default is to not write a pid file.'))
parser.add_option('--daemonize',
action='store_true',
dest='daemonize',
default=False,
help='Have the process fork and run in the background.')
parser.add_option('-h', '--scribe_host', action='store', dest='host',
default='127.0.0.1',
help='The scribe host to connect to.')
parser.add_option('--port', action='store', dest='port', default=1463,
help='The port on the scribe server to connect to.')
parser.add_option('--filename', action='store', dest='filename',
help='The file name to tail into scribe.')
parser.add_option('--category', action='store', dest='category',
help='The category to use when talking to scribe.')
parser.add_option('--help', action='store_true', dest='help',
default=False, help='Display this help message.')
global options
global args
(options, args) = parser.parse_args(sys.argv[1:])
if options.help:
print parser.format_help()
sys.exit(0)
if len(args) == 2:
if len(sys.argv) not in (3, 5):
logger.info('Mixed use of old and new parameter style is not recommended.')
options.category = args[0]
options.filename = args[1]
host_port = options.host.split(':')
options.host = host_port[0]
if len(host_port) > 1:
options.port = int(host_port[1])
write_pid('/tmp/scribetail-%s.pid' % options.category)
if options.filename is None:
print >> sys.stderr, 'You must specify a file name using --filename'
print >> sys.stderr, parser.format_help()
sys.exit(1)
if options.category is None:
print >> sys.stderr, 'You must specify a category using -c'
print >> sys.stderr, parser.format_help()
sys.exit(1)
if options.daemonize:
daemonize()
logger = logging.getLogger()
if options.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
if options.daemonize:
if options.log_file == None:
options.log_file = '/var/log/scribetail-%s.log' % options.category
file_handler = logging.handlers.RotatingFileHandler(options.log_file,
maxBytes=10*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
else:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if options.daemonize:
cleanup_fds()
signal.signal(signal.SIGTERM, handler)
logger.info('Starting scribe_tail: %s', ' '.join(sys.argv))
# save our pid to allow this to be run in daemon mode.
if options.pid_file:
write_pid(options.pid_file)
# connect to thrift
socket = TSocket.TSocket(host=options.host, port=options.port)
transport = TTransport.TFramedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(trans=transport, strictRead=False,
strictWrite=False)
client = scribe.Client(iprot=protocol, oprot=protocol)
# start tail
while 1:
try:
transport.open()
do_tail(client, options.filename)
except TTransport.TTransportException:
print >> sys.stderr, "CAUGHT: TTransport.TTransportException"
transport.close()
try:
transport.open()
except TTransport.TTransportException:
print >> sys.stderr, "CAUGHT: TTransport.TTransportException"
except:
raise
time.sleep(15)
except:
raise
# never reached - should be able to deal with SIGINT here (future)
transport.close()
if result == scribe.ResultCode.OK:
sys.exit()
elif result == scribe.ResultCode.TRY_LATER:
print >> sys.stderr, "TRY_LATER"
sys.exit(84) # 'T'
else:
sys.exit("Unknown error code.")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logger.info('Exiting on KeyboardInterrupt.')
sys.exit()