Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find runs by glob-ing levels of subdirectories #1087

Merged
merged 4 commits into from
Apr 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ py_library(
srcs_version = "PY2AND3",
deps = [
"//tensorboard:expect_tensorflow_installed",
"@org_pythonhosted_six",
],
)

Expand All @@ -24,6 +25,7 @@ py_test(
deps = [
":io_wrapper",
"//tensorboard:expect_tensorflow_installed",
"@org_pythonhosted_six",
],
)

Expand Down Expand Up @@ -94,6 +96,7 @@ py_library(
deps = [
":directory_watcher",
":event_file_loader",
":io_wrapper",
":plugin_asset_util",
":reservoir",
"//tensorboard:data_compat",
Expand Down Expand Up @@ -188,7 +191,7 @@ py_library(
srcs_version = "PY2AND3",
deps = [
":event_accumulator",
":event_multiplexer",
":io_wrapper",
"//tensorboard:expect_tensorflow_installed",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ def Fake(*args, **kwargs):

FakeFactory.has_been_called = False

for stub_name in ['ListDirectoryAbsolute', 'ListRecursively']:
stub_names = [
'ListDirectoryAbsolute',
'ListRecursivelyViaGlobbing',
'ListRecursivelyViaWalking',
]
for stub_name in stub_names:
self.stubs.Set(io_wrapper, stub_name,
FakeFactory(getattr(io_wrapper, stub_name)))
for stub_name in ['IsDirectory', 'Exists', 'Stat']:
Expand Down
25 changes: 5 additions & 20 deletions tensorboard/backend/event_processing/event_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
from __future__ import print_function

import collections
import os
import threading

import tensorflow as tf

from tensorboard.backend.event_processing import directory_watcher
from tensorboard.backend.event_processing import event_file_loader
from tensorboard.backend.event_processing import io_wrapper
from tensorboard.backend.event_processing import plugin_asset_util
from tensorboard.backend.event_processing import reservoir
from tensorboard.plugins.distribution import compressor
Expand Down Expand Up @@ -98,23 +98,6 @@
}


def IsTensorFlowEventsFile(path):
"""Check the path name to see if it is probably a TF Events file.

Args:
path: A file path to check if it is an event file.

Raises:
ValueError: If the path is an empty string.

Returns:
If path is formatted like a TensorFlowEventsFile.
"""
if not path:
raise ValueError('Path must be a nonempty string')
return 'tfevents' in tf.compat.as_str_any(os.path.basename(path))


class EventAccumulator(object):
"""An `EventAccumulator` takes an event generator, and accumulates the values.

Expand Down Expand Up @@ -747,11 +730,13 @@ def _GeneratorFromPath(path):
"""Create an event generator for file or directory at given path string."""
if not path:
raise ValueError('path must be a valid string')
if IsTensorFlowEventsFile(path):
if io_wrapper.IsTensorFlowEventsFile(path):
return event_file_loader.EventFileLoader(path)
else:
return directory_watcher.DirectoryWatcher(
path, event_file_loader.EventFileLoader, IsTensorFlowEventsFile)
path,
event_file_loader.EventFileLoader,
io_wrapper.IsTensorFlowEventsFile)


def _ParseFileVersion(file_version):
Expand Down
12 changes: 6 additions & 6 deletions tensorboard/backend/event_processing/event_file_inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@

from tensorboard.backend.event_processing import event_accumulator
from tensorboard.backend.event_processing import event_file_loader
from tensorboard.backend.event_processing import event_multiplexer
from tensorboard.backend.event_processing import io_wrapper

FLAGS = tf.flags.FLAGS

Expand Down Expand Up @@ -323,12 +323,12 @@ def generators_from_logdir(logdir):
Returns:
List of event generators for each subdirectory with event files.
"""
subdirs = event_multiplexer.GetLogdirSubdirectories(logdir)
subdirs = io_wrapper.GetLogdirSubdirectories(logdir)
generators = [
itertools.chain(*[
generator_from_event_file(os.path.join(subdir, f))
for f in tf.gfile.ListDirectory(subdir)
if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f))
if io_wrapper.IsTensorFlowEventsFile(os.path.join(subdir, f))
]) for subdir in subdirs
]
return generators
Expand Down Expand Up @@ -356,13 +356,13 @@ def get_inspection_units(logdir='', event_file='', tag=''):
A list of InspectionUnit objects.
"""
if logdir:
subdirs = event_multiplexer.GetLogdirSubdirectories(logdir)
subdirs = io_wrapper.GetLogdirSubdirectories(logdir)
inspection_units = []
for subdir in subdirs:
generator = itertools.chain(*[
generator_from_event_file(os.path.join(subdir, f))
for f in tf.gfile.ListDirectory(subdir)
if event_accumulator.IsTensorFlowEventsFile(os.path.join(subdir, f))
if io_wrapper.IsTensorFlowEventsFile(os.path.join(subdir, f))
])
inspection_units.append(InspectionUnit(
name=subdir,
Expand All @@ -371,7 +371,7 @@ def get_inspection_units(logdir='', event_file='', tag=''):
if inspection_units:
print('Found event files in:\n{}\n'.format('\n'.join(
[u.name for u in inspection_units])))
elif event_accumulator.IsTensorFlowEventsFile(logdir):
elif io_wrapper.IsTensorFlowEventsFile(logdir):
print(
'It seems that {} may be an event file instead of a logdir. If this '
'is the case, use --event_file instead of --logdir to pass '
Expand Down
16 changes: 1 addition & 15 deletions tensorboard/backend/event_processing/event_multiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def AddRunsFromDirectory(self, path, name=None):
The `EventMultiplexer`.
"""
tf.logging.info('Starting AddRunsFromDirectory: %s', path)
for subdir in GetLogdirSubdirectories(path):
for subdir in io_wrapper.GetLogdirSubdirectories(path):
tf.logging.info('Adding events from directory %s', subdir)
rpath = os.path.relpath(subdir, path)
subname = os.path.join(name, rpath) if name else rpath
Expand Down Expand Up @@ -480,17 +480,3 @@ def GetAccumulator(self, run):
"""
with self._accumulators_mutex:
return self._accumulators[run]


def GetLogdirSubdirectories(path):
"""Returns subdirectories with event files on path."""
if tf.gfile.Exists(path) and not tf.gfile.IsDirectory(path):
raise ValueError('GetLogdirSubdirectories: path exists and is not a '
'directory, %s' % path)

# ListRecursively just yields nothing if the path doesn't exist.
return (
subdir
for (subdir, files) in io_wrapper.ListRecursively(path)
if list(filter(event_accumulator.IsTensorFlowEventsFile, files))
)
158 changes: 152 additions & 6 deletions tensorboard/backend/event_processing/io_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,182 @@
from __future__ import division
from __future__ import print_function

import collections
import os
import re

import six
import tensorflow as tf

_ESCAPE_GLOB_CHARACTERS_REGEX = re.compile('([*?[])')


# TODO(chihuahua): Rename this method to use camel-case for GCS (Gcs).
def IsGCSPath(path):
return path.startswith("gs://")


def IsCnsPath(path):
return path.startswith("/cns/")


def IsTensorFlowEventsFile(path):
"""Check the path name to see if it is probably a TF Events file.

Args:
path: A file path to check if it is an event file.

Raises:
ValueError: If the path is an empty string.

Returns:
If path is formatted like a TensorFlowEventsFile.
"""
if not path:
raise ValueError('Path must be a nonempty string')
return 'tfevents' in tf.compat.as_str_any(os.path.basename(path))


def ListDirectoryAbsolute(directory):
"""Yields all files in the given directory. The paths are absolute."""
return (os.path.join(directory, path)
for path in tf.gfile.ListDirectory(directory))


def ListRecursively(top):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it could be good to keep the word "recursive" in the new function names just to signal that they're listing the entire directory tree. It's clear in the docstring but not so much from the function name itself.

I wonder it it might make sense to call these ListRecursivelyViaGlobbing() and ListRecursivelyViaWalking()? While the implementations are different, functionally they are basically interchangeable aside from slightly different output structures (which actually could be made the same by just doing per-subdir grouping in the globbing function before yielding).

Copy link
Member Author

@chihuahua chihuahua Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those names sound great! They indicate recursion. Furthermore, the methods (almost) return the same values now. ListRecursivelyViaGlobbing raises an exception when the directory does not exist.

def _EscapeGlobCharacters(path):
"""Escapes the glob characters in a path.

Python 3 has a glob.escape method, but python 2 lacks it, so we manually
implement this method.

Args:
path: The absolute path to escape.

Returns:
The escaped path string.
"""
drive, path = os.path.splitdrive(path)
return '%s%s' % (drive, _ESCAPE_GLOB_CHARACTERS_REGEX.sub(r'[\1]', path))


def ListRecursivelyViaGlobbing(top):
"""Recursively lists all files within the directory.

This method does not list subdirectories (in addition to regular files), and
the file paths are all absolute. If the directory does not exist, this yields
nothing.

This method does so by glob-ing deeper and deeper directories, ie
foo/*, foo/*/*, foo/*/*/* and so on until all files are listed. All file
paths are absolute, and this method lists subdirectories too.

For certain file systems, Globbing via this method may prove
significantly faster than recursively walking a directory.
Specifically, file systems that implement analogs to TensorFlow's
FileSystem.GetMatchingPaths method could save costly disk reads by using
this method. However, for other file systems, this method might prove slower
because the file system performs a walk per call to glob (in which case it
might as well just perform 1 walk).

Args:
top: A path to a directory.

Yields:
A (dir_path, file_paths) tuple for each directory/subdirectory.
"""
current_glob_string = os.path.join(_EscapeGlobCharacters(top), '*')
level = 0

while True:
tf.logging.info('GlobAndListFiles: Starting to glob level %d', level)
glob = tf.gfile.Glob(current_glob_string)
tf.logging.info(
'GlobAndListFiles: %d files glob-ed at level %d', len(glob), level)

if not glob:
# This subdirectory level lacks files. Terminate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I thought of another optional optimization. Don't have to do it in this PR but maybe add a TODO(nickfelt) to implement at some point? The idea is that if at any point the glob returns files that are all in a single directory (i.e. len(set(map(os.path.dirname, glob))) == 1) then we can replace the current globbing path with that directory as the literal prefix. We can think of this as basically saying that once the globbing is decending down a single branch in the directory tree, we can focus on that specific branch rather than still globbing through all the layers. This should improve efficiency in cases where a single subdir is significantly deeper than the rest of the log directory subdirs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! Indeed, this should improve efficiency for that scenario! Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks for implementing this! It'd probably be good to have a test that exercises this just to be sure it works at producing the same listing as the non-optimized version. Could be done by just having a structure like:

  • root
    • dir1
      • fileA
    • dir2
      • fileB
      • dir2-child
        • fileC
        • dir2-grandchild1
          • fileD
        • dir2-grandchild2
          • fileE

After listing root/*/*/* which should produce /root/dir2/dir2-child/fileC and /root/dir2/dir2-child/dir2-grandchild1 and 2, the "pruning" behavior should kick in and the next listing should be /root/dir2/dir2-child/*/* which should produce /root/dir2/dir2-child/dir2-grandchild1/fileD and dir2-grandchild2/fileE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I made the deep directory structure test this case. Specifically, see quuz/garply/corge and quuz/garply/grault.

return

# Map subdirectory to a list of files.
pairs = collections.defaultdict(list)
for file_path in glob:
pairs[os.path.dirname(file_path)].append(file_path)
for dir_name, file_paths in six.iteritems(pairs):
yield (dir_name, tuple(file_paths))

if len(pairs) == 1:
# If at any point the glob returns files that are all in a single
# directory, replace the current globbing path with that directory as the
# literal prefix. This should improve efficiency in cases where a single
# subdir is significantly deeper than the rest of the sudirs.
current_glob_string = os.path.join(list(pairs.keys())[0], '*')

# Iterate to the next level of subdirectories.
current_glob_string = os.path.join(current_glob_string, '*')
level += 1


def ListRecursivelyViaWalking(top):
"""Walks a directory tree, yielding (dir_path, file_paths) tuples.

For each of `top` and its subdirectories, yields a tuple containing the path
to the directory and the path to each of the contained files. Note that
unlike os.Walk()/tf.gfile.Walk(), this does not list subdirectories and the
file paths are all absolute.
unlike os.Walk()/tf.gfile.Walk()/ListRecursivelyViaGlobbing, this does not
list subdirectories. The file paths are all absolute. If the directory does
not exist, this yields nothing.

If the directory does not exist, this yields nothing.
Walking may be incredibly slow on certain file systems.

Args:
top: A path to a directory..
top: A path to a directory.

Yields:
A list of (dir_path, file_paths) tuples.
A (dir_path, file_paths) tuple for each directory/subdirectory.
"""
for dir_path, _, filenames in tf.gfile.Walk(top):
yield (dir_path, (os.path.join(dir_path, filename)
for filename in filenames))


def GetLogdirSubdirectories(path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a test for GetLogdirSubdirectories() itself? It can probably be lighter weight since it's not testing the full recursive walk algorithm, but something that checks the basic logic of returning only subdirs that contain at least one events file seems good to have.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Done.

"""Obtains all subdirectories with events files.

The order of the subdirectories returned is unspecified. The internal logic
that determines order varies by scenario.

Args:
path: The path to a directory under which to find subdirectories.

Returns:
A tuple of absolute paths of all subdirectories each with at least 1 events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth mentioning that the order of the subdirectories returned is unspecified, since the two different methods return the directories in different orders.

Also, this could be done as a generator to match the old version of this function, with a little more work around the unique-ifying logic for the glob implementation. Do you think that would be useful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! ListRecursivelyViaGlobbing and ListRecursivelyViaWalking now yield the same types of tuples. One difference is that the lists of absolute paths returned by ListRecursivelyViaGlobbing include subdirectories.

file directly within the subdirectory.

Raises:
ValueError: If the path passed to the method exists and is not a directory.
"""
if not tf.gfile.Exists(path):
# No directory to traverse.
return ()

if not tf.gfile.IsDirectory(path):
raise ValueError('GetLogdirSubdirectories: path exists and is not a '
'directory, %s' % path)

if IsGCSPath(path) or IsCnsPath(path):
# Glob-ing for files can be significantly faster than recursively
# walking through directories for some file systems.
tf.logging.info(
'GetLogdirSubdirectories: Starting to list directories via glob-ing.')
traversal_method = ListRecursivelyViaGlobbing
else:
# For other file systems, the glob-ing based method might be slower because
# each call to glob could involve performing a recursive walk.
tf.logging.info(
'GetLogdirSubdirectories: Starting to list directories via walking.')
traversal_method = ListRecursivelyViaWalking

return (
subdir
for (subdir, files) in traversal_method(path)
if any(IsTensorFlowEventsFile(f) for f in files)
)
Loading