diff --git a/AppDB/datastore_server.py b/AppDB/datastore_server.py index a966a2d939..249756d660 100644 --- a/AppDB/datastore_server.py +++ b/AppDB/datastore_server.py @@ -226,8 +226,8 @@ def get_kind_key(self, prefix, key_path): # are of set size i.e. 2 > 0003 but 0002 < 0003 key_id = str(e.id()).zfill(ID_KEY_LENGTH) path.append("{0}:{1}".format(e.type(), key_id)) - encoded_path = '!'.join(path) - encoded_path += '!' + encoded_path = dbconstants.KIND_SEPARATOR.join(path) + encoded_path += dbconstants.KIND_SEPARATOR return prefix + self._NAMESPACE_SEPARATOR + encoded_path @@ -249,8 +249,8 @@ def _encode_path(pb): elif e.has_id(): key_id = str(e.id()).zfill(ID_KEY_LENGTH) path.append("{0}:{1}".format(e.type(), key_id)) - val = '!'.join(path) - val += '!' + val = dbconstants.KIND_SEPARATOR.join(path) + val += dbconstants.KIND_SEPARATOR return val if isinstance(pb, entity_pb.PropertyValue) and pb.has_uservalue(): @@ -485,11 +485,17 @@ def kind_row_generator(entities): logging.debug("Root entity is: {0}".\ format(self.get_root_key_from_entity_key(str(ii[0])))) logging.debug("Transaction hash is: {0}".format(str(txn_hash))) - txn_id = txn_hash[self.get_root_key_from_entity_key(str(ii[0]))] - row_values[str(ii[0])] = \ - {dbconstants.APP_ENTITY_SCHEMA[0]:str(ii[1]), #ent - dbconstants.APP_ENTITY_SCHEMA[1]:str(txn_id)} #txnid - + try: + txn_id = txn_hash[self.get_root_key_from_entity_key(str(ii[0]))] + row_values[str(ii[0])] = \ + {dbconstants.APP_ENTITY_SCHEMA[0]:str(ii[1]), #ent + dbconstants.APP_ENTITY_SCHEMA[1]:str(txn_id)} #txnid + except KeyError, key_error: + logging.error("Key we are trying to get the root: {0}".\ + format(str(ii[0]))) + logging.error("Root key we got: {0}".\ + format(self.get_root_key_from_entity_key(str(ii[0])))) + raise key_error for prefix, group in itertools.groupby(entities, lambda x: x[0]): kind_group_rows = tuple(kind_row_generator(group)) new_kind_keys = [str(ii[0]) for ii in kind_group_rows] @@ -983,6 +989,13 @@ def dynamic_put(self, app_id, put_request, put_response): for root_key in txn_hash: self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key]) raise zkte + except dbconstants.AppScaleDBConnectionError, dbce: + logging.exception("Connection issue with datastore for app id {0}, " \ + "info {1}".format(app_id, str(dbce))) + for root_key in txn_hash: + self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key]) + raise dbce + def get_root_key_from_entity_key(self, entity_key): """ Extract the root key from an entity key. We @@ -997,8 +1010,8 @@ def get_root_key_from_entity_key(self, entity_key): TypeError: If the type is not supported. """ if isinstance(entity_key, str): - tokens = entity_key.split('!') - return tokens[0] + '!' + tokens = entity_key.split(dbconstants.KIND_SEPARATOR) + return tokens[0] + dbconstants.KIND_SEPARATOR elif isinstance(entity_key, entity_pb.Reference): app_id = entity_key.app() path = entity_key.path() @@ -1048,12 +1061,16 @@ def acquire_locks_for_nontrans(self, app_id, entities, retries=0): except ZKTransactionException, zkte: logging.warning("Concurrent transaction exception for app id {0} with " \ "info {1}".format(app_id, str(zkte))) - for key in txn_hash: - self.zookeeper.notify_failed_transaction(app_id, txn_hash[key]) if retries > 0: + logging.warning("Trying again to acquire lock" \ + "info {1} with retry #{2}".format(app_id, str(zkte), retries)) time.sleep(self.LOCK_RETRY_TIME) - return self.acquire_locks_for_nontrans(app_id, entities, retries-1) + return dict(self.acquire_locks_for_nontrans(app_id, entities, + retries-1).items() + txn_hash.items()) + for key in txn_hash: + self.zookeeper.notify_failed_transaction(app_id, txn_hash[key]) raise zkte + return txn_hash def get_root_key(self, app_id, ns, ancestor_list): @@ -1074,8 +1091,8 @@ def get_root_key(self, app_id, ns, ancestor_list): # Make sure ids are ordered lexigraphically by making sure they # are of set size i.e. 2 > 0003 but 0002 < 0003. key_id = str(first_ent.id()).zfill(ID_KEY_LENGTH) - return "{0}{1}{2}:{3}!".format(prefix, self._NAMESPACE_SEPARATOR, - first_ent.type(), key_id) + return "{0}{1}{2}:{3}{4}".format(prefix, self._NAMESPACE_SEPARATOR, + first_ent.type(), key_id, dbconstants.KIND_SEPARATOR) def is_instance_wrapper(self, obj, expected_type): """ A wrapper for isinstance for mocking purposes. @@ -1138,6 +1155,7 @@ def acquire_locks_for_trans(self, entities, txnid): for root_key in txn_hash: self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key]) raise zkte + return txn_hash def release_locks_for_nontrans(self, app_id, entities, txn_hash): @@ -1884,9 +1902,10 @@ def reverse_path(self, key): Returns: A string key which can be used on the kind table. """ - tokens = key.split('!') + tokens = key.split(dbconstants.KIND_SEPARATOR) tokens.reverse() - key = '!'.join(tokens)[1:] + '!' + key = dbconstants.KIND_SEPARATOR.join(tokens)[1:] + \ + dbconstants.KIND_SEPARATOR return key def kind_query_range(self, query, filter_info, order_info): @@ -1908,9 +1927,11 @@ def kind_query_range(self, query, filter_info, order_info): end_inclusive = self._ENABLE_INCLUSIVITY start_inclusive = self._ENABLE_INCLUSIVITY prefix = self.get_table_prefix(query) - startrow = prefix + self._SEPARATOR + query.kind() + '!' + \ + startrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + \ str(ancestor_filter) - endrow = prefix + self._SEPARATOR + query.kind() + '!' + \ + endrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + \ str(ancestor_filter) + \ self._TERM_STRING if '__key__' not in filter_info: @@ -1920,18 +1941,24 @@ def kind_query_range(self, query, filter_info, order_info): op = key_filter[0] __key__ = str(key_filter[1]) if op and op == datastore_pb.Query_Filter.EQUAL: - startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ - endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ + startrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ + endrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ elif op and op == datastore_pb.Query_Filter.GREATER_THAN: start_inclusive = self._DISABLE_INCLUSIVITY - startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ + startrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ elif op and op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL: - startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ + startrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ elif op and op == datastore_pb.Query_Filter.LESS_THAN: - endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ + endrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ end_inclusive = self._DISABLE_INCLUSIVITY elif op and op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL: - endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__ + endrow = prefix + self._SEPARATOR + query.kind() + \ + dbconstants.KIND_SEPARATOR + __key__ return startrow, endrow, start_inclusive, end_inclusive def namespace_query(self): @@ -2594,14 +2621,16 @@ def get_range_composite_query(self, query, filter_info): if filter_info[prop.name()][-1][0] == datastore_pb.Query_Filter.EQUAL: equality_value += value oper = filter_info[prop.name()][-1][0] - elif filter_info[prop.name()][-1][0] == datastore_pb.Query_Filter.EXISTS: + elif filter_info[prop.name()][-1][0] == \ + datastore_pb.Query_Filter.EXISTS: # We currently do not handle projection queries. pass else: - # Figure out what operator we will use. Default to greater than or equal to - # if there are no filter operators that tell us otherwise. - # The last filter that is not an EXISTS filter is what operator we will use, - # as dictated by the composite index definition. + # Figure out what operator we will use. Default to greater than or + # equal to if there are no filter operators that tell us + # otherwise. The last filter that is not an EXISTS filter is + # what operator we will use, as dictated by the composite index + # definition. oper = filter_info[prop.name()][-1][0] index_value += str(value) diff --git a/AppDB/dbconstants.py b/AppDB/dbconstants.py index 0073374c5d..0a81703925 100644 --- a/AppDB/dbconstants.py +++ b/AppDB/dbconstants.py @@ -7,6 +7,7 @@ ERROR_DEFAULT = "DB_ERROR:" NONEXISTANT_TRANSACTION = "0" KEY_DELIMITER = '\x00' +KIND_SEPARATOR = '\x01' # Table names USERS_TABLE = "USERS__" diff --git a/AppDB/groomer.py b/AppDB/groomer.py index 40e0831ad9..8b970996b9 100644 --- a/AppDB/groomer.py +++ b/AppDB/groomer.py @@ -145,8 +145,8 @@ def get_root_key_from_entity_key(key): Returns: The root key extracted from the row key. """ - tokens = key.split('!') - return tokens[0] + '!' + tokens = key.split(dbconstants.KIND_SEPARATOR) + return tokens[0] + dbconstants.KIND_SEPARATOR @staticmethod def get_prefix_from_entity_key(entity_key): diff --git a/AppDB/zkappscale/zktransaction_stub.py b/AppDB/zkappscale/zktransaction_stub.py index cac82e4527..42361d9fed 100644 --- a/AppDB/zkappscale/zktransaction_stub.py +++ b/AppDB/zkappscale/zktransaction_stub.py @@ -66,7 +66,7 @@ def __init__(self, host = DEFAULT_HOST, startgc = True): self.gccv = threading.Condition() self.__rollbackFunc = None - def getTransactionID(self, app_id): + def get_transaction_id(self, app_id, is_xg=False): """ Get new transaction ID for transaction. This function only create transaction ID, and you must lock @@ -76,7 +76,7 @@ def getTransactionID(self, app_id): """ return random.randint(0,100000000) - def checkTransaction(self, app_id, txid): + def check_transaction(self, app_id, txid): """ Get status of specified transaction. Returns: True - If transaction is alive. @@ -84,7 +84,7 @@ def checkTransaction(self, app_id, txid): """ return True - def acquireLock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY): + def acquire_lock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY): """ Acquire lock for transaction. You must call getTransactionID() first to obtain transaction ID. @@ -93,7 +93,7 @@ def acquireLock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY): """ return True - def getUpdatedKeyList(self, app_id, txid): + def get_update_key_list(self, app_id, txid): """ Get the list of updated key. This method just return updated key list which is registered using @@ -102,7 +102,7 @@ def getUpdatedKeyList(self, app_id, txid): you can use this method for rollback. """ return [] - def releaseLock(self, app_id, txid, key = None): + def release_lock(self, app_id, txid, key = None): """ Release acquired lock. You must call acquireLock() first. @@ -112,14 +112,14 @@ def releaseLock(self, app_id, txid, key = None): """ return True - def isBlacklisted(self, app_id, txid, entity_key = None): + def is_blacklisted(self, app_id, txid, entity_key = None): """ This validate transaction id with black list. The PB server logic should use getValidTransactionID(). """ return False - def getValidTransactionID(self, app_id, target_txid, entity_key): + def get_valid_transaction_id(self, app_id, target_txid, entity_key): """ This returns valid transaction id for the entity key. If the specified transaction id is black-listed, @@ -128,14 +128,14 @@ def getValidTransactionID(self, app_id, target_txid, entity_key): """ return long(target_txid) - def registUpdatedKey(self, app_id, current_txid, target_txid, entity_key): + def register_updated_key(self, app_id, current_txid, target_txid, entity_key): """ Regist valid transaction id for entity. target_txid must be the latest valid transaction id for the entity. """ return - def notifyFailedTransaction(self, app_id, txid): + def notify_failed_transaction(self, app_id, txid): """ Notify failed transaction id. This method will add the transaction id into black list. @@ -143,23 +143,28 @@ def notifyFailedTransaction(self, app_id, txid): """ return + def close(self): + """ Stub function for closing all ZooKeeper connections. """ + return + - def setRollbackFunction(self, func): - """ Set rollback function for gc. - + def increment_and_get_counter(self, _, value): + """ Stub function for incrementing a path by a given value. + + Args: + path: A str (ignored) + value: An int, the amount to increase by. + Returns: + A tuple: (old_value, old_value + value). + """ + fake_value = int(time.time() * 1000) + return (fake_value, fake_value + value) - The function must have following 4 arguments. - - Application ID - - Transaction ID - - Root key - - Updated key array + def get_datastore_groomer_lock(self): + """ Stub implementation for getting the lock for the datastore groomer. - You could not use acquireLock() and releaseLock() - in rollback function because the transaction is already timeout. - The specified transaction already have lock, so you don't have to call - acquireLock() in rollback. - You should raise any Exception if rollback is failed. + Returns: + Always returns True. """ - self.__rollbackFunc = func - + return True