Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-based ObjectStore #4314

Closed
wants to merge 36 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0b169c7
Replaced most of the direct calls to S3 with their equivalence from C…
VJalili Feb 13, 2017
ea71efb
Replaced most of the direct calls to S3 with their equivalence from C…
VJalili Feb 13, 2017
143f84f
A reformatting.
VJalili Feb 15, 2017
7ec4306
Initial commit of OpenID Connect Provider.
VJalili Apr 27, 2017
714a071
Initial commit of OpenID Connect Provider.
VJalili Apr 28, 2017
747ad3f
Added a TODO comment.
VJalili May 2, 2017
f0bda14
Updates to both oidc and s3.
VJalili May 5, 2017
b801a84
Updates to both oidc and s3.
VJalili May 22, 2017
e2e4566
Renamed S3ObjectStore to CloudObjectStore
VJalili May 22, 2017
da87d16
Renamed S3 to Cloud.
VJalili May 22, 2017
b1e80fa
Drafted a user-based ObjectStore structure.
VJalili May 23, 2017
8b16182
1. Updated a comment.
VJalili May 24, 2017
b385f3d
1. Added user to create function call in job.
VJalili May 25, 2017
d764ec7
- Propagated 'User' from HDA through Dataset to ObjectStore, for some…
VJalili May 26, 2017
5980ee3
- Added user object to all the classes which need to access a user-ba…
VJalili Jun 1, 2017
198b4ca
change the design 'who passes user to objectstore':
VJalili Jun 2, 2017
1d4bc21
change the design 'who passes user to objectstore':
VJalili Jun 2, 2017
77b5d7a
Introduced PluggedMedia;
VJalili Jun 15, 2017
520cea5
- Removed OIDC idp code from this branch.
VJalili Jun 15, 2017
9699790
- Removed the remaining OIDC idp code from this branch.
VJalili Jun 15, 2017
6e8e671
Propogated user and pluggedMedia info when creating a new dataset as …
VJalili Jun 19, 2017
004ea3f
1. Propogated User and PluggedMedia through a dataset delete/purge pr…
VJalili Jun 20, 2017
634bc55
Fixed some bugs occurring at a dataset deletion and download.
VJalili Jun 21, 2017
66ec187
1. make user and pluggedMedia as optional parameters so that other in…
VJalili Jul 13, 2017
2774824
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
bf08b33
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
8f31f76
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
140a194
Merge branch 'master' into UserBasedObjectStore
VJalili Jul 13, 2017
9176836
Merge remote-tracking branch 'remotes/main/dev' into UserBasedObjectS…
VJalili Jul 13, 2017
32f806e
Fixed db migration number.
VJalili Jul 13, 2017
5f1cf27
Added missing commits for galaxy/jobs
VJalili Jul 13, 2017
337b3a0
Fixed a bug with galaxy/jobs; and renamed a confusing dataset argumen…
VJalili Jul 13, 2017
398bc78
Reverted the change on info.txt
VJalili Jul 14, 2017
a9f2165
Refactoring `pluggedMedia` to `plugged_media`
VJalili Jul 14, 2017
c4f3c96
Fixed a bug happended after refactoring.
VJalili Jul 14, 2017
a10b564
Reverted the formating fixes in mappings.py
VJalili Jul 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'remotes/main/dev' into UserBasedObjectS…
…tore

# Conflicts:
#	doc/source/lib/galaxy.objectstore.rst
#	doc/source/slideshow/architecture/images/objectstore.plantuml.svg
#	lib/galaxy/jobs/__init__.py
VJalili committed Jul 13, 2017
commit 9176836cdb3d3ef9cad429c7fa281c426a9d34e9
42 changes: 5 additions & 37 deletions doc/source/lib/galaxy.objectstore.rst
Original file line number Diff line number Diff line change
@@ -9,40 +9,8 @@ galaxy\.objectstore package
Submodules
----------

galaxy.objectstore.pulsar module
--------------------------------

.. automodule:: galaxy.objectstore.pulsar
:members:
:undoc-members:
:show-inheritance:

galaxy.objectstore.rods module
------------------------------

.. automodule:: galaxy.objectstore.rods
:members:
:undoc-members:
:show-inheritance:

galaxy.objectstore.cloud module
----------------------------

.. automodule:: galaxy.objectstore.cloud
:members:
:undoc-members:
:show-inheritance:

galaxy.objectstore.s3_multipart_upload module
---------------------------------------------

.. automodule:: galaxy.objectstore.s3_multipart_upload
:members:
:undoc-members:
:show-inheritance:

galaxy.objectstore.azure_blob module
---------------------------------------------
galaxy\.objectstore\.azure\_blob module
---------------------------------------

.. automodule:: galaxy.objectstore.azure_blob
:members:
@@ -65,10 +33,10 @@ galaxy\.objectstore\.rods module
:undoc-members:
:show-inheritance:

galaxy\.objectstore\.s3 module
------------------------------
galaxy.objectstore.cloud module
----------------------------

.. automodule:: galaxy.objectstore.s3
.. automodule:: galaxy.objectstore.cloud
:members:
:undoc-members:
:show-inheritance:
171 changes: 1 addition & 170 deletions doc/source/slideshow/architecture/images/objectstore.plantuml.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
83 changes: 40 additions & 43 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -902,16 +902,10 @@ def get_special( ):
def _create_working_directory( self ):
job = self.get_job()
try:
# TEMP BLOCK --- START
pluggedMedia = None
for pM in job.user.pluggedMedia:
pluggedMedia = pM
break
# TEMP BLOCK --- END
self.app.object_store.create(
job, user=job.user, pluggedMedia=pluggedMedia, base_dir='job_work', dir_only=True, obj_dir=True)
job, base_dir='job_work', dir_only=True, obj_dir=True )
self.working_directory = self.app.object_store.get_filename(
job, user=job.user, pluggedMedia=pluggedMedia, base_dir='job_work', dir_only=True, obj_dir=True)
job, base_dir='job_work', dir_only=True, obj_dir=True )

# The tool execution is given a working directory beneath the
# "job" working directory.
@@ -1005,13 +999,11 @@ def fail( self, message, exception=False, stdout="", stderr="", exit_code=None )
dataset.blurb = 'tool error'
dataset.info = message
dataset.set_size()
dataset.dataset.set_total_size( user=job.user, pluggedMedia=dataset.pluggedMedia )
dataset.dataset.set_total_size()
dataset.mark_unhidden()
if dataset.ext == 'auto':
dataset.extension = 'data'
# Update (non-library) job output datasets through the object store
if dataset not in job.output_library_datasets:
self.app.object_store.update_from_file( dataset.dataset, user=job.user, pluggedMedia=dataset.pluggedMedia, create=True )
self.__update_output(job, dataset)
# Pause any dependent jobs (and those jobs' outputs)
for dep_job_assoc in dataset.dependent_jobs:
self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." )
@@ -1222,19 +1214,20 @@ def finish(
# should this also be checking library associations? - can a library item be added from a history before the job has ended? -
# lets not allow this to occur
# need to update all associated output hdas, i.e. history was shared with job running
#TODO: the following object called `dataset`, should be renamed to `hda`.
for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations:
trynum = 0
while trynum < self.app.config.retry_job_output_collection:
try:
# Attempt to short circuit NFS attribute caching
os.stat( dataset.dataset.get_file_name( job.user ) )
os.chown( dataset.dataset.get_file_name( job.user ), os.getuid(), -1 )
trynum = self.app.config.retry_job_output_collection
except ( OSError, ObjectNotFound ) as e:
trynum += 1
log.warning( 'Error accessing %s, will retry: %s', dataset.dataset.get_file_name( job.user ), e )
time.sleep( 2 )
purged = dataset.dataset.purged
if not purged and dataset.dataset.external_filename is None:
trynum = 0
while trynum < self.app.config.retry_job_output_collection:
try:
# Attempt to short circuit NFS attribute caching
os.stat( dataset.dataset.file_name )
os.chown( dataset.dataset.file_name, os.getuid(), -1 )
trynum = self.app.config.retry_job_output_collection
except ( OSError, ObjectNotFound ) as e:
trynum += 1
log.warning( 'Error accessing %s, will retry: %s', dataset.dataset.file_name, e )
time.sleep( 2 )
if getattr( dataset, "hidden_beneath_collection_instance", None ):
dataset.visible = False
dataset.blurb = 'done'
@@ -1252,8 +1245,20 @@ def finish(
dataset.dataset.uuid = context['uuid']
# Update (non-library) job output datasets through the object store
if dataset not in job.output_library_datasets:
self.app.object_store.update_from_file(dataset.dataset, user=job.user, pluggedMedia=dataset.pluggedMedia, create=True)
self._collect_extra_files(dataset.dataset, self.working_directory)
self.app.object_store.update_from_file(dataset.dataset, create=True)
self.__update_output(job, dataset)
if not purged:
self._collect_extra_files(dataset.dataset, self.working_directory)
# Handle composite datatypes of auto_primary_file type
if dataset.datatype.composite_type == 'auto_primary_file' and not dataset.has_data():
try:
with NamedTemporaryFile() as temp_fh:
temp_fh.write( dataset.datatype.generate_primary_file( dataset ) )
temp_fh.flush()
self.app.object_store.update_from_file( dataset.dataset, file_name=temp_fh.name, create=True )
dataset.set_size()
except Exception as e:
log.warning( 'Unable to generate primary composite file automatically for %s: %s', dataset.dataset.id, e )
if job.states.ERROR == final_job_state:
dataset.blurb = "error"
dataset.mark_unhidden()
@@ -1270,7 +1275,7 @@ def finish(
if retry_internally and not self.external_output_metadata.external_metadata_set_successfully(dataset, self.sa_session ):
# If Galaxy was expected to sniff type and didn't - do so.
if dataset.ext == "_sniff_":
extension = sniff.handle_uploaded_dataset_file( dataset.dataset.get_file_name( job.user ), self.app.datatypes_registry )
extension = sniff.handle_uploaded_dataset_file( dataset.dataset.file_name, self.app.datatypes_registry )
dataset.extension = extension

# call datatype.set_meta directly for the initial set_meta call during dataset creation
@@ -1306,7 +1311,7 @@ def path_rewriter( path ):
else:
dataset.set_peek( line_count=context['line_count'] )
except:
if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte( user=job.user, pluggedMedia=dataset.pluggedMedia ) ) or self.tool.is_multi_byte:
if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte:
dataset.set_peek( is_multi_byte=True )
else:
dataset.set_peek()
@@ -1401,8 +1406,9 @@ def path_rewriter( path ):
collected_bytes = 0
# Once datasets are collected, set the total dataset size (includes extra files)
for dataset_assoc in job.output_datasets:
dataset_assoc.dataset.dataset.set_total_size( user=job.user, pluggedMedia=dataset.pluggedMedia )
collected_bytes += dataset_assoc.dataset.dataset.get_total_size( user=job.user, pluggedMedia=dataset.pluggedMedia )
if not dataset_assoc.dataset.dataset.purged:
dataset_assoc.dataset.dataset.set_total_size()
collected_bytes += dataset_assoc.dataset.dataset.get_total_size()

if job.user:
job.user.adjust_total_disk_usage(collected_bytes)
@@ -1441,7 +1447,7 @@ def check_tool_output( self, stdout, stderr, tool_exit_code, job ):

def cleanup( self, delete_files=True ):
# At least one of these tool cleanup actions (job import), is needed
# for the tool to work properly, that is why one might want to run
# for thetool to work properly, that is why one might want to run
# cleanup but not delete files.
try:
if delete_files:
@@ -1451,15 +1457,7 @@ def cleanup( self, delete_files=True ):
galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session )
galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.app, self.job_id ).cleanup_after_job()
if delete_files:
job = self.get_job()
# TEMP BLOCK --- START
pluggedMedia = None
for pM in job.user.pluggedMedia:
pluggedMedia = pM
break
# TEMP BLOCK --- END
self.app.object_store.delete(job, user=job.user, pluggedMedia=pluggedMedia,
base_dir='job_work', entire_dir=True, dir_only=True, obj_dir=True)
self.app.object_store.delete(self.get_job(), base_dir='job_work', entire_dir=True, dir_only=True, obj_dir=True)
except:
log.exception( "Unable to cleanup job %d", self.job_id )

@@ -1569,7 +1567,6 @@ def get_input_paths( self, job=None ):

def get_output_fnames( self ):
if self.output_paths is None:
job = self.get_job()
self.compute_outputs()
return self.output_paths

@@ -1595,14 +1592,14 @@ def compute_outputs( self ):
for da in job.output_datasets + job.output_library_datasets:
da_false_path = dataset_path_rewriter.rewrite_dataset_path( da.dataset, 'output' )
mutable = da.dataset.dataset.external_filename is None
dataset_path = DatasetPath( da.dataset.dataset.id, da.dataset.get_file_name(), false_path=da_false_path, mutable=mutable )
dataset_path = DatasetPath( da.dataset.dataset.id, da.dataset.file_name, false_path=da_false_path, mutable=mutable )
results.append( ( da.name, da.dataset, dataset_path ) )

self.output_paths = [t[2] for t in results]
self.output_hdas_and_paths = dict([(t[0], t[1:]) for t in results])
if special:
false_path = dataset_path_rewriter.rewrite_dataset_path( special.dataset, 'output' )
dsp = DatasetPath( special.dataset.id, special.dataset.get_file_name( job.user ), false_path )
dsp = DatasetPath( special.dataset.id, special.dataset.file_name, false_path )
self.output_paths.append( dsp )
return self.output_paths

You are viewing a condensed version of this merge commit. You can view the full changes here.