diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..0254a40c 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -446,7 +446,8 @@ def from_dict(cls, action_dict): return RemoteCopyAction(source=action_dict["source"]) def write_to_path(self, path): - copy_to_path(open(self.path, "rb"), path) + with open(self.path, "rb") as f: + copy_to_path(f, path) def write_from_path(self, pulsar_path): destination = self.path @@ -534,7 +535,8 @@ def write_to_path(self, path): object_store_id=object_store_ref["object_store_id"], ) filename = self.object_store.get_filename(dataset_object) - copy_to_path(open(filename, 'rb'), path) + with open(filename, "rb") as f: + copy_to_path(f, path) def write_from_path(self, pulsar_path): raise NotImplementedError("Writing raw files to object store not supported at this time.") @@ -660,7 +662,8 @@ def from_dict(cls, action_dict): return MessageAction(contents=action_dict["contents"]) def write_to_path(self, path): - open(path, "w").write(self.contents) + with open(path, "w") as f: + f.write(self.contents) DICTIFIABLE_ACTION_CLASSES = [ diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..656ce360 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -579,11 +579,8 @@ def _read(path): Utility method to quickly read small files (config files and tool wrappers) into memory as bytes. """ - input = open(path, encoding="utf-8") - try: + with open(path, encoding="utf-8") as input: return input.read() - finally: - input.close() __all__ = ['submit_job'] diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py index 242e63ec..10d66520 100644 --- a/pulsar/client/transport/curl.py +++ b/pulsar/client/transport/curl.py @@ -33,14 +33,16 @@ def __init__(self, timeout=None, **kwrgs): def execute(self, url, method=None, data=None, input_path=None, output_path=None): buf = _open_output(output_path) + input_fh = None try: c = _new_curl_object_for_url(url) c.setopt(c.WRITEFUNCTION, buf.write) if method: c.setopt(c.CUSTOMREQUEST, method) if input_path: + input_fh = open(input_path, "rb") c.setopt(c.UPLOAD, 1) - c.setopt(c.READFUNCTION, open(input_path, 'rb').read) + c.setopt(c.READFUNCTION, input_fh.read) filesize = os.path.getsize(input_path) c.setopt(c.INFILESIZE, filesize) if data: @@ -61,6 +63,8 @@ def execute(self, url, method=None, data=None, input_path=None, output_path=None return buf.getvalue() finally: buf.close() + if input_fh: + input_fh.close() def post_file(url, path): diff --git a/pulsar/client/transport/poster.py b/pulsar/client/transport/poster.py index a1e2488b..cfc22acc 100644 --- a/pulsar/client/transport/poster.py +++ b/pulsar/client/transport/poster.py @@ -26,9 +26,10 @@ def post_file(url, path): __ensure_poster() try: - datagen, headers = poster.encode.multipart_encode({"file": open(path, "rb")}) - request = Request(url, datagen, headers) - return urlopen(request).read() + with open(path, "rb") as fh: + datagen, headers = poster.encode.multipart_encode({"file": fh}) + request = Request(url, datagen, headers) + return urlopen(request).read() except Exception: log.exception("Problem with poster post of [%s]" % path) raise diff --git a/pulsar/client/transport/requests.py b/pulsar/client/transport/requests.py index 1a75b21a..92dcf036 100644 --- a/pulsar/client/transport/requests.py +++ b/pulsar/client/transport/requests.py @@ -21,10 +21,11 @@ def post_file(url, path): raise ImportError(REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE) __ensure_requests() - m = requests_toolbelt.MultipartEncoder( - fields={'file': ('filename', open(path, 'rb'))} - ) - requests.post(url, data=m, headers={'Content-Type': m.content_type}) + with open(path, "rb") as f: + m = requests_toolbelt.MultipartEncoder( + fields={'file': ('filename', f)} + ) + requests.post(url, data=m, headers={'Content-Type': m.content_type}) def get_file(url, path): diff --git a/pulsar/client/util.py b/pulsar/client/util.py index e10531ed..6bc1d988 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -40,19 +40,16 @@ def copy_to_path(object, path): """ Copy file-like object to path. """ - output = open(path, 'wb') - _copy_and_close(object, output) + with open(path, 'wb') as output: + _copy(object, output) -def _copy_and_close(object, output): - try: - while True: - buffer = object.read(BUFFER_SIZE) - if not buffer: - break - output.write(buffer) - finally: - output.close() +def _copy(object, output): + while True: + buffer = object.read(BUFFER_SIZE) + if not buffer: + break + output.write(buffer) # Variant of base64 compat layer inspired by BSD code from Bcfg2 @@ -283,10 +280,12 @@ def __contains__(self, item): return exists(self.__path(item)) def __setitem__(self, key, value): - open(self.__path(key), 'w').write(json.dumps(value)) + with open(self.__path(key), "w") as f: + f.write(json.dumps(value)) def __getitem__(self, key): - return json.loads(open(self.__path(key)).read()) + with open(self.__path(key)) as f: + return json.loads(f.read()) def __delitem__(self, key): try: diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py index 82950b26..46e75c78 100644 --- a/pulsar/manager_factory.py +++ b/pulsar/manager_factory.py @@ -52,7 +52,8 @@ def build_managers(app, conf): def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_config): config = configparser.ConfigParser() - config.readfp(open(job_managers_config)) + with open(job_managers_config) as config_fh: + config.read_file(config_fh) for section in config.sections(): if not section.startswith(MANAGER_PREFIX): continue diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 5c89eb66..53938f5a 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -186,7 +186,8 @@ def _check_execution(self, job_id, tool_id, command_line): for file in self._list_dir(tool_files_dir): if os.path.isdir(join(tool_files_dir, file)): continue - contents = open(join(tool_files_dir, file)).read() + with open(join(tool_files_dir, file)) as fh: + contents = fh.read() log.debug("job_id: {} - checking tool file {}".format(job_id, file)) authorization.authorize_tool_file(basename(file), contents) config_files_dir = job_directory.configs_directory() @@ -255,28 +256,21 @@ def calculate_path(self, remote_path, input_type): def read_file(self, name, size=-1, default=None): path = self._job_file(name) - job_file = None try: - job_file = open(path, 'rb') - return job_file.read(size) + with open(path, 'rb') as job_file: + return job_file.read(size) except Exception: if default is not None: return default else: raise - finally: - if job_file: - job_file.close() def write_file(self, name, contents): path = self._job_file(name) - job_file = open(path, 'wb') - try: - if isinstance(contents, str): - contents = contents.encode("UTF-8") + if isinstance(contents, str): + contents = contents.encode("UTF-8") + with open(path, "wb") as job_file: job_file.write(contents) - finally: - job_file.close() return path def remove_file(self, name): diff --git a/pulsar/managers/queued_condor.py b/pulsar/managers/queued_condor.py index 62e3677c..2153094d 100644 --- a/pulsar/managers/queued_condor.py +++ b/pulsar/managers/queued_condor.py @@ -41,7 +41,9 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio setup_params=setup_params ) log_path = self.__condor_user_log(job_id) - open(log_path, 'w') # Touch log file + with open(log_path, "w"): + # Touch log file + pass submit_params.update(self.submission_params) build_submit_params = dict( diff --git a/pulsar/managers/queued_external_drmaa.py b/pulsar/managers/queued_external_drmaa.py index 4eb97192..114d74e8 100644 --- a/pulsar/managers/queued_external_drmaa.py +++ b/pulsar/managers/queued_external_drmaa.py @@ -44,7 +44,8 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio submit_params=submit_params, setup_params=setup_params, ) - print(open(attributes['remoteCommand']).read()) + with open(attributes["remoteCommand"]) as fh: + print(fh.read()) job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes)) user = submit_params.get('user', None) log.info("Submit as user %s" % user) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 2f32ce1c..178af279 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -88,7 +88,8 @@ def touch_outputs(self, job_id, touch_outputs): job_directory = self._proxied_manager.job_directory(job_id) for name in touch_outputs: path = job_directory.calculate_path(name, 'output') - job_directory.open_file(path, mode='a') + with contextlib.closing(job_directory.open_file(path, mode='a')): + pass def preprocess_and_launch(self, job_id, launch_config): self._persist_launch_config(job_id, launch_config) @@ -320,7 +321,8 @@ def activate_job(self, job_id, active_status=ACTIVE_STATUS_LAUNCHED): if self._active_job_directory(active_status): path = self._active_job_file(job_id, active_status=active_status) try: - open(path, "w").close() + with open(path, "w"): + pass except Exception: log.warn(ACTIVATE_FAILED_MESSAGE % job_id) diff --git a/pulsar/scripts/config.py b/pulsar/scripts/config.py index bfebe18a..abd683a6 100644 --- a/pulsar/scripts/config.py +++ b/pulsar/scripts/config.py @@ -295,7 +295,8 @@ def _handle_server_ini(args, directory): server_config = SERVER_CONFIG_TEMPLATE.safe_substitute( **config_dict ) - open(ini_file, "w").write(server_config) + with open(ini_file, "w") as fh: + fh.write(server_config) def _handle_app_yaml(args, directory): @@ -312,7 +313,8 @@ def _handle_app_yaml(args, directory): contents += 'conda_auto_install: {}\n'.format(auto_conda) if not IS_WINDOWS and args.libdrmaa_path: contents += 'manager:\n type: queued_drmaa\n' - open(yaml_file, "w").write(contents) + with open(yaml_file, "w") as fh: + fh.write(contents) def _handle_local_env(args, directory, dependencies): @@ -327,7 +329,8 @@ def _handle_local_env(args, directory, dependencies): local_env_contents = LOCAL_ENV_TEMPLATE.safe_substitute( libdrmaa_line=libdrmaa_line, ) - open(local_env_file, "w").write(local_env_contents) + with open(local_env_file, "w") as fh: + fh.write(local_env_contents) def _handle_supervisor(args, mode, directory, dependencies): @@ -339,7 +342,8 @@ def _handle_supervisor(args, mode, directory, dependencies): mode=mode, ) conf_path = os.path.join(directory, "supervisor.conf") - open(conf_path, "w").write(config) + with open(conf_path, "w") as fh: + fh.write(config) dependencies.append("supervisor") diff --git a/pulsar/scripts/drmaa_launch.py b/pulsar/scripts/drmaa_launch.py index cb746272..95f0ed49 100644 --- a/pulsar/scripts/drmaa_launch.py +++ b/pulsar/scripts/drmaa_launch.py @@ -10,7 +10,8 @@ def main(argv=None): arg_parser = ArgumentParser(description=DESCRIPTION) arg_parser.add_argument("--job_attributes", required=True) args = arg_parser.parse_args(argv) - job_attributes = load(open(args.job_attributes)) + with open(args.job_attributes) as fh: + job_attributes = load(fh) session = DrmaaSessionFactory().get() external_id = session.run_job(**job_attributes) print(external_id) diff --git a/pulsar/scripts/run.py b/pulsar/scripts/run.py index 9d27076d..dc2dcfc8 100644 --- a/pulsar/scripts/run.py +++ b/pulsar/scripts/run.py @@ -97,7 +97,8 @@ def _run_client_for_job(args): result_status = waiter.wait() pulsar_outputs = PulsarOutputs.from_status_response(result_status) if args.result_json: - open(args.result_json, "w").write(json_dumps(result_status)) + with open(args.result_json, "w") as fh: + fh.write(json_dumps(result_status)) finish_args = dict( client=client, job_completed_normally=True, diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py index c00a2c32..d2d3580f 100644 --- a/pulsar/scripts/submit_util.py +++ b/pulsar/scripts/submit_util.py @@ -77,7 +77,8 @@ def _load_job_config(args): base64_job_config = args.base64 job_config = from_base64_json(base64_job_config) else: - job_config = json.load(open(args.file)) + with open(args.file) as fh: + job_config = json.load(fh) return job_config diff --git a/pulsar/tools/authorization.py b/pulsar/tools/authorization.py index 32412c31..1db7c8d6 100644 --- a/pulsar/tools/authorization.py +++ b/pulsar/tools/authorization.py @@ -43,7 +43,8 @@ def authorize_tool_file(self, name, contents): tool = self.tool tool_dir = tool.get_tool_dir() tool_dir_file = join(tool_dir, name) - allowed_contents = open(tool_dir_file).read() + with open(tool_dir_file) as fh: + allowed_contents = fh.read() if contents != allowed_contents: self.__unauthorized("Attempt to write tool file with contents differing from Pulsar copy of tool file.") diff --git a/pulsar/tools/toolbox.py b/pulsar/tools/toolbox.py index 97fbe5b9..610a2929 100644 --- a/pulsar/tools/toolbox.py +++ b/pulsar/tools/toolbox.py @@ -68,7 +68,8 @@ def validate_config(self, job_directory, name, path): config_validator = self.config_validators.get(name, None) valid = True if config_validator: - contents = open(path, encoding="UTF-8").read() + with open(path, encoding="UTF-8") as fh: + contents = fh.read() valid = config_validator.validate(job_directory, contents) return valid diff --git a/pulsar/util/__init__.py b/pulsar/util/__init__.py index fde70666..a34f6a9f 100644 --- a/pulsar/util/__init__.py +++ b/pulsar/util/__init__.py @@ -9,19 +9,16 @@ def copy_to_path(object, path): """ Copy file-like object to path. """ - output = open(path, 'wb') - _copy_and_close(object, output) + with open(path, 'wb') as output: + _copy(object, output) -def _copy_and_close(object, output): - try: - while True: - buffer = object.read(BUFFER_SIZE) - if not buffer: - break - output.write(buffer) - finally: - output.close() +def _copy(object, output): + while True: + buffer = object.read(BUFFER_SIZE) + if not buffer: + break + output.write(buffer) def copy_to_temp(object): @@ -29,8 +26,8 @@ def copy_to_temp(object): Copy file-like object to temp file and return path. """ - temp_file = NamedTemporaryFile(delete=False) - _copy_and_close(object, temp_file) + with NamedTemporaryFile(delete=False) as temp_file: + _copy(object, temp_file) return temp_file.name diff --git a/pulsar/util/pastescript/serve.py b/pulsar/util/pastescript/serve.py index 5eee811e..8706f996 100644 --- a/pulsar/util/pastescript/serve.py +++ b/pulsar/util/pastescript/serve.py @@ -592,20 +592,20 @@ def command(self): # Ensure the log file is writeable if self.options.log_file: try: - writeable_log_file = open(self.options.log_file, 'a') + with open(self.options.log_file, 'a'): + pass except OSError as ioe: msg = 'Error: Unable to write to log file: %s' % ioe raise BadCommand(msg) - writeable_log_file.close() # Ensure the pid file is writeable if self.options.pid_file: try: - writeable_pid_file = open(self.options.pid_file, 'a') + with open(self.options.pid_file, 'a'): + pass except OSError as ioe: msg = 'Error: Unable to write to pid file: %s' % ioe raise BadCommand(msg) - writeable_pid_file.close() if getattr(self.options, 'daemon', False): try: @@ -721,9 +721,8 @@ def record_pid(self, pid_file): pid = os.getpid() if self.verbose > 1: print(f'Writing PID {pid} to {pid_file}') - f = open(pid_file, 'w') - f.write(str(pid)) - f.close() + with open(pid_file, 'w') as f: + f.write(str(pid)) atexit.register(_remove_pid_file, pid, pid_file, self.verbose) def stop_daemon(self): @@ -913,9 +912,8 @@ def live_pidfile(pidfile): def read_pidfile(filename): if os.path.exists(filename): try: - f = open(filename) - content = f.read() - f.close() + with open(filename) as f: + content = f.read() return int(content.strip()) except (ValueError, OSError): return None @@ -931,9 +929,8 @@ def _remove_pid_file(written_pid, filename, verbosity): return if not os.path.exists(filename): return - f = open(filename) - content = f.read().strip() - f.close() + with open(filename) as f: + content = f.read().strip() try: pid_in_file = int(content) except ValueError: diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index 4d6b0756..b8b48555 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -233,10 +233,11 @@ def __init__(self, id): def _handle_upload(file_cache, path, body, cache_token=None): - source = body if cache_token: cached_file = file_cache.destination(cache_token) - source = open(cached_file, 'rb') - log.info("Copying cached file {} to {}".format(cached_file, path)) - copy_to_path(source, path) + with open(cached_file, 'rb') as source: + log.info("Copying cached file {} to {}".format(cached_file, path)) + copy_to_path(source, path) + else: + copy_to_path(body, path) return {"path": path}