Skip to content

Commit

Permalink
backend: implement event file replacement detection (tensorflow#5529)
Browse files Browse the repository at this point in the history
  • Loading branch information
nfelt authored and yatbear committed Mar 27, 2023
1 parent 8c3e538 commit 05935bd
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 7 deletions.
2 changes: 2 additions & 0 deletions tensorboard/backend/event_processing/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ py_test(
":event_file_loader",
"//tensorboard:expect_tensorflow_installed",
"//tensorboard:test",
"//tensorboard/compat",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/summary/writer",
],
Expand All @@ -200,6 +201,7 @@ py_test(
deps = [
":event_file_loader",
"//tensorboard:test",
"//tensorboard/compat",
"//tensorboard/compat:no_tensorflow",
"//tensorboard/compat/proto:protos_all_py_pb2",
"//tensorboard/summary/writer",
Expand Down
87 changes: 84 additions & 3 deletions tensorboard/backend/event_processing/event_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,32 @@ def __next__(self):
class RawEventFileLoader(object):
"""An iterator that yields Event protos as serialized bytestrings."""

def __init__(self, file_path):
def __init__(self, file_path, detect_file_replacement=False):
"""Constructs a RawEventFileLoader for the given file path.
Args:
file_path: the event file path to read from
detect_file_replacement: if True, when Load() is called, the loader
will make a stat() call to check the size of the file. If it sees
that the file has grown, it will reopen the file entirely (while
preserving the current offset) before attempting to read from it.
Otherwise, Load() will simply poll at EOF for new data.
"""
if file_path is None:
raise ValueError("A file path is required")
self._file_path = platform_util.readahead_file_path(file_path)
self._detect_file_replacement = detect_file_replacement
self._file_size = None
self._iterator = _make_tf_record_iterator(self._file_path)
if self._detect_file_replacement and not hasattr(
self._iterator, "reopen"
):
logger.warning(
"File replacement detection requested, but not enabled because "
"TF record iterator impl does not support reopening. This "
"functionality requires TensorFlow 2.9+"
)
self._detect_file_replacement = False

def Load(self):
"""Loads all new events from disk as raw serialized proto bytestrings.
Expand All @@ -133,6 +154,25 @@ def Load(self):
All event proto bytestrings in the file that have not been yielded yet.
"""
logger.debug("Loading events from %s", self._file_path)
if self._detect_file_replacement:
has_increased = self.CheckForIncreasedFileSize()
# Only act on the file size information if we got a concrete result.
if has_increased is not None:
if has_increased:
logger.debug(
"Reopening %s since file size has changed",
self._file_path,
)
self._iterator.close()
self._iterator.reopen()
else:
logger.debug(
"Skipping attempt to poll %s since file size has not "
"changed (still %d)",
self._file_path,
self._file_size,
)
return
while True:
try:
yield next(self._iterator)
Expand All @@ -147,6 +187,47 @@ def Load(self):
break
logger.debug("No more events in %s", self._file_path)

def CheckForIncreasedFileSize(self):
"""Stats the file to get its updated size, returning True if it grew.
If the stat call fails or reports a smaller size than was previously
seen, then any previously cached size is left unchanged.
Returns:
boolean or None: True if the file size increased; False if it was
the same or decreased; or None if neither case could be detected
(either because the previous size had not been recorded yet, or
because the stat call for the current size failed).
"""
previous_size = self._file_size
try:
self._file_size = tf.io.gfile.stat(self._file_path).length
except tf.errors.OpError as e:
logger.error("Failed to stat %s: %s", self._file_path, e)
return None
logger.debug(
"Stat on %s got size %d, previous size %s",
self._file_path,
self._file_size,
previous_size,
)
if previous_size is None:
return None
if self._file_size > previous_size:
return True
if self._file_size < previous_size:
logger.warning(
"File %s shrank from previous size %d to size %d",
self._file_path,
previous_size,
self._file_size,
)
# In case this was transient, preserve the previously cached size,
# to avoid reporting a spurious increase next time. If the file was
# actually truncated, we can't recover anyway, so just ignore it.
self._file_size = previous_size
return False


class LegacyEventFileLoader(RawEventFileLoader):
"""An iterator that yields parsed Event protos."""
Expand All @@ -170,8 +251,8 @@ class EventFileLoader(LegacyEventFileLoader):
Specifically, this includes `data_compat` and `dataclass_compat`.
"""

def __init__(self, file_path):
super(EventFileLoader, self).__init__(file_path)
def __init__(self, *args, **kwargs):
super(EventFileLoader, self).__init__(*args, **kwargs)
# Track initial metadata for each tag, for `dataclass_compat`.
# This is meant to be tracked per run, not per event file, so
# there is a potential failure case when the second event file
Expand Down
102 changes: 98 additions & 4 deletions tensorboard/backend/event_processing/event_file_loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@
import abc
import io
import os
import unittest
from unittest import mock

from tensorboard import test as tb_test
from tensorboard.backend.event_processing import event_file_loader
from tensorboard.compat import tf
from tensorboard.compat.proto import event_pb2
from tensorboard.summary.writer import record_writer


FILENAME = "test.events"
USING_STUB_TF = tf.__version__ == "stub"


class EventFileLoaderTestBase(metaclass=abc.ABCMeta):
def _get_filename(self):
return os.path.join(self.get_temp_dir(), FILENAME)

def _append_record(self, data):
with open(os.path.join(self.get_temp_dir(), FILENAME), "ab") as f:
with open(self._get_filename(), "ab") as f:
record_writer.RecordWriter(f).write(data)

def _make_loader(self):
return self._loader_class(os.path.join(self.get_temp_dir(), FILENAME))
def _make_loader(self, **kwargs):
return self._loader_class(self._get_filename(), **kwargs)

@abc.abstractproperty
def _loader_class(self):
Expand All @@ -48,7 +55,7 @@ def assertEventWallTimes(self, load_result, event_wall_times_in_order):
raise NotImplementedError()

def testLoad_emptyEventFile(self):
with open(os.path.join(self.get_temp_dir(), FILENAME), "ab") as f:
with open(self._get_filename(), "ab") as f:
f.write(b"")
loader = self._make_loader()
self.assertEmpty(list(loader.Load()))
Expand Down Expand Up @@ -95,6 +102,93 @@ def testLoad_noIterationDoesNotConsumeEvents(self):
loader.Load()
self.assertEventWallTimes(loader.Load(), [1.0])

def testLoad_detectFileReplacement_simple(self):
# This test confirms that detect_file_replacement=True doesn't break the
# existing basic loading behavior, including for no-TF mode (where it
# cannot reopen iterators, so it just falls back to polling).
self._append_record(_make_event(wall_time=1.0))
loader = self._make_loader(detect_file_replacement=True)
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [1.0, 2.0])
self._append_record(_make_event(wall_time=3.0))
self.assertEventWallTimes(loader.Load(), [3.0])

@unittest.skipIf(USING_STUB_TF, "detect_file_replacement requires real TF")
def testLoad_detectFileReplacement_reopensIteratorAfterFileGrows(self):
self._append_record(_make_event(wall_time=1.0))
loader = self._make_loader(detect_file_replacement=True)
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [1.0, 2.0])
self.assertEventWallTimes(loader.Load(), [])
# Remove original file; replace it with a file containing 1 additional
# event. Without detect_file_replacement=True, we don't detect the new data.
os.remove(self._get_filename())
self._append_record(_make_event(wall_time=1.0))
self._append_record(_make_event(wall_time=2.0))
self._append_record(_make_event(wall_time=3.0))
self.assertEventWallTimes(loader.Load(), [3.0])

@unittest.skipIf(USING_STUB_TF, "detect_file_replacement requires real TF")
def testLoad_detectFileReplacement_statFails_shouldPollInstead(self):
self._append_record(_make_event(wall_time=1.0))
with mock.patch.object(tf.io.gfile, "stat") as mock_stat:
mock_stat.side_effect = tf.errors.UnimplementedError(
None, None, "stat is unsupported"
)
loader = self._make_loader(detect_file_replacement=True)
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [1.0, 2.0])
mock_stat.assert_called_once()
mock_stat.reset_mock()
self._append_record(_make_event(wall_time=3.0))
self.assertEventWallTimes(loader.Load(), [3.0])
mock_stat.assert_called_once()

@unittest.skipIf(USING_STUB_TF, "detect_file_replacement requires real TF")
def testLoad_detectFileReplacement_statFailsTransiently_shouldRecover(self):
self._append_record(_make_event(wall_time=1.0))
loader = self._make_loader(detect_file_replacement=True)
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [1.0, 2.0])
with mock.patch.object(tf.io.gfile, "stat") as mock_stat:
mock_stat.side_effect = tf.errors.DeadlineExceededError(
None, None, "transient failure"
)
# Stat call fails; we fall back to polling, which shows no new data.
self.assertEventWallTimes(loader.Load(), [])
mock_stat.assert_called_once()
mock_stat.reset_mock()
# Simulate file growing via replacement.
os.remove(self._get_filename())
self._append_record(_make_event(wall_time=1.0))
self._append_record(_make_event(wall_time=2.0))
self._append_record(_make_event(wall_time=3.0))
# Stat call fails again; since we're polling; still no new data.
self.assertEventWallTimes(loader.Load(), [])
mock_stat.assert_called_once()
mock_stat.reset_mock()
# Removing the mock and getting a successful stat result triggers
# reopening the file, and we see the new data.
self.assertEventWallTimes(loader.Load(), [3.0])

@unittest.skipIf(USING_STUB_TF, "detect_file_replacement requires real TF")
def testLoad_detectFileReplacement_statShowsSizeDecrease_shouldIgnore(self):
self._append_record(_make_event(wall_time=1.0))
loader = self._make_loader(detect_file_replacement=True)
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [1.0, 2.0])
self.assertEventWallTimes(loader.Load(), [])
# Remove original file; replace it with a truncated file. Since the
# stat call shows a reduced file size, we should ignore it until we
# see an increase relative to the previously cached size.
os.remove(self._get_filename())
self._append_record(_make_event(wall_time=1.0))
self.assertEventWallTimes(loader.Load(), [])
self._append_record(_make_event(wall_time=2.0))
self.assertEventWallTimes(loader.Load(), [])
self._append_record(_make_event(wall_time=3.0))
self.assertEventWallTimes(loader.Load(), [3.0])


class RawEventFileLoaderTest(EventFileLoaderTestBase, tb_test.TestCase):
@property
Expand Down

0 comments on commit 05935bd

Please sign in to comment.