Skip to content

Commit

Permalink
feat: add Object ACL RPCs to emulator (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan authored Aug 17, 2021
1 parent 4ca0865 commit 5ca5ab9
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 0 deletions.
71 changes: 71 additions & 0 deletions emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,77 @@ def object_get(bucket_name, object_name):
return blob.rest_media(flask.request)


# === OBJECT ACCESS CONTROL === #


@gcs.route("/b/<bucket_name>/o/<path:object_name>/acl")
@retry_test(method="storage.object_acl.list")
def object_acl_list(bucket_name, object_name):
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
response = {"kind": "storage#objectAccessControls", "items": []}
for acl in blob.metadata.acl:
acl_rest = json_format.MessageToDict(acl)
acl_rest["kind"] = "storage#objectAccessControl"
response["items"].append(acl_rest)
fields = flask.request.args.get("fields", None)
return testbench.common.filter_response_rest(response, None, fields)


@gcs.route("/b/<bucket_name>/o/<path:object_name>/acl", methods=["POST"])
@retry_test(method="storage.object_acl.insert")
def object_acl_insert(bucket_name, object_name):
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
acl = blob.insert_acl(flask.request, None)
response = json_format.MessageToDict(acl)
response["kind"] = "storage#objectAccessControl"
fields = flask.request.args.get("fields", None)
return testbench.common.filter_response_rest(response, None, fields)


@gcs.route("/b/<bucket_name>/o/<path:object_name>/acl/<entity>")
@retry_test(method="storage.object_acl.get")
def object_acl_get(bucket_name, object_name, entity):
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
acl = blob.get_acl(entity, None)
response = json_format.MessageToDict(acl)
response["kind"] = "storage#objectAccessControl"
fields = flask.request.args.get("fields", None)
return testbench.common.filter_response_rest(response, None, fields)


@gcs.route("/b/<bucket_name>/o/<path:object_name>/acl/<entity>", methods=["PUT"])
@retry_test(method="storage.object_acl.update")
def object_acl_update(bucket_name, object_name, entity):
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
acl = blob.update_acl(flask.request, entity, None)
response = json_format.MessageToDict(acl)
response["kind"] = "storage#objectAccessControl"
fields = flask.request.args.get("fields", None)
return testbench.common.filter_response_rest(response, None, fields)


@gcs.route(
"/b/<bucket_name>/o/<path:object_name>/acl/<entity>", methods=["PATCH", "POST"]
)
@retry_test(method="storage.object_acl.patch")
def object_acl_patch(bucket_name, object_name, entity):
testbench.common.enforce_patch_override(flask.request)
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
acl = blob.patch_acl(flask.request, entity, None)
response = json_format.MessageToDict(acl)
response["kind"] = "storage#objectAccessControl"
fields = flask.request.args.get("fields", None)
return testbench.common.filter_response_rest(response, None, fields)


@gcs.route("/b/<bucket_name>/o/<path:object_name>/acl/<entity>", methods=["DELETE"])
@retry_test(method="storage.object_acl.delete")
def object_acl_delete(bucket_name, object_name, entity):
blob = db.get_object(flask.request, bucket_name, object_name, False, None)
blob.delete_acl(entity, None)
return ""


# === SERVER === #

# Define the WSGI application to handle HMAC key requests
Expand Down
80 changes: 80 additions & 0 deletions tests/test_emulator_object_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,86 @@ def test_object_crud(self):
response = self.client.get("/storage/v1/b/bucket-name/o/fox.txt")
self.assertEqual(response.status_code, 404)

def test_object_acl_crud(self):
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Use the XML API to insert an object, as the JSON API is not yet ready.
payload = "How vexingly quick daft zebras jump!"
response = self.client.put(
"/bucket-name/zebra",
content_type="text/plain",
data=payload,
)
self.assertEqual(response.status_code, 200)

insert_data = {"entity": "allAuthenticatedUsers", "role": "READER"}
response = self.client.post(
"/storage/v1/b/bucket-name/o/zebra/acl", data=json.dumps(insert_data)
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
insert_rest = json.loads(response.data)
self.assertEqual(insert_rest, insert_rest | insert_data)

response = self.client.get(
"/storage/v1/b/bucket-name/o/zebra/acl/allAuthenticatedUsers"
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
get_rest = json.loads(response.data)
self.assertEqual(get_rest, insert_rest)

response = self.client.patch(
"/storage/v1/b/bucket-name/o/zebra/acl/allAuthenticatedUsers",
data=json.dumps({"role": "OWNER"}),
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
patch_rest = json.loads(response.data)
self.assertEqual(patch_rest.get("role", None), "OWNER")

update_data = patch_rest.copy()
update_data["role"] = "READER"
response = self.client.put(
"/storage/v1/b/bucket-name/o/zebra/acl/allAuthenticatedUsers",
data=json.dumps(update_data),
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
update_rest = json.loads(response.data)
self.assertEqual(update_rest.get("role", None), "READER")

response = self.client.get("/storage/v1/b/bucket-name/o/zebra/acl")
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)
list_rest = json.loads(response.data)
self.assertIn(
"allAuthenticatedUsers", [a.get("entity") for a in list_rest.get("items")]
)

response = self.client.delete(
"/storage/v1/b/bucket-name/o/zebra/acl/allAuthenticatedUsers"
)
self.assertEqual(response.status_code, 200)
# After delete, get should fail
response = self.client.get(
"/storage/v1/b/bucket-name/o/zebra/allAuthenticatedUsers"
)
self.assertEqual(response.status_code, 404)


if __name__ == "__main__":
unittest.main()

0 comments on commit 5ca5ab9

Please sign in to comment.