From c441619f805156cc60fa0f9a3086773948b1fa62 Mon Sep 17 00:00:00 2001 From: Polt Date: Tue, 8 Oct 2024 13:23:47 +0200 Subject: [PATCH] multitemp rrho fix --- src/censo/ensembleopt/screening.py | 296 +++++++++++++++-------------- src/censo/qm_processor.py | 196 ++++++++++--------- 2 files changed, 260 insertions(+), 232 deletions(-) diff --git a/src/censo/ensembleopt/screening.py b/src/censo/ensembleopt/screening.py index 628fbc6..7dc7ce3 100644 --- a/src/censo/ensembleopt/screening.py +++ b/src/censo/ensembleopt/screening.py @@ -2,6 +2,7 @@ Screening is basically just an extension of part0 (Prescreening). Additionally to part0 it is also possible to calculate gsolv implicitly and include the RRHO contribution. """ + import os from functools import reduce from math import exp @@ -10,8 +11,7 @@ from ..datastructure import MoleculeData from ..logging import setup_logger from ..parallel import execute -from ..params import (AU2KCAL, GFNOPTIONS, GRIDOPTIONS, PLENGTH, PROGS, - SOLV_MODS) +from ..params import AU2KCAL, GFNOPTIONS, GRIDOPTIONS, PLENGTH, PROGS, SOLV_MODS from ..utilities import format_data, h1, print from .prescreening import Prescreening @@ -27,36 +27,15 @@ class Screening(Prescreening): # __gsolv_mods = reduce(lambda x, y: x + y, GSOLV_MODS.values()) _options = { - "threshold": { - "default": 3.5 - }, - "func": { - "default": "r2scan-3c" - }, - "basis": { - "default": "def2-TZVP" - }, - "prog": { - "default": "orca", - "options": PROGS - }, - "sm": { - "default": "smd", - "options": __solv_mods - }, - "gfnv": { - "default": "gfn2", - "options": GFNOPTIONS - }, - "run": { - "default": True - }, - "implicit": { - "default": True - }, - "template": { - "default": False - }, + "threshold": {"default": 3.5}, + "func": {"default": "r2scan-3c"}, + "basis": {"default": "def2-TZVP"}, + "prog": {"default": "orca", "options": PROGS}, + "sm": {"default": "smd", "options": __solv_mods}, + "gfnv": {"default": "gfn2", "options": GFNOPTIONS}, + "run": {"default": True}, + "implicit": {"default": True}, + "template": {"default": False}, } _settings = {} @@ -104,29 +83,35 @@ def optimize(self, ncores: int, cut: bool = True) -> None: # sort conformers list self.ensemble.conformers.sort( - key=lambda conf: conf.results[self._name]["gtot"]) + key=lambda conf: conf.results[self._name]["gtot"] + ) if cut and len(self.ensemble.conformers) > 1: # calculate fuzzyness of threshold (adds 1 kcal/mol at max to the threshold) - fuzzy = (1 / AU2KCAL) * (1 - exp(-AU2KCAL * stdev([ - conf.results[self._name]["xtb_rrho"]["energy"] - for conf in self.ensemble.conformers - ]))) - threshold += fuzzy - print( - f"Updated fuzzy threshold: {threshold * AU2KCAL:.2f} kcal/mol." + fuzzy = (1 / AU2KCAL) * ( + 1 + - exp( + -AU2KCAL + * stdev( + [ + conf.results[self._name]["xtb_rrho"]["energy"] + for conf in self.ensemble.conformers + ] + ) + ) ) + threshold += fuzzy + print(f"Updated fuzzy threshold: {threshold * AU2KCAL:.2f} kcal/mol.") # update the conformer list in ensemble (remove confs if below threshold) - for confname in self.ensemble.update_conformers( - self.grrho, threshold): + for confname in self.ensemble.update_conformers(self.grrho, threshold): print(f"No longer considering {confname}.") # calculate boltzmann weights from gtot values calculated here # trying to get temperature from instructions, set it to room temperature if that fails for some reason self.ensemble.calc_boltzmannweights( - self.get_general_settings().get("temperature", 298.15), - self._name) + self.get_general_settings().get("temperature", 298.15), self._name + ) # if no conformers are filtered basically nothing happens @@ -139,8 +124,10 @@ def gsolv(self, conf: MoleculeData) -> float: """ # If solvation contributions should be included and the solvation free enthalpy # should not be included in the single-point energy the 'gsolv' job should've been run - if not self.get_general_settings( - )["gas-phase"] and not self.get_settings()["implicit"]: + if ( + not self.get_general_settings()["gas-phase"] + and not self.get_settings()["implicit"] + ): return conf.results[self._name]["gsolv"]["energy_solv"] # Otherwise, return just the single-point energy else: @@ -158,8 +145,7 @@ def grrho(self, conf: MoleculeData) -> float: """ # Gtot = E(DFT) + Gsolv + Grrho # NOTE: grrho should only be called if evaluate_rrho is True - return self.gsolv(conf) + conf.results[ - self._name]["xtb_rrho"]["energy"] + return self.gsolv(conf) + conf.results[self._name]["xtb_rrho"]["energy"] def write_results(self) -> None: """ @@ -201,12 +187,15 @@ def write_results(self) -> None: # variables for printmap # minimal xtb single-point energy (taken from prescreening) - if (all("prescreening" in conf.results.keys() - for conf in self.ensemble.conformers) - and not self.get_general_settings()["gas-phase"]): + if ( + all( + "prescreening" in conf.results.keys() + for conf in self.ensemble.conformers + ) + and not self.get_general_settings()["gas-phase"] + ): xtb_energies = { - id(conf): - conf.results["prescreening"]["xtb_gsolv"]["energy_xtb_gas"] + id(conf): conf.results["prescreening"]["xtb_gsolv"]["energy_xtb_gas"] for conf in self.ensemble.conformers } xtbmin = min(xtb_energies.values()) @@ -217,40 +206,47 @@ def write_results(self) -> None: gsolvmin = min(self.gsolv(conf) for conf in self.ensemble.conformers) # collect all dft single point energies - dft_energies = ({ - id(conf): conf.results[self._name]["sp"]["energy"] - for conf in self.ensemble.conformers - } if not all("gsolv" in conf.results[self._name].keys() - for conf in self.ensemble.conformers) else { - id(conf): - conf.results[self._name]["gsolv"]["energy_gas"] - for conf in self.ensemble.conformers - }) + dft_energies = ( + { + id(conf): conf.results[self._name]["sp"]["energy"] + for conf in self.ensemble.conformers + } + if not all( + "gsolv" in conf.results[self._name].keys() + for conf in self.ensemble.conformers + ) + else { + id(conf): conf.results[self._name]["gsolv"]["energy_gas"] + for conf in self.ensemble.conformers + } + ) # determines what to print for each conformer in each column printmap = { - "CONF#": - lambda conf: conf.name, - "E (xTB)": - lambda conf: f"{xtb_energies[id(conf)]:.6f}" - if xtb_energies is not None else "---", - "ΔE (xTB)": - lambda conf: f"{(xtb_energies[id(conf)] - xtbmin) * AU2KCAL:.2f}" - if xtb_energies is not None else "---", - "E (DFT)": - lambda conf: f"{dft_energies[id(conf)]:.6f}", - "ΔGsolv": - lambda conf: f"{self.gsolv(conf) - dft_energies[id(conf)]:.6f}" - if "xtb_gsolv" in conf.results[self._name].keys() or "gsolv" in - conf.results[self._name].keys() else "---", - "Gtot": - lambda conf: f"{self.gsolv(conf):.6f}", - "ΔGtot": - lambda conf: f"{(self.gsolv(conf) - gsolvmin) * AU2KCAL:.2f}", + "CONF#": lambda conf: conf.name, + "E (xTB)": lambda conf: ( + f"{xtb_energies[id(conf)]:.6f}" if xtb_energies is not None else "---" + ), + "ΔE (xTB)": lambda conf: ( + f"{(xtb_energies[id(conf)] - xtbmin) * AU2KCAL:.2f}" + if xtb_energies is not None + else "---" + ), + "E (DFT)": lambda conf: f"{dft_energies[id(conf)]:.6f}", + "ΔGsolv": lambda conf: ( + f"{self.gsolv(conf) - dft_energies[id(conf)]:.6f}" + if "xtb_gsolv" in conf.results[self._name].keys() + or "gsolv" in conf.results[self._name].keys() + else "---" + ), + "Gtot": lambda conf: f"{self.gsolv(conf):.6f}", + "ΔGtot": lambda conf: f"{(self.gsolv(conf) - gsolvmin) * AU2KCAL:.2f}", } - rows = [[printmap[header](conf) for header in headers] - for conf in self.ensemble.conformers] + rows = [ + [printmap[header](conf) for header in headers] + for conf in self.ensemble.conformers + ] lines = format_data(headers, rows, units=units) @@ -262,11 +258,10 @@ def write_results(self) -> None: # write everything to a file filename = f"{self._part_no}_{self._name.upper()}.out" - logger.debug( - f"Writing to {os.path.join(self.ensemble.workdir, filename)}.") - with open(os.path.join(self.ensemble.workdir, filename), - "w", - newline=None) as outfile: + logger.debug(f"Writing to {os.path.join(self.ensemble.workdir, filename)}.") + with open( + os.path.join(self.ensemble.workdir, filename), "w", newline=None + ) as outfile: outfile.writelines(lines) def write_results2(self) -> None: @@ -313,66 +308,77 @@ def write_results2(self) -> None: ] # minimal xtb energy from single-point (and mRRHO) - if (all("prescreening" in conf.results.keys() - for conf in self.ensemble.conformers) - and not self.get_general_settings()["gas-phase"]): + if ( + all( + "prescreening" in conf.results.keys() + for conf in self.ensemble.conformers + ) + and not self.get_general_settings()["gas-phase"] + ): gxtb = { - id(conf): - conf.results["prescreening"]["xtb_gsolv"]["energy_xtb_gas"] + id(conf): conf.results["prescreening"]["xtb_gsolv"]["energy_xtb_gas"] for conf in self.ensemble.conformers } if self.get_general_settings()["evaluate_rrho"]: for conf in self.ensemble.conformers: - gxtb[id(conf)] += conf.results[self._name]["xtb_rrho"][ - "gibbs"][self.get_general_settings()["temperature"]] + gxtb[id(conf)] += conf.results[self._name]["xtb_rrho"]["gibbs"][ + self.get_general_settings()["temperature"] + ] gxtbmin = min(gxtb.values()) else: gxtb = None # minimal gtot from E(DFT), Gsolv and GmRRHO - gtotmin = min(conf.results[self._name]["gtot"] - for conf in self.ensemble.conformers) + gtotmin = min( + conf.results[self._name]["gtot"] for conf in self.ensemble.conformers + ) # collect all dft single point energies - dft_energies = ({ - id(conf): conf.results[self._name]["sp"]["energy"] - for conf in self.ensemble.conformers - } if not all("gsolv" in conf.results[self._name].keys() - for conf in self.ensemble.conformers) else { - id(conf): - conf.results[self._name]["gsolv"]["energy_gas"] - for conf in self.ensemble.conformers - }) + dft_energies = ( + { + id(conf): conf.results[self._name]["sp"]["energy"] + for conf in self.ensemble.conformers + } + if not all( + "gsolv" in conf.results[self._name].keys() + for conf in self.ensemble.conformers + ) + else { + id(conf): conf.results[self._name]["gsolv"]["energy_gas"] + for conf in self.ensemble.conformers + } + ) printmap = { - "CONF#": - lambda conf: conf.name, - "G (xTB)": - lambda conf: f"{gxtb[id(conf)]:.6f}" - if gxtb is not None else "---", - "ΔG (xTB)": - lambda conf: f"{(gxtb[id(conf)] - gxtbmin) * AU2KCAL:.2f}" - if gxtb is not None else "---", - "E (DFT)": - lambda conf: f"{dft_energies[id(conf)]:.6f}", - "ΔGsolv": - lambda conf: f"{self.gsolv(conf) - dft_energies[id(conf)]:.6f}" - if not self.get_settings().get("implicit", False) else "---", - "GmRRHO": - lambda conf: - f"{conf.results[self._name]['xtb_rrho']['gibbs'][self.get_general_settings()['temperature']]:.6f}" - if self.get_general_settings()["evaluate_rrho"] else "---", - "Gtot": - lambda conf: f"{conf.results[self._name]['gtot']:.6f}", - "ΔGtot": - lambda conf: - f"{(conf.results[self._name]['gtot'] - gtotmin) * AU2KCAL:.2f}", - "Boltzmann weight": - lambda conf: f"{conf.results[self._name]['bmw'] * 100:.2f}", + "CONF#": lambda conf: conf.name, + "G (xTB)": lambda conf: ( + f"{gxtb[id(conf)]:.6f}" if gxtb is not None else "---" + ), + "ΔG (xTB)": lambda conf: ( + f"{(gxtb[id(conf)] - gxtbmin) * AU2KCAL:.2f}" + if gxtb is not None + else "---" + ), + "E (DFT)": lambda conf: f"{dft_energies[id(conf)]:.6f}", + "ΔGsolv": lambda conf: ( + f"{self.gsolv(conf) - dft_energies[id(conf)]:.6f}" + if not self.get_settings().get("implicit", False) + else "---" + ), + "GmRRHO": lambda conf: ( + f"{conf.results[self._name]['xtb_rrho']['gibbs'][self.get_general_settings()['temperature']]:.6f}" + if self.get_general_settings()["evaluate_rrho"] + else "---" + ), + "Gtot": lambda conf: f"{conf.results[self._name]['gtot']:.6f}", + "ΔGtot": lambda conf: f"{(conf.results[self._name]['gtot'] - gtotmin) * AU2KCAL:.2f}", + "Boltzmann weight": lambda conf: f"{conf.results[self._name]['bmw'] * 100:.2f}", } - rows = [[printmap[header](conf) for header in headers] - for conf in self.ensemble.conformers] + rows = [ + [printmap[header](conf) for header in headers] + for conf in self.ensemble.conformers + ] lines = format_data(headers, rows, units=units) @@ -385,17 +391,21 @@ def write_results2(self) -> None: ) # calculate averaged free enthalpy - avG = sum([ - conf.results[self._name]["bmw"] * conf.results[self._name]["gtot"] - for conf in self.ensemble.conformers - ]) + avG = sum( + [ + conf.results[self._name]["bmw"] * conf.results[self._name]["gtot"] + for conf in self.ensemble.conformers + ] + ) # calculate averaged free energy - avE = sum([ - conf.results[self._name]["bmw"] * - conf.results[self._name]["sp"]["energy"] - for conf in self.ensemble.conformers - ]) + avE = sum( + [ + conf.results[self._name]["bmw"] + * conf.results[self._name]["sp"]["energy"] + for conf in self.ensemble.conformers + ] + ) # append the lines for the free energy/enthalpy lines.append( @@ -409,9 +419,9 @@ def write_results2(self) -> None: # append lines to already existing file filename = f"{self._part_no}_{self._name.upper()}.out" - with open(os.path.join(self.ensemble.workdir, filename), - "a", - newline=None) as outfile: + with open( + os.path.join(self.ensemble.workdir, filename), "a", newline=None + ) as outfile: outfile.writelines(lines) # Additionally, write the results to a json file diff --git a/src/censo/qm_processor.py b/src/censo/qm_processor.py index d3cc3cb..bef7f2c 100644 --- a/src/censo/qm_processor.py +++ b/src/censo/qm_processor.py @@ -3,6 +3,7 @@ Additionally contains functions which should be present irrespective of the QM code. (xTB always available) """ + import json import os import signal @@ -27,7 +28,8 @@ def handle_sigterm(signum, frame, sub): logger.critical( - f"{f'worker{os.getpid()}:':{WARNLEN}}Received SIGTERM. Terminating.") + f"{f'worker{os.getpid()}:':{WARNLEN}}Received SIGTERM. Terminating." + ) sub.send_signal(signal.SIGTERM) @@ -57,7 +59,7 @@ class QmProc: ], "xtb_rrho": [ "gfnv", - ] + ], } @classmethod @@ -72,8 +74,7 @@ def print_paths(cls) -> None: lines.append("\n" + "".ljust(PLENGTH, "-") + "\n") # Append the title of the section to the output, centered. - lines.append("PATHS of external QM programs".center(PLENGTH, " ") + - "\n") + lines.append("PATHS of external QM programs".center(PLENGTH, " ") + "\n") # Append a separator line to the output. lines.append("".ljust(PLENGTH, "-") + "\n") @@ -108,14 +109,14 @@ def run(self, job: ParallelJob) -> ParallelJob: Returns: job (ParallelJob): job with results """ - logger.debug( - f"{f'worker{os.getpid()}:':{WARNLEN}}Running on {job.omp} cores.") + logger.debug(f"{f'worker{os.getpid()}:':{WARNLEN}}Running on {job.omp} cores.") # jobtype is basically an ordered (!!!) (important e.g. if sp is required before the next step) # list containing the types of computations to do if not all(t in self._jobtypes.keys() for t in job.jobtype): raise RuntimeError( f"At least one jobtype of {job.jobtype} is not available for {self.__class__.__name__}.\nAvailable " - + f"jobtypes are: {list(self._jobtypes.keys())}") + + f"jobtypes are: {list(self._jobtypes.keys())}" + ) # run all the computations for j in job.jobtype: @@ -142,21 +143,24 @@ def run(self, job: ParallelJob) -> ParallelJob: # if a calculation failed all following calculations will not be executed if not job.meta[j]["success"]: - for j2 in job.jobtype[job.jobtype.index(j) + 1:]: + for j2 in job.jobtype[job.jobtype.index(j) + 1 :]: job.results[j2] = None job.meta[j2]["success"] = False job.meta[j2]["error"] = "Previous calculation failed" break job.meta["total_time"] = sum( - job.meta[j]["time"] for j in job.jobtype - if job.meta[j]["error"] != "Previous calculation failed") + job.meta[j]["time"] + for j in job.jobtype + if job.meta[j]["error"] != "Previous calculation failed" + ) # returns modified job object with result dict e.g.: {"sp": ..., "gsolv": ..., etc.} return job - def _make_call(self, prog: str, call: list, outputpath: str, - jobdir: str) -> tuple[int, str]: + def _make_call( + self, prog: str, call: list, outputpath: str, jobdir: str + ) -> tuple[int, str]: """ Make a call to an external program and write output into outputfile. @@ -183,8 +187,7 @@ def _make_call(self, prog: str, call: list, outputpath: str, # call external program and write output into outputfile with open(outputpath, "w", newline=None) as outputfile: - logger.debug( - f"{f'worker{os.getpid()}:':{WARNLEN}}Running {call}...") + logger.debug(f"{f'worker{os.getpid()}:':{WARNLEN}}Running {call}...") # create subprocess for external program sub = subprocess.Popen( @@ -204,8 +207,8 @@ def _make_call(self, prog: str, call: list, outputpath: str, # make sure to send SIGTERM to subprocess if program is quit signal.signal( - signal.SIGTERM, - lambda signum, frame: handle_sigterm(signum, frame, sub)) + signal.SIGTERM, lambda signum, frame: handle_sigterm(signum, frame, sub) + ) # wait for process to finish _, errors = sub.communicate() @@ -330,17 +333,18 @@ def _xtb_sp( # NOTE on solvents_dict (or rather censo_solvents.json): # [0] is the normal name of the solvent, if it is available, [1] is the replacement if not (job.prepinfo["general"].get("gas-phase", False) or no_solv): - call.extend([ - "--" + job.prepinfo["general"]["sm_rrho"], - job.prepinfo["xtb_sp"]["solvent_key_xtb"], - "reference", - "-I", - xcontrolname, - ]) + call.extend( + [ + "--" + job.prepinfo["general"]["sm_rrho"], + job.prepinfo["xtb_sp"]["solvent_key_xtb"], + "reference", + "-I", + xcontrolname, + ] + ) # set gbsa grid - with open(os.path.join(jobdir, xcontrolname), "w", - newline=None) as xcout: + with open(os.path.join(jobdir, xcontrolname), "w", newline=None) as xcout: xcout.write("$gbsa\n") xcout.write(" gbsagrid=tight\n") xcout.write("$end\n") @@ -353,13 +357,11 @@ def _xtb_sp( if returncode != 0: meta["success"] = False meta["error"] = "unknown_error" - logger.warning( - f"Job for {job.conf.name} failed. Stderr output:\n{errors}") + logger.warning(f"Job for {job.conf.name} failed. Stderr output:\n{errors}") return result, meta # read energy from outputfile - with open(outputpath, "r", encoding=CODING, - newline=None) as outputfile: + with open(outputpath, "r", encoding=CODING, newline=None) as outputfile: for line in outputfile.readlines(): if "| TOTAL ENERGY" in line: result["energy"] = float(line.split()[3]) @@ -370,8 +372,8 @@ def _xtb_sp( return result, meta def _xtb_gsolv( - self, job: ParallelJob, - jobdir: str) -> tuple[dict[str, float | None], dict[str, any]]: + self, job: ParallelJob, jobdir: str + ) -> tuple[dict[str, float | None], dict[str, any]]: """ Calculate additive GBSA or ALPB solvation using GFNn-xTB or GFN-FF. @@ -427,11 +429,9 @@ def _xtb_gsolv( return result, meta # TODO - break this down - def _xtb_rrho(self, - job: ParallelJob, - jobdir: str, - filename: str = "xtb_rrho" - ) -> tuple[dict[str, any], dict[str, any]]: + def _xtb_rrho( + self, job: ParallelJob, jobdir: str, filename: str = "xtb_rrho" + ) -> tuple[dict[str, any], dict[str, any]]: """ Calculates the mRRHO contribution to the free enthalpy of a conformer with GFNn-xTB/GFN-FF. @@ -504,11 +504,9 @@ def _xtb_rrho(self, trange.append(job.prepinfo["general"]["temperature"]) # Write trange to the xcontrol file - xcout.write(f" temp=" - f"{','.join([str(i) for i in trange])}\n") + xcout.write(f" temp=" f"{','.join([str(i) for i in trange])}\n") else: - xcout.write( - f" temp={job.prepinfo['general']['temperature']}\n") + xcout.write(f" temp={job.prepinfo['general']['temperature']}\n") xcout.write(f" sthr={job.prepinfo['general']['sthr']}\n") @@ -541,8 +539,7 @@ def _xtb_rrho(self, dohess = "--ohess" # generate coord file for xtb - with open(os.path.join(jobdir, f"{filename}.coord"), "w", - newline=None) as file: + with open(os.path.join(jobdir, f"{filename}.coord"), "w", newline=None) as file: file.writelines(job.conf.tocoord()) call = [ @@ -562,20 +559,24 @@ def _xtb_rrho(self, # add solvent to xtb call if necessary if not job.prepinfo["general"]["gas-phase"]: - call.extend([ - "--" + job.prepinfo["general"]["sm_rrho"], - job.prepinfo["xtb_rrho"]["solvent_key_xtb"], - ]) + call.extend( + [ + "--" + job.prepinfo["general"]["sm_rrho"], + job.prepinfo["xtb_rrho"]["solvent_key_xtb"], + ] + ) # if rmsd bias is used (should be defined in censo workdir (cwd)) TODO if job.prepinfo["general"]["rmsdbias"]: # move one dir up to get to cwd (FIXME) cwd = os.path.join(os.path.split(self.workdir)[::-1][1:][::-1]) - call.extend([ - "--bias-input", - os.path.join(cwd, "rmsdpot.xyz"), - ]) + call.extend( + [ + "--bias-input", + os.path.join(cwd, "rmsdpot.xyz"), + ] + ) # call xtb returncode, errors = self._make_call("xtb", call, outputpath, jobdir) @@ -584,13 +585,11 @@ def _xtb_rrho(self, if returncode != 0: meta["success"] = False meta["error"] = "unknown_error" - logger.warning( - f"Job for {job.conf.name} failed. Stderr output:\n{errors}") + logger.warning(f"Job for {job.conf.name} failed. Stderr output:\n{errors}") return result, meta # read output and store lines - with open(outputpath, "r", encoding=CODING, - newline=None) as outputfile: + with open(outputpath, "r", encoding=CODING, newline=None) as outputfile: lines = outputfile.readlines() if job.prepinfo["general"]["multitemp"]: @@ -610,7 +609,7 @@ def _xtb_rrho(self, # Get Gibbs energy and enthalpy for line in lines: if "T/K" in line: - for line2 in lines[lines.index(line) + 2:]: + for line2 in lines[lines.index(line) + 2 :]: if "----------------------------------" in line2: break @@ -622,32 +621,39 @@ def _xtb_rrho(self, rotS = {} # Get rotational entropy - entropy_lines = ((line, lines[i + 1]) - for i, line in enumerate(lines) if "VIB" in line) + entropy_lines = ( + (line, lines[i + 1]) for i, line in enumerate(lines) if "VIB" in line + ) for line in entropy_lines: T = float(line[0].split()[0]) rotS[T] = float(line[1].split()[4]) # Extract symmetry result["linear"] = next( - ({ - "true": True, - "false": False - }[line.split()[2]] for line in lines if ": linear? " in line), + ( + {"true": True, "false": False}[line.split()[2]] + for line in lines + if ": linear? " in line + ), None, ) # Extract rmsd result["rmsd"] = next( - (float(line.split()[3]) for line in lines - if "final rmsd / " in line and job.prepinfo["general"]["bhess"]), + ( + float(line.split()[3]) + for line in lines + if "final rmsd / " in line and job.prepinfo["general"]["bhess"] + ), None, ) # check if xtb calculated the temperature range correctly if job.prepinfo["general"]["multitemp"] and not ( - len(trange) == len(gt) and len(trange) == len(ht) - and len(trange) == len(rotS)): + len(trange) == len(gt) + and len(trange) == len(ht) + and len(trange) == len(rotS) + ): meta["success"] = False meta["error"] = "what went wrong in xtb_rrho" return result, meta @@ -655,24 +661,30 @@ def _xtb_rrho(self, result["gibbs"] = gt result["enthalpy"] = ht result["entropy"] = rotS + else: + result["gibbs"] = {} + result["enthalpy"] = {} + result["entropy"] = {} # xtb_enso.json is generated by xtb by using the '--enso' argument *only* when using --bhess or --ohess # (when a hessian is calculated) # contains output from xtb in json format to be more easily digestible by CENSO with open( - os.path.join(jobdir, "xtb_enso.json"), - "r", - encoding=CODING, - newline=None, + os.path.join(jobdir, "xtb_enso.json"), + "r", + encoding=CODING, + newline=None, ) as f: data = json.load(f) # read number of imaginary frequencies and print warning if "number of imags" in data: if data["number of imags"] > 0: - logger.warning(f"Found {data['number of imags']} significant" - f" imaginary frequencies for " - f"{job.conf.name}.") + logger.warning( + f"Found {data['number of imags']} significant" + f" imaginary frequencies for " + f"{job.conf.name}." + ) # get gibbs energy if "G(T)" in data: @@ -680,23 +692,28 @@ def _xtb_rrho(self, if job.prepinfo["general"]["temperature"] == 0.0: result["energy"] = data.get("ZPVE", 0.0) - if job.prepinfo["general"]["multitemp"]: - result["gibbs"][job.prepinfo["general"] - ["temperature"]] = data.get("ZPVE", 0.0) - result["enthalpy"][job.prepinfo["general"] - ["temperature"]] = data.get( - "ZPVE", 0.0) - result["entropy"][job.prepinfo["general"][ - "temperature"]] = None # set this to None for predictability + result["gibbs"][job.prepinfo["general"]["temperature"]] = data.get( + "ZPVE", 0.0 + ) + result["enthalpy"][job.prepinfo["general"]["temperature"]] = data.get( + "ZPVE", 0.0 + ) + if not job.prepinfo["general"]["multitemp"]: + result["entropy"][ + job.prepinfo["general"]["temperature"] + ] = None # set this to None for predictability else: result["energy"] = data.get("G(T)", 0.0) - if job.prepinfo["general"]["multitemp"]: - result["gibbs"][job.prepinfo["general"] - ["temperature"]] = data.get("G(T)", 0.0) - result["enthalpy"][job.prepinfo["general"][ - "temperature"]] = None # set this to None for predictability - result["entropy"][job.prepinfo["general"][ - "temperature"]] = None # set this to None for predictability + result["gibbs"][job.prepinfo["general"]["temperature"]] = data.get( + "G(T)", 0.0 + ) + if not job.prepinfo["general"]["multitemp"]: + result["enthalpy"][ + job.prepinfo["general"]["temperature"] + ] = None # set this to None for predictability + result["entropy"][ + job.prepinfo["general"]["temperature"] + ] = None # set this to None for predictability # only determine symmetry if all the needed information is there if "point group" and "linear" in data.keys(): @@ -708,8 +725,9 @@ def _xtb_rrho(self, result["linear"] = False # calculate symnum - result["symnum"] = self._get_sym_num(sym=result["symmetry"], - linear=result["linear"]) + result["symnum"] = self._get_sym_num( + sym=result["symmetry"], linear=result["linear"] + ) else: meta["success"] = False meta["error"] = "Could not read xtb_enso.json"