Skip to content

Commit

Permalink
Close (almost) all file handles
Browse files Browse the repository at this point in the history
What's remaining are tests and file serving in WSGI app.
The WGSI app use is probably fine but also not a problem for us
currently.
  • Loading branch information
mvdbeek committed Jun 18, 2024
1 parent 12f1349 commit c7635b6
Show file tree
Hide file tree
Showing 20 changed files with 94 additions and 85 deletions.
9 changes: 6 additions & 3 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ def from_dict(cls, action_dict):
return RemoteCopyAction(source=action_dict["source"])

def write_to_path(self, path):
copy_to_path(open(self.path, "rb"), path)
with open(self.path, "rb") as f:
copy_to_path(f, path)

def write_from_path(self, pulsar_path):
destination = self.path
Expand Down Expand Up @@ -534,7 +535,8 @@ def write_to_path(self, path):
object_store_id=object_store_ref["object_store_id"],
)
filename = self.object_store.get_filename(dataset_object)
copy_to_path(open(filename, 'rb'), path)
with open(filename, "rb") as f:
copy_to_path(f, path)

def write_from_path(self, pulsar_path):
raise NotImplementedError("Writing raw files to object store not supported at this time.")
Expand Down Expand Up @@ -660,7 +662,8 @@ def from_dict(cls, action_dict):
return MessageAction(contents=action_dict["contents"])

def write_to_path(self, path):
open(path, "w").write(self.contents)
with open(path, "w") as f:
f.write(self.contents)


DICTIFIABLE_ACTION_CLASSES = [
Expand Down
5 changes: 1 addition & 4 deletions pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,8 @@ def _read(path):
Utility method to quickly read small files (config files and tool
wrappers) into memory as bytes.
"""
input = open(path, encoding="utf-8")
try:
with open(path, encoding="utf-8") as input:
return input.read()
finally:
input.close()


__all__ = ['submit_job']
6 changes: 5 additions & 1 deletion pulsar/client/transport/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ def __init__(self, timeout=None, **kwrgs):

def execute(self, url, method=None, data=None, input_path=None, output_path=None):
buf = _open_output(output_path)
input_fh = None
try:
c = _new_curl_object_for_url(url)
c.setopt(c.WRITEFUNCTION, buf.write)
if method:
c.setopt(c.CUSTOMREQUEST, method)
if input_path:
input_fh = open(input_path, "rb")
c.setopt(c.UPLOAD, 1)
c.setopt(c.READFUNCTION, open(input_path, 'rb').read)
c.setopt(c.READFUNCTION, input_fh.read)
filesize = os.path.getsize(input_path)
c.setopt(c.INFILESIZE, filesize)
if data:
Expand All @@ -61,6 +63,8 @@ def execute(self, url, method=None, data=None, input_path=None, output_path=None
return buf.getvalue()
finally:
buf.close()
if input_fh:
input_fh.close()


def post_file(url, path):
Expand Down
7 changes: 4 additions & 3 deletions pulsar/client/transport/poster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
def post_file(url, path):
__ensure_poster()
try:
datagen, headers = poster.encode.multipart_encode({"file": open(path, "rb")})
request = Request(url, datagen, headers)
return urlopen(request).read()
with open(path, "rb") as fh:
datagen, headers = poster.encode.multipart_encode({"file": fh})
request = Request(url, datagen, headers)
return urlopen(request).read()
except Exception:
log.exception("Problem with poster post of [%s]" % path)
raise
Expand Down
9 changes: 5 additions & 4 deletions pulsar/client/transport/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ def post_file(url, path):
raise ImportError(REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE)

__ensure_requests()
m = requests_toolbelt.MultipartEncoder(
fields={'file': ('filename', open(path, 'rb'))}
)
requests.post(url, data=m, headers={'Content-Type': m.content_type})
with open(path, "rb") as f:
m = requests_toolbelt.MultipartEncoder(
fields={'file': ('filename', f)}
)
requests.post(url, data=m, headers={'Content-Type': m.content_type})


def get_file(url, path):
Expand Down
25 changes: 12 additions & 13 deletions pulsar/client/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,16 @@ def copy_to_path(object, path):
"""
Copy file-like object to path.
"""
output = open(path, 'wb')
_copy_and_close(object, output)
with open(path, 'wb') as output:
_copy(object, output)


def _copy_and_close(object, output):
try:
while True:
buffer = object.read(BUFFER_SIZE)
if not buffer:
break
output.write(buffer)
finally:
output.close()
def _copy(object, output):
while True:
buffer = object.read(BUFFER_SIZE)
if not buffer:
break
output.write(buffer)


# Variant of base64 compat layer inspired by BSD code from Bcfg2
Expand Down Expand Up @@ -283,10 +280,12 @@ def __contains__(self, item):
return exists(self.__path(item))

def __setitem__(self, key, value):
open(self.__path(key), 'w').write(json.dumps(value))
with open(self.__path(key), "w") as f:
f.write(json.dumps(value))

def __getitem__(self, key):
return json.loads(open(self.__path(key)).read())
with open(self.__path(key)) as f:
return json.loads(f.read())

def __delitem__(self, key):
try:
Expand Down
3 changes: 2 additions & 1 deletion pulsar/manager_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def build_managers(app, conf):

def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_config):
config = configparser.ConfigParser()
config.readfp(open(job_managers_config))
with open(job_managers_config) as config_fh:
config.read_file(config_fh)
for section in config.sections():
if not section.startswith(MANAGER_PREFIX):
continue
Expand Down
20 changes: 7 additions & 13 deletions pulsar/managers/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ def _check_execution(self, job_id, tool_id, command_line):
for file in self._list_dir(tool_files_dir):
if os.path.isdir(join(tool_files_dir, file)):
continue
contents = open(join(tool_files_dir, file)).read()
with open(join(tool_files_dir, file)) as fh:
contents = fh.read()
log.debug("job_id: {} - checking tool file {}".format(job_id, file))
authorization.authorize_tool_file(basename(file), contents)
config_files_dir = job_directory.configs_directory()
Expand Down Expand Up @@ -255,28 +256,21 @@ def calculate_path(self, remote_path, input_type):

def read_file(self, name, size=-1, default=None):
path = self._job_file(name)
job_file = None
try:
job_file = open(path, 'rb')
return job_file.read(size)
with open(path, 'rb') as job_file:
return job_file.read(size)
except Exception:
if default is not None:
return default
else:
raise
finally:
if job_file:
job_file.close()

def write_file(self, name, contents):
path = self._job_file(name)
job_file = open(path, 'wb')
try:
if isinstance(contents, str):
contents = contents.encode("UTF-8")
if isinstance(contents, str):
contents = contents.encode("UTF-8")
with open(path, "wb") as job_file:
job_file.write(contents)
finally:
job_file.close()
return path

def remove_file(self, name):
Expand Down
4 changes: 3 additions & 1 deletion pulsar/managers/queued_condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio
setup_params=setup_params
)
log_path = self.__condor_user_log(job_id)
open(log_path, 'w') # Touch log file
with open(log_path, "w"):
# Touch log file
pass

submit_params.update(self.submission_params)
build_submit_params = dict(
Expand Down
3 changes: 2 additions & 1 deletion pulsar/managers/queued_external_drmaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def launch(self, job_id, command_line, submit_params={}, dependencies_descriptio
submit_params=submit_params,
setup_params=setup_params,
)
print(open(attributes['remoteCommand']).read())
with open(attributes["remoteCommand"]) as fh:
print(fh.read())
job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes))
user = submit_params.get('user', None)
log.info("Submit as user %s" % user)
Expand Down
6 changes: 4 additions & 2 deletions pulsar/managers/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def touch_outputs(self, job_id, touch_outputs):
job_directory = self._proxied_manager.job_directory(job_id)
for name in touch_outputs:
path = job_directory.calculate_path(name, 'output')
job_directory.open_file(path, mode='a')
with contextlib.closing(job_directory.open_file(path, mode='a')):
pass

def preprocess_and_launch(self, job_id, launch_config):
self._persist_launch_config(job_id, launch_config)
Expand Down Expand Up @@ -320,7 +321,8 @@ def activate_job(self, job_id, active_status=ACTIVE_STATUS_LAUNCHED):
if self._active_job_directory(active_status):
path = self._active_job_file(job_id, active_status=active_status)
try:
open(path, "w").close()
with open(path, "w"):
pass
except Exception:
log.warn(ACTIVATE_FAILED_MESSAGE % job_id)

Expand Down
12 changes: 8 additions & 4 deletions pulsar/scripts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ def _handle_server_ini(args, directory):
server_config = SERVER_CONFIG_TEMPLATE.safe_substitute(
**config_dict
)
open(ini_file, "w").write(server_config)
with open(ini_file, "w") as fh:
fh.write(server_config)


def _handle_app_yaml(args, directory):
Expand All @@ -312,7 +313,8 @@ def _handle_app_yaml(args, directory):
contents += 'conda_auto_install: {}\n'.format(auto_conda)
if not IS_WINDOWS and args.libdrmaa_path:
contents += 'manager:\n type: queued_drmaa\n'
open(yaml_file, "w").write(contents)
with open(yaml_file, "w") as fh:
fh.write(contents)


def _handle_local_env(args, directory, dependencies):
Expand All @@ -327,7 +329,8 @@ def _handle_local_env(args, directory, dependencies):
local_env_contents = LOCAL_ENV_TEMPLATE.safe_substitute(
libdrmaa_line=libdrmaa_line,
)
open(local_env_file, "w").write(local_env_contents)
with open(local_env_file, "w") as fh:
fh.write(local_env_contents)


def _handle_supervisor(args, mode, directory, dependencies):
Expand All @@ -339,7 +342,8 @@ def _handle_supervisor(args, mode, directory, dependencies):
mode=mode,
)
conf_path = os.path.join(directory, "supervisor.conf")
open(conf_path, "w").write(config)
with open(conf_path, "w") as fh:
fh.write(config)
dependencies.append("supervisor")


Expand Down
3 changes: 2 additions & 1 deletion pulsar/scripts/drmaa_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ def main(argv=None):
arg_parser = ArgumentParser(description=DESCRIPTION)
arg_parser.add_argument("--job_attributes", required=True)
args = arg_parser.parse_args(argv)
job_attributes = load(open(args.job_attributes))
with open(args.job_attributes) as fh:
job_attributes = load(fh)
session = DrmaaSessionFactory().get()
external_id = session.run_job(**job_attributes)
print(external_id)
Expand Down
3 changes: 2 additions & 1 deletion pulsar/scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def _run_client_for_job(args):
result_status = waiter.wait()
pulsar_outputs = PulsarOutputs.from_status_response(result_status)
if args.result_json:
open(args.result_json, "w").write(json_dumps(result_status))
with open(args.result_json, "w") as fh:
fh.write(json_dumps(result_status))
finish_args = dict(
client=client,
job_completed_normally=True,
Expand Down
3 changes: 2 additions & 1 deletion pulsar/scripts/submit_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def _load_job_config(args):
base64_job_config = args.base64
job_config = from_base64_json(base64_job_config)
else:
job_config = json.load(open(args.file))
with open(args.file) as fh:
job_config = json.load(fh)
return job_config


Expand Down
3 changes: 2 additions & 1 deletion pulsar/tools/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def authorize_tool_file(self, name, contents):
tool = self.tool
tool_dir = tool.get_tool_dir()
tool_dir_file = join(tool_dir, name)
allowed_contents = open(tool_dir_file).read()
with open(tool_dir_file) as fh:
allowed_contents = fh.read()
if contents != allowed_contents:
self.__unauthorized("Attempt to write tool file with contents differing from Pulsar copy of tool file.")

Expand Down
3 changes: 2 additions & 1 deletion pulsar/tools/toolbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def validate_config(self, job_directory, name, path):
config_validator = self.config_validators.get(name, None)
valid = True
if config_validator:
contents = open(path, encoding="UTF-8").read()
with open(path, encoding="UTF-8") as fh:
contents = fh.read()
valid = config_validator.validate(job_directory, contents)
return valid

Expand Down
23 changes: 10 additions & 13 deletions pulsar/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,25 @@ def copy_to_path(object, path):
"""
Copy file-like object to path.
"""
output = open(path, 'wb')
_copy_and_close(object, output)
with open(path, 'wb') as output:
_copy(object, output)


def _copy_and_close(object, output):
try:
while True:
buffer = object.read(BUFFER_SIZE)
if not buffer:
break
output.write(buffer)
finally:
output.close()
def _copy(object, output):
while True:
buffer = object.read(BUFFER_SIZE)
if not buffer:
break
output.write(buffer)


def copy_to_temp(object):
"""
Copy file-like object to temp file and return
path.
"""
temp_file = NamedTemporaryFile(delete=False)
_copy_and_close(object, temp_file)
with NamedTemporaryFile(delete=False) as temp_file:
_copy(object, temp_file)
return temp_file.name


Expand Down
Loading

0 comments on commit c7635b6

Please sign in to comment.