diff --git a/CondCore/Utilities/python/CondDBFW/command_line.py b/CondCore/Utilities/python/CondDBFW/command_line.py index 73074e65e6fd8..4462fbb7285b0 100755 --- a/CondCore/Utilities/python/CondDBFW/command_line.py +++ b/CondCore/Utilities/python/CondDBFW/command_line.py @@ -5,8 +5,6 @@ Works by taking the main keyword (first command given to the script), passing that to the function that will deal with that action, along with the following arguments as parameters for that function. """ -from __future__ import print_function -from __future__ import absolute_import from . import querying import argparse @@ -112,7 +110,7 @@ def copy_tag(arguments): # set end_of_validity to -1 because sqlite doesn't support long ints source_tag.end_of_validity = -1 source_tag.name = arguments.dest_tag - source_tag.modification_time = datetime.datetime.now() + source_tag.modification_time = datetime.datetime.utcnow() # create new iovs new_iovs = [] @@ -154,7 +152,7 @@ def copy_global_tag(arguments): tags = source_connection.tag(name=tags) # copy global tag first - global_tag.insertion_time = datetime.datetime.now() + global_tag.insertion_time = datetime.datetime.utcnow() global_tag.validity = -1 dest_connection.write_and_commit(global_tag) diff --git a/CondCore/Utilities/python/CondDBFW/data_formats.py b/CondCore/Utilities/python/CondDBFW/data_formats.py index e21a369c00911..4e5daadb36144 100644 --- a/CondCore/Utilities/python/CondDBFW/data_formats.py +++ b/CondCore/Utilities/python/CondDBFW/data_formats.py @@ -6,7 +6,6 @@ Note: may also contain a decorator that can wrap a class around a function that contains a script (future development). """ -from __future__ import absolute_import from .data_sources import json_data_node, json_list, json_dict, json_basic @@ -30,7 +29,7 @@ def to_datatables(script): def new_script(self, connection): try: data = script(self, connection) - if(isinstance(data, list)): + if(type(data) == list): data = _json_data_node.make(data) return to_datatables(data) except (KeyError, TypeError) as e: @@ -64,19 +63,19 @@ def _to_array_of_dicts(data): headers = data.get("headers").data() data_list = data.get("data").data() def unicode_to_str(string): - return str(string) if isinstance(string, unicode) else string - headers = map(unicode_to_str, headers) + return str(string) if type(string) == str else string + headers = list(map(unicode_to_str, headers)) def row_to_dict(row): - row = map(unicode_to_str, row) - return dict(zip(headers, row)) - array_of_dicts = map(row_to_dict, data_list) + row = list(map(unicode_to_str, row)) + return dict(list(zip(headers, row))) + array_of_dicts = list(map(row_to_dict, data_list)) return json_data_node.make(array_of_dicts) def _to_datatables(data): - headers = map(str, data.get(0).data().keys()) + headers = list(map(str, list(data.get(0).data().keys()))) new_data = [] for n in range(0, len(data.data())): - new_data.append(map(lambda entry : str(entry) if isinstance(entry, unicode) else entry, data.get(n).data().values())) + new_data.append([str(entry) if type(entry) == str else entry for entry in list(data.get(n).data().values())]) return json_data_node.make({ "headers" : headers, "data" : new_data diff --git a/CondCore/Utilities/python/CondDBFW/data_sources.py b/CondCore/Utilities/python/CondDBFW/data_sources.py index 2737308574dcc..1ad43a4d00255 100644 --- a/CondCore/Utilities/python/CondDBFW/data_sources.py +++ b/CondCore/Utilities/python/CondDBFW/data_sources.py @@ -3,8 +3,6 @@ This file contains the base DataSource class, and all sub classes that implement their own methods for parsing data. """ -from __future__ import print_function -from __future__ import absolute_import import json @@ -107,7 +105,7 @@ def __init__(self, sqlite_file_name): sql_query = "select %s from %s" % (column_string, table) results = cursor.execute(sql_query).fetchall() for n in range(0, len(results)): - results[n] = dict(zip(table_to_columns[table], map(str, results[n]))) + results[n] = dict(list(zip(table_to_columns[table], list(map(str, results[n]))))) table_to_data[str(table)] = results self._data = json_data_node.make(table_to_data) else: @@ -129,9 +127,9 @@ def __init__(self, data=None): # be created in code that shouldn't be doing it. @staticmethod def make(data): - if isinstance(data, list): + if type(data) == list: return json_list(data) - elif isinstance(data, dict): + elif type(data) == dict: return json_dict(data) else: return json_basic(data) @@ -159,12 +157,12 @@ def find(self, type_name): # traverse json_data_node structure, and find all lists # if this node in the structure is a list, return all sub lists lists = [] - if isinstance(self._data, type_name): + if type(self._data) == type_name: lists.append(self._data) - if isinstance(self._data, list): + if type(self._data) == list: for item in self._data: lists += json_data_node.make(item).find(type_name) - elif isinstance(self._data, dict): + elif type(self._data) == dict: for key in self._data: lists += json_data_node.make(self._data[key]).find(type_name) return lists @@ -198,7 +196,7 @@ def add_child(self, data): def __iter__(self): return self - def next(self): + def __next__(self): if self.iterator_index > len(self._data)-1: self.reset() raise StopIteration @@ -227,13 +225,13 @@ def indices(self, *indices): def get_members(self, member_name): # assume self.data() is a list - if not(type(member_name) in [str, unicode]): + if not(type(member_name) in [str, str]): raise TypeError("Value given for member name must be a string.") type_of_first_item = self.data()[0].__class__ for item in self.data(): if item.__class__ != type_of_first_item: return None - return json_data_node.make(map(lambda item : getattr(item, member_name), self.data())) + return json_data_node.make([getattr(item, member_name) for item in self.data()]) # format methods @@ -245,7 +243,7 @@ def as_dicts(self, convert_timestamps=False): if self.get(0).data().__class__.__name__ in ["GlobalTag", "GlobalTagMap", "Tag", "IOV", "Payload"]: # copy data - new_data = map(lambda item : item.as_dicts(convert_timestamps=convert_timestamps), [item for item in self.data()]) + new_data = [item.as_dicts(convert_timestamps=convert_timestamps) for item in [item for item in self.data()]] return new_data else: print("Data in json_list was not the correct type.") @@ -281,7 +279,7 @@ def as_table(self, fit=["all"], columns=None, hide=None, col_width=None, row_num table_name = None data = self.data() # gets headers stored in first dictionary - headers = data[0].keys() + headers = list(data[0].keys()) if columns != None: headers = columns @@ -298,7 +296,7 @@ def as_table(self, fit=["all"], columns=None, hide=None, col_width=None, row_num if col_width == None: import subprocess - table_width = int(0.95*int(subprocess.check_output(["stty", "size"]).split(" ")[1])) + table_width = int(0.95*int(subprocess.check_output([b'stty', b'size']).split(b' ')[1])) col_width = int(table_width/len(headers)) if hide != None: @@ -335,7 +333,7 @@ def cell(content, header, col_width, fit): for column in fit: if not(column in headers): - print("'%s' is not a valid column." % column) + print(("'%s' is not a valid column." % column)) return column_to_width[column] = max_width_of_column(column, data) diff --git a/CondCore/Utilities/python/CondDBFW/models.py b/CondCore/Utilities/python/CondDBFW/models.py index ca8f783445122..ee9c2773da8fb 100644 --- a/CondCore/Utilities/python/CondDBFW/models.py +++ b/CondCore/Utilities/python/CondDBFW/models.py @@ -8,8 +8,6 @@ so these will not work in a normal context outside the framework. """ -from __future__ import print_function -from __future__ import absolute_import import json import datetime @@ -24,7 +22,7 @@ exit() from . import data_sources, data_formats -import urllib, urllib2, base64 +import urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse, base64 from copy import deepcopy # get utility functions @@ -50,8 +48,8 @@ def session_independent_object(object, schema=None): return new_object def session_independent(objects): - if isinstance(objects, list): - return map(session_independent_object, objects) + if type(objects) == list: + return list(map(session_independent_object, objects)) else: # assume objects is a single object (not a list) return session_independent_object(objects) @@ -153,9 +151,9 @@ def apply(self): def apply_filter(orm_query, orm_class, attribute, value): filter_attribute = getattr(orm_class, attribute) - if isinstance(value, list): + if type(value) == list: orm_query = orm_query.filter(filter_attribute.in_(value)) - elif isinstance(value, data_sources.json_list): + elif type(value) == data_sources.json_list: orm_query = orm_query.filter(filter_attribute.in_(value.data())) elif type(value) in [Range, Radius]: @@ -163,7 +161,7 @@ def apply_filter(orm_query, orm_class, attribute, value): plus = value.get_end() orm_query = orm_query.filter(and_(filter_attribute >= minus, filter_attribute <= plus)) - elif isinstance(value, RegExp): + elif type(value) == RegExp: # Relies on being a SingletonThreadPool @@ -181,7 +179,7 @@ def apply_filter(orm_query, orm_class, attribute, value): return orm_query def apply_filters(orm_query, orm_class, **filters): - for (key, value) in filters.items(): + for (key, value) in list(filters.items()): if not(key in ["amount"]): orm_query = apply_filter(orm_query, orm_class, key, value) return orm_query @@ -189,8 +187,11 @@ def apply_filters(orm_query, orm_class, **filters): def generate(map_blobs=False, class_name=None): Base = declarative_base() + schema = {"schema" : "CMS_CONDITIONS"} + fk_schema_prefix = ("%s." % schema["schema"]) if schema else "" class GlobalTag(Base): + __table_args__ = schema __tablename__ = 'GLOBAL_TAG' headers = ["name", "validity", "description", "release", "insertion_time", "snapshot_time", "scenario", "workflow", "type"] @@ -246,7 +247,7 @@ def all(self, **kwargs): """ query = self.session.query(GlobalTag) query = apply_filters(query, self.__class__, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None query_result = query.order_by(GlobalTag.name).limit(amount).all() gts = data_sources.json_data_node.make(query_result) return gts @@ -258,10 +259,10 @@ def tags(self, **kwargs): kwargs["global_tag_name"] = self.name all_tags = self.session.query(GlobalTagMap.global_tag_name, GlobalTagMap.record, GlobalTagMap.label, GlobalTagMap.tag_name) all_tags = apply_filters(all_tags, GlobalTagMap, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None all_tags = all_tags.order_by(GlobalTagMap.tag_name).limit(amount).all() column_names = ["global_tag_name", "record", "label", "tag_name"] - all_tags = map(lambda row : dict(zip(column_names, map(to_timestamp, row))), all_tags) + all_tags = [dict(list(zip(column_names, list(map(to_timestamp, row))))) for row in all_tags] all_tags = data_formats._dicts_to_orm_objects(GlobalTagMap, all_tags) return data_sources.json_data_node.make(all_tags) @@ -281,7 +282,7 @@ def iovs(self, **kwargs): tag_names = self.tags().get_members("tag_name").data() iovs_all_tags = self.session.query(IOV).filter(IOV.tag_name.in_(tag_names)) iovs_all_tags = apply_filters(iovs_all_tags, IOV, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None iovs_all_tags = iovs_all_tags.limit(amount).subquery() # now, join Global Tag Map table onto IOVs @@ -293,7 +294,7 @@ def iovs(self, **kwargs): iovs_gt_tags = iovs_gt_tags.order_by(iovs_all_tags.c.since).all() column_names = ["tag_name", "since", "payload_hash", "insertion_time"] - all_iovs = map(lambda row : dict(zip(column_names, row)), iovs_gt_tags) + all_iovs = [dict(list(zip(column_names, row))) for row in iovs_gt_tags] all_iovs = data_formats._dicts_to_orm_objects(IOV, all_iovs) return data_sources.json_data_node.make(all_iovs) @@ -335,14 +336,15 @@ def diff(self, gt): return data_sources.json_data_node.make(table) class GlobalTagMap(Base): + __table_args__ = schema __tablename__ = 'GLOBAL_TAG_MAP' headers = ["global_tag_name", "record", "label", "tag_name"] - global_tag_name = Column(String(100), ForeignKey('GLOBAL_TAG.name'), primary_key=True, nullable=False) - record = Column(String(100), ForeignKey('RECORDS.record'), primary_key=True, nullable=False) + global_tag_name = Column(String(100), ForeignKey(fk_schema_prefix + 'GLOBAL_TAG.name'), primary_key=True, nullable=False) + record = Column(String(100), ForeignKey(fk_schema_prefix + 'RECORDS.record'), primary_key=True, nullable=False) label = Column(String(100), primary_key=True, nullable=False) - tag_name = Column(String(100), ForeignKey('TAG.name'), nullable=False) + tag_name = Column(String(100), ForeignKey(fk_schema_prefix + 'TAG.name'), nullable=False) def __init__(self, dictionary={}, convert_timestamps=True): # assign each entry in a kwargs @@ -372,11 +374,12 @@ def as_dicts(self, convert_timestamps=False): class GlobalTagMapRequest(Base): + __table_args__ = schema __tablename__ = 'GLOBAL_TAG_MAP_REQUEST' queue = Column(String(100), primary_key=True, nullable=False) - tag = Column(String(100), ForeignKey('TAG.name'), primary_key=True, nullable=False) - record = Column(String(100), ForeignKey('RECORDS.record'), primary_key=True, nullable=False) + tag = Column(String(100), ForeignKey(fk_schema_prefix + 'TAG.name'), primary_key=True, nullable=False) + record = Column(String(100), ForeignKey(fk_schema_prefix + 'RECORDS.record'), primary_key=True, nullable=False) label = Column(String(100), primary_key=True, nullable=False) status = Column(String(1), nullable=False) description = Column(String(4000), nullable=False) @@ -420,13 +423,14 @@ def to_array(self): return [self.queue, self.tag, self.record, self.label, status_full_name(self.status), to_timestamp(self.time_submitted), to_timestamp(self.last_edited)] class IOV(Base): + __table_args__ = schema __tablename__ = 'IOV' headers = ["tag_name", "since", "payload_hash", "insertion_time"] - tag_name = Column(String(4000), ForeignKey('TAG.name'), primary_key=True, nullable=False) + tag_name = Column(String(4000), ForeignKey(fk_schema_prefix + 'TAG.name'), primary_key=True, nullable=False) since = Column(Integer, primary_key=True, nullable=False) - payload_hash = Column(String(40), ForeignKey('PAYLOAD.hash'), nullable=False) + payload_hash = Column(String(40), ForeignKey(fk_schema_prefix + 'PAYLOAD.hash'), nullable=False) insertion_time = Column(DateTime, primary_key=True, nullable=False) def __init__(self, dictionary={}, convert_timestamps=True): @@ -463,12 +467,13 @@ def all(self, **kwargs): """ query = self.session.query(IOV) query = apply_filters(query, IOV, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None query_result = query.order_by(IOV.tag_name).order_by(IOV.since).limit(amount).all() return data_sources.json_data_node.make(query_result) class Payload(Base): + __table_args__ = schema __tablename__ = 'PAYLOAD' headers = ["hash", "object_type", "version", "insertion_time"] @@ -536,8 +541,8 @@ def parent_tags(self, **kwargs): query = self.session.query(IOV.tag_name) query = apply_filters(query, IOV, **kwargs) query_result = query.all() - tag_names = map(lambda entry : entry[0], query_result) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + tag_names = [entry[0] for entry in query_result] + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None tags = self.session.query(Tag).filter(Tag.name.in_(tag_names)).order_by(Tag.name).limit(amount).all() return data_sources.json_data_node.make(tags) @@ -547,12 +552,13 @@ def all(self, **kwargs): """ query = self.session.query(Payload) query = apply_filters(query, Payload, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None query_result = query.order_by(Payload.hash).limit(amount).all() return data_sources.json_data_node.make(query_result) class Record(Base): + __table_args__ = schema __tablename__ = 'RECORDS' headers = ["record", "object", "type"] @@ -583,16 +589,17 @@ def all(self, **kwargs): """ query = self.session.query(Record) query = apply_filters(query, Record, kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None query_result = query.order_by(Record.record).limit(amount).all() return data_sources.json_data_node.make(query_result) class Tag(Base): + __table_args__ = schema __tablename__ = 'TAG' headers = ["name", "time_type", "object_type", "synchronization", "end_of_validity",\ - "description", "last_validated_time", "insertion_time", "modification_time"] + "description", "last_validated_time", "insertion_time", "modification_time", "protection_code"] name = Column(String(4000), primary_key=True, nullable=False) time_type = Column(String(4000), nullable=False) @@ -603,6 +610,7 @@ class Tag(Base): last_validated_time = Column(BigInteger, nullable=False) insertion_time = Column(DateTime, nullable=False) modification_time = Column(DateTime, nullable=False) + protection_code = Column(Integer, nullable=False) record = None label = None @@ -656,8 +664,8 @@ def parent_global_tags(self, **kwargs): query = apply_filters(query, GlobalTagMap, **kwargs) query_result = query.all() if len(query_result) != 0: - global_tag_names = map(lambda entry : entry[0], query_result) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + global_tag_names = [entry[0] for entry in query_result] + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None global_tags = self.session.query(GlobalTag).filter(GlobalTag.name.in_(global_tag_names)).order_by(GlobalTag.name).limit(amount).all() else: global_tags = None @@ -669,7 +677,7 @@ def all(self, **kwargs): """ query = self.session.query(Tag) query = apply_filters(query, Tag, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None query_result = query.order_by(Tag.name).limit(amount).all() return data_sources.json_data_node.make(query_result) @@ -680,7 +688,7 @@ def iovs(self, **kwargs): # filter_params contains a list of columns to filter the iovs by iovs_query = self.session.query(IOV).filter(IOV.tag_name == self.name) iovs_query = apply_filters(iovs_query, IOV, **kwargs) - amount = kwargs["amount"] if "amount" in kwargs.keys() else None + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None iovs = iovs_query.order_by(IOV.since).limit(amount).all() return data_sources.json_data_node.make(iovs) @@ -738,8 +746,8 @@ def diff(self, tag, short=False): raise TypeError("Tag given must be a CondDBFW Tag object.") # get lists of iovs - iovs1 = dict(map(lambda iov : (iov.since, iov.payload_hash), self.iovs().data())) - iovs2 = dict(map(lambda iov : (iov.since, iov.payload_hash), tag.iovs().data())) + iovs1 = dict([(iov.since, iov.payload_hash) for iov in self.iovs().data()]) + iovs2 = dict([(iov.since, iov.payload_hash) for iov in tag.iovs().data()]) iovs = [(x, iovs1.get(x), iovs2.get(x)) for x in sorted(set(iovs1) | set(iovs2))] iovs.append(("Infinity", 1, 2)) @@ -851,7 +859,7 @@ def merge_into(self, tag, range_object): else: # otherwise, iterate down from n to find the last sqlite iov, # and assign that hash - for i in reversed(range(0,n)): + for i in reversed(list(range(0,n))): if new_iovs_list[i].source == "sqlite": print("change %s to %s at since %d" % (iov.payload_hash, new_iovs_list[i].payload_hash, iov.since)) iov.payload_hash = new_iovs_list[i].payload_hash @@ -867,7 +875,7 @@ def merge_into(self, tag, range_object): new_iov_list_copied = sorted(new_iov_list_copied, key=lambda iov : iov.since) - now = datetime.datetime.now() + now = datetime.datetime.utcnow() new_iovs = [] for iov in new_iov_list_copied: @@ -881,8 +889,47 @@ def merge_into(self, tag, range_object): return new_tag #sqlite.write_and_commit(new_iovs) + + class TagAuthorization(Base): + __table_args__ = schema + __tablename__ = 'TAG_AUTHORIZATION' + + headers = ["tag_name", "access_type", "credential", "credential_type"] + + tag_name = Column(String(100), ForeignKey(fk_schema_prefix + 'TAG.name'), primary_key=True, nullable=False) + access_type = Column(Integer, nullable=False) + credential = Column(String(100), primary_key=True, nullable=False) + credential_type = Column(Integer, nullable=False) + + def as_dicts(self): + """ + Returns dictionary form of this Tag Authorization. + """ + return { + "tag_name" : self.tag_name, + "access_type" : self.access_type, + "credential" : self.credential, + "credential_type" : self.credential_type + } + + def __repr__(self): + return '' % (self.tag_name, self.access_type, self.credential, self.credential_type) + + def to_array(self): + return [self.tag_name, self.access_type, self.credential, self.credential_type] + + def all(self, **kwargs): + """ + Returns `amount` Records ordered by Record record. + """ + query = self.session.query(TagAuthorization) + query = apply_filters(query, TagAuthorization, kwargs) + amount = kwargs["amount"] if "amount" in list(kwargs.keys()) else None + query_result = query.order_by(TagAuthorization.tag).limit(amount).all() + return data_sources.json_data_node.make(query_result) + classes = {"globaltag" : GlobalTag, "iov" : IOV, "globaltagmap" : GlobalTagMap,\ - "payload" : Payload, "tag" : Tag, "Base" : Base} + "payload" : Payload, "tag" : Tag, "TagAuthorization": TagAuthorization, "Base" : Base} if class_name == None: return classes diff --git a/CondCore/Utilities/python/CondDBFW/querying.py b/CondCore/Utilities/python/CondDBFW/querying.py index d7d3528a8e7db..1de2aa3eaddd1 100644 --- a/CondCore/Utilities/python/CondDBFW/querying.py +++ b/CondCore/Utilities/python/CondDBFW/querying.py @@ -6,8 +6,6 @@ connection class can also take a pre-constructed engine - useful for web services. """ -from __future__ import print_function -from __future__ import absolute_import import sqlalchemy from sqlalchemy import create_engine, text, or_ @@ -23,436 +21,439 @@ import sys class connection(object): - engine = None - connection = None - session = None - connection_data = None - netrc_authenticators = None - secrets = None - """ + engine = None + connection = None + session = None + connection_data = None + netrc_authenticators = None + secrets = None + """ Given a connection string, parses the connection string and connects. """ - def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False): - - self._pooling = pooling - - # add querying utility properties - # these must belong to the connection since the way in which their values are handled - # depends on the database being connected to. - self.range = models.Range - self.radius = models.Radius - self.regexp = models.RegExp - self.regexp.connection_object = self - - if type(connection_data) in [str, unicode]: - # if we've been given a connection string, process it - self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode) - self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else "" - - self.range.database_type = self.connection_data["host"] - self.radius.database_type = self.connection_data["host"] - self.regexp.database_type = self.connection_data["host"] - else: - self.connection_data = connection_data - # assume we have an engine - # we need to take the string representation so we know which type of db we're aiming at - engine_string = str(connection_data) - db_type = None - if "oracle" in engine_string: - db_type = "oracle" - elif "frontier" in engine_string: - db_type = "frontier" - elif "sqlite" in engine_string: - db_type = "sqlite" - - self.range.database_type = db_type - self.radius.database_type = db_type - self.regexp.database_type = db_type - - from . import models as ms - self.models = ms.generate(map_blobs) - #self.base = self.models["Base"] - - def setup(self): - """ - Setup engine with given credentials from netrc file, and make a session maker. - """ - - if isinstance(self.connection_data, dict): - self.engine = engine_from_dictionary(self.connection_data, pooling=self._pooling) - else: - # we've been given an engine by the user - # use it as the engine - self.engine = self.connection_data - - self.sessionmaker = sessionmaker(bind=self.engine) - self.session = self.sessionmaker() - self.factory = factory(self) - - # assign correct schema for database name to each model - tmp_models_dict = {} - for key in self.models: - if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\ - and str(self.models[key].__name__) != "Base": - - if isinstance(self.connection_data, dict): - # we can only extract the secrets and schema individuall - # if we were given a dictionary... if we were given an engine - # we can't do this without parsing the connection string from the engine - # - a wide range of which it will be difficult to support! - self.models[key].__table__.schema = self.connection_data["schema"] - self.models[key].secrets = self.connection_data["secrets"] - - self.models[key].session = self.session - # isn't used anywhere - comment it out for now - #self.models[key].authentication = self.netrc_authenticators - self.models[key].connection = self - tmp_models_dict[key.lower()] = self.models[key] - tmp_models_dict[key.lower()].empty = False - - self.models = tmp_models_dict - - return self - - @staticmethod - def _get_CMS_frontier_connection_string(database): - try: - import subprocess - return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip() - except: - raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.") - - @staticmethod - def _cms_frontier_string(database, schema="cms_conditions"): - """ - Get database string for frontier. - """ - import urllib - return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema) - - @staticmethod - def _cms_oracle_string(user, pwd, db_name): - """ - Get database string for oracle. - """ - return 'oracle://%s:%s@%s' % (user, pwd, db_name) - - @staticmethod - def build_oracle_url(user, pwd, db_name): - """ - Build the connection url, and get credentials from self.secrets dictionary. - """ - - database_url = connection._cms_oracle_string(user, pwd, db_name) - - try: - url = sqlalchemy.engine.url.make_url(database_url) - if url.password is None: - url.password = pwd - except sqlalchemy.exc.ArgumentError: - url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name) - return url - - @staticmethod - def build_frontier_url(db_name, schema): - database_url = connection._cms_frontier_string(db_name, schema) - - try: - url = sqlalchemy.engine.url.make_url(database_url) - except sqlalchemy.exc.ArgumentError: - """ - Is this needed for a use case? - """ - url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name) - return url - - # currently just commits and closes the current session (ends transaction, closes connection) - # may do other things later - def tear_down(self): - try: - self.session.commit() - self.close_session() - except: - return "Couldn't tear down connection on engine %s." % str(self.engine) - - def close_session(self): - self.session.close() - return True - - def hard_close(self): - self.engine.dispose() - return True - - # get model based on given model name - def model(self, model_name): - if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta: - model_name = model_name.__name__ - model_name = model_name.replace("_", "") - return self.models[model_name] - - # model should be the class the developer wants to be instantiated - # pk_to_value maps primary keys to values - def object(self, model, pk_to_value): - if self.session == None: - return None - model_data = self.session.query(model) - for pk in pk_to_value: - model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk]) - return model_data.first() - - def global_tag(self, **pkargs): - return self.factory.object("globaltag", **pkargs) - - def global_tag_map(self, **pkargs): - return self.factory.object("globaltagmap", **pkargs) - - """def global_tag_map_request(self, **pkargs): + def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False): + + self._pooling = pooling + + # add querying utility properties + # these must belong to the connection since the way in which their values are handled + # depends on the database being connected to. + self.range = models.Range + self.radius = models.Radius + self.regexp = models.RegExp + self.regexp.connection_object = self + + if type(connection_data) in [str, str]: + # if we've been given a connection string, process it + self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode) + self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else "" + + self.range.database_type = self.connection_data["host"] + self.radius.database_type = self.connection_data["host"] + self.regexp.database_type = self.connection_data["host"] + else: + self.connection_data = connection_data + # assume we have an engine + # we need to take the string representation so we know which type of db we're aiming at + engine_string = str(connection_data) + db_type = None + if "oracle" in engine_string: + db_type = "oracle" + elif "frontier" in engine_string: + db_type = "frontier" + elif "sqlite" in engine_string: + db_type = "sqlite" + + self.range.database_type = db_type + self.radius.database_type = db_type + self.regexp.database_type = db_type + + from . import models as ms + self.models = ms.generate(map_blobs) + #self.base = self.models["Base"] + + def setup(self): + """ + Setup engine with given credentials from netrc file, and make a session maker. + """ + + if type(self.connection_data) == dict: + self.engine = engine_from_dictionary(self.connection_data, pooling=self._pooling) + else: + # we've been given an engine by the user + # use it as the engine + self.engine = self.connection_data + + self.sessionmaker = sessionmaker(bind=self.engine) + self.session = self.sessionmaker() + self.factory = factory(self) + + # assign correct schema for database name to each model + tmp_models_dict = {} + for key in self.models: + if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\ + and str(self.models[key].__name__) != "Base": + + if type(self.connection_data) == dict: + # we can only extract the secrets and schema individuall + # if we were given a dictionary... if we were given an engine + # we can't do this without parsing the connection string from the engine + # - a wide range of which it will be difficult to support! + self.models[key].__table__.schema = self.connection_data["schema"] + self.models[key].secrets = self.connection_data["secrets"] + + self.models[key].session = self.session + # isn't used anywhere - comment it out for now + #self.models[key].authentication = self.netrc_authenticators + self.models[key].connection = self + tmp_models_dict[key.lower()] = self.models[key] + tmp_models_dict[key.lower()].empty = False + + self.models = tmp_models_dict + + return self + + @staticmethod + def _get_CMS_frontier_connection_string(database): + try: + import subprocess + return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip() + except: + raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.") + + @staticmethod + def _cms_frontier_string(database, schema="cms_conditions"): + """ + Get database string for frontier. + """ + import urllib.request, urllib.parse, urllib.error + return 'oracle+frontier://@%s/%s' % (urllib.parse.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema) + + @staticmethod + def _cms_oracle_string(user, pwd, db_name): + """ + Get database string for oracle. + """ + return 'oracle://%s:%s@%s' % (user, pwd, db_name) + + @staticmethod + def build_oracle_url(user, pwd, db_name): + """ + Build the connection url, and get credentials from self.secrets dictionary. + """ + + database_url = connection._cms_oracle_string(user, pwd, db_name) + + try: + url = sqlalchemy.engine.url.make_url(database_url) + if url.password is None: + url.password = pwd + except sqlalchemy.exc.ArgumentError: + url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name) + return url + + @staticmethod + def build_frontier_url(db_name, schema): + database_url = connection._cms_frontier_string(db_name, schema) + + try: + url = sqlalchemy.engine.url.make_url(database_url) + except sqlalchemy.exc.ArgumentError: + """ + Is this needed for a use case? + """ + url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name) + return url + + # currently just commits and closes the current session (ends transaction, closes connection) + # may do other things later + def tear_down(self): + try: + self.session.commit() + self.close_session() + except: + return "Couldn't tear down connection on engine %s." % str(self.engine) + + def close_session(self): + self.session.close() + return True + + def hard_close(self): + self.engine.dispose() + return True + + # get model based on given model name + def model(self, model_name): + if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta: + model_name = model_name.__name__ + model_name = model_name.replace("_", "") + return self.models[model_name] + + # model should be the class the developer wants to be instantiated + # pk_to_value maps primary keys to values + def object(self, model, pk_to_value): + if self.session == None: + return None + model_data = self.session.query(model) + for pk in pk_to_value: + model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk]) + return model_data.first() + + def global_tag(self, **pkargs): + return self.factory.object("globaltag", **pkargs) + + def global_tag_map(self, **pkargs): + return self.factory.object("globaltagmap", **pkargs) + + """def global_tag_map_request(self, **pkargs): return self.factory.object("globaltagmaprequest", **pkargs)""" - def tag(self, **pkargs): - return self.factory.object("tag", **pkargs) + def tag(self, **pkargs): + return self.factory.object("tag", **pkargs) + + def tag_authorization(self, **pkargs): + return self.factory.object("tagauthorization", **pkargs) - def iov(self, **pkargs): - return self.factory.object("iov", **pkargs) + def iov(self, **pkargs): + return self.factory.object("iov", **pkargs) - def payload(self, **pkargs): - return self.factory.object("payload", **pkargs) + def payload(self, **pkargs): + return self.factory.object("payload", **pkargs) - """def record(self, **pkargs): + """def record(self, **pkargs): return self.factory.object("record", **pkargs)""" - # adds %% at the beginning and end so LIKE in SQL searches all of the string - def _oracle_match_format(self, string): - return "%%%s%%" % string - - # returns dictionary mapping object type to a list of all objects found in the search - def search_everything(self, string, amount=10): - string = self._oracle_match_format(string) - - gt = self.model("globaltag") - global_tags = self.session.query(gt).filter(or_( - gt.name.ilike(string), - gt.description.ilike(string), - gt.release.ilike(string) - )).limit(amount) - tag = self.model("tag") - tags = self.session.query(tag).filter(or_( - tag.name.ilike(string), - tag.object_type.ilike(string), - tag.description.ilike(string)) - ).limit(amount) - iov = self.model("iov") - iovs = self.session.query(iov).filter(or_( - iov.tag_name.ilike(string), - iov.since.ilike(string), - iov.payload_hash.ilike(string), - iov.insertion_time.ilike(string) - )).limit(amount) - payload = self.model("payload") - payloads = self.session.query(payload).filter(or_( - payload.hash.ilike(string), - payload.object_type.ilike(string), - payload.insertion_time.ilike(string) - )).limit(amount) - - return json_data_node.make({ - "global_tags" : global_tags.all(), - "tags" : tags.all(), - "iovs" : iovs.all(), - "payloads" : payloads.all() - }) - - def write(self, object): - new_object = models.session_independent_object(object, schema=self.schema) - self.session.add(new_object) - return new_object - - def commit(self): - try: - self.session.commit() - except: - traceback.print_exc() - self.session.rollback() - - def write_and_commit(self, object): - if isinstance(object, list): - for item in object: - self.write_and_commit(item) - else: - # should be changed to deal with errors - add them to exception handling if they appear - self.write(object) - self.commit() - - def rollback(self): - try: - self.session.rollback() - except: - traceback.print_exc() - print("Session couldn't be rolled back.") + # adds %% at the beginning and end so LIKE in SQL searches all of the string + def _oracle_match_format(self, string): + return "%%%s%%" % string + + # returns dictionary mapping object type to a list of all objects found in the search + def search_everything(self, string, amount=10): + string = self._oracle_match_format(string) + + gt = self.model("globaltag") + global_tags = self.session.query(gt).filter(or_( + gt.name.ilike(string), + gt.description.ilike(string), + gt.release.ilike(string) + )).limit(amount) + tag = self.model("tag") + tags = self.session.query(tag).filter(or_( + tag.name.ilike(string), + tag.object_type.ilike(string), + tag.description.ilike(string)) + ).limit(amount) + iov = self.model("iov") + iovs = self.session.query(iov).filter(or_( + iov.tag_name.ilike(string), + iov.since.ilike(string), + iov.payload_hash.ilike(string), + iov.insertion_time.ilike(string) + )).limit(amount) + payload = self.model("payload") + payloads = self.session.query(payload).filter(or_( + payload.hash.ilike(string), + payload.object_type.ilike(string), + payload.insertion_time.ilike(string) + )).limit(amount) + + return json_data_node.make({ + "global_tags" : global_tags.all(), + "tags" : tags.all(), + "iovs" : iovs.all(), + "payloads" : payloads.all() + }) + + def write(self, object): + new_object = models.session_independent_object(object, schema=self.schema) + self.session.add(new_object) + return new_object + + def commit(self): + try: + self.session.commit() + except: + traceback.print_exc() + self.session.rollback() + + def write_and_commit(self, object): + if type(object) == list: + for item in object: + self.write_and_commit(item) + else: + # should be changed to deal with errors - add them to exception handling if they appear + self.write(object) + self.commit() + + def rollback(self): + try: + self.session.rollback() + except: + traceback.print_exc() + print("Session couldn't be rolled back.") class factory(): - """ - Contains methods for creating objects. - """ - def __init__(self, connection): - self.connection = connection - - # class_name is the class name of the model to be used - # pkargs is a dictionary of keyword arguments used as primary key values - # this dictionary will be used to populate the object of type name class_name - def object(self, class_name, **pkargs): - from .data_sources import json_list - from .models import apply_filters - # get the class that self.connection holds from the class name - model = self.connection.model(class_name) - - if self.connection.session == None: - return None - - # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list) - model_data = self.connection.session.query(model) - if len(pkargs.items()) != 0: - # apply the filters defined in **kwargs - model_data = apply_filters(model_data, model, **pkargs) - amount = pkargs["amount"] if "amount" in pkargs.keys() else None - model_data = model_data.limit(amount) - if model_data.count() > 1: - # if we have multiple objects, return a json_list - return json_list(model_data.all()) - elif model_data.count() == 1: - # if we have a single object, return that object - return model_data.first() - else: - # if we have no objects returned, return None - return None - else: - # no column arguments were given, so return an empty object - new_object = model() - new_object.empty = True - return new_object + """ + Contains methods for creating objects. + """ + def __init__(self, connection): + self.connection = connection + + # class_name is the class name of the model to be used + # pkargs is a dictionary of keyword arguments used as primary key values + # this dictionary will be used to populate the object of type name class_name + def object(self, class_name, **pkargs): + from .data_sources import json_list + from .models import apply_filters + # get the class that self.connection holds from the class name + model = self.connection.model(class_name) + + if self.connection.session == None: + return None + + # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list) + model_data = self.connection.session.query(model) + if len(list(pkargs.items())) != 0: + # apply the filters defined in **kwargs + model_data = apply_filters(model_data, model, **pkargs) + amount = pkargs["amount"] if "amount" in list(pkargs.keys()) else None + model_data = model_data.limit(amount) + if model_data.count() > 1: + # if we have multiple objects, return a json_list + return json_list(model_data.all()) + elif model_data.count() == 1: + # if we have a single object, return that object + return model_data.first() + else: + # if we have no objects returned, return None + return None + else: + # no column arguments were given, so return an empty object + new_object = model() + new_object.empty = True + return new_object def _get_netrc_data(netrc_file, key): - """ - Returns a dictionary {login : ..., account : ..., password : ...} - """ - try: - headers = ["login", "account", "password"] - authenticator_tuple = netrc.netrc(netrc_file).authenticators(key) - if authenticator_tuple == None: - raise Exception("netrc file must contain key '%s'." % key) - except: - raise Exception("Couldn't get credentials from netrc file.") - return dict(zip(headers, authenticator_tuple)) + """ + Returns a dictionary {login : ..., account : ..., password : ...} + """ + try: + headers = ["login", "account", "password"] + authenticator_tuple = netrc.netrc(netrc_file).authenticators(key) + if authenticator_tuple == None: + raise Exception("netrc file must contain key '%s'." % key) + except: + raise Exception("Couldn't get credentials from netrc file.") + return dict(list(zip(headers, authenticator_tuple))) def new_connection_dictionary(connection_data, secrets=None, mode="r"): - """ - Function used to construct connection data dictionaries - internal to framework. - """ - frontier_str_length = len("frontier://") - sqlite_str_length = len("sqlite://") - #sqlite_file_str_length = len("sqlite_file://") - oracle_str_length = len("oracle://") - - if type(connection_data) in [str, unicode] and connection_data[0:frontier_str_length] == "frontier://": - """ - frontier://database_name/schema - """ - db_name = connection_data[frontier_str_length:].split("/")[0] - schema = connection_data[frontier_str_length:].split("/")[1] - connection_data = {} - connection_data["database_name"] = db_name - connection_data["schema"] = schema - connection_data["host"] = "frontier" - connection_data["secrets"] = None - elif type(connection_data) in [str, unicode] and connection_data[0:sqlite_str_length] == "sqlite://": - """ - sqlite://database_file_name - """ - # for now, just support "sqlite://" format for sqlite connection strings - db_name = connection_data[sqlite_str_length:] - schema = "" - connection_data = {} - connection_data["database_name"] = os.path.abspath(db_name) - connection_data["schema"] = schema - connection_data["host"] = "sqlite" - connection_data["secrets"] = None - elif type(connection_data) in [str, unicode] and connection_data[0:oracle_str_length] == "oracle://": - """ - oracle://account:password@database_name - or - oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) - """ - new_connection_string = connection_data[oracle_str_length:] - - if ":" in new_connection_string: - # the user has given a password - usually in the case of the db upload service - database_name = new_connection_string[new_connection_string.index("@")+1:] - schema_name = new_connection_string[0:new_connection_string.index(":")] - # set username based on connection string - username = new_connection_string[0:new_connection_string.index(":")] - password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")] - else: - mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"} - database_name = new_connection_string[0:new_connection_string.index("/")] - schema_name = new_connection_string[new_connection_string.index("/")+1:] - if secrets == None: - username = str(raw_input("Enter the username you want to connect to the schema '%s' with: " % (schema_name))) - password = str(raw_input("Enter the password for the user '%s' in database '%s': " % (username, database_name))) - else: - if isinstance(secrets, str): - netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode]) - netrc_data = _get_netrc_data(secrets, key=netrc_key) - # take the username from the netrc entry corresponding to the mode the database is opened in - # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken - username = netrc_data["login"] - password = netrc_data["password"] - elif isinstance(secrets, dict): - username = secrets["user"] - password = secrets["password"] - else: - raise Exception("Invalid type given for secrets. Either an str or a dict must be given.") - - #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username)) - - connection_data = {} - connection_data["database_name"] = database_name - connection_data["schema"] = schema_name - connection_data["password"] = password - connection_data["host"] = "oracle" - connection_data["secrets"] = {"login" : username, "password" : password} - - return connection_data + """ + Function used to construct connection data dictionaries - internal to framework. + """ + frontier_str_length = len("frontier://") + sqlite_str_length = len("sqlite://") + #sqlite_file_str_length = len("sqlite_file://") + oracle_str_length = len("oracle://") + + if type(connection_data) in [str, str] and connection_data[0:frontier_str_length] == "frontier://": + """ + frontier://database_name/schema + """ + db_name = connection_data[frontier_str_length:].split("/")[0] + schema = connection_data[frontier_str_length:].split("/")[1] + connection_data = {} + connection_data["database_name"] = db_name + connection_data["schema"] = schema + connection_data["host"] = "frontier" + connection_data["secrets"] = None + elif type(connection_data) in [str, str] and connection_data[0:sqlite_str_length] == "sqlite://": + """ + sqlite://database_file_name + """ + # for now, just support "sqlite://" format for sqlite connection strings + db_name = connection_data[sqlite_str_length:] + schema = "" + connection_data = {} + connection_data["database_name"] = os.path.abspath(db_name) + connection_data["schema"] = schema + connection_data["host"] = "sqlite" + connection_data["secrets"] = None + elif type(connection_data) in [str, str] and connection_data[0:oracle_str_length] == "oracle://": + """ + oracle://account:password@database_name + or + oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) + """ + new_connection_string = connection_data[oracle_str_length:] + + if ":" in new_connection_string: + # the user has given a password - usually in the case of the db upload service + database_name = new_connection_string[new_connection_string.index("@")+1:] + schema_name = new_connection_string[0:new_connection_string.index(":")] + # set username based on connection string + username = new_connection_string[0:new_connection_string.index(":")] + password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")] + else: + mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"} + database_name = new_connection_string[0:new_connection_string.index("/")] + schema_name = new_connection_string[new_connection_string.index("/")+1:] + if secrets == None: + username = str(input("Enter the username you want to connect to the schema '%s' with: " % (schema_name))) + password = str(input("Enter the password for the user '%s' in database '%s': " % (username, database_name))) + else: + if type(secrets) == str: + netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode]) + netrc_data = _get_netrc_data(secrets, key=netrc_key) + # take the username from the netrc entry corresponding to the mode the database is opened in + # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken + username = netrc_data["login"] + password = netrc_data["password"] + elif type(secrets) == dict: + username = secrets["user"] + password = secrets["password"] + else: + raise Exception("Invalid type given for secrets. Either an str or a dict must be given.") + + #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username)) + + connection_data = {} + connection_data["database_name"] = database_name + connection_data["schema"] = schema_name + connection_data["password"] = password + connection_data["host"] = "oracle" + connection_data["secrets"] = {"login" : username, "password" : password} + + return connection_data def engine_from_dictionary(dictionary, pooling=True): - if dictionary["host"] != "sqlite": - if dictionary["host"] != "frontier": - # probably oracle - # if not frontier, we have to authenticate - user = dictionary["secrets"]["login"] - pwd = dictionary["secrets"]["password"] - # set max label length for oracle - if pooling: - return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6) - else: - return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool) - else: - # if frontier, no need to authenticate - # set max label length for frontier - if pooling: - return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6) - else: - return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool) - else: - # if host is sqlite, making the url is easy - no authentication - return create_engine("sqlite:///%s" % dictionary["database_name"]) + if dictionary["host"] != "sqlite": + if dictionary["host"] != "frontier": + # probably oracle + # if not frontier, we have to authenticate + user = dictionary["secrets"]["login"] + pwd = dictionary["secrets"]["password"] + # set max label length for oracle + if pooling: + return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6) + else: + return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool) + else: + # if frontier, no need to authenticate + # set max label length for frontier + if pooling: + return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6) + else: + return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool) + else: + # if host is sqlite, making the url is easy - no authentication + return create_engine("sqlite:///%s" % dictionary["database_name"]) def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True): - """ - Utility method for user - set up a connection object. - """ - con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling) - con = con.setup() - return con + """ + Utility method for user - set up a connection object. + """ + con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling) + con = con.setup() + return con \ No newline at end of file diff --git a/CondCore/Utilities/python/CondDBFW/shell.py b/CondCore/Utilities/python/CondDBFW/shell.py index e039f63ad8f0b..a19704dae6176 100644 --- a/CondCore/Utilities/python/CondDBFW/shell.py +++ b/CondCore/Utilities/python/CondDBFW/shell.py @@ -3,8 +3,6 @@ Contains classes for shell part of framework - basically a collection of classes that are designed to be invoked on the command line. """ -from __future__ import print_function -from __future__ import absolute_import from . import querying import sys diff --git a/CondCore/Utilities/python/CondDBFW/uploadConditions.py b/CondCore/Utilities/python/CondDBFW/uploadConditions.py index 5e9b0de06ff2c..17977f42b5e8e 100755 --- a/CondCore/Utilities/python/CondDBFW/uploadConditions.py +++ b/CondCore/Utilities/python/CondDBFW/uploadConditions.py @@ -1,8 +1,11 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 """ - +Primary Author: Joshua Dawes - CERN, CMS - The University of Manchester +Debugging, Integration and Maintenance: +Andres Cardenas - CERN, CMS - Universidad San Francisco + Upload script wrapper - controls the automatic update system. Note: the name of the file follows a different convention to the others because it should be the same as the current upload script name. @@ -15,10 +18,17 @@ 4. Invoke the CondDBFW.uploads module with the arguments given to this script. """ -from __future__ import print_function -import pycurl -from StringIO import StringIO +__version__ = 1 + +#import pycurl +import requests +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +try: + from StringIO import StringIO +except: + pass import traceback import sys import os @@ -28,146 +38,18 @@ import netrc import shutil import getpass +import errno +import sqlite3 -def get_version_info(url): - """ - Queries the server-side for the commit hash it is currently using. - Note: this is the commit hash used by /data/services/common/CondDBFW on the server-side. - """ - request = pycurl.Curl() - request.setopt(request.CONNECTTIMEOUT, 60) - user_agent = "User-Agent: ConditionWebServices/1.0 python/%d.%d.%d PycURL/%s" % (sys.version_info[ :3 ] + (pycurl.version_info()[1],)) - request.setopt(request.USERAGENT, user_agent) - # we don't need to verify who signed the certificate or who the host is - request.setopt(request.SSL_VERIFYPEER, 0) - request.setopt(request.SSL_VERIFYHOST, 0) - response_buffer = StringIO() - request.setopt(request.WRITEFUNCTION, response_buffer.write) - request.setopt(request.URL, url + "conddbfw_version/") - request.perform() - return json.loads(response_buffer.getvalue()) - -def get_local_commit_hash(): - """ - Gets the commit hash used by the local repository CondDBFW/.git/. - """ - directory = os.path.abspath("CondDBFW") - - # get the commit hash of the code in `directory` - # by reading the .commit_hash file - try: - commit_hash_file_handle = open(os.path.join(directory, ".commit_hash"), "r") - commit_hash = commit_hash_file_handle.read().strip() - - # validate length of the commit hash - if len(commit_hash) != 40: - print("Commit hash found is not valid. Must be 40 characters long.") - exit() - - #commit_hash = run_in_shell("git --git-dir=%s rev-parse HEAD" % (os.path.join(directory, ".git")), shell=True).strip() - - return commit_hash - except Exception: - return None - -def get_directory_to_pull_to(default_directory, commit_hash): - """ - Finds out which directory we can safely use - either CondDBFW/ or a temporary directory. - """ - # try to write a file (and then delete it) - try: - handle = open(os.path.join(default_directory, "test_file"), "w") - handle.write("test") - handle.close() - os.remove(os.path.join(default_directory, "test_file")) - sys.path.insert(0, default_directory) - return default_directory - except IOError as io: - # cannot write to default directory, so set up a directory in /tmp/ - new_path = os.path.join("tmp", commit_hash[0:10]) - if not(os.path.exists(new_path)): - os.mkdir(new_path) - sys.path.insert(0, new_path) - return new_path - else: - # for now, fail - exit("Can't find anywhere to pull the new code base to.") horizontal_rule = "="*60 -def pull_code_from_git(target_directory, repository_url, hash): - """ - Pulls CondDBFW from the git repository specified by the upload server. - """ - # make directory - target = os.path.abspath(target_directory) - sys.path.append(target) - conddbfw_directory = os.path.join(target, "CondDBFW") - git_directory = os.path.join(conddbfw_directory, ".git") - if not(os.path.exists(conddbfw_directory)): - os.mkdir(conddbfw_directory) - else: - # if the directory exists, it may contain things - prompt the user - force_pull = str(raw_input("CondDBFW directory isn't empty - empty it, and update to new version? [y/n] ")) - if force_pull == "y": - # empty directory and delete it - run_in_shell("rm -rf CondDBFW", shell=True) - # remake the directory - it will be empty - os.mkdir(conddbfw_directory) - - print("Pulling code back from repository...") - print(horizontal_rule) - - run_in_shell("git --git-dir=%s clone %s CondDBFW" % (git_directory, repository_url), shell=True) - # --force makes sure we ignore any conflicts that - # could occur and overwrite everything in the checkout - run_in_shell("cd %s && git checkout --force -b version_used %s" % (conddbfw_directory, hash), shell=True) - - # write the hash to a file in the CondDBFW directory so we can delete the git repository - hash_file_handle = open(os.path.join(conddbfw_directory, ".commit_hash"), "w") - hash_file_handle.write(hash) - hash_file_handle.close() - - # can now delete .git directory - shutil.rmtree(git_directory) - - print(horizontal_rule) - print("Creating local log directories (if required)...") - if not(os.path.exists(os.path.join(target, "upload_logs"))): - os.mkdir(os.path.join(target, "upload_logs")) - if not(os.path.exists(os.path.join(target, "server_side_logs"))): - os.mkdir(os.path.join(target, "server_side_logs")) - print("Finished with log directories.") - print("Update of CondDBFW complete.") - - print(horizontal_rule) - - return True - -def run_in_shell(*popenargs, **kwargs): - """ - Runs string-based commands in the shell and returns the result. - """ - out = subprocess.PIPE if kwargs.get("stdout") == None else kwargs.get("stdout") - new_kwargs = kwargs - if new_kwargs.get("stdout"): - del new_kwargs["stdout"] - process = subprocess.Popen(*popenargs, stdout=out, **new_kwargs) - stdout = process.communicate()[0] - returnCode = process.returncode - cmd = kwargs.get('args') - if cmd is None: - cmd = popenargs[0] - if returnCode: - raise subprocess.CalledProcessError(returnCode, cmd) - return stdout - def run_upload(**parameters): """ Imports CondDBFW.uploads and runs the upload with the upload metadata obtained. """ try: - import CondDBFW.uploads as uploads + import CondCore.Utilities.CondDBFW.uploads as uploads except Exception as e: traceback.print_exc() exit("CondDBFW or one of its dependencies could not be imported.\n"\ @@ -176,6 +58,177 @@ def run_upload(**parameters): uploader = uploads.uploader(**parameters) result = uploader.upload() +def getInput(default, prompt = ''): + '''Like raw_input() but with a default and automatic strip(). + ''' + + answer = raw_input(prompt) + if answer: + return answer.strip() + + return default.strip() + + +def getInputWorkflow(prompt = ''): + '''Like getInput() but tailored to get target workflows (synchronization options). + ''' + + while True: + workflow = getInput(defaultWorkflow, prompt) + + if workflow in frozenset(['offline', 'hlt', 'express', 'prompt', 'pcl']): + return workflow + + print('Please specify one of the allowed workflows. See above for the explanation on each of them.') + + +def getInputChoose(optionsList, default, prompt = ''): + '''Makes the user choose from a list of options. + ''' + + while True: + index = getInput(default, prompt) + + try: + return optionsList[int(index)] + except ValueError: + print('Please specify an index of the list (i.e. integer).') + except IndexError: + print('The index you provided is not in the given list.') + + +def getInputRepeat(prompt = ''): + '''Like raw_input() but repeats if nothing is provided and automatic strip(). + ''' + + while True: + answer = raw_input(prompt) + if answer: + return answer.strip() + + print('You need to provide a value.') + +def runWizard(basename, dataFilename, metadataFilename): + while True: + print('''\nWizard for metadata for %s + +I will ask you some questions to fill the metadata file. For some of the questions there are defaults between square brackets (i.e. []), leave empty (i.e. hit Enter) to use them.''' % basename) + + # Try to get the available inputTags + try: + dataConnection = sqlite3.connect(dataFilename) + dataCursor = dataConnection.cursor() + dataCursor.execute('select name from sqlite_master where type == "table"') + tables = set(zip(*dataCursor.fetchall())[0]) + + # only conddb V2 supported... + if 'TAG' in tables: + dataCursor.execute('select NAME from TAG') + # In any other case, do not try to get the inputTags + else: + raise Exception() + + inputTags = dataCursor.fetchall() + if len(inputTags) == 0: + raise Exception() + inputTags = list(zip(*inputTags))[0] + + except Exception: + inputTags = [] + + if len(inputTags) == 0: + print('\nI could not find any input tag in your data file, but you can still specify one manually.') + + inputTag = getInputRepeat( + '\nWhich is the input tag (i.e. the tag to be read from the SQLite data file)?\ne.g. BeamSpotObject_ByRun\ninputTag: ') + + else: + print('\nI found the following input tags in your SQLite data file:') + for (index, inputTag) in enumerate(inputTags): + print(' %s) %s' % (index, inputTag)) + + inputTag = getInputChoose(inputTags, '0', + '\nWhich is the input tag (i.e. the tag to be read from the SQLite data file)?\ne.g. 0 (you select the first in the list)\ninputTag [0]: ') + + databases = { + 'oraprod': 'oracle://cms_orcon_prod/CMS_CONDITIONS', + 'prod': 'oracle://cms_orcon_prod/CMS_CONDITIONS', + 'oradev': 'oracle://cms_orcoff_prep/CMS_CONDITIONS', + 'prep': 'oracle://cms_orcoff_prep/CMS_CONDITIONS', + } + + destinationDatabase = '' + ntry = 0 + print('\nWhich is the destination database where the tags should be exported?') + print('\n%s) %s' % ('oraprod', databases['oraprod'])) + print('\n%s) %s' % ('oradev', databases['oradev'])) + + while ( destinationDatabase not in databases.values() ): + if ntry==0: + inputMessage = \ + '\nPossible choices: oraprod or oradev \ndestinationDatabase: ' + elif ntry==1: + inputMessage = \ + '\nPlease choose one of the two valid destinations: oraprod or oradev \ndestinationDatabase: ' + else: + raise Exception('No valid destination chosen. Bailing out...') + + databaseInput = getInputRepeat(inputMessage).lower() + if databaseInput in databases.keys(): + destinationDatabase = databases[databaseInput] + ntry += 1 + + while True: + since = getInput('', + '\nWhich is the given since? (if not specified, the one from the SQLite data file will be taken -- note that even if specified, still this may not be the final since, depending on the synchronization options you select later: if the synchronization target is not offline, and the since you give is smaller than the next possible one (i.e. you give a run number earlier than the one which will be started/processed next in prompt/hlt/express), the DropBox will move the since ahead to go to the first safe run instead of the value you gave)\ne.g. 1234\nsince []: ') + if not since: + since = None + break + else: + try: + since = int(since) + break + except ValueError: + print('The since value has to be an integer or empty (null).') + + userText = getInput('', + '\nWrite any comments/text you may want to describe your request\ne.g. Muon alignment scenario for...\nuserText []: ') + + destinationTags = {} + while True: + destinationTag = getInput('', + '\nWhich is the next destination tag to be added (leave empty to stop)?\ne.g. BeamSpotObjects_PCL_byRun_v0_offline\ndestinationTag []: ') + if not destinationTag: + if len(destinationTags) == 0: + print('There must be at least one destination tag.') + continue + break + + if destinationTag in destinationTags: + print( + 'You already added this destination tag. Overwriting the previous one with this new one.') + + destinationTags[destinationTag] = { + } + + metadata = { + 'destinationDatabase': destinationDatabase, + 'destinationTags': destinationTags, + 'inputTag': inputTag, + 'since': since, + 'userText': userText, + } + + metadata = json.dumps(metadata, sort_keys=True, indent=4) + print('\nThis is the generated metadata:\n%s' % metadata) + + if getInput('n', + '\nIs it fine (i.e. save in %s and *upload* the conditions if this is the latest file)?\nAnswer [n]: ' % metadataFilename).lower() == 'y': + break + print('Saving generated metadata in %s...', metadataFilename) + with open(metadataFilename, 'wb') as metadataFile: + metadataFile.write(metadata) + def parse_arguments(): # read in command line arguments, and build metadata dictionary from them parser = argparse.ArgumentParser(prog="cmsDbUpload client", description="CMS Conditions Upload Script in CondDBFW.") @@ -210,13 +263,19 @@ def parse_arguments(): parser.add_argument("--review-options", required=False, action="store_true") + parser.add_argument("--replay-file", required=False) + command_line_data = parser.parse_args() + if command_line_data.replay_file: + dictionary = json.loads("".join(open(command_line_data.replay_file, "r").readlines())) + command_line_data.tier0_response = dictionary["tier0_response"] + # default is the production server, which can point to either database anyway server_alias_to_url = { "prep" : "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", - "prod" : "https://cms-conddb.cern.ch/cmsDbCondUpload/", - None : "https://cms-conddb.cern.ch/cmsDbCondUpload/" + "dev" : "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", + "prod" : "https://cms-conddb.cern.ch/cmsDbCondUpload/" } # if prep, prod or None were given, convert to URLs in dictionary server_alias_to_url @@ -224,6 +283,17 @@ def parse_arguments(): if command_line_data.server in server_alias_to_url.keys(): command_line_data.server = server_alias_to_url[command_line_data.server] + # resolve destination databases + database_alias_to_connection = { + "prep": "oracle://cms_orcoff_prep/CMS_CONDITIONS", + "dev": "oracle://cms_orcoff_prep/CMS_CONDITIONS", + "prod": "oracle://cms_orcon_adg/CMS_CONDITIONS" + } + + if command_line_data.destinationDatabase in database_alias_to_connection.keys(): + command_line_data.destinationDatabase = database_alias_to_connection[command_line_data.destinationDatabase] + + # use netrc to get username and password try: netrc_file = command_line_data.netrc @@ -264,23 +334,63 @@ def parse_arguments(): since these override the options set in the metadata file. """ + + # Hash to use, entirely from command line if command_line_data.hashToUse != None: command_line_data.userText = "" metadata_dictionary = command_line_data.__dict__ elif command_line_data.metadataFile == None: - command_line_data.userText = command_line_data.userText\ - if command_line_data.userText != None\ - else str(raw_input("Tag's description [can be empty]:")) - metadata_dictionary = command_line_data.__dict__ - else: + if command_line_data.sourceDB != None and (command_line_data.inputTag == None or command_line_data.destinationTag == None or command_line_data.destinationDatabase == None): + basepath = command_line_data.sourceDB.rsplit('.db', 1)[0].rsplit('.txt', 1)[0] + basename = os.path.basename(basepath) + dataFilename = '%s.db' % basepath + metadataFilename = '%s.txt' % basepath + # Data file + try: + with open(dataFilename, 'rb') as dataFile: + pass + except IOError as e: + errMsg = 'Impossible to open SQLite data file %s' %dataFilename + print( errMsg ) + ret['status'] = -3 + ret['error'] = errMsg + return ret + + # Metadata file + + try: + with open(metadataFilename, 'rb') as metadataFile: + pass + except IOError as e: + if e.errno != errno.ENOENT: + errMsg = 'Impossible to open file %s (for other reason than not existing)' %metadataFilename + ret = {} + ret['status'] = -4 + ret['error'] = errMsg + exit (ret) + + if getInput('y', '\nIt looks like the metadata file %s does not exist and not enough parameters were received in the command line. Do you want me to create it and help you fill it?\nAnswer [y]: ' % metadataFilename).lower() != 'y': + errMsg = 'Metadata file %s does not exist' %metadataFilename + ret = {} + ret['status'] = -5 + ret['error'] = errMsg + exit(ret) + # Wizard + runWizard(basename, dataFilename, metadataFilename) + command_line_data.metadataFile = metadataFilename + else: + command_line_data.userText = command_line_data.userText\ + if command_line_data.userText != None\ + else str(raw_input("Tag's description [can be empty]:")) + metadata_dictionary = command_line_data.__dict__ + + if command_line_data.metadataFile != None: metadata_dictionary = json.loads("".join(open(os.path.abspath(command_line_data.metadataFile), "r").readlines())) metadata_dictionary["username"] = username metadata_dictionary["password"] = password metadata_dictionary["userText"] = metadata_dictionary.get("userText")\ if metadata_dictionary.get("userText") != None\ else str(raw_input("Tag's description [can be empty]:")) - # set the server to use to be the default one - metadata_dictionary["server"] = server_alias_to_url[None] # go through command line options and, if they are set, overwrite entries for (option_name, option_value) in command_line_data.__dict__.items(): @@ -314,47 +424,51 @@ def parse_arguments(): if raw_input("\nDo you want to continue? [y/n] ") != "y": exit() + if metadata_dictionary["server"] == None: + if metadata_dictionary["destinationDatabase"] == "oracle://cms_orcoff_prep/CMS_CONDITIONS": + metadata_dictionary["server"] = server_alias_to_url["prep"] + else: + metadata_dictionary["server"] = server_alias_to_url["prod"] + return metadata_dictionary +def get_version(url): + return requests.get(url + "script_version/", verify=False) + + if __name__ == "__main__": upload_metadata = parse_arguments() # upload_metadata should be used to decide the service url final_service_url = upload_metadata["server"] + try: + response = get_version(final_service_url) + server_version = response.json() + except Exception as e: + print(horizontal_rule) + print(e) + print("Could not connect to server at %s"%final_service_url) + print("If you specified a server please check it is correct. If that is not the issue please contact the AlcaDB team.") + print(horizontal_rule) + exit(1) - conddbfw_version = get_version_info(final_service_url) - local_version = get_local_commit_hash() + if server_version["version"] != __version__: + print(horizontal_rule) + print("Local upload script is different than server version. Please run the following command to get the latest script.") + print("curl --insecure -o uploadConditions.py %sget_upload_script/ && chmod +x uploadConditions.py;"%final_service_url) + print(horizontal_rule) + exit(1) - """ - Todo - case where we don't have write permission in the current directory (local_version == None and hashes don't match) - """ - # target_directory is only used if we don't find a version of CondDBFW locally, - # but is set here so we can access it later if we need to delete a temporary directory - target_directory = "" - # check if we have a persistent local version of CondDBFW - if local_version != None: - if conddbfw_version["hash"] == local_version: - # no update is required, pass for now - print("No change of version of CondDBFW is required - performing the upload.") - # add CondDBFW to the system paths (local_version != None, so we know it's in this directory) - sys.path.append(os.path.abspath(os.getcwd())) - elif conddbfw_version["hash"] != local_version: - # this is the case where CondDBFW is in the directory working_dir/CondDBFW, but there is an update available - # CondDBFW isn't in this directory, and the local commit hash doesn't match the latest one on the server - print("The server uses a different version of CondDBFW - changing to commit '%s' of CondDBFW." % conddbfw_version["hash"]) - shell_response = pull_code_from_git(os.getcwd(), conddbfw_version["repo"], conddbfw_version["hash"]) - else: - # no CondDBFW version - we should pull the code and start from scratch - # we can't look for temporary versions of it in /tmp/, since we can't guess the hash used to make the directory name - print("No CondDBFW version found locally - pulling one.") - target_directory = get_directory_to_pull_to(os.getcwd(), conddbfw_version["hash"]) - shell_response = pull_code_from_git(target_directory, conddbfw_version["repo"], conddbfw_version["hash"]) - - import CondDBFW.data_sources as data_sources + import CondCore.Utilities.CondDBFW.data_sources as data_sources upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB") - + + try: + os.mkdir('upload_logs') + except OSError as e: + pass + # make new dictionary, and copy over everything except "metadata_source" upload_metadata_argument = {} for (key, value) in upload_metadata.items(): @@ -362,15 +476,15 @@ def parse_arguments(): upload_metadata_argument[key] = value upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument) - - # pass dictionary as arguments to match keywords - the constructor has a **kwargs parameter to deal with stray arguments - run_upload(**upload_metadata) - - # if the directory was temporary, delete it - if "tmp" in target_directory: + try: + # pass dictionary as arguments to match keywords - the constructor has a **kwargs parameter to deal with stray arguments + run_upload(**upload_metadata) + print(horizontal_rule) + print("Process completed without issues. Please check logs for further details.") + print(horizontal_rule) + except SystemExit as e: + print(horizontal_rule) + print("Process exited abnormally. Please check logs for details.") print(horizontal_rule) - print("Removing directory %s..." % target_directory) - try: - run_in_shell("rm -rf %s" % target_directory, shell=True) - except Exception as e: - print("Couldn't delete the directory %s - try to manually delete it." % target_directory) \ No newline at end of file + exit(1) + exit(0) diff --git a/CondCore/Utilities/python/CondDBFW/uploads.py b/CondCore/Utilities/python/CondDBFW/uploads.py index 2bc8927badfd3..953492d625128 100755 --- a/CondCore/Utilities/python/CondDBFW/uploads.py +++ b/CondCore/Utilities/python/CondDBFW/uploads.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 """ Joshua Dawes - CERN, CMS - The University of Manchester @@ -6,14 +6,12 @@ This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data. """ -from __future__ import print_function -from __future__ import absolute_import import os import json import base64 from datetime import datetime -from urllib import urlencode +from urllib.parse import urlencode import math import sys import traceback @@ -42,7 +40,7 @@ def log(file_handle, message): """ Very simple logging function, used by output class. """ - file_handle.write("[%s] %s\n" % (to_timestamp(datetime.now()), message)) + file_handle.write("[%s] %s\n" % (to_timestamp(datetime.utcnow()), message)) def new_log_file_id(): """ @@ -59,38 +57,23 @@ def new_log_file_id(): return new_id class output(): - INFO = 0 - ERROR = 1 - WARNING = 2 - VERBOSE = 3 - DEBUG = 4 - """ Used to control output to the console and to the client-side log. """ - def __init__(self, log_handle=None, verbose=False, debug=False): + def __init__(self, log_handle=None, verbose=False): # first time writing progress bar, don't need to go back along the line self.current_output_length = 0 self._verbose = verbose self._log_handle = log_handle - self._debug = debug - self.labels = ["INFO", "ERROR", "WARNING", "VERBOSE", "DEBUG"] - def write(self, message="", level=INFO): + def write(self, message="", ignore_verbose=False): """ Write to the console and to the log file held by self. """ - message = "[%s] %s: %s"%(datetime.now(), self.labels[level], message) - if self._verbose: - if level == output.DEBUG and self._debug: - print(message) - elif level < output.DEBUG: - print(message) - elif self._debug: - if level == output.DEBUG: - print(message) - elif level <= output.ERROR: + if ignore_verbose: + print(message) + elif self._verbose: print(message) if self._log_handle != None: log(self._log_handle, message) @@ -123,7 +106,7 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal self._handle = open(self.upload_log_file_name, "a") # set up client-side logging object - self._outputter = output(verbose=verbose, log_handle=self._handle, debug = self._debug) + self._outputter = output(verbose=verbose, log_handle=self._handle) self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL) # expect a CondDBFW data_source object for metadata_source @@ -139,7 +122,7 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal if self.metadata_source.get("destinationTags") == None: self.exit_upload("No destination Tag was given.") else: - if isinstance(self.metadata_source.get("destinationTags"), dict) and self.metadata_source.get("destinationTags").keys()[0] == None: + if type(self.metadata_source.get("destinationTags")) == dict and list(self.metadata_source.get("destinationTags").keys())[0] == None: self.exit_upload("No destination Tag was given.") # make sure a destination database was given @@ -171,7 +154,7 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name) sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) - self._outputter.write("Getting Tag and IOVs from SQLite database.", output.VERBOSE) + self._outputter.write("Getting Tag and IOVs from SQLite database.") # query for Tag, check for existence, then convert to dictionary tag = sqlite_con.tag(name=self.input_tag) @@ -184,7 +167,7 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal if iovs == None: self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag) iovs = iovs.as_dicts(convert_timestamps=True) - iovs = [iovs] if not isinstance(iovs, list) else iovs + iovs = [iovs] if type(iovs) != list else iovs """ Finally, get the list of all Payload hashes of IOVs, @@ -210,7 +193,7 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal # set empty dictionary to contain Tag and IOV data from command line result_dictionary = {} - now = to_timestamp(datetime.now()) + now = to_timestamp(datetime.utcnow()) # tag dictionary will be taken from the server # this does not require any authentication tag = self.get_tag_dictionary() @@ -244,6 +227,17 @@ def __init__(self, metadata_source=None, debug=False, verbose=False, testing=Fal # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing self.data_to_send["since"] = self.data_to_send["since"] << 32 + """ + TODO - Settle on a single destination tag format. + """ + # look for deprecated metadata entries - give warnings + # Note - we only really support this format + try: + if type(result_dictionary["destinationTags"]) == dict: + self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.") + except Exception as e: + self._outputter.write("ERROR: %s" % str(e)) + @check_response(check="json") def get_tag_dictionary(self): url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]} @@ -257,21 +251,21 @@ def check_response_for_error_key(self, response_dict, exit_if_error=True): If it is a dictionary, and one of its keys is "error", the server returned an error """ # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback - if isinstance(response_dict, dict) and "error" in response_dict.keys(): + if type(response_dict) == dict and "error" in list(response_dict.keys()): splitter_string = "\n%s\n" % ("-"*50) - self._outputter.write("\nERROR: %s" % splitter_string, output.ERROR) - self._outputter.write(response_dict["error"], output.ERROR) + self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True) + self._outputter.write(response_dict["error"], ignore_verbose=True) # if the user has given the --debug flag, show the traceback as well if self._debug: # suggest to the user to email this to db upload experts - self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, output.DEBUG) + self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True) if response_dict.get("traceback") != None: - self._outputter.write(response_dict["traceback"], output.DEBUG) + self._outputter.write(response_dict["traceback"], ignore_verbose=True) else: - self._outputter.write("No traceback was returned from the server.", output.DEBUG) + self._outputter.write("No traceback was returned from the server.", ignore_verbose=True) else: - self._outputter.write("Use the --debug option to show the traceback of this error.", output.INFO) + self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True) # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state ("")) # if an error has occurred on the server side, a log will have been written @@ -282,9 +276,9 @@ def check_response_for_error_key(self, response_dict, exit_if_error=True): return False else: exit() - elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys(): + elif not("error" in list(response_dict.keys())) and "log_data" in list(response_dict.keys()): # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back - self._log_data = response_dict["log_data"][2:-1] + self._log_data = response_dict["log_data"] return True def write_server_side_log(self, log_data): @@ -314,18 +308,12 @@ def write_server_side_log(self, log_data): # tell the user where the log files are # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module) - if self._SERVICE_URL.startswith("https://cms-conddb-dev.cern.ch/cmsDbCondUpload"): - logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prep/%s"%self.upload_session_id - else: - logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prod/%s"%self.upload_session_id - - print("[%s] INFO: Server log found at %s." % (datetime.now(), logUrl)) if server_log_file_name != None: - print("[%s] INFO: Local copy of server log file at '%s'." % (datetime.now(), server_log_file_name)) + print("Log file from server written to '%s'." % server_log_file_name) else: print("No server log file could be written locally.") - print("[%s] INFO: Local copy of client log file at '%s'." % (datetime.now(), self.upload_log_file_name)) + print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name) def exit_upload(self, message=None): """ @@ -368,14 +356,14 @@ def upload(self): return False self.upload_session_id = upload_session_data["id"] - self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id, output.DEBUG) + self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id) self.server_side_log_file = upload_session_data["log_file"] except errors.NoMoreRetriesException as no_more_retries: return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.") except Exception as e: # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set. - self._outputter.write(traceback.format_exc(), output.ERROR) + self._outputter.write(traceback.format_exc(), ignore_verbose=True) if not(self._verbose): self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.") @@ -403,7 +391,7 @@ def upload(self): return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.") except Exception as e: # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set. - self._outputter.write(traceback.format_exc(), output.ERROR) + self._outputter.write(traceback.format_exc(), ignore_verbose=True) if not(self._verbose): self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.") @@ -429,10 +417,10 @@ def upload(self): # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload # because if a hash is not found and is not on the server, there is no data to upload - all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"]) + all_hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]] hashes_not_found = check_hashes_response["hashes_not_found"] hashes_found = list(set(all_hashes) - set(hashes_not_found)) - self._outputter.write("Checking for IOVs that have no Payload locally or on the server.", output.VERBOSE) + self._outputter.write("Checking for IOVs that have no Payload locally or on the server.") # check if any hashes not found on the server is used in the local SQLite database for hash_not_found in hashes_not_found: if hash_not_found in self.hashes_with_no_local_payload: @@ -440,19 +428,16 @@ def upload(self): for hash_found in hashes_found: if hash_found in self.hashes_with_no_local_payload: - self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found, output.VERBOSE) - - self._outputter.write("Found %i Payloads in remote server" % len(hashes_found), output.INFO) - self._outputter.write("Found %i Payloads not in remote server" % len(hashes_not_found), output.INFO) + self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found) - self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.", output.VERBOSE) + self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.") except errors.NoMoreRetriesException as no_more_retries: # for now, just write the log if we get a NoMoreRetriesException return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.") except Exception as e: # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set. - self._outputter.write(traceback.format_exc(), output.ERROR) + self._outputter.write(traceback.format_exc(), ignore_verbose=True) if not(self._verbose): self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.") @@ -477,16 +462,10 @@ def upload(self): # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator send_metadata_response = self.send_metadata(self.upload_session_id) - no_error = self.check_response_for_error_key(send_metadata_response) if not(no_error) and self._testing: return False - try: - self._outputter.write(send_metadata_response["summary"], output.INFO) - except KeyError: - pass - # we have to call this explicitly here since check_response_for_error_key only writes the log file # if an error has occurred, whereas it should always be written here self.write_server_side_log(self._log_data) @@ -495,7 +474,7 @@ def upload(self): return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.") except Exception as e: # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set. - self._outputter.write(traceback.format_exc(), output.ERROR) + self._outputter.write(traceback.format_exc(), ignore_verbose=True) if not(self._verbose): self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.") @@ -517,7 +496,7 @@ def get_upload_session_id(self): Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests, as long as the upload session is still open. """ - self._outputter.write("Getting upload session.", output.VERBOSE) + self._outputter.write("Getting upload session.") # send password in the body so it can be encrypted over https # username and password are taken from the netrc file @@ -525,11 +504,11 @@ def get_upload_session_id(self): # this method's end result is obtaining a token. body_data = base64.b64encode(json.dumps( { - "destinationTag" : self.data_to_send["destinationTags"].keys()[0], + "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0], "username_or_token" : self.data_to_send["username"], "password" : self.data_to_send["password"] } - )) + ).encode('UTF-8')) url_data = {"database" : self.data_to_send["destinationDatabase"]} @@ -557,11 +536,15 @@ def get_fcsr_from_server(self, upload_session_id): Note: we do this in a separate function we so we can do the decoding check for json data with check_response. """ # tiny amount of client-side logic here - all of the work is done on the server + # tier0_response uses get() so if the key isn't present, we default to None + # tier0_response is for replaying uploads from the old upload service, with knowledge of the tier0 response + # when those uploads happened. url_data = { "database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, - "destinationTag" : self.data_to_send["destinationTags"].keys()[0], - "sourceTagSync" : self.data_to_send["fcsr_filter"] + "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0], + "sourceTagSync" : self.data_to_send["fcsr_filter"], + "tier0_response" : self.data_to_send.get("tier0_response") } query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data) result = query.send() @@ -626,7 +609,7 @@ def filter_iovs_by_fcsr(self, upload_session_id): self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"] # modify insertion_time of iovs - new_time = to_timestamp(datetime.now()) + new_time = to_timestamp(datetime.utcnow()) for (i, iov) in enumerate(self.data_to_send["iovs"]): self.data_to_send["iovs"][i]["insertion_time"] = new_time @@ -634,9 +617,8 @@ def get_all_hashes(self): """ Get all the hashes from the dictionary of IOVs we have from the SQLite file. """ - self._outputter.write("\tGetting list of all hashes found in SQLite database.", output.DEBUG) - hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"]) - self._outputter.write("Found %i local Payload(s) referenced in IOVs"%len(hashes), output.INFO) + self._outputter.write("\tGetting list of all hashes found in SQLite database.") + hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]] return hashes @check_response(check="json") @@ -644,7 +626,7 @@ def get_hashes_to_send(self, upload_session_id): """ Get the hashes of the payloads we want to send that the server doesn't have yet. """ - self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.", output.DEBUG) + self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.") post_data = json.dumps(self.get_all_hashes()) url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id} query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data) @@ -658,21 +640,22 @@ def send_payloads(self, hashes, upload_session_id): # if we have no hashes, we can't send anything # but don't exit since it might mean all the Payloads were already on the server if len(hashes) == 0: - self._outputter.write("No payloads to send - moving to IOV upload.") + self._outputter.write("No hashes to send - moving to metadata upload.") return True else: self._outputter.write("Sending payloads of hashes not found:") # construct connection string for local SQLite database file - database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if isinstance(self.sqlite_file_name, str) else self.sqlite_file_name + database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW) self._outputter.write("\tConnecting to input SQLite database.") con = querying.connect(database, map_blobs=True) # query for the Payloads self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.") - payloads = con.payload(hash=hashes) + byte_hashes = [bytes(h, 'utf-8') for h in hashes] + payloads = con.payload(hash=byte_hashes) # if we get a single Payload back, put it in a list and turn it into a json_list - if payloads.__class__ != data_sources.json_list: + if payloads and payloads.__class__ != data_sources.json_list: payloads = data_sources.json_data_node.make([payloads]) # close the session with the SQLite database file - we won't use it again @@ -713,7 +696,7 @@ def send_blob(self, payload, upload_session_id): url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id} # construct the data to send in the body and header of the HTTPs request - for key in payload.keys(): + for key in list(payload.keys()): # skip blob if key != "data": if key == "insertion_time": @@ -730,7 +713,7 @@ def send_blob(self, payload, upload_session_id): return request_response except Exception as e: # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out - if isinstance(e, errors.NoMoreRetriesException): + if type(e) == errors.NoMoreRetriesException: self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"]) self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"]) return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()}) @@ -744,16 +727,16 @@ def send_metadata(self, upload_session_id): # set user text if it's empty if self.data_to_send["userText"] in ["", None]: - self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0] + self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % list(self.data_to_send["destinationTags"].keys())[0] self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\ - % self.upload_session_id, output.VERBOSE) + % self.upload_session_id) # sent the HTTPs request to the server - url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id} + url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, "tier0_response" : self.data_to_send.get("tier0_response")} request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send)) response = request.send() - self._outputter.write("Response received - conditions upload process complete.", output.VERBOSE) + self._outputter.write("Response received - conditions upload process complete.") return response if __name__ == "__main__": @@ -777,7 +760,7 @@ def send_metadata(self, upload_session_id): # make new dictionary, and copy over everything except "metadata_source" upload_metadata_argument = {} - for (key, value) in upload_metadata.items(): + for (key, value) in list(upload_metadata.items()): if key != "metadata_source": upload_metadata_argument[key] = value diff --git a/CondCore/Utilities/python/CondDBFW/url_query.py b/CondCore/Utilities/python/CondDBFW/url_query.py index 4639eb261800b..e6fab9c9ba55d 100644 --- a/CondCore/Utilities/python/CondDBFW/url_query.py +++ b/CondCore/Utilities/python/CondDBFW/url_query.py @@ -6,20 +6,35 @@ Provides methods for performing/closing the request, as well as getting the request response. Note: user agent string from current version of cmsDbUpload """ -from __future__ import print_function -from __future__ import absolute_import -import pycurl -from StringIO import StringIO -from urllib import urlencode +#import pycurl +import requests +from io import StringIO +from urllib.parse import urlencode import traceback import sys import json from .errors import * from time import sleep +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + class url_query(): + def __init__(self, url=None, url_data=None, body=None): + self._url = url + self._url_data = url_data + self._body = body + + def send(self): + if self._body: + return requests.post(self._url, data=self._body, params=self._url_data, verify=False).text + else: + return requests.get(self._url, params=self._url_data, verify=False).text + +"""class url_query(): + def __init__(self, url=None, url_data=None, body=None, response_stream=None, timeout=60): if not(url): return None @@ -38,15 +53,15 @@ def __init__(self, url=None, url_data=None, body=None, response_stream=None, tim self._response = StringIO() if body: - if isinstance(body, dict): + if type(body) == dict: body = urlencode(body) - elif isinstance(body, list): + elif type(body) == list: body = json.dumps(body) self._r.setopt(self._r.POSTFIELDS, body) if url_data: - if isinstance(url_data, dict): + if type(url_data) == dict: url_data = urlencode(url_data) else: exit("URL data '%s' for request to URL '%s' was not valid - should be a dictionary." % (str(url_data), url)) @@ -54,7 +69,7 @@ def __init__(self, url=None, url_data=None, body=None, response_stream=None, tim # set the URL with url parameters if they were given self._r.setopt(self._r.URL, url + (("?%s" % url_data) if url_data else "")) - if response_stream and not isinstance(response_stream, StringIO): + if response_stream and type(response_stream) != StringIO: response_stream = StringIO() # copy reference to instance variable self._response = response_stream @@ -79,7 +94,7 @@ def send(self): attempt += 1 # this catches exceptions that occur with the actual http request # not exceptions sent back from server side - if isinstance(e, pycurl.error) and e[0] in [7, 52]: + if type(e) == pycurl.error and e[0] in [7, 52]: # wait two seconds to retry print("Request failed - waiting 3 seconds to retry.") sleep(3) @@ -89,4 +104,4 @@ def send(self): print("Unforesoon error occurred when sending data to server.") traceback.print_exc() if attempt == max_retries: - raise NoMoreRetriesException(max_retries) \ No newline at end of file + raise NoMoreRetriesException(max_retries)""" \ No newline at end of file diff --git a/CondCore/Utilities/python/CondDBFW/utils.py b/CondCore/Utilities/python/CondDBFW/utils.py index 053dacb0c15d2..277995e824b50 100644 --- a/CondCore/Utilities/python/CondDBFW/utils.py +++ b/CondCore/Utilities/python/CondDBFW/utils.py @@ -7,13 +7,13 @@ def to_timestamp(obj): """ Takes a datetime object and outputs a timestamp string with the format Y-m-d H:m:S.f """ - return obj.strftime('%Y-%m-%d %H:%M:%S.%f') if isinstance(obj, datetime.datetime) else obj + return obj.strftime('%d-%b-%y %I:%M:%S.%f %p') if isinstance(obj, datetime.datetime) else obj def to_datetime(date_string): """ Takes a date string with the format Y-m-d H:m:S.f and gives back a datetime.datetime object """ - return datetime.datetime.strptime(date_string.replace(",", "."), "%Y-%m-%d %H:%M:%S.%f") + return datetime.datetime.strptime(date_string.replace(",", "."), "%d-%b-%y %I:%M:%S.%f %p") def friendly_since(time_type, since): """