Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include size in pkgselect responses #1744

Merged
merged 15 commits into from
Sep 24, 2020
32 changes: 26 additions & 6 deletions lambdas/pkgselect/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ def file_list_to_folder(df: pd.DataFrame) -> dict:
lambda).
"""
try:
folder = pd.Series(df.logical_key.dropna().str.extract('([^/]+/?).*')[0].unique())
prefixes = folder[folder.str.endswith('/')].sort_values().tolist()
objects = folder[~folder.str.endswith('/')].sort_values().tolist()
except AttributeError:
groups = df.groupby(df.logical_key.str.extract('([^/]+/?).*')[0], dropna=True)
folder = groups.agg(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this automatically skip undefined values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

size=('size', 'sum'),
physical_key=('physical_key', 'first')
)
folder.reset_index(inplace=True)
kevinemoore marked this conversation as resolved.
Show resolved Hide resolved
folder.rename(columns={0: 'logical_key'}, inplace=True)
prefixes = folder[folder.logical_key.str.contains('/')].to_dict(orient='records')
kevinemoore marked this conversation as resolved.
Show resolved Hide resolved
objects = folder[~folder.logical_key.str.contains('/')].to_dict(orient='records')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in keys with successive // to both the regex in the groupby and here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// is illegal in logical keys isn't it?

except AttributeError as err:
# Pandas will raise an attribute error if the DataFrame has
# no rows with a non-null logical_key. We expect that case if
# either: (1) the package is empty (has zero package entries)
Expand Down Expand Up @@ -172,7 +178,8 @@ def lambda_handler(request):
# Call s3 select to fetch only logical keys matching the
# desired prefix (folder path)
prefix_length = len(prefix) if prefix is not None else 0
sql_stmt = f"SELECT SUBSTRING(s.logical_key, {prefix_length + 1}) AS logical_key FROM s3object s"
sql_stmt = f"SELECT SUBSTRING(s.logical_key, {prefix_length + 1}) AS logical_key"
sql_stmt += ", s.\"size\", s.physical_keys[0] as physical_key FROM s3object s"
kevinemoore marked this conversation as resolved.
Show resolved Hide resolved
kevinemoore marked this conversation as resolved.
Show resolved Hide resolved
if prefix:
sql_stmt += f" WHERE SUBSTRING(s.logical_key, 1, {prefix_length}) = '{sql_escape(prefix)}'"
result = query_manifest_content(
Expand All @@ -185,10 +192,23 @@ def lambda_handler(request):
df = pd.read_json(result, lines=True)
response_data = file_list_to_folder(df)

# Fetch package-level or directory-level metadata
if prefix:
sql_stmt = f"SELECT s.meta FROM s3object s WHERE s.logical_key = '{sql_escape(prefix)}'"
else:
sql_stmt = "SELECT s.* FROM s3object s WHERE s.logical_key is NULL"
result = query_manifest_content(
s3_client,
bucket=bucket,
key=key,
sql_stmt=sql_stmt
)

ret_val = make_json_response(
200,
{
'contents': response_data
'contents': response_data,
'meta': json.load(result) if result else {}
}
)

Expand Down
54 changes: 37 additions & 17 deletions lambdas/pkgselect/test/test_pkgselect.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ def setUp(self):
"bar/baz/file3.txt",
"bar/baz/file4.txt"
]
jsonl = ""
entries = []
for key in logical_keys:
jsonl += "{\"logical_key\": \"%s\"}\n" % key
entry = dict(
logical_key=key,
physical_key=f"{key}?versionid=1234",
size=100
)
entries.append(json.dumps(entry))
jsonl = "\n".join(entries)
print(jsonl)
streambytes = jsonl.encode()

Expand Down Expand Up @@ -165,8 +171,8 @@ def test_browse_top_level(self):
folder = file_list_to_folder(df)
assert len(folder['prefixes']) == 1
assert len(folder['objects']) == 1
assert 'foo.csv' in folder['objects']
assert 'bar/' in folder['prefixes']
assert folder['objects'][0]['logical_key'] == 'foo.csv'
assert folder['prefixes'][0]['logical_key'] == 'bar/'

def test_browse_subfolder(self):
"""
Expand All @@ -176,16 +182,22 @@ def test_browse_subfolder(self):
prefix = "bar/"
df = pd.read_json(buffer_s3response(self.s3response), lines=True)
assert isinstance(df, pd.DataFrame)

filtered_df = df[df['logical_key'].str.startswith(prefix)]
stripped = filtered_df['logical_key'].str.slice(start=len(prefix))
folder = file_list_to_folder(stripped.to_frame('logical_key'))
print(folder)
stripped_df = stripped.to_frame('logical_key')
s3_df = pd.concat(
[stripped_df['logical_key'], filtered_df['size'], filtered_df['physical_key']],
axis=1,
keys=['logical_key', 'size', 'physical_key']
)

folder = file_list_to_folder(s3_df)
assert len(folder['prefixes']) == 1
assert len(folder['objects']) == 2
assert "file1.txt" in folder['objects']
assert "file2.txt" in folder['objects']
assert "baz/" in folder['prefixes']
object_keys = [obj['logical_key'] for obj in folder['objects']]
assert "file1.txt" in object_keys
assert "file2.txt" in object_keys
assert folder['prefixes'][0]['logical_key'] == "baz/"

def test_browse_subsubfolder(self):
"""
Expand All @@ -197,13 +209,20 @@ def test_browse_subsubfolder(self):
assert isinstance(df, pd.DataFrame)
filtered_df = df[df['logical_key'].str.startswith(prefix)]
stripped = filtered_df['logical_key'].str.slice(start=len(prefix))
folder = file_list_to_folder(stripped.to_frame('logical_key'))
stripped_df = stripped.to_frame('logical_key')
s3_df = pd.concat(
[stripped_df['logical_key'], filtered_df['size'], filtered_df['physical_key']],
axis=1,
keys=['logical_key', 'size', 'physical_key']
)
folder = file_list_to_folder(s3_df)
assert "objects" in folder
assert "prefixes" in folder
assert not folder['prefixes']
assert len(folder['objects']) == 2
assert "file3.txt" in folder['objects']
assert "file4.txt" in folder['objects']
object_keys = [obj['logical_key'] for obj in folder['objects']]
assert "file3.txt" in object_keys
assert "file4.txt" in object_keys

def test_folder_view(self):
"""
Expand Down Expand Up @@ -245,8 +264,8 @@ def test_folder_view(self):
folder = json.loads(read_body(response))['contents']
assert len(folder['prefixes']) == 1
assert len(folder['objects']) == 1
assert 'foo.csv' in folder['objects']
assert 'bar/' in folder['prefixes']
assert folder['objects'][0]['logical_key'] == 'foo.csv'
assert folder['prefixes'][0]['logical_key'] == 'bar/'
client_patch.stop()

def test_detail_view(self):
Expand Down Expand Up @@ -381,10 +400,11 @@ def test_anon_access(self):
print(response)
assert response['statusCode'] == 200
folder = json.loads(read_body(response))['contents']
print(folder)
assert len(folder['prefixes']) == 1
assert len(folder['objects']) == 1
assert 'foo.csv' in folder['objects']
assert 'bar/' in folder['prefixes']
assert folder['objects'][0]['logical_key'] == 'foo.csv'
assert folder['prefixes'][0]['logical_key'] == 'bar/'
s3_stubber.deactivate()
client_patch.stop()
env_patcher.stop()
4 changes: 3 additions & 1 deletion lambdas/shared/t4_lambda_shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ def buffer_s3response(s3response):
response = io.StringIO()
end_event_received = False
stats = None
found_records = False
for event in s3response['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode()
response.write(records)
found_records = True
elif 'Progress' in event:
print(event['Progress']['Details'])
elif 'Stats' in event:
Expand All @@ -116,7 +118,7 @@ def buffer_s3response(s3response):
if not end_event_received:
raise IncompleteResultException("Error: Received an incomplete response from S3 Select.")
response.seek(0)
return response
return response if found_records else None


def query_manifest_content(
Expand Down