Skip to content

Commit

Permalink
Merge pull request #844 from nlake44/kind-delimiter
Browse files Browse the repository at this point in the history
Kind delimiter
  • Loading branch information
shatterednirvana committed Dec 31, 2013
2 parents 985cda5 + f88c203 commit a8e5e3f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 57 deletions.
91 changes: 60 additions & 31 deletions AppDB/datastore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ def get_kind_key(self, prefix, key_path):
# are of set size i.e. 2 > 0003 but 0002 < 0003
key_id = str(e.id()).zfill(ID_KEY_LENGTH)
path.append("{0}:{1}".format(e.type(), key_id))
encoded_path = '!'.join(path)
encoded_path += '!'
encoded_path = dbconstants.KIND_SEPARATOR.join(path)
encoded_path += dbconstants.KIND_SEPARATOR

return prefix + self._NAMESPACE_SEPARATOR + encoded_path

Expand All @@ -249,8 +249,8 @@ def _encode_path(pb):
elif e.has_id():
key_id = str(e.id()).zfill(ID_KEY_LENGTH)
path.append("{0}:{1}".format(e.type(), key_id))
val = '!'.join(path)
val += '!'
val = dbconstants.KIND_SEPARATOR.join(path)
val += dbconstants.KIND_SEPARATOR
return val

if isinstance(pb, entity_pb.PropertyValue) and pb.has_uservalue():
Expand Down Expand Up @@ -485,11 +485,17 @@ def kind_row_generator(entities):
logging.debug("Root entity is: {0}".\
format(self.get_root_key_from_entity_key(str(ii[0]))))
logging.debug("Transaction hash is: {0}".format(str(txn_hash)))
txn_id = txn_hash[self.get_root_key_from_entity_key(str(ii[0]))]
row_values[str(ii[0])] = \
{dbconstants.APP_ENTITY_SCHEMA[0]:str(ii[1]), #ent
dbconstants.APP_ENTITY_SCHEMA[1]:str(txn_id)} #txnid

try:
txn_id = txn_hash[self.get_root_key_from_entity_key(str(ii[0]))]
row_values[str(ii[0])] = \
{dbconstants.APP_ENTITY_SCHEMA[0]:str(ii[1]), #ent
dbconstants.APP_ENTITY_SCHEMA[1]:str(txn_id)} #txnid
except KeyError, key_error:
logging.error("Key we are trying to get the root: {0}".\
format(str(ii[0])))
logging.error("Root key we got: {0}".\
format(self.get_root_key_from_entity_key(str(ii[0]))))
raise key_error
for prefix, group in itertools.groupby(entities, lambda x: x[0]):
kind_group_rows = tuple(kind_row_generator(group))
new_kind_keys = [str(ii[0]) for ii in kind_group_rows]
Expand Down Expand Up @@ -983,6 +989,13 @@ def dynamic_put(self, app_id, put_request, put_response):
for root_key in txn_hash:
self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key])
raise zkte
except dbconstants.AppScaleDBConnectionError, dbce:
logging.exception("Connection issue with datastore for app id {0}, " \
"info {1}".format(app_id, str(dbce)))
for root_key in txn_hash:
self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key])
raise dbce


def get_root_key_from_entity_key(self, entity_key):
""" Extract the root key from an entity key. We
Expand All @@ -997,8 +1010,8 @@ def get_root_key_from_entity_key(self, entity_key):
TypeError: If the type is not supported.
"""
if isinstance(entity_key, str):
tokens = entity_key.split('!')
return tokens[0] + '!'
tokens = entity_key.split(dbconstants.KIND_SEPARATOR)
return tokens[0] + dbconstants.KIND_SEPARATOR
elif isinstance(entity_key, entity_pb.Reference):
app_id = entity_key.app()
path = entity_key.path()
Expand Down Expand Up @@ -1048,12 +1061,16 @@ def acquire_locks_for_nontrans(self, app_id, entities, retries=0):
except ZKTransactionException, zkte:
logging.warning("Concurrent transaction exception for app id {0} with " \
"info {1}".format(app_id, str(zkte)))
for key in txn_hash:
self.zookeeper.notify_failed_transaction(app_id, txn_hash[key])
if retries > 0:
logging.warning("Trying again to acquire lock" \
"info {1} with retry #{2}".format(app_id, str(zkte), retries))
time.sleep(self.LOCK_RETRY_TIME)
return self.acquire_locks_for_nontrans(app_id, entities, retries-1)
return dict(self.acquire_locks_for_nontrans(app_id, entities,
retries-1).items() + txn_hash.items())
for key in txn_hash:
self.zookeeper.notify_failed_transaction(app_id, txn_hash[key])
raise zkte

return txn_hash

def get_root_key(self, app_id, ns, ancestor_list):
Expand All @@ -1074,8 +1091,8 @@ def get_root_key(self, app_id, ns, ancestor_list):
# Make sure ids are ordered lexigraphically by making sure they
# are of set size i.e. 2 > 0003 but 0002 < 0003.
key_id = str(first_ent.id()).zfill(ID_KEY_LENGTH)
return "{0}{1}{2}:{3}!".format(prefix, self._NAMESPACE_SEPARATOR,
first_ent.type(), key_id)
return "{0}{1}{2}:{3}{4}".format(prefix, self._NAMESPACE_SEPARATOR,
first_ent.type(), key_id, dbconstants.KIND_SEPARATOR)

def is_instance_wrapper(self, obj, expected_type):
""" A wrapper for isinstance for mocking purposes.
Expand Down Expand Up @@ -1138,6 +1155,7 @@ def acquire_locks_for_trans(self, entities, txnid):
for root_key in txn_hash:
self.zookeeper.notify_failed_transaction(app_id, txn_hash[root_key])
raise zkte

return txn_hash

def release_locks_for_nontrans(self, app_id, entities, txn_hash):
Expand Down Expand Up @@ -1884,9 +1902,10 @@ def reverse_path(self, key):
Returns:
A string key which can be used on the kind table.
"""
tokens = key.split('!')
tokens = key.split(dbconstants.KIND_SEPARATOR)
tokens.reverse()
key = '!'.join(tokens)[1:] + '!'
key = dbconstants.KIND_SEPARATOR.join(tokens)[1:] + \
dbconstants.KIND_SEPARATOR
return key

def kind_query_range(self, query, filter_info, order_info):
Expand All @@ -1908,9 +1927,11 @@ def kind_query_range(self, query, filter_info, order_info):
end_inclusive = self._ENABLE_INCLUSIVITY
start_inclusive = self._ENABLE_INCLUSIVITY
prefix = self.get_table_prefix(query)
startrow = prefix + self._SEPARATOR + query.kind() + '!' + \
startrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + \
str(ancestor_filter)
endrow = prefix + self._SEPARATOR + query.kind() + '!' + \
endrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + \
str(ancestor_filter) + \
self._TERM_STRING
if '__key__' not in filter_info:
Expand All @@ -1920,18 +1941,24 @@ def kind_query_range(self, query, filter_info, order_info):
op = key_filter[0]
__key__ = str(key_filter[1])
if op and op == datastore_pb.Query_Filter.EQUAL:
startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
startrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
endrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
elif op and op == datastore_pb.Query_Filter.GREATER_THAN:
start_inclusive = self._DISABLE_INCLUSIVITY
startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
startrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
elif op and op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL:
startrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
startrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
elif op and op == datastore_pb.Query_Filter.LESS_THAN:
endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
endrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
end_inclusive = self._DISABLE_INCLUSIVITY
elif op and op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL:
endrow = prefix + self._SEPARATOR + query.kind() + "!" + __key__
endrow = prefix + self._SEPARATOR + query.kind() + \
dbconstants.KIND_SEPARATOR + __key__
return startrow, endrow, start_inclusive, end_inclusive

def namespace_query(self):
Expand Down Expand Up @@ -2594,14 +2621,16 @@ def get_range_composite_query(self, query, filter_info):
if filter_info[prop.name()][-1][0] == datastore_pb.Query_Filter.EQUAL:
equality_value += value
oper = filter_info[prop.name()][-1][0]
elif filter_info[prop.name()][-1][0] == datastore_pb.Query_Filter.EXISTS:
elif filter_info[prop.name()][-1][0] == \
datastore_pb.Query_Filter.EXISTS:
# We currently do not handle projection queries.
pass
else:
# Figure out what operator we will use. Default to greater than or equal to
# if there are no filter operators that tell us otherwise.
# The last filter that is not an EXISTS filter is what operator we will use,
# as dictated by the composite index definition.
# Figure out what operator we will use. Default to greater than or
# equal to if there are no filter operators that tell us
# otherwise. The last filter that is not an EXISTS filter is
# what operator we will use, as dictated by the composite index
# definition.
oper = filter_info[prop.name()][-1][0]

index_value += str(value)
Expand Down
1 change: 1 addition & 0 deletions AppDB/dbconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ERROR_DEFAULT = "DB_ERROR:"
NONEXISTANT_TRANSACTION = "0"
KEY_DELIMITER = '\x00'
KIND_SEPARATOR = '\x01'

# Table names
USERS_TABLE = "USERS__"
Expand Down
4 changes: 2 additions & 2 deletions AppDB/groomer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def get_root_key_from_entity_key(key):
Returns:
The root key extracted from the row key.
"""
tokens = key.split('!')
return tokens[0] + '!'
tokens = key.split(dbconstants.KIND_SEPARATOR)
return tokens[0] + dbconstants.KIND_SEPARATOR

@staticmethod
def get_prefix_from_entity_key(entity_key):
Expand Down
53 changes: 29 additions & 24 deletions AppDB/zkappscale/zktransaction_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self, host = DEFAULT_HOST, startgc = True):
self.gccv = threading.Condition()
self.__rollbackFunc = None

def getTransactionID(self, app_id):
def get_transaction_id(self, app_id, is_xg=False):
""" Get new transaction ID for transaction.
This function only create transaction ID, and you must lock
Expand All @@ -76,15 +76,15 @@ def getTransactionID(self, app_id):
"""
return random.randint(0,100000000)

def checkTransaction(self, app_id, txid):
def check_transaction(self, app_id, txid):
""" Get status of specified transaction.
Returns: True - If transaction is alive.
Raises: ZKTransactionException - If transaction is timeout or not exist.
"""
return True

def acquireLock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY):
def acquire_lock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY):
""" Acquire lock for transaction.
You must call getTransactionID() first to obtain transaction ID.
Expand All @@ -93,7 +93,7 @@ def acquireLock(self, app_id, txid, entity_key = GLOBAL_LOCK_KEY):
"""
return True

def getUpdatedKeyList(self, app_id, txid):
def get_update_key_list(self, app_id, txid):
""" Get the list of updated key.
This method just return updated key list which is registered using
Expand All @@ -102,7 +102,7 @@ def getUpdatedKeyList(self, app_id, txid):
you can use this method for rollback.
"""
return []
def releaseLock(self, app_id, txid, key = None):
def release_lock(self, app_id, txid, key = None):
""" Release acquired lock.
You must call acquireLock() first.
Expand All @@ -112,14 +112,14 @@ def releaseLock(self, app_id, txid, key = None):
"""
return True

def isBlacklisted(self, app_id, txid, entity_key = None):
def is_blacklisted(self, app_id, txid, entity_key = None):
""" This validate transaction id with black list.
The PB server logic should use getValidTransactionID().
"""
return False

def getValidTransactionID(self, app_id, target_txid, entity_key):
def get_valid_transaction_id(self, app_id, target_txid, entity_key):
""" This returns valid transaction id for the entity key.
If the specified transaction id is black-listed,
Expand All @@ -128,38 +128,43 @@ def getValidTransactionID(self, app_id, target_txid, entity_key):
"""
return long(target_txid)

def registUpdatedKey(self, app_id, current_txid, target_txid, entity_key):
def register_updated_key(self, app_id, current_txid, target_txid, entity_key):
""" Regist valid transaction id for entity.
target_txid must be the latest valid transaction id for the entity.
"""
return

def notifyFailedTransaction(self, app_id, txid):
def notify_failed_transaction(self, app_id, txid):
""" Notify failed transaction id.
This method will add the transaction id into black list.
After this call, the transaction becomes invalid.
"""
return

def close(self):
""" Stub function for closing all ZooKeeper connections. """
return


def setRollbackFunction(self, func):
""" Set rollback function for gc.
<Obsolete>
def increment_and_get_counter(self, _, value):
""" Stub function for incrementing a path by a given value.
Args:
path: A str (ignored)
value: An int, the amount to increase by.
Returns:
A tuple: (old_value, old_value + value).
"""
fake_value = int(time.time() * 1000)
return (fake_value, fake_value + value)

The function must have following 4 arguments.
- Application ID
- Transaction ID
- Root key
- Updated key array
def get_datastore_groomer_lock(self):
""" Stub implementation for getting the lock for the datastore groomer.
You could not use acquireLock() and releaseLock()
in rollback function because the transaction is already timeout.
The specified transaction already have lock, so you don't have to call
acquireLock() in rollback.
You should raise any Exception if rollback is failed.
Returns:
Always returns True.
"""
self.__rollbackFunc = func

return True

0 comments on commit a8e5e3f

Please sign in to comment.