diff --git a/.travis.yml b/.travis.yml index b89ea00..1cf2106 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ env: - TOXENV=py33 - TOXENV=py27 - TOXENV=py26 -- TOXENV=pypy +#- TOXENV=pypy install: pip install -U tox language: python python: 3.5 diff --git a/README.rst b/README.rst index 1a13c35..07df109 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,6 @@ s3tail =============================== - .. image:: https://img.shields.io/pypi/v/s3tail.svg :target: https://pypi.python.org/pypi/s3tail @@ -18,8 +17,8 @@ s3tail :alt: Updates -Console utility app to retrieve and cat files stored in AWS S3 - +S3tail is a simple tool to help access log files stored in an S3 bucket in the same way one might +use the *nix `tail` command (with far fewer options, notably the lack of a `follow` option). * Free software: MIT license * Documentation: https://s3tail.readthedocs.io. @@ -28,8 +27,45 @@ Console utility app to retrieve and cat files stored in AWS S3 Features -------- +S3tail downloads and displays the content of files stored in S3, optionally starting at a specific +prefix. For example, the following will start dumping all the log file contents found for August the +fourth in the order S3 provides from that prefix onward:: + + $ s3tail s3://my-logs/production/s3/production-s3-access-2016-08-04 + +When s3tail is stopped or interrupted, it'll print a bookmark to be used to pick up at the exact +spot following the last log printed in a previous run. Something like the following might be used to +leverage this ability to continue tailing from a previous stopping point:: + + $ s3tail s3://my-logs/production/s3/production-s3-access-2016-08-04 + ... + ...a-bunch-of-file-output... + ... + INFO:s3tail:Bookmark: production/s3/production-s3-access-2016-08-04-00-20-31-61059F36E0DBF36E:706 + +This can then be used to pick up at line `707` later on, like this:: + + $ s3tail s3://my-logs/production/s3/production-s3-access-2016-08-04 \ + -b production/s3/production-s3-access-2016-08-04-00-20-31-61059F36E0DBF36E:706 + +It's safe to re-run the s3tail sessions when working with piped commands searching for data in the +stream (e.g. `grep`). S3tail keeps files in a local file system cache for 24 hours by default, and +will always read and display from the cache if found before trying to download the content from +S3. This is all done in a best-effort background thread to avoid impacting performance. + +The file cache is stored in the user's `HOME` directory, in an `.s3tailcache` subdirectory, hashed +by the S3 keynames using SHA-256 to avoid collisions. + +Check out `s3tail --help` for full usage information. + * TODO + * allow for digit ranges to be looked up + + * add ability to expresss bookmark "manually" by setting the actual key of the *CURRENT* file and + do search looking for one previous? consider having all bookmarks like this! way better + usability + Credits --------- @@ -37,4 +73,3 @@ This package was created with Cookiecutter_ and the `audreyr/cookiecutter-pypack .. _Cookiecutter: https://github.com/audreyr/cookiecutter .. _`audreyr/cookiecutter-pypackage`: https://github.com/audreyr/cookiecutter-pypackage - diff --git a/s3tail/background_writer.py b/s3tail/background_writer.py new file mode 100644 index 0000000..78aac34 --- /dev/null +++ b/s3tail/background_writer.py @@ -0,0 +1,39 @@ +from queue import Queue +from threading import Thread + +class BackgroundWriter(Thread): + def __init__(self, writer, done_callback=None): + '''Wraps a writer I/O object with background write calls. + + Optionally, will call the done_callback just before the thread stops (to allow caller to + close/operate on the writer) + ''' + super(BackgroundWriter, self).__init__() + self._done = False + self._done_callback = done_callback + self._queue = Queue() + self._writer = writer + + def write(self, data): + self._queue.put(data) + + def mark_done(self): + if not self._done: + self._done = True + self._queue.put(True) + + def join(self, timeout=None): + self.mark_done() + self._queue.join() + super(BackgroundWriter, self).join(timeout) + + def run(self): + while True: + data = self._queue.get() + if data is True: + self._queue.task_done() + if self._done_callback: + self._done_callback(self._writer) + return + self._writer.write(data) + self._queue.task_done() diff --git a/s3tail/cache.py b/s3tail/cache.py new file mode 100644 index 0000000..0cba0d7 --- /dev/null +++ b/s3tail/cache.py @@ -0,0 +1,77 @@ +import os +import logging + +from hashlib import sha256 +from tempfile import NamedTemporaryFile + +from .background_writer import BackgroundWriter +from .old_file_cleaner import OldFileCleaner + +class Cache(object): + readers = [] + + def __init__(self, path, hours): + self._logger = logging.getLogger('s3tail.cache') + self.path = path + if not os.path.isdir(path): + os.mkdir(path) + # create shard buckets for sha hexstring names + chars = range(ord('0'), ord('9')+1) + range(ord('a'), ord('f')+1) + for i in chars: + for j in chars: + os.mkdir(os.path.join(path, chr(i)+chr(j))) + else: + cleaner = OldFileCleaner(path, hours) + cleaner.start() + + def open(self, name, reader): + safe_name = sha256(name).hexdigest() + cache_pn = os.path.join(self.path, safe_name[0:2], safe_name) + if os.path.exists(cache_pn): + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug('Found %s in cache: %s', name, cache_pn) + else: + self._logger.info('Found %s in cache', name) + reader.close() + return open(cache_pn) + return Cache._Reader(reader, cache_pn) + + def cleanup(self): + for reader in Cache.readers: + reader.cleanup() + + ###################################################################### + # private + + class _Reader(object): + def __init__(self, reader, cache_pn): + self.closed = False + self._logger = logging.getLogger('s3tail.cache.reader') + self._reader = reader + self._cache_pn = cache_pn + # write to a tempfile in case of failure; move into place when writing is complete + head, tail = os.path.split(cache_pn) + self._tempfile = NamedTemporaryFile(dir=head, prefix=tail) + self._writer = BackgroundWriter(self._tempfile, self._move_into_place) + self._writer.start() + Cache.readers.append(self) + + def read(self, size=-1): + data = self._reader.read(size) + self._writer.write(data) + return data + + def close(self): + self._reader.close() + self._writer.mark_done() # allow writer to finish async, not requiring caller to wait + self.closed = True + + def cleanup(self): + self._writer.join() + + def _move_into_place(self, _): + self._tempfile.delete = False # prevent removal on close + self._tempfile.close() + os.rename(self._tempfile.name, self._cache_pn) + Cache.readers.remove(self) + self._logger.debug('Placed: %s', self._cache_pn) diff --git a/s3tail/cli.py b/s3tail/cli.py index 5a3acd3..43163ee 100644 --- a/s3tail/cli.py +++ b/s3tail/cli.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - ''' Utility to help "tail" AWS logs stored in S3 generated by S3 bucket logging or ELB logging. @@ -13,6 +11,7 @@ import re from boto import s3 + from .s3tail import S3Tail @click.command() @@ -72,6 +71,7 @@ def dump(num, line): except IOError as exc: if exc.errno != errno.EPIPE: raise + logger.info('Interrupted pipe. Bookmark: %s', tail.get_bookmark()) sys.exit(0) # just exit if piped to something that has terminated (i.e. head or tail) logger.info('No more logs. Bookmark: %s', tail.get_bookmark()) diff --git a/s3tail/old_file_cleaner.py b/s3tail/old_file_cleaner.py new file mode 100644 index 0000000..62c5eef --- /dev/null +++ b/s3tail/old_file_cleaner.py @@ -0,0 +1,25 @@ +import os +import logging + +from threading import Thread +from datetime import datetime, timedelta + +class OldFileCleaner(Thread): + def __init__(self, path, hours): + super(OldFileCleaner, self).__init__() + self._logger = logging.getLogger('s3tail.old_file_cleaner') + self._path = path + self._hours = hours + + def run(self): + count = 0 + for dirpath, _, filenames in os.walk(self._path): + for ent in filenames: + curpath = os.path.join(dirpath, ent) + file_modified = datetime.fromtimestamp(os.path.getatime(curpath)) + if datetime.now() - file_modified > timedelta(hours=self._hours): + self._logger.debug('Removing %s', curpath) + os.remove(curpath) + count += 1 + if count > 0: + self._logger.info('Cleaned up %d files', count) diff --git a/s3tail/s3tail.py b/s3tail/s3tail.py index e404144..28994bc 100644 --- a/s3tail/s3tail.py +++ b/s3tail/s3tail.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - ''' Utility to help "tail" AWS logs stored in S3 generated by S3 bucket logging or ELB logging. @@ -7,83 +5,11 @@ import os import logging -from threading import Thread -from queue import Queue -from datetime import datetime, timedelta -from tempfile import NamedTemporaryFile -from hashlib import sha256 + from boto import connect_s3 from boto.s3 import connect_to_region -# allow for digit ranges to be looked up! - -# add ability to expresss bookmark "manually" by setting the actual key of the +CURRENT+ file and do search looking for one previous? consider having all bookmarks like this! way better usability - -class Cache(object): - readers = [] - - class Reader(object): - def __init__(self, reader, cache_pn): - self.closed = False - self._logger = logging.getLogger('s3tail.cache.reader') - self._reader = reader - self._cache_pn = cache_pn - # write to a tempfile in case of failure; move into place when writing is complete - head, tail = os.path.split(cache_pn) - self._tempfile = NamedTemporaryFile(dir=head, prefix=tail) - self._writer = BackgroundWriter(self._tempfile, self._move_into_place) - self._writer.start() - Cache.readers.append(self) - - def read(self, size=-1): - data = self._reader.read(size) - self._writer.write(data) - return data - - def close(self): - self._reader.close() - self._writer.mark_done() # allow writer to finish async, not requiring caller to wait - self.closed = True - - def cleanup(self): - self._writer.join() - - def _move_into_place(self, _): - self._tempfile.delete = False # prevent removal on close - self._tempfile.close() - os.rename(self._tempfile.name, self._cache_pn) - Cache.readers.remove(self) - self._logger.debug('Placed: %s', self._cache_pn) - - def __init__(self, path, hours): - self._logger = logging.getLogger('s3tail.cache') - self.path = path - if not os.path.isdir(path): - os.mkdir(path) - # create shard buckets for sha hexstring names - chars = range(ord('0'), ord('9')+1) + range(ord('a'), ord('f')+1) - for i in chars: - for j in chars: - os.mkdir(os.path.join(path, chr(i)+chr(j))) - else: - cleaner = OldFileCleaner(path, hours) - cleaner.start() - - def open(self, name, reader): - safe_name = sha256(name).hexdigest() - cache_pn = os.path.join(self.path, safe_name[0:2], safe_name) - if os.path.exists(cache_pn): - if self._logger.isEnabledFor(logging.DEBUG): - self._logger.debug('Found %s in cache: %s', name, cache_pn) - else: - self._logger.info('Found %s in cache', name) - reader.close() - return open(cache_pn) - return Cache.Reader(reader, cache_pn) - - def cleanup(self): - for reader in Cache.readers: - reader.cleanup() +from .cache import Cache class S3Tail(object): BUFFER_SIZE = 1 * (1024*1024) # MiB @@ -168,60 +94,3 @@ def _next_line(self, reader): line = self._buffer[0:i] self._buffer = self._buffer[i+1:] return line - -class BackgroundWriter(Thread): - def __init__(self, writer, done_callback=None): - '''Wraps a writer I/O object with background write calls. - - Optionally, will call the done_callback just before the thread stops (to allow caller to - close/operate on the writer) - ''' - super(BackgroundWriter, self).__init__() - self._done = False - self._done_callback = done_callback - self._queue = Queue() - self._writer = writer - - def write(self, data): - self._queue.put(data) - - def mark_done(self): - if not self._done: - self._done = True - self._queue.put(True) - - def join(self, timeout=None): - self.mark_done() - self._queue.join() - super(BackgroundWriter, self).join(timeout) - - def run(self): - while True: - data = self._queue.get() - if data is True: - self._queue.task_done() - if self._done_callback: - self._done_callback(self._writer) - return - self._writer.write(data) - self._queue.task_done() - -class OldFileCleaner(Thread): - def __init__(self, path, hours): - super(OldFileCleaner, self).__init__() - self._logger = logging.getLogger('s3tail.old_file_cleaner') - self._path = path - self._hours = hours - - def run(self): - count = 0 - for dirpath, _, filenames in os.walk(self._path): - for ent in filenames: - curpath = os.path.join(dirpath, ent) - file_modified = datetime.fromtimestamp(os.path.getatime(curpath)) - if datetime.now() - file_modified > timedelta(hours=self._hours): - self._logger.debug('Removing %s', curpath) - os.remove(curpath) - count += 1 - if count > 0: - self._logger.info('Cleaned up %d files', count)