Skip to content

Commit

Permalink
Merge pull request #9 from LocusEnergy/fix-polling-race-condition
Browse files Browse the repository at this point in the history
Fix polling race condition
  • Loading branch information
lemeryfertitta authored Sep 25, 2017
2 parents 78cc2bf + 61f58c0 commit 4172684
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 58 deletions.
2 changes: 1 addition & 1 deletion dbio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from io import load, query, replicate, replicate_no_fifo

__all__ = ['io', 'databases']
__version__ = '0.5.0'
__version__ = '0.5.1'
53 changes: 19 additions & 34 deletions dbio/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,40 +206,26 @@ def replicate(query_db_url, load_db_url, query, table, append, analyze=False,
logger.debug("Writer call: " + ' '.join(writer_args))
writer_process = subprocess.Popen(writer_args, env=env)

processes = [reader_process, writer_process]
try:
while True:
writer_process.poll()
reader_process.poll()
r_returncode = reader_process.returncode
w_returncode = writer_process.returncode
if w_returncode is None:
if r_returncode is None:
# Both processes are still running. Check again in one second.
time.sleep(1)
else:
raise ReaderError("Reader finished before writer. Subprocess returncode: " + str(r_returncode))
elif w_returncode != os.EX_OK:
raise WriterError("Subprocess returncode: " + str(writer_process.returncode))
else:
if r_returncode is None:
# Wait for reader to finish
reader_process.communicate()
r_returncode = reader_process.returncode
if r_returncode != os.EX_OK:
raise ReaderError("Subprocess returncode: " + str(r_returncode))
break
elif r_returncode != os.EX_OK:
raise ReaderError("Subprocess returncode: " + str(r_returncode))
else:
break
running_processes = list(processes)
while running_processes:
for process in running_processes:
process.poll()
if process.returncode == os.EX_OK:
running_processes.remove(process)

elif process.returncode is not None:
running_processes.remove(process)
raise RuntimeError('Failure inside a database reader/writer process')

finally:
# Ensure no processes are orphaned
reader_process.poll()
if reader_process.returncode is None:
reader_process.kill()
writer_process.poll()
if writer_process.returncode is None:
writer_process.kill()
for process in processes:
process.poll()
if process.returncode is None:
process.kill()

finally:
os.remove(pipe_name)

Expand Down Expand Up @@ -309,6 +295,5 @@ def __get_database(url):
return db_class(url)


class UnsupportedDatabaseError(Exception): pass
class WriterError(Exception): pass
class ReaderError(Exception): pass
class UnsupportedDatabaseError(Exception):
pass
26 changes: 3 additions & 23 deletions dbio/test/test_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,8 @@ def mockpopen(args, **kwargs):
monkeypatch.setattr(subprocess, 'Popen', mockpopen)

# Tested method
try:
with pytest.raises(RuntimeError):
dbio.replicate(mock_url, mock_url, mock_query, mock_table, mock_append)
except dbio.io.ReaderError:
pass
except Exception as e:
assert False, "Unexcpted error thrown by replicate: " + e.message
else:
assert False, "Reader failed but replicate did not."

# Reader should show failure
assert mock_reader.returncode != 0
Expand Down Expand Up @@ -214,14 +208,8 @@ def mockpopen(args, **kwargs):
monkeypatch.setattr(subprocess, 'Popen', mockpopen)

# Tested method
try:
with pytest.raises(RuntimeError):
dbio.replicate(mock_url, mock_url, mock_query, mock_table, mock_append)
except dbio.io.WriterError:
pass
except:
assert False, "Unexcpted error thrown by replicate"
else:
assert False, "Writer failed but replicate did not."

# Reader should not be alive
assert mock_reader.returncode is not None
Expand Down Expand Up @@ -263,16 +251,8 @@ def mockpopen(args, **kwargs):
monkeypatch.setattr(subprocess, 'Popen', mockpopen)

# Tested method
try:
with pytest.raises(RuntimeError):
dbio.replicate(mock_url, mock_url, mock_query, mock_table, mock_append)
except dbio.io.ReaderError:
pass
except dbio.io.WriterError:
pass
except:
assert False, "Unexcpted error thrown by replicate"
else:
assert False, "Writer and reader failed but replicate did not."

assert mock_reader.returncode != 0
assert mock_writer.returncode != 0
Expand Down

0 comments on commit 4172684

Please sign in to comment.