Skip to content

Commit

Permalink
Merge pull request #2 from HDFGroup/master
Browse files Browse the repository at this point in the history
Update master with changes from HDFGroup/hsds
  • Loading branch information
loichuder authored Mar 18, 2020
2 parents bcebcb5 + 59ba21d commit 8a1878b
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 41 deletions.
33 changes: 19 additions & 14 deletions hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import time
from aiohttp.client_exceptions import ClientError
from aiohttp.web_exceptions import HTTPNotFound
from aiohttp.web_exceptions import HTTPNotFound, HTTPInternalServerError
from util.idUtil import isValidUuid, isSchema2Id, getS3Key, isS3ObjKey, getObjId, isValidChunkId, getCollectionForId
from util.chunkUtil import getDatasetId
from util.storUtil import getStorKeys, putStorJSONObj, deleteStorObj
Expand All @@ -24,7 +24,7 @@
# Note: only works with schema v2 domains!

def scanRootCallback(app, s3keys):
log.debug(f"scanRootCallback, {len(s3keys)} items")
log.debug(f"scanRoot - callback, {len(s3keys)} items")
if isinstance(s3keys, list):
log.error("got list result for s3keys callback")
raise ValueError("unexpected callback format")
Expand All @@ -48,10 +48,10 @@ def scanRootCallback(app, s3keys):
obj_size = item["Size"]
if "LastModified" in item:
lastModified = item["LastModified"]
log.debug(f"{objid}: {etag} {obj_size} {lastModified}")
log.debug(f"scanRoot - got key {objid}: {etag} {obj_size} {lastModified}")

if lastModified > results["lastModified"]:
log.debug(f"changing lastModified from: {results['lastModified']} to {lastModified}")
log.debug(f"scanRoot: changing lastModified from: {results['lastModified']} to {lastModified}")
results["lastModified"] = lastModified
is_chunk = False
if isValidChunkId(objid):
Expand Down Expand Up @@ -85,7 +85,7 @@ def scanRootCallback(app, s3keys):
elif getCollectionForId(objid) == "datatypes":
results["num_datatypes"] += 1
else:
log.error(f"Unexpected collection type for id: {objid}")
log.error(f"scanRoot - Unexpected collection type for id: {objid}")


async def scanRoot(app, rootid, update=False, bucket=None):
Expand Down Expand Up @@ -130,13 +130,13 @@ async def scanRoot(app, rootid, update=False, bucket=None):

await getStorKeys(app, prefix=root_prefix, include_stats=True, bucket=bucket, callback=scanRootCallback)

log.info(f"scan complete for rootid: {rootid}")
log.info(f"scanRoot - scan complete for rootid: {rootid}")
results["scan_complete"] = time.time()

if update:
# write .info object back to S3
info_key = root_prefix + ".info.json"
log.info(f"updating info key: {info_key}")
log.info(f"scanRoot - updating info key: {info_key}")
await putStorJSONObj(app, info_key, results, bucket=bucket)
return results

Expand All @@ -159,7 +159,7 @@ async def objDeleteCallback(app, s3keys):
log.error(f"Unexpected key {s3key} for prefix: {prefix}")
raise ValueError("invalid s3key for objDeleteCallback")
full_key = prefix + s3key[prefix_len:]
log.info(f"objDeleteCallback got key: {full_key}")
log.info(f"removeKeys - objDeleteCallback deleting key: {full_key}")
await deleteStorObj(app, full_key)


Expand All @@ -174,26 +174,31 @@ async def removeKeys(app, objid):
log.warn("ignoring non-schema2 id")
raise KeyError("Invalid key")
s3key = getS3Key(objid)
log.debug(f"got s3key: {s3key}")
log.debug(f"removeKeys - got s3key: {s3key}")
expected_suffixes = (".dataset.json", ".group.json")
s3prefix = None

for suffix in expected_suffixes:
if s3key.endswith(suffix):
s3prefix = s3key[:-len(suffix)]
if not s3prefix:
log.error("unexpected s3key for delete_set")
log.error("removeKeys - unexpected s3key for delete_set")
raise KeyError("unexpected key suffix")
log.info(f"delete for {objid} searching for s3prefix: {s3prefix}")
log.info(f"removeKeys - delete for {objid} searching for s3prefix: {s3prefix}")
if app["objDelete_prefix"]:
log.error("objDelete_prefix is already set - impropere use of non-reentrant call?")
log.error("removeKeys - objDelete_prefix is already set - improper use of non-reentrant call?")
# just continue and reset
app["objDelete_prefix"] = s3prefix
try:
await getStorKeys(app, prefix=s3prefix, include_stats=False, callback=objDeleteCallback)
except ClientError as ce:
log.error(f"getS3Keys faiiled: {ce}")
log.error(f"removeKeys - getS3Keys faiiled: {ce}")
except HTTPNotFound:
log.warn(f"HTTPNotFound error for getStorKeys with prefix: {s3prefix}")
log.warn(f"removeKeys - HTTPNotFound error for getStorKeys with prefix: {s3prefix}")
except HTTPInternalServerError:
log.error(f"removeKeys - HTTPInternalServerError for getStorKeys with prefix: {s3prefix}")
except Exception as e:
log.error(f"removeKeys - Unexpected Exception for getStorKeys with prefix: {s3prefix}: {e}")

# reset the prefix
app["objDelete_prefix"] = None
1 change: 0 additions & 1 deletion hsds/chunk_dn.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ async def GET_Chunk(request):
log.debug(f"GET_Chunk - using s3path: {s3path}")
elif "bucket" in params:
bucket = params["bucket"]

if "s3offset" in params:
try:
s3offset = int(params["s3offset"])
Expand Down
10 changes: 6 additions & 4 deletions hsds/chunk_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,13 @@ async def getChunkInfoMap(app, dset_id, dset_json, chunk_ids, bucket=None):
log.error("Unexpected chunk_index")
raise HTTPInternalServerError()
extent = item_size
s3offset = layout["offset"]
for i in range(rank):
index = chunk_index[i]
s3offset = layout["offset"] + extent * chunk_dims[i] * index
extent *= dims[i]
log.debug("setting chunk_info_map to s3offset: {s3offset} s3size: {s3size} for chunk_id: {chunk_id}")
dim = rank - i - 1
index = chunk_index[dim]
s3offset += index * chunk_dims[dim] * extent
extent *= dims[dim]
log.debug(f"setting chunk_info_map to s3offset: {s3offset} s3size: {s3size} for chunk_id: {chunk_id}")
if s3offset > layout["offset"] + layout["size"]:
log.warn(f"range get of s3offset: {s3offset} s3size: {s3size} extends beyond end of contingous dataset for chunk_id: {chunk_id}")
chunkinfo_map[chunk_id] = {"s3path": s3path, "s3offset": s3offset, "s3size": chunk_size}
Expand Down
15 changes: 14 additions & 1 deletion hsds/datanode.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from chunk_dn import PUT_Chunk, GET_Chunk, POST_Chunk, DELETE_Chunk
from datanode_lib import s3syncCheck
from async_lib import scanRoot, removeKeys
from aiohttp.web_exceptions import HTTPNotFound, HTTPInternalServerError, HTTPForbidden, HTTPBadRequest



async def init(loop):
Expand Down Expand Up @@ -108,7 +110,18 @@ async def bucketScan(app):
for root_id in root_ids:
bucket = root_ids[root_id]
log.info(f"bucketScan for: {root_id} bucket: {bucket}")
await scanRoot(app, root_id, update=True, bucket=bucket)
try:
await scanRoot(app, root_id, update=True, bucket=bucket)
except HTTPNotFound as nfe:
log.warn(f"bucketScan - HTTPNotFound error scanning {root_id}: {nfe}")
except HTTPForbidden as fe:
log.warn(f"bucketScan - HTTPForbidden error scanning {root_id}: {fe}")
except HTTPBadRequest as bre:
log.error(f"bucketScan - HTTPBadRequest error scanning {root_id}: {bre}")
except HTTPInternalServerError as ise:
log.error(f"bucketScan - HTTPInternalServer error scanning {root_id}: {ise}")
except Exception as e:
log.error(f"bucketScan - Unexpected exception scanning {root_id}: {e}")

log.info(f"bucketScan - sleep: {async_sleep_time}")
await asyncio.sleep(async_sleep_time)
Expand Down
10 changes: 5 additions & 5 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,11 @@ async def get_chunk(app, chunk_id, dset_json, bucket=None, s3path=None, s3offset
s3key = None

if s3path:
if not s3path.startswith("s3://"):
# TBD - verify these at dataset creation time?
log.error(f"unexpected s3path for getChunk: {s3path}")
raise HTTPInternalServerError()
path = s3path[5:]
if s3path.startswith("s3://"):
# trim off the s3:// if found
path = s3path[5:]
else:
path = s3path
index = path.find('/') # split bucket and key
if index < 1:
log.error(f"s3path is invalid: {s3path}")
Expand Down
38 changes: 23 additions & 15 deletions hsds/util/chunkUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,33 @@ def getContiguousLayout(shape_json, item_size, chunk_min=1000*1000, chunk_max=4*
return None
if shape_json["class"] == 'H5S_SCALAR':
return (1,) # just enough to store one item
chunk_avg = (chunk_min + chunk_max) // 2
dims = shape_json["dims"]
rank = len(dims)
layout = [0,] * rank
nsize = item_size
unit_chunk = False
for i in range(rank):
dim = rank - i - 1
extent = dims[dim]
nsize *= extent
if unit_chunk:
layout[dim] = 1
elif nsize <= chunk_min:
layout[dim] = extent
elif nsize <= chunk_max:
layout[dim] = (chunk_min * extent) // nsize
unit_chunk = True
layout = [1,] * rank
if rank == 1:
# just divy up the dimension with whatever works best
nsize *= dims[0]
if nsize < chunk_max:
layout[0] = dims[0]
else:
layout[dim] = (chunk_max * extent) // nsize
unit_chunk = True
layout[0] = chunk_avg // item_size
else:
unit_chunk = False
for i in range(rank):
dim = rank - i - 1
extent = dims[dim]
nsize *= extent
if unit_chunk:
layout[dim] = 1
else:
layout[dim] = extent
if nsize > chunk_max:
if i>0:
# make everything after first dimension 1
layout[dim] = 1
unit_chunk = 1
return layout


Expand Down
4 changes: 3 additions & 1 deletion tests/unit/chunkUtilTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def testGetContiguiousLayout(self):
space_bytes = extent*typesize
if space_bytes > chunk_min:
self.assertTrue(chunk_bytes >= chunk_min)

self.assertTrue(chunk_bytes <= chunk_max)

for extent in (1, 10, 100):
Expand Down Expand Up @@ -226,7 +227,8 @@ def testGetContiguiousLayout(self):
chunk_bytes = layout[0]*layout[1]*layout[2]*typesize
space_bytes = dims[0]*dims[1]*dims[2]*typesize
if space_bytes > chunk_min:
self.assertTrue(chunk_bytes >= chunk_min)
# chunk size maybe less than chunk_min in this case
# self.assertTrue(chunk_bytes >= chunk_min)
self.assertEqual(layout[0], 1)
self.assertTrue(chunk_bytes <= chunk_max)

Expand Down

0 comments on commit 8a1878b

Please sign in to comment.