Skip to content

Commit

Permalink
pull real s3 logic in and refactor for use with click
Browse files Browse the repository at this point in the history
  • Loading branch information
bradrf committed Aug 26, 2016
1 parent ddea0a2 commit 165f13f
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 40 deletions.
48 changes: 19 additions & 29 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
# Config file for automatic testing at travis-ci.org
# This file will be regenerated if you run travis_pypi_setup.py

language: python
python: 3.5

env:
- TOXENV=py35
- TOXENV=py34
- TOXENV=py33
- TOXENV=py27
- TOXENV=py26
- TOXENV=pypy

# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors
install: pip install -U tox

# command to run tests, e.g. python setup.py test
script: tox -e ${TOXENV}

# After you create the Github repo and add it to Travis, run the
# travis_pypi_setup.py script to finish PyPI deployment setup
# This file was autogenerated and will overwrite each time you run travis_pypi_setup.py
deploy:
provider: pypi
true:
condition: $TOXENV == py27
repo: bradrf/s3tail
tags: true
distributions: sdist bdist_wheel
user: bradrf
password:
secure: PLEASE_REPLACE_ME
on:
tags: true
repo: bradrf/s3tail
condition: $TOXENV == py27
secure: gsGx86vc8eR/NTzF0seRO/1hJNPzeq0G+Kxo+LrzMSBHuU8gpQdaiRddv3grLTneWcBkUqZ20pWuFjbcIT1xMP1AndUX+JKhZvNTxmXxf+YO3wV0Erha9QFrA19gZUeorYpR3mNIF4zopC1sC/+4ZW4RRgAs+GWHaiTrvOss4XODAPsbiFdP3xiEJRxyCk+9+OyY58YNIC6XWCcq25ZKiNouPCrv30Nyrxu4mHRJNxw8LnTiWzgpBqhJGsHjvjxszHXoUJ+JSVXWaHV8Wh6xq/bU5+AoMQ/Fs3R7ItElLIwMx4IszRpqXGp4fI41tsObTlsd9S8lZpcGPgLeOhmntXcGFbSDTaTPtIoLc0CT+8KeDjxWkmNLr21smI4VRyX57lraOY6A+STz2pVgG35PDmWlQZdT+jWo32kgZE9TXaevn9v1FyWiYvpmTZ9WK4CSYZKRKTOVxbr8cm3RnNzfihksFYL5Hyvvtf+mV25j6eYZtrw/dd5KCmIFZg4XOARQWtlaUqras4FEbbTKZ6ZO7nPHv1AxDbTTSmcXqEMhewQPTJczZxMazdmU2Zk2ADYN83R6BLJvaaLkBFaa6qikDtzYnMeQq3/A11+z14CghEpgjh/Xcit6jErEKRMKfI+AR81Gia/ewTd5MW6crY3w2gOqCD8TUxLeGTAMNnqbz9k=
provider: pypi
user: bradrf
env:
- TOXENV=py35
- TOXENV=py34
- TOXENV=py33
- TOXENV=py27
- TOXENV=py26
- TOXENV=pypy
install: pip install -U tox
language: python
python: 3.5
script: tox -e ${TOXENV}
1 change: 1 addition & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
boto==2.42.0
pip==8.1.2
bumpversion==0.5.3
wheel==0.29.0
Expand Down
79 changes: 73 additions & 6 deletions s3tail/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,81 @@
# -*- coding: utf-8 -*-

'''
Utility to help "tail" AWS logs stored in S3 generated by S3 bucket
logging or ELB logging.
'''

import click
import sys
import signal
import errno
import logging
import re

from boto import s3
from s3tail import S3Tail

@click.command()
def main(args=None):
"""Console script for s3tail"""
click.echo("Replace this message by putting your code into "
"s3tail.cli.main")
click.echo("See click documentation at http://click.pocoo.org/")
@click.option('-r', '--region', type=click.Choice(r.name for r in s3.regions()),
help='AWS region to use when connecting')
@click.option('-b', '--bookmark', help='Bookmark of last key:line shown')
@click.option('-l', '--log-level', type=click.Choice(['debug','info','warning','error','critical']),
help='set logging level', default='info')
@click.option('--log-file', metavar='FILENAME',
help='write logs to FILENAME', default='STDOUT')
@click.option('--cache-hours', type=int, default=24,
help='Number of hours to keep in cache before removing on next run')
@click.argument('s3_uri')
def main(region, bookmark, log_level, log_file, cache_hours, s3_uri):
'''Begins tailing files found at [s3://]BUCKET[/PREFIX]'''

s3_uri = re.sub(r'^(s3:)?/+', '', s3_uri)
bucket, prefix = s3_uri.split('/', 1)

log_kwargs = {
'level': getattr(logging, log_level.upper())
}
if log_file != 'STDOUT':
log_kwargs['filename'] = log_file
logging.basicConfig(**log_kwargs)
logger = logging.getLogger('s3tail')

class Track:
tail = None
last_key = None
last_num = None
show_pick_up = bookmark != None

def signal_handler(signal, frame):
logger.info('Stopped processing at %s:%d', Track.last_key, Track.last_num)
logger.info('Bookmark: %s', tail.get_bookmark())
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)

def progress(key):
Track.last_key = key
logger.info('Starting %s', key)
return True

def dump(num, line):
Track.last_num = num
if Track.show_pick_up:
logger.info('Picked up at line %s', num)
Track.show_pick_up = False
print line

tail = S3Tail(bucket, prefix, dump,
key_handler=progress, bookmark=bookmark,
region=region, hours=cache_hours)
try:
tail.watch()
except IOError as exc:
if exc.errno != errno.EPIPE:
raise
sys.exit(0) # just exit if piped to something that has terminated (i.e. head or tail)

logger.info('No more logs. Bookmark: %s', tail.get_bookmark())
sys.exit(0)

if __name__ == "__main__":
if __name__ == '__main__':
main()
226 changes: 226 additions & 0 deletions s3tail/s3tail.py
Original file line number Diff line number Diff line change
@@ -1 +1,227 @@
# -*- coding: utf-8 -*-

'''
Utility to help "tail" AWS logs stored in S3 generated by S3 bucket
logging or ELB logging.
'''

import os
import logging
from threading import Thread
from Queue import Queue
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from hashlib import sha256
from boto import connect_s3
from boto.s3 import connect_to_region

# allow for digit ranges to be looked up!

# add ability to expresss bookmark "manually" by setting the actual key of the +CURRENT+ file and do search looking for one previous? consider having all bookmarks like this! way better usability

class Cache(object):
readers = []

class Reader(object):
def __init__(self, reader, cache_pn):
self.closed = False
self._logger = logging.getLogger('s3tail.cache.reader')
self._reader = reader
self._cache_pn = cache_pn
# write to a tempfile in case of failure; move into place when writing is complete
head, tail = os.path.split(cache_pn)
self._tempfile = NamedTemporaryFile(dir=head, prefix=tail)
self._writer = BackgroundWriter(self._tempfile, self._move_into_place)
self._writer.start()
Cache.readers.append(self)

def read(self, size=-1):
data = self._reader.read(size)
self._writer.write(data)
return data

def close(self):
self._reader.close()
self._writer.mark_done() # allow writer to finish async, not requiring caller to wait
self.closed = True

def cleanup(self):
self._writer.join()

def _move_into_place(self, _):
self._tempfile.delete = False # prevent removal on close
self._tempfile.close()
os.rename(self._tempfile.name, self._cache_pn)
Cache.readers.remove(self)
self._logger.debug('Placed: %s', self._cache_pn)

def __init__(self, path, hours):
self._logger = logging.getLogger('s3tail.cache')
self.path = path
if not os.path.isdir(path):
os.mkdir(path)
# create shard buckets for sha hexstring names
chars = range(ord('0'), ord('9')+1) + range(ord('a'), ord('f')+1)
for i in chars:
for j in chars:
os.mkdir(os.path.join(path, chr(i)+chr(j)))
else:
cleaner = OldFileCleaner(path, hours)
cleaner.start()

def open(self, name, reader):
safe_name = sha256(name).hexdigest()
cache_pn = os.path.join(self.path, safe_name[0:2], safe_name)
if os.path.exists(cache_pn):
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug('Found %s in cache: %s', name, cache_pn)
else:
self._logger.info('Found %s in cache', name)
reader.close()
return open(cache_pn)
return Cache.Reader(reader, cache_pn)

def cleanup(self):
for reader in Cache.readers:
reader.cleanup()

class S3Tail(object):
BUFFER_SIZE = 1 * (1024*1024) # MiB

def __init__(self, bucket_name, prefix, line_handler,
key_handler=None, bookmark=None, region=None, hours=24):
self._cache = Cache(os.path.join(os.path.expanduser('~'), '.s3tailcache'), hours)
if region:
self._conn = connect_to_region(region)
else:
self._conn = connect_s3()
self._bucket = self._conn.get_bucket(bucket_name)
self._prefix = prefix
self._line_handler = line_handler
self._key_handler = key_handler
if bookmark:
self._bookmark_key, self._bookmark_line_num = bookmark.split(':')
if len(self._bookmark_key) == 0:
self._bookmark_key = None
else:
self._bookmark_line_num = int(self._bookmark_line_num)
else:
self._bookmark_key = None
self._bookmark_line_num = 0
self._marker = None
self._buffer = None
self._line_num = None

def get_bookmark(self):
if self._marker:
return self._marker + ':' + str(self._line_num)
if self._line_num:
return ':' + str(self._line_num)

def watch(self):
for key in self._bucket.list(prefix=self._prefix, marker=self._bookmark_key):
self._bookmark_key = None
if self._key_handler:
result = self._key_handler(key.name)
if not result:
continue
result = self._read(key)
if result is not None:
return result
self._marker = key.name # marker always has to be _previous_ entry, not current

def cleanup(self):
self._cache.cleanup()

######################################################################
# private

def _read(self, key):
self._buffer = ''
self._line_num = 0
key.open()
reader = self._cache.open(key.name, key)
while not reader.closed:
line = self._next_line(reader)
self._line_num += 1
if self._line_num < self._bookmark_line_num:
continue
self._bookmark_line_num = 0
result = self._line_handler(self._line_num, line)
if result is not None:
return result
self._bookmark_line_num = 0 # safety in case bookmark count was larger than actual lines

def _next_line(self, reader):
i = None
for _ in range(0, 3): # try reading up to three times the buffer size
i = self._buffer.find("\n")
if i > -1:
break
more_data = reader.read(S3Tail.BUFFER_SIZE)
if len(more_data) > 0:
self._buffer += more_data
else:
reader.close()
i = len(self._buffer) + 1 # use remaining info in buffer
break
line = self._buffer[0:i]
self._buffer = self._buffer[i+1:]
return line

class BackgroundWriter(Thread):
def __init__(self, writer, done_callback=None):
'''Wraps a writer I/O object with background write calls.
Optionally, will call the done_callback just before the thread stops (to allow caller to
close/operate on the writer)
'''
super(BackgroundWriter, self).__init__()
self._done = False
self._done_callback = done_callback
self._queue = Queue()
self._writer = writer

def write(self, data):
self._queue.put(data)

def mark_done(self):
if not self._done:
self._done = True
self._queue.put(True)

def join(self, timeout=None):
self.mark_done()
self._queue.join()
super(BackgroundWriter, self).join(timeout)

def run(self):
while True:
data = self._queue.get()
if data is True:
self._queue.task_done()
if self._done_callback:
self._done_callback(self._writer)
return
self._writer.write(data)
self._queue.task_done()

class OldFileCleaner(Thread):
def __init__(self, path, hours):
super(OldFileCleaner, self).__init__()
self._logger = logging.getLogger('s3tail.old_file_cleaner')
self._path = path
self._hours = hours

def run(self):
count = 0
for dirpath, _, filenames in os.walk(self._path):
for ent in filenames:
curpath = os.path.join(dirpath, ent)
file_modified = datetime.fromtimestamp(os.path.getatime(curpath))
if datetime.now() - file_modified > timedelta(hours=self._hours):
self._logger.debug('Removing %s', curpath)
os.remove(curpath)
count += 1
if count > 0:
self._logger.info('Cleaned up %d files', count)
2 changes: 1 addition & 1 deletion setup.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

requirements = [
'Click>=6.0',
# TODO: put package requirements here
'boto>=2.42.0',
]

test_requirements = [
Expand Down
7 changes: 3 additions & 4 deletions tests/test_s3tail.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ def test_something(self):
def test_command_line_interface(self):
runner = CliRunner()
result = runner.invoke(cli.main)
assert result.exit_code == 0
assert 's3tail.cli.main' in result.output
assert result.exit_code == 2
assert 'Missing argument "s3_uri"' in result.output
help_result = runner.invoke(cli.main, ['--help'])
assert help_result.exit_code == 0
assert '--help Show this message and exit.' in help_result.output
assert 'Show this message and exit.' in help_result.output

@classmethod
def teardown_class(cls):
pass

0 comments on commit 165f13f

Please sign in to comment.