Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User-based ObjectStore #4314

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0b169c7
Replaced most of the direct calls to S3 with their equivalence from C…
VJalili Feb 13, 2017
ea71efb
Replaced most of the direct calls to S3 with their equivalence from C…
VJalili Feb 13, 2017
143f84f
A reformatting.
VJalili Feb 15, 2017
7ec4306
Initial commit of OpenID Connect Provider.
VJalili Apr 27, 2017
714a071
Initial commit of OpenID Connect Provider.
VJalili Apr 28, 2017
747ad3f
Added a TODO comment.
VJalili May 2, 2017
f0bda14
Updates to both oidc and s3.
VJalili May 5, 2017
b801a84
Updates to both oidc and s3.
VJalili May 22, 2017
e2e4566
Renamed S3ObjectStore to CloudObjectStore
VJalili May 22, 2017
da87d16
Renamed S3 to Cloud.
VJalili May 22, 2017
b1e80fa
Drafted a user-based ObjectStore structure.
VJalili May 23, 2017
8b16182
1. Updated a comment.
VJalili May 24, 2017
b385f3d
1. Added user to create function call in job.
VJalili May 25, 2017
d764ec7
- Propagated 'User' from HDA through Dataset to ObjectStore, for some…
VJalili May 26, 2017
5980ee3
- Added user object to all the classes which need to access a user-ba…
VJalili Jun 1, 2017
198b4ca
change the design 'who passes user to objectstore':
VJalili Jun 2, 2017
1d4bc21
change the design 'who passes user to objectstore':
VJalili Jun 2, 2017
77b5d7a
Introduced PluggedMedia;
VJalili Jun 15, 2017
520cea5
- Removed OIDC idp code from this branch.
VJalili Jun 15, 2017
9699790
- Removed the remaining OIDC idp code from this branch.
VJalili Jun 15, 2017
6e8e671
Propogated user and pluggedMedia info when creating a new dataset as …
VJalili Jun 19, 2017
004ea3f
1. Propogated User and PluggedMedia through a dataset delete/purge pr…
VJalili Jun 20, 2017
634bc55
Fixed some bugs occurring at a dataset deletion and download.
VJalili Jun 21, 2017
66ec187
1. make user and pluggedMedia as optional parameters so that other in…
VJalili Jul 13, 2017
2774824
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
bf08b33
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
8f31f76
some clean-up and separating OIDC-idp from this branch.
VJalili Jul 13, 2017
140a194
Merge branch 'master' into UserBasedObjectStore
VJalili Jul 13, 2017
9176836
Merge remote-tracking branch 'remotes/main/dev' into UserBasedObjectS…
VJalili Jul 13, 2017
32f806e
Fixed db migration number.
VJalili Jul 13, 2017
5f1cf27
Added missing commits for galaxy/jobs
VJalili Jul 13, 2017
337b3a0
Fixed a bug with galaxy/jobs; and renamed a confusing dataset argumen…
VJalili Jul 13, 2017
398bc78
Reverted the change on info.txt
VJalili Jul 14, 2017
a9f2165
Refactoring `pluggedMedia` to `plugged_media`
VJalili Jul 14, 2017
c4f3c96
Fixed a bug happended after refactoring.
VJalili Jul 14, 2017
a10b564
Reverted the formating fixes in mappings.py
VJalili Jul 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/source/lib/galaxy.objectstore.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ galaxy\.objectstore\.rods module
:undoc-members:
:show-inheritance:

galaxy\.objectstore\.s3 module
------------------------------
galaxy.objectstore.cloud module
----------------------------

.. automodule:: galaxy.objectstore.s3
.. automodule:: galaxy.objectstore.cloud
:members:
:undoc-members:
:show-inheritance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ abstract class NestedObjectStore

ObjectStore <|-- DiskObjectStore
ObjectStore <|-- NestedObjectStore
ObjectStore <|-- S3ObjectStore
ObjectStore <|-- CloudObjectStore
DiskObjectStore <|-- IRODSObjectStore
NestedObjectStore <|-- DistributedObjectStore
NestedObjectStore <|-- HierarchicalObjectStore
Expand Down
36 changes: 25 additions & 11 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,16 @@ def get_special( ):
def _create_working_directory( self ):
job = self.get_job()
try:
# TEMP BLOCK --- START
plugged_media = None
for pM in job.user.plugged_media:
plugged_media = pM
break
# TEMP BLOCK --- END
self.app.object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True )
job, user=job.user, plugged_media=plugged_media, base_dir='job_work', dir_only=True, obj_dir=True)
self.working_directory = self.app.object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True )
job, user=job.user, plugged_media=plugged_media, base_dir='job_work', dir_only=True, obj_dir=True)

# The tool execution is given a working directory beneath the
# "job" working directory.
Expand Down Expand Up @@ -999,7 +1005,7 @@ def fail( self, message, exception=False, stdout="", stderr="", exit_code=None )
dataset.blurb = 'tool error'
dataset.info = message
dataset.set_size()
dataset.dataset.set_total_size()
dataset.dataset.set_total_size( user=job.user, plugged_media=dataset.plugged_media )
dataset.mark_unhidden()
if dataset.ext == 'auto':
dataset.extension = 'data'
Expand Down Expand Up @@ -1245,7 +1251,7 @@ def finish(
dataset.dataset.uuid = context['uuid']
# Update (non-library) job output datasets through the object store
if dataset not in job.output_library_datasets:
self.app.object_store.update_from_file(dataset.dataset, create=True)
self.app.object_store.update_from_file(dataset.dataset, user=job.user, plugged_media=dataset.plugged_media, create=True)
self.__update_output(job, dataset)
if not purged:
self._collect_extra_files(dataset.dataset, self.working_directory)
Expand Down Expand Up @@ -1311,7 +1317,7 @@ def path_rewriter( path ):
else:
dataset.set_peek( line_count=context['line_count'] )
except:
if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte:
if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte( user=job.user, plugged_media=dataset.plugged_media ) ) or self.tool.is_multi_byte:
dataset.set_peek( is_multi_byte=True )
else:
dataset.set_peek()
Expand Down Expand Up @@ -1407,8 +1413,8 @@ def path_rewriter( path ):
# Once datasets are collected, set the total dataset size (includes extra files)
for dataset_assoc in job.output_datasets:
if not dataset_assoc.dataset.dataset.purged:
dataset_assoc.dataset.dataset.set_total_size()
collected_bytes += dataset_assoc.dataset.dataset.get_total_size()
dataset_assoc.dataset.dataset.set_total_size( user=job.user, plugged_media=dataset.plugged_media )
collected_bytes += dataset_assoc.dataset.dataset.get_total_size( user=job.user, plugged_media=dataset.plugged_media )

if job.user:
job.user.adjust_total_disk_usage(collected_bytes)
Expand Down Expand Up @@ -1457,7 +1463,15 @@ def cleanup( self, delete_files=True ):
galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session )
galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.app, self.job_id ).cleanup_after_job()
if delete_files:
self.app.object_store.delete(self.get_job(), base_dir='job_work', entire_dir=True, dir_only=True, obj_dir=True)
job = self.get_job()
# TEMP BLOCK --- START
plugged_media = None
for pM in job.user.plugged_media:
plugged_media = pM
break
# TEMP BLOCK --- END
self.app.object_store.delete(job, user=job.user, plugged_media=plugged_media,
base_dir='job_work', entire_dir=True, dir_only=True, obj_dir=True)
except:
log.exception( "Unable to cleanup job %d", self.job_id )

Expand Down Expand Up @@ -1723,18 +1737,18 @@ def user( self ):
else:
return 'anonymous@unknown'

def __update_output(self, job, dataset, clean_only=False):
def __update_output(self, job, hda, clean_only=False):
"""Handle writing outputs to the object store.

This should be called regardless of whether the job was failed or not so
that writing of partial results happens and so that the object store is
cleaned up if the dataset has been purged.
"""
dataset = dataset.dataset
dataset = hda.dataset
if dataset not in job.output_library_datasets:
purged = dataset.purged
if not purged and not clean_only:
self.app.object_store.update_from_file(dataset, create=True)
self.app.object_store.update_from_file( dataset, user=job.user, plugged_media=hda.plugged_media, create=True)
else:
# If the dataset is purged and Galaxy is configured to write directly
# to the object store from jobs - be sure that file is cleaned up. This
Expand Down
111 changes: 73 additions & 38 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,15 @@ def expand_user_properties( user, in_string ):
return Template( in_string ).safe_substitute( environment )


class PluggedMedia( object ):
def __init__( self, user_id, type, access_key, secret_key, path=None ):
self.user_id = user_id
self.type = type
self.path = path
self.access_key = access_key
self.secret_key = secret_key


class PasswordResetToken( object ):
def __init__( self, user, token=None):
if token:
Expand Down Expand Up @@ -1762,12 +1771,11 @@ def __init__( self, id=None, state=None, external_filename=None, extra_files_pat
def in_ready_state( self ):
return self.state in self.ready_states

def get_file_name( self ):
def get_file_name( self, user=None, plugged_media=None ):
if not self.external_filename:
assert self.id is not None, "ID must be set before filename used (commit the object)"
assert self.object_store is not None, "Object Store has not been initialized for dataset %s" % self.id
filename = self.object_store.get_filename( self )
return filename
return self.object_store.get_filename(self, user=user, plugged_media=plugged_media )
else:
filename = self.external_filename
# Make filename absolute
Expand Down Expand Up @@ -1796,16 +1804,16 @@ def set_extra_files_path( self, extra_files_path ):
self.external_extra_files_path = extra_files_path
extra_files_path = property( get_extra_files_path, set_extra_files_path)

def _calculate_size( self ):
def _calculate_size( self, user, plugged_media ):
if self.external_filename:
try:
return os.path.getsize(self.external_filename)
except OSError:
return 0
else:
return self.object_store.size(self)
return self.object_store.size( self, user=user, plugged_media=plugged_media )

def get_size( self, nice_size=False ):
def get_size( self, user, plugged_media, nice_size=False ):
"""Returns the size of the data on disk"""
if self.file_size:
if nice_size:
Expand All @@ -1814,64 +1822,65 @@ def get_size( self, nice_size=False ):
return self.file_size
else:
if nice_size:
return galaxy.util.nice_size( self._calculate_size() )
return galaxy.util.nice_size( self._calculate_size( user=user, plugged_media=plugged_media ) )
else:
return self._calculate_size()
return self._calculate_size( user=user, plugged_media=plugged_media )

def set_size( self ):
"""Returns the size of the data on disk"""
def set_size( self, user, plugged_media ):
"""Sets the size of the data on disk"""
if not self.file_size:
self.file_size = self._calculate_size()
self.file_size = self._calculate_size( user=user, plugged_media=plugged_media )

def get_total_size( self ):
def get_total_size( self, user, plugged_media ):
if self.total_size is not None:
return self.total_size
# for backwards compatibility, set if unset
self.set_total_size()
self.set_total_size( user=user, plugged_media=plugged_media )
db_session = object_session( self )
db_session.flush()
return self.total_size

def set_total_size( self ):
def set_total_size( self, user, plugged_media ):
if self.file_size is None:
self.set_size()
self.set_size( user=user, plugged_media=plugged_media )
self.total_size = self.file_size or 0
if self.object_store.exists(self, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True):
if self.object_store.exists(self, user=user, plugged_media=plugged_media, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True):
for root, dirs, files in os.walk( self.extra_files_path ):
self.total_size += sum( [ os.path.getsize( os.path.join( root, file ) ) for file in files if os.path.exists( os.path.join( root, file ) ) ] )

def has_data( self ):
def has_data( self, user, plugged_media ):
"""Detects whether there is any data"""
return self.get_size() > 0
return self.get_size( user=user, plugged_media=plugged_media ) > 0

def mark_deleted( self, include_children=True ):
self.deleted = True

def is_multi_byte( self ):
if not self.has_data():
def is_multi_byte( self, user, plugged_media ):
if not self.has_data( user, plugged_media ):
return False
try:
return is_multi_byte( codecs.open( self.file_name, 'r', 'utf-8' ).read( 100 ) )
return is_multi_byte( codecs.open( self.get_file_name( user=user, plugged_media=plugged_media ), 'r', 'utf-8' ).read( 100 ) )
except UnicodeDecodeError:
return False
# FIXME: sqlalchemy will replace this

def _delete(self):
# TODO: is this function ever called ? I don't see the call.
def _delete( self, user, plugged_media ):
"""Remove the file that corresponds to this data"""
self.object_store.delete(self)
self.object_store.delete( self, user=user, plugged_media=plugged_media )

@property
def user_can_purge( self ):
return self.purged is False \
and not bool( self.library_associations ) \
and len( self.history_associations ) == len( self.purged_history_associations )

def full_delete( self ):
def full_delete( self, user, plugged_media ):
"""Remove the file and extra files, marks deleted and purged"""
# os.unlink( self.file_name )
self.object_store.delete(self)
if self.object_store.exists(self, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True):
self.object_store.delete(self, entire_dir=True, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True)
self.object_store.delete( self, user=user, plugged_media=plugged_media )
if self.object_store.exists( self, user=user, plugged_media=plugged_media, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True ):
self.object_store.delete( self, user=user, plugged_media=plugged_media, entire_dir=True, extra_dir=self._extra_files_path or "dataset_%d_files" % self.id, dir_only=True )
# if os.path.exists( self.extra_files_path ):
# shutil.rmtree( self.extra_files_path )
# TODO: purge metadata files
Expand Down Expand Up @@ -1960,8 +1969,8 @@ def set_dataset_state( self, state ):
object_session( self ).flush() # flush here, because hda.flush() won't flush the Dataset object
state = property( get_dataset_state, set_dataset_state )

def get_file_name( self ):
return self.dataset.get_file_name()
def get_file_name( self, user=None ):
return self.dataset.get_file_name( user )

def set_file_name(self, filename):
return self.dataset.set_file_name( filename )
Expand Down Expand Up @@ -2016,25 +2025,25 @@ def change_datatype( self, new_ext ):
self.clear_associated_files()
_get_datatypes_registry().change_datatype( self, new_ext )

def get_size( self, nice_size=False ):
def get_size( self, user, nice_size=False ):
"""Returns the size of the data on disk"""
if nice_size:
return galaxy.util.nice_size( self.dataset.get_size() )
return self.dataset.get_size()
return galaxy.util.nice_size( self.dataset.get_size( user ) )
return self.dataset.get_size( user )

def set_size( self ):
def set_size( self, user ):
"""Returns the size of the data on disk"""
return self.dataset.set_size()
return self.dataset.set_size( user )

def get_total_size( self ):
return self.dataset.get_total_size()
def get_total_size( self, user ):
return self.dataset.get_total_size( user )

def set_total_size( self ):
return self.dataset.set_total_size()

def has_data( self ):
def has_data( self, user ):
"""Detects whether there is any data"""
return self.dataset.has_data()
return self.dataset.has_data( user )

def get_raw_data( self ):
"""Returns the full data. To stream it open the file_name and read/write as needed"""
Expand Down Expand Up @@ -2359,6 +2368,7 @@ class HistoryDatasetAssociation( DatasetInstance, Dictifiable, UsesAnnotations,
"""

def __init__( self,
plugged_media,
hid=None,
history=None,
copied_from_history_dataset_association=None,
Expand All @@ -2376,6 +2386,7 @@ def __init__( self,
self.history = history
self.copied_from_history_dataset_association = copied_from_history_dataset_association
self.copied_from_library_dataset_dataset_association = copied_from_library_dataset_dataset_association
self.plugged_media = plugged_media

def copy( self, copy_children=False, parent_id=None ):
"""
Expand Down Expand Up @@ -2607,6 +2618,30 @@ def copy_tags_from( self, target_user, source_hda ):
new_tag_assoc.user = target_user
self.tags.append( new_tag_assoc )

def get_file_name( self ):
return self.dataset.get_file_name( self.history.user, self.plugged_media )

def get_size( self, nice_size=False ):
"""Returns the size of the data on disk"""
if nice_size:
return galaxy.util.nice_size( self.dataset.get_size( self.history.user, self.plugged_media ) )
return self.dataset.get_size( self.history.user, self.plugged_media )

def set_size( self ):
"""Returns the size of the data on disk"""
return self.dataset.set_size( self.history.user, self.plugged_media )

def get_total_size( self ):
return self.dataset.get_total_size( self.history.user, self.plugged_media )

def has_data( self ):
"""Detects whether there is any data"""
return self.dataset.has_data( self.history.user, self.plugged_media )

def set_file_name(self, filename):
return self.dataset.set_file_name( filename )
file_name = property(get_file_name, set_file_name)


class HistoryDatasetAssociationDisplayAtAuthorization( object ):
def __init__( self, hda=None, user=None, site=None ):
Expand Down
22 changes: 20 additions & 2 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@
Column( "active", Boolean, index=True, default=True, nullable=False ),
Column( "activation_token", TrimmedString( 64 ), nullable=True, index=True ) )

model.PluggedMedia.table = Table(
"plugged_media", metadata,
Column( "id", Integer, primary_key=True ),
Column( "user_id", Integer, ForeignKey( "galaxy_user.id" ), index=True ),
Column( "type", TEXT, nullable=False ),
Column( "path", TEXT, nullable=False ),
Column( "secret_key", TEXT ),
Column( "access_key", TEXT ) )

model.UserAddress.table = Table(
"user_address", metadata,
Column( "id", Integer, primary_key=True),
Expand Down Expand Up @@ -145,7 +154,9 @@
Column( "hid", Integer ),
Column( "purged", Boolean, index=True, default=False ),
Column( "hidden_beneath_collection_instance_id",
ForeignKey( "history_dataset_collection_association.id" ), nullable=True ) )
ForeignKey( "history_dataset_collection_association.id" ), nullable=True ),
Column( "plugged_media_id", Integer, ForeignKey( "plugged_media.id" ) ),
Column( "dataset_path_on_media", TEXT ) )

model.Dataset.table = Table(
"dataset", metadata,
Expand Down Expand Up @@ -1624,7 +1635,9 @@ def simple_mapping( model, **kwds ):
model.HistoryDatasetCollectionAssociation.table.c.id ) ),
uselist=False,
backref="hidden_dataset_instances"),
_metadata=deferred(model.HistoryDatasetAssociation.table.c._metadata)
_metadata=deferred( model.HistoryDatasetAssociation.table.c._metadata ),
plugged_media=relation( model.PluggedMedia,
primaryjoin=( model.HistoryDatasetAssociation.table.c.plugged_media_id == model.PluggedMedia.table.c.id ) )
)

simple_mapping( model.Dataset,
Expand Down Expand Up @@ -1765,6 +1778,11 @@ def simple_mapping( model, **kwds ):
api_keys=relation( model.APIKeys,
backref="user",
order_by=desc( model.APIKeys.table.c.create_time ) ),
plugged_media=relation( model.PluggedMedia )
) )

mapper( model.PluggedMedia, model.PluggedMedia.table, properties=dict(
user=relation( model.User )
) )

mapper( model.PasswordResetToken, model.PasswordResetToken.table,
Expand Down
Loading