-
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
125 additions
and
265 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,76 +1,87 @@ | ||
""" | ||
:: Moiety_Formula = C24 H27 Au Cl N P, C H2 Cl2, Cl | ||
""" | ||
# ---------------------------------------------------------------------------- | ||
# "THE BEER-WARE LICENSE" (Revision 42): | ||
# [email protected]> wrote this file. As long as you retain | ||
# this notice you can do whatever you want with this stuff. If we meet some day, | ||
# and you think this stuff is worth it, you can buy me a beer in return. | ||
# Dr. Daniel Kratzert | ||
# ---------------------------------------------------------------------------- | ||
|
||
import os | ||
import subprocess | ||
import sys | ||
import threading | ||
import time | ||
from contextlib import suppress | ||
from pathlib import Path | ||
from subprocess import TimeoutExpired | ||
from time import sleep | ||
from typing import Union | ||
|
||
from PyQt5.QtCore import QThread | ||
from PyQt5 import QtCore | ||
from PyQt5.QtCore import QProcess, QTimer, QTime | ||
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QLabel, \ | ||
QPlainTextEdit | ||
|
||
|
||
class Platon(QThread): | ||
def __init__(self, cif: Path, timeout: int = 300, cmdoption='-u'): | ||
""" | ||
Option -u is for checkcif | ||
""" | ||
super().__init__() | ||
self.cmdoption = cmdoption | ||
self.timeout = timeout | ||
self.cif_path = cif | ||
self.chkfile = Path(self.cif_path.with_suffix('.chk')) | ||
self.platon_output = '' | ||
class PlatonRunner(QtCore.QObject): | ||
finished = QtCore.pyqtSignal(bool) | ||
formula = QtCore.pyqtSignal(str) | ||
tick = QtCore.pyqtSignal(str) | ||
|
||
def __init__(self, parent, output_widget: QPlainTextEdit, log_widget: QPlainTextEdit, cif_file: Path): | ||
super().__init__(parent) | ||
self.cif_file = cif_file.resolve().absolute() | ||
self.process = None | ||
self.is_stopped = False | ||
self._origdir = None | ||
self.output_widget = output_widget | ||
self.log_widget = log_widget | ||
self.formula_moiety = '' | ||
self.Z = '' | ||
self.chk_file_text = '' | ||
|
||
def run_process(self): | ||
self._origdir = os.curdir | ||
#os.chdir(self.cif_file.parent) | ||
self.formula_moiety = '' | ||
self.Z = '' | ||
self.process = QProcess() | ||
self.output_widget.clear() | ||
threading.Thread(target=self._monitor_output_log).start() | ||
# self.process.readyReadStandardOutput.connect(self.on_ready_read) | ||
self.process.finished.connect(self._onfinished) | ||
self.process.setWorkingDirectory(str(self.cif_file.parent)) | ||
self.cif_file.with_suffix('.chk').unlink(missing_ok=True) | ||
self.process.start(self.platon_exe, ["-U", str(self.cif_file.name)]) | ||
|
||
def _onfinished(self): | ||
self._on_ready_read() | ||
#os.chdir(self._origdir) | ||
self._parse_chk_file() | ||
self.output_widget.setPlainText(self.chk_file_text) | ||
self.finished.emit(True) | ||
self.delete_orphaned_files() | ||
self.plat: Union[subprocess.CompletedProcess, bool] = True | ||
|
||
def kill(self): | ||
if sys.platform.startswith('win'): | ||
with suppress(FileNotFoundError): | ||
subprocess.run(["taskkill", "/f", "/im", "platon.exe"], shell=False) | ||
if sys.platform[:5] in ('linux', 'darwi'): | ||
with suppress(FileNotFoundError): | ||
subprocess.run(["killall", "platon"], shell=False) | ||
def _on_ready_read(self): | ||
output = self.process.readAllStandardOutput().data().decode() | ||
self.log_widget.appendPlainText(output) | ||
|
||
def delete_orphaned_files(self): | ||
# delete orphaned files: | ||
for ext in ['.ckf', '.fcf', '.def', '.lis', '.sar', '.ckf', | ||
'.sum', '.hkp', '.pjn', '.bin', '.spf']: | ||
def _monitor_output_log(self): | ||
while not self.is_stopped: | ||
self.tick.emit('#') | ||
time.sleep(1) | ||
try: | ||
file = self.cif_path.resolve().with_suffix(ext) | ||
if file.stat().st_size < 100: | ||
file.unlink() | ||
if file.suffix in ['.sar', '.spf', '.ckf']: | ||
file.unlink() | ||
log_file = self.cif_file.with_suffix('.chk').read_text('latin1', errors='ignore') | ||
if 'Unresolved or to be Checked Issue' in log_file: | ||
self._stop_program() | ||
except FileNotFoundError: | ||
# print('##') | ||
pass | ||
break | ||
|
||
def parse_chk_file(self): | ||
""" | ||
""" | ||
def _stop_program(self): | ||
self.is_stopped = True | ||
#if self.process and self.process.state() == QProcess.Running: | ||
self.process.terminate() | ||
self.finished.emit(True) | ||
|
||
def _parse_chk_file(self): | ||
try: | ||
self.chk_file_text = self.chkfile.read_text(encoding='ascii', errors='ignore') | ||
self.chk_file_text = self.cif_file.with_suffix('.chk').read_text(encoding='latin1', errors='ignore') | ||
except FileNotFoundError as e: | ||
print('CHK file not found:', e) | ||
self.chk_file_text = '' | ||
for num, line in enumerate(self.chk_file_text.splitlines(keepends=False)): | ||
if line.startswith('# MoietyFormula'): | ||
self.formula_moiety = ' '.join(line.split(' ')[2:]) | ||
self.formula.emit(self.formula_moiety) | ||
if line.startswith('# Z'): | ||
self.Z = line[19:24].strip(' ') | ||
|
||
|
@@ -85,56 +96,69 @@ def platon_exe(self): | |
else: | ||
return 'platon' | ||
|
||
def run(self): | ||
""" | ||
Runs the platon thread. | ||
""" | ||
curdir = Path(os.curdir).resolve() | ||
with suppress(FileNotFoundError): | ||
self.cif_path.with_suffix('.vrf').unlink() | ||
with suppress(FileNotFoundError): | ||
self.chkfile.unlink() | ||
os.chdir(str(self.cif_path.absolute().parent)) | ||
try: | ||
print('running local platon on', self.cif_path.name) | ||
self.plat = subprocess.run([self.platon_exe, self.cmdoption, self.cif_path.name], | ||
startupinfo=self.hide_status_window(), | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
shell=False, | ||
env=os.environ, | ||
timeout=self.timeout) | ||
except TimeoutExpired: | ||
print('PLATON timeout!') | ||
self.platon_output = 'PLATON timeout!' | ||
pass | ||
except Exception as e: | ||
print('Could not run local platon:' + str(e)) | ||
self.platon_output = str(e) | ||
if self.plat and hasattr(self.plat, 'stdout'): | ||
self.platon_output = self.plat.stdout.decode('ascii') | ||
# self.delete_orphaned_files() | ||
os.chdir(curdir) | ||
|
||
@staticmethod | ||
def hide_status_window(): | ||
try: | ||
# This is only available on windows: | ||
si = subprocess.STARTUPINFO() | ||
si.dwFlags = 1 | ||
si.wShowWindow = 0 | ||
except AttributeError: | ||
si = None | ||
return si | ||
def kill(self): | ||
if sys.platform.startswith('win'): | ||
with suppress(FileNotFoundError): | ||
subprocess.run(["taskkill", "/f", "/im", "platon.exe"], shell=False) | ||
if sys.platform[:5] in ('linux', 'darwi'): | ||
with suppress(FileNotFoundError): | ||
subprocess.run(["killall", "platon"], shell=False) | ||
|
||
def delete_orphaned_files(self): | ||
# delete orphaned files: | ||
for ext in ['.ckf', '.fcf', '.def', '.lis', '.sar', '.ckf', | ||
'.sum', '.hkp', '.pjn', '.bin', '.spf']: | ||
try: | ||
file = self.cif_file.resolve().with_suffix(ext) | ||
if file.stat().st_size < 100: | ||
file.unlink(missing_ok=True) | ||
if file.suffix in ['.sar', '.spf', '.ckf']: | ||
file.unlink(missing_ok=True) | ||
except FileNotFoundError: | ||
pass | ||
|
||
|
||
def __repr__(self): | ||
return 'Platon:\n{}'.format(self.formula_moiety) | ||
class ProcessWidget(QWidget): | ||
def __init__(self): | ||
super().__init__() | ||
layout = QVBoxLayout() | ||
self.text_widget = QPlainTextEdit() | ||
self.log_widget = QPlainTextEdit() | ||
layout.addWidget(self.log_widget) | ||
layout.addWidget(self.text_widget) | ||
self.button = QPushButton("Run QProcess") | ||
layout.addWidget(self.button) | ||
self.time_label = QLabel() | ||
layout.addWidget(self.time_label) | ||
self.setLayout(layout) | ||
self.runner = PlatonRunner(parent=self, output_widget=self.text_widget, log_widget=self.log_widget, | ||
cif_file=Path("tests/examples/work/cu_BruecknerJK_153F40_0m.cif")) | ||
self.button.clicked.connect(lambda x: self.button.setDisabled(True)) | ||
self.button.clicked.connect(lambda x: self.runner.run_process()) | ||
self.button.clicked.connect(lambda x: self.log_widget.setPlainText('Running Platon')) | ||
self.runner.finished.connect(lambda x: self.button.setEnabled(True)) | ||
|
||
# Only to show that the main thread works continuously: | ||
self.timer = QTimer(self) | ||
self.timer.timeout.connect(self.update_time) | ||
self.timer.start(1000) | ||
|
||
def update_time(self): | ||
current_time = QTime.currentTime() | ||
time_text = current_time.toString("hh:mm:ss") | ||
self.time_label.setText(f"Current Time: {time_text}") | ||
|
||
|
||
if __name__ == '__main__': | ||
fname = Path('test-data/DK_zucker2_0m.cif') | ||
p = Platon(fname) | ||
p.start() | ||
sleep(15) | ||
p.exit() | ||
p.kill() | ||
app = QApplication(sys.argv) | ||
window = QMainWindow() | ||
window.setWindowTitle("QProcess Example") | ||
window.setMinimumWidth(800) | ||
window.setMinimumHeight(600) | ||
|
||
process_widget = ProcessWidget() | ||
window.setCentralWidget(process_widget) | ||
|
||
window.show() | ||
|
||
sys.exit(app.exec_()) |
Oops, something went wrong.