diff --git a/.travis.yml b/.travis.yml index ac4b055..b89ea00 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,32 +1,22 @@ -# Config file for automatic testing at travis-ci.org -# This file will be regenerated if you run travis_pypi_setup.py - -language: python -python: 3.5 - -env: - - TOXENV=py35 - - TOXENV=py34 - - TOXENV=py33 - - TOXENV=py27 - - TOXENV=py26 - - TOXENV=pypy - -# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors -install: pip install -U tox - -# command to run tests, e.g. python setup.py test -script: tox -e ${TOXENV} - -# After you create the Github repo and add it to Travis, run the -# travis_pypi_setup.py script to finish PyPI deployment setup +# This file was autogenerated and will overwrite each time you run travis_pypi_setup.py deploy: - provider: pypi + true: + condition: $TOXENV == py27 + repo: bradrf/s3tail + tags: true distributions: sdist bdist_wheel - user: bradrf password: - secure: PLEASE_REPLACE_ME - on: - tags: true - repo: bradrf/s3tail - condition: $TOXENV == py27 + secure: gsGx86vc8eR/NTzF0seRO/1hJNPzeq0G+Kxo+LrzMSBHuU8gpQdaiRddv3grLTneWcBkUqZ20pWuFjbcIT1xMP1AndUX+JKhZvNTxmXxf+YO3wV0Erha9QFrA19gZUeorYpR3mNIF4zopC1sC/+4ZW4RRgAs+GWHaiTrvOss4XODAPsbiFdP3xiEJRxyCk+9+OyY58YNIC6XWCcq25ZKiNouPCrv30Nyrxu4mHRJNxw8LnTiWzgpBqhJGsHjvjxszHXoUJ+JSVXWaHV8Wh6xq/bU5+AoMQ/Fs3R7ItElLIwMx4IszRpqXGp4fI41tsObTlsd9S8lZpcGPgLeOhmntXcGFbSDTaTPtIoLc0CT+8KeDjxWkmNLr21smI4VRyX57lraOY6A+STz2pVgG35PDmWlQZdT+jWo32kgZE9TXaevn9v1FyWiYvpmTZ9WK4CSYZKRKTOVxbr8cm3RnNzfihksFYL5Hyvvtf+mV25j6eYZtrw/dd5KCmIFZg4XOARQWtlaUqras4FEbbTKZ6ZO7nPHv1AxDbTTSmcXqEMhewQPTJczZxMazdmU2Zk2ADYN83R6BLJvaaLkBFaa6qikDtzYnMeQq3/A11+z14CghEpgjh/Xcit6jErEKRMKfI+AR81Gia/ewTd5MW6crY3w2gOqCD8TUxLeGTAMNnqbz9k= + provider: pypi + user: bradrf +env: +- TOXENV=py35 +- TOXENV=py34 +- TOXENV=py33 +- TOXENV=py27 +- TOXENV=py26 +- TOXENV=pypy +install: pip install -U tox +language: python +python: 3.5 +script: tox -e ${TOXENV} diff --git a/requirements_dev.txt b/requirements_dev.txt index e1dae6c..29fa636 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,3 +1,4 @@ +boto==2.42.0 pip==8.1.2 bumpversion==0.5.3 wheel==0.29.0 diff --git a/s3tail/cli.py b/s3tail/cli.py index 28bba8f..a2ad0e4 100644 --- a/s3tail/cli.py +++ b/s3tail/cli.py @@ -1,14 +1,81 @@ # -*- coding: utf-8 -*- +''' +Utility to help "tail" AWS logs stored in S3 generated by S3 bucket +logging or ELB logging. +''' + import click +import sys +import signal +import errno +import logging +import re + +from boto import s3 +from s3tail import S3Tail @click.command() -def main(args=None): - """Console script for s3tail""" - click.echo("Replace this message by putting your code into " - "s3tail.cli.main") - click.echo("See click documentation at http://click.pocoo.org/") +@click.option('-r', '--region', type=click.Choice(r.name for r in s3.regions()), + help='AWS region to use when connecting') +@click.option('-b', '--bookmark', help='Bookmark of last key:line shown') +@click.option('-l', '--log-level', type=click.Choice(['debug','info','warning','error','critical']), + help='set logging level', default='info') +@click.option('--log-file', metavar='FILENAME', + help='write logs to FILENAME', default='STDOUT') +@click.option('--cache-hours', type=int, default=24, + help='Number of hours to keep in cache before removing on next run') +@click.argument('s3_uri') +def main(region, bookmark, log_level, log_file, cache_hours, s3_uri): + '''Begins tailing files found at [s3://]BUCKET[/PREFIX]''' + + s3_uri = re.sub(r'^(s3:)?/+', '', s3_uri) + bucket, prefix = s3_uri.split('/', 1) + + log_kwargs = { + 'level': getattr(logging, log_level.upper()) + } + if log_file != 'STDOUT': + log_kwargs['filename'] = log_file + logging.basicConfig(**log_kwargs) + logger = logging.getLogger('s3tail') + + class Track: + tail = None + last_key = None + last_num = None + show_pick_up = bookmark != None + + def signal_handler(signal, frame): + logger.info('Stopped processing at %s:%d', Track.last_key, Track.last_num) + logger.info('Bookmark: %s', tail.get_bookmark()) + sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) + + def progress(key): + Track.last_key = key + logger.info('Starting %s', key) + return True + + def dump(num, line): + Track.last_num = num + if Track.show_pick_up: + logger.info('Picked up at line %s', num) + Track.show_pick_up = False + print line + + tail = S3Tail(bucket, prefix, dump, + key_handler=progress, bookmark=bookmark, + region=region, hours=cache_hours) + try: + tail.watch() + except IOError as exc: + if exc.errno != errno.EPIPE: + raise + sys.exit(0) # just exit if piped to something that has terminated (i.e. head or tail) + logger.info('No more logs. Bookmark: %s', tail.get_bookmark()) + sys.exit(0) -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/s3tail/s3tail.py b/s3tail/s3tail.py index 40a96af..c82e65f 100644 --- a/s3tail/s3tail.py +++ b/s3tail/s3tail.py @@ -1 +1,227 @@ # -*- coding: utf-8 -*- + +''' +Utility to help "tail" AWS logs stored in S3 generated by S3 bucket +logging or ELB logging. +''' + +import os +import logging +from threading import Thread +from Queue import Queue +from datetime import datetime, timedelta +from tempfile import NamedTemporaryFile +from hashlib import sha256 +from boto import connect_s3 +from boto.s3 import connect_to_region + +# allow for digit ranges to be looked up! + +# add ability to expresss bookmark "manually" by setting the actual key of the +CURRENT+ file and do search looking for one previous? consider having all bookmarks like this! way better usability + +class Cache(object): + readers = [] + + class Reader(object): + def __init__(self, reader, cache_pn): + self.closed = False + self._logger = logging.getLogger('s3tail.cache.reader') + self._reader = reader + self._cache_pn = cache_pn + # write to a tempfile in case of failure; move into place when writing is complete + head, tail = os.path.split(cache_pn) + self._tempfile = NamedTemporaryFile(dir=head, prefix=tail) + self._writer = BackgroundWriter(self._tempfile, self._move_into_place) + self._writer.start() + Cache.readers.append(self) + + def read(self, size=-1): + data = self._reader.read(size) + self._writer.write(data) + return data + + def close(self): + self._reader.close() + self._writer.mark_done() # allow writer to finish async, not requiring caller to wait + self.closed = True + + def cleanup(self): + self._writer.join() + + def _move_into_place(self, _): + self._tempfile.delete = False # prevent removal on close + self._tempfile.close() + os.rename(self._tempfile.name, self._cache_pn) + Cache.readers.remove(self) + self._logger.debug('Placed: %s', self._cache_pn) + + def __init__(self, path, hours): + self._logger = logging.getLogger('s3tail.cache') + self.path = path + if not os.path.isdir(path): + os.mkdir(path) + # create shard buckets for sha hexstring names + chars = range(ord('0'), ord('9')+1) + range(ord('a'), ord('f')+1) + for i in chars: + for j in chars: + os.mkdir(os.path.join(path, chr(i)+chr(j))) + else: + cleaner = OldFileCleaner(path, hours) + cleaner.start() + + def open(self, name, reader): + safe_name = sha256(name).hexdigest() + cache_pn = os.path.join(self.path, safe_name[0:2], safe_name) + if os.path.exists(cache_pn): + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug('Found %s in cache: %s', name, cache_pn) + else: + self._logger.info('Found %s in cache', name) + reader.close() + return open(cache_pn) + return Cache.Reader(reader, cache_pn) + + def cleanup(self): + for reader in Cache.readers: + reader.cleanup() + +class S3Tail(object): + BUFFER_SIZE = 1 * (1024*1024) # MiB + + def __init__(self, bucket_name, prefix, line_handler, + key_handler=None, bookmark=None, region=None, hours=24): + self._cache = Cache(os.path.join(os.path.expanduser('~'), '.s3tailcache'), hours) + if region: + self._conn = connect_to_region(region) + else: + self._conn = connect_s3() + self._bucket = self._conn.get_bucket(bucket_name) + self._prefix = prefix + self._line_handler = line_handler + self._key_handler = key_handler + if bookmark: + self._bookmark_key, self._bookmark_line_num = bookmark.split(':') + if len(self._bookmark_key) == 0: + self._bookmark_key = None + else: + self._bookmark_line_num = int(self._bookmark_line_num) + else: + self._bookmark_key = None + self._bookmark_line_num = 0 + self._marker = None + self._buffer = None + self._line_num = None + + def get_bookmark(self): + if self._marker: + return self._marker + ':' + str(self._line_num) + if self._line_num: + return ':' + str(self._line_num) + + def watch(self): + for key in self._bucket.list(prefix=self._prefix, marker=self._bookmark_key): + self._bookmark_key = None + if self._key_handler: + result = self._key_handler(key.name) + if not result: + continue + result = self._read(key) + if result is not None: + return result + self._marker = key.name # marker always has to be _previous_ entry, not current + + def cleanup(self): + self._cache.cleanup() + + ###################################################################### + # private + + def _read(self, key): + self._buffer = '' + self._line_num = 0 + key.open() + reader = self._cache.open(key.name, key) + while not reader.closed: + line = self._next_line(reader) + self._line_num += 1 + if self._line_num < self._bookmark_line_num: + continue + self._bookmark_line_num = 0 + result = self._line_handler(self._line_num, line) + if result is not None: + return result + self._bookmark_line_num = 0 # safety in case bookmark count was larger than actual lines + + def _next_line(self, reader): + i = None + for _ in range(0, 3): # try reading up to three times the buffer size + i = self._buffer.find("\n") + if i > -1: + break + more_data = reader.read(S3Tail.BUFFER_SIZE) + if len(more_data) > 0: + self._buffer += more_data + else: + reader.close() + i = len(self._buffer) + 1 # use remaining info in buffer + break + line = self._buffer[0:i] + self._buffer = self._buffer[i+1:] + return line + +class BackgroundWriter(Thread): + def __init__(self, writer, done_callback=None): + '''Wraps a writer I/O object with background write calls. + + Optionally, will call the done_callback just before the thread stops (to allow caller to + close/operate on the writer) + ''' + super(BackgroundWriter, self).__init__() + self._done = False + self._done_callback = done_callback + self._queue = Queue() + self._writer = writer + + def write(self, data): + self._queue.put(data) + + def mark_done(self): + if not self._done: + self._done = True + self._queue.put(True) + + def join(self, timeout=None): + self.mark_done() + self._queue.join() + super(BackgroundWriter, self).join(timeout) + + def run(self): + while True: + data = self._queue.get() + if data is True: + self._queue.task_done() + if self._done_callback: + self._done_callback(self._writer) + return + self._writer.write(data) + self._queue.task_done() + +class OldFileCleaner(Thread): + def __init__(self, path, hours): + super(OldFileCleaner, self).__init__() + self._logger = logging.getLogger('s3tail.old_file_cleaner') + self._path = path + self._hours = hours + + def run(self): + count = 0 + for dirpath, _, filenames in os.walk(self._path): + for ent in filenames: + curpath = os.path.join(dirpath, ent) + file_modified = datetime.fromtimestamp(os.path.getatime(curpath)) + if datetime.now() - file_modified > timedelta(hours=self._hours): + self._logger.debug('Removing %s', curpath) + os.remove(curpath) + count += 1 + if count > 0: + self._logger.info('Cleaned up %d files', count) diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index f1392d3..b81a505 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ requirements = [ 'Click>=6.0', - # TODO: put package requirements here + 'boto>=2.42.0', ] test_requirements = [ diff --git a/tests/test_s3tail.py b/tests/test_s3tail.py old mode 100644 new mode 100755 index f27f28c..4db12a5 --- a/tests/test_s3tail.py +++ b/tests/test_s3tail.py @@ -28,13 +28,12 @@ def test_something(self): def test_command_line_interface(self): runner = CliRunner() result = runner.invoke(cli.main) - assert result.exit_code == 0 - assert 's3tail.cli.main' in result.output + assert result.exit_code == 2 + assert 'Missing argument "s3_uri"' in result.output help_result = runner.invoke(cli.main, ['--help']) assert help_result.exit_code == 0 - assert '--help Show this message and exit.' in help_result.output + assert 'Show this message and exit.' in help_result.output @classmethod def teardown_class(cls): pass -