From 0655bd3d968c7189ac37bbde56bde776b19c1a27 Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Fri, 16 Aug 2024 12:17:36 +0000 Subject: [PATCH 1/6] Quick and dirty changes --- gcs/object.py | 2 ++ testbench/common.py | 21 +++++++++++++++++++++ testbench/database.py | 1 + testbench/rest_server.py | 5 ++++- testbench_run.py | 2 +- 5 files changed, 29 insertions(+), 2 deletions(-) diff --git a/gcs/object.py b/gcs/object.py index de0ff152..3ef16b6b 100644 --- a/gcs/object.py +++ b/gcs/object.py @@ -399,6 +399,8 @@ def rest_media(self, request, delay=time.sleep): content_range = "bytes %d-%d/%d" % (begin, end - 1, length) instructions = testbench.common.extract_instruction(request, None) + print("came inside rest_media") + print(instructions) if instructions is None: def streamer(): diff --git a/testbench/common.py b/testbench/common.py index b19dfa57..48a920d7 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -23,6 +23,7 @@ import re import socket import struct +import time import types from functools import wraps @@ -46,6 +47,10 @@ retry_return_broken_stream_after_bytes = re.compile( r"return-broken-stream-after-([0-9]+)K$" ) +retry_stall_beginning = re.compile( + r"stall-at-beginning-([0-9]+)K$" +) + content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)") # === STR === # @@ -777,11 +782,14 @@ def handle_retry_test_instruction(database, request, socket_closer, method): test_id, method, transport="HTTP" ): return __get_default_response_fn + next_instruction = database.peek_next_instruction(test_id, method) + print("handle_retry_test: (next_instructions) ", next_instruction) error_code_matches = testbench.common.retry_return_error_code.match( next_instruction ) if error_code_matches: + print("handle_retry_test: error code matches") database.dequeue_next_instruction(test_id, method) items = list(error_code_matches.groups()) error_code = items[0] @@ -795,6 +803,7 @@ def handle_retry_test_instruction(database, request, socket_closer, method): next_instruction ) if retry_connection_matches: + print("handle_retry_test: return connection matches") items = list(retry_connection_matches.groups()) if items[0] == "reset-connection": database.dequeue_next_instruction(test_id, method) @@ -814,10 +823,22 @@ def handle_retry_test_instruction(database, request, socket_closer, method): "Injected 'connection reset by peer' fault", 500 ) elif items[0] == "broken-stream": + print("broken stream") return __get_streamer_response_fn(database, method, socket_closer, test_id) broken_stream_after_bytes = ( testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction) ) + + retry_stall_beginning_matches = testbench.common.retry_stall_beginning.match( + next_instruction + ) + if retry_stall_beginning_matches and method == "storage.objects.get": + media = request.args.get("alt", None) + if media == None or media != "json": + print("handle_retry_test: retry_stall_beginning matches") + database.dequeue_next_instruction(test_id, method) + time.sleep(10) + if broken_stream_after_bytes and method == "storage.objects.get": items = list(broken_stream_after_bytes.groups()) after_bytes = int(items[0]) * 1024 diff --git a/testbench/database.py b/testbench/database.py index 368e0692..3c6ef4ac 100644 --- a/testbench/database.py +++ b/testbench/database.py @@ -438,6 +438,7 @@ def __validate_injected_failure_description(self, failure): testbench.common.retry_return_error_after_bytes, testbench.common.retry_return_short_response, testbench.common.retry_return_broken_stream_after_bytes, + testbench.common.retry_stall_beginning, ]: if expr.match(failure) is not None: return diff --git a/testbench/rest_server.py b/testbench/rest_server.py index 9490f24f..1e10412d 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -37,7 +37,7 @@ # === DEFAULT ENTRY FOR REST SERVER === # root = flask.Flask(__name__) -root.debug = False +root.debug = True root.register_error_handler(Exception, testbench.error.RestException.handler) @@ -133,6 +133,7 @@ def list_retry_tests(): def create_retry_test(): payload = json.loads(flask.request.data) test_instruction_set = payload.get("instructions", None) + print("under create retry test: ", test_instruction_set) if not test_instruction_set: return flask.Response( "instructions is not defined", status=400, content_type="text/plain" @@ -575,6 +576,7 @@ def object_get(bucket_name, object_name): preconditions=testbench.common.make_json_preconditions(flask.request), context=None, ) + print("under object_get") media = flask.request.args.get("alt", None) if media is None or media == "json": projection = testbench.common.extract_projection(flask.request, "noAcl", None) @@ -587,6 +589,7 @@ def object_get(bucket_name, object_name): testbench.csek.validation( flask.request, blob.metadata.customer_encryption.key_sha256_bytes, False, None ) + print("just before rest_media call") return blob.rest_media(flask.request) diff --git a/testbench_run.py b/testbench_run.py index fbec5800..6d7e9ade 100644 --- a/testbench_run.py +++ b/testbench_run.py @@ -27,7 +27,7 @@ def start_server(): - if len(sys.argv) == 4: + if len(sys.argv) >= 4: sock_host = sys.argv[1] sock_port = int(sys.argv[2]) num_of_threads = int(sys.argv[3]) From b533a9f5b53907d18d9db4dcb63bfd14d047e41f Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 20 Aug 2024 14:22:23 +0000 Subject: [PATCH 2/6] review comments --- README.md | 1 + gcs/object.py | 2 - testbench/common.py | 46 +++++++++++----- testbench/database.py | 2 +- testbench/rest_server.py | 5 +- testbench_run.py | 2 +- tests/test_testbench_retry.py | 99 +++++++++++++++++++++++++++++++++++ 7 files changed, 136 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 019cbd88..b718e9ec 100644 --- a/README.md +++ b/README.md @@ -249,3 +249,4 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91 | return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes
[GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes | return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data
[GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data | return-reset-connection | [HTTP] Testbench will fail with a reset connection
[GRPC] Testbench will fail the RPC with `UNAVAILABLE` +| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data
[GRPC] Not supported \ No newline at end of file diff --git a/gcs/object.py b/gcs/object.py index 3ef16b6b..de0ff152 100644 --- a/gcs/object.py +++ b/gcs/object.py @@ -399,8 +399,6 @@ def rest_media(self, request, delay=time.sleep): content_range = "bytes %d-%d/%d" % (begin, end - 1, length) instructions = testbench.common.extract_instruction(request, None) - print("came inside rest_media") - print(instructions) if instructions is None: def streamer(): diff --git a/testbench/common.py b/testbench/common.py index 48a920d7..10fd4863 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -47,8 +47,8 @@ retry_return_broken_stream_after_bytes = re.compile( r"return-broken-stream-after-([0-9]+)K$" ) -retry_stall_beginning = re.compile( - r"stall-at-beginning-([0-9]+)K$" +retry_stall_after_bytes = re.compile( + r"stall-for-([0-9]+)s-after-([0-9]+)K$" ) content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)") @@ -723,6 +723,29 @@ def streamer(): return response_handler +def __get_stream_and_stall_fn( + database, method, test_id, limit=4, stall_time_sec=10, chunk_size=4 +): + def response_handler(data): + def streamer(): + d = _extract_data(data) + bytes_yield = 0 + instruction_dequed = False + for r in range(0, len(d), chunk_size): + if bytes_yield >= limit and not instruction_dequed: + time.sleep(stall_time_sec) + database.dequeue_next_instruction(test_id, method) + instruction_dequed = True + chunk_end = min(r + chunk_size, len(d)) + chunk_downloaded = chunk_end - r + bytes_yield += chunk_downloaded + yield d[r:chunk_end] + + return flask.Response(streamer(), headers=_extract_headers(data)) + + return response_handler + + def __get_default_response_fn(data): return data @@ -784,12 +807,10 @@ def handle_retry_test_instruction(database, request, socket_closer, method): return __get_default_response_fn next_instruction = database.peek_next_instruction(test_id, method) - print("handle_retry_test: (next_instructions) ", next_instruction) error_code_matches = testbench.common.retry_return_error_code.match( next_instruction ) if error_code_matches: - print("handle_retry_test: error code matches") database.dequeue_next_instruction(test_id, method) items = list(error_code_matches.groups()) error_code = items[0] @@ -803,7 +824,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method): next_instruction ) if retry_connection_matches: - print("handle_retry_test: return connection matches") items = list(retry_connection_matches.groups()) if items[0] == "reset-connection": database.dequeue_next_instruction(test_id, method) @@ -823,21 +843,21 @@ def handle_retry_test_instruction(database, request, socket_closer, method): "Injected 'connection reset by peer' fault", 500 ) elif items[0] == "broken-stream": - print("broken stream") return __get_streamer_response_fn(database, method, socket_closer, test_id) broken_stream_after_bytes = ( testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction) ) - retry_stall_beginning_matches = testbench.common.retry_stall_beginning.match( + retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match( next_instruction ) - if retry_stall_beginning_matches and method == "storage.objects.get": - media = request.args.get("alt", None) - if media == None or media != "json": - print("handle_retry_test: retry_stall_beginning matches") - database.dequeue_next_instruction(test_id, method) - time.sleep(10) + if retry_stall_after_bytes_matches: + items = list(retry_stall_after_bytes_matches.groups()) + stall_time = int(items[0]) + after_bytes = int(items[1]) * 1024 + return __get_stream_and_stall_fn( + database, method, test_id, limit=after_bytes, stall_time_sec=stall_time + ) if broken_stream_after_bytes and method == "storage.objects.get": items = list(broken_stream_after_bytes.groups()) diff --git a/testbench/database.py b/testbench/database.py index 3c6ef4ac..f17260dd 100644 --- a/testbench/database.py +++ b/testbench/database.py @@ -438,7 +438,7 @@ def __validate_injected_failure_description(self, failure): testbench.common.retry_return_error_after_bytes, testbench.common.retry_return_short_response, testbench.common.retry_return_broken_stream_after_bytes, - testbench.common.retry_stall_beginning, + testbench.common.retry_stall_after_bytes, ]: if expr.match(failure) is not None: return diff --git a/testbench/rest_server.py b/testbench/rest_server.py index 1e10412d..9490f24f 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -37,7 +37,7 @@ # === DEFAULT ENTRY FOR REST SERVER === # root = flask.Flask(__name__) -root.debug = True +root.debug = False root.register_error_handler(Exception, testbench.error.RestException.handler) @@ -133,7 +133,6 @@ def list_retry_tests(): def create_retry_test(): payload = json.loads(flask.request.data) test_instruction_set = payload.get("instructions", None) - print("under create retry test: ", test_instruction_set) if not test_instruction_set: return flask.Response( "instructions is not defined", status=400, content_type="text/plain" @@ -576,7 +575,6 @@ def object_get(bucket_name, object_name): preconditions=testbench.common.make_json_preconditions(flask.request), context=None, ) - print("under object_get") media = flask.request.args.get("alt", None) if media is None or media == "json": projection = testbench.common.extract_projection(flask.request, "noAcl", None) @@ -589,7 +587,6 @@ def object_get(bucket_name, object_name): testbench.csek.validation( flask.request, blob.metadata.customer_encryption.key_sha256_bytes, False, None ) - print("just before rest_media call") return blob.rest_media(flask.request) diff --git a/testbench_run.py b/testbench_run.py index 6d7e9ade..fbec5800 100644 --- a/testbench_run.py +++ b/testbench_run.py @@ -27,7 +27,7 @@ def start_server(): - if len(sys.argv) >= 4: + if len(sys.argv) == 4: sock_host = sys.argv[1] sock_port = int(sys.argv[2]) num_of_threads = int(sys.argv[3]) diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 28dcbb84..f3a1b793 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -21,6 +21,7 @@ import re import unittest import unittest.mock +import time import crc32c from grpc import StatusCode @@ -439,6 +440,104 @@ def test_retry_test_return_broken_stream_after_bytes(self): _ = len(response.data) self.assertIn("broken stream", ex.exception.msg) + def test_list_retry_stall_test(self): + response = self.client.post( + "/retry_test", + data=json.dumps({ + "instructions": { + "storage.buckets.list": ["stall-for-1s-after-0K"] + } + }), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + start_time = time.perf_counter() + list_response = self.client.get( + "/storage/v1/b", + query_string={"project": "test-project-unused"}, + headers={"x-retry-test-id": create_rest.get("id")}, + ) + self.assertEqual(len(list_response.get_data()), 40) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + + def test_read_retry_test_stall_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject a larger object and smaller object. + media = self._create_block(UPLOAD_QUANTUM) + blob_larger = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_larger.status_code, 200) + + media = self._create_block(128) + blob_smaller = self.client.put( + "/bucket-name/128.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_smaller.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.get": ["stall-for-1s-after-128K"] + } + } + ), + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + start_time = time.perf_counter() + # The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection. + response = self.client.get( + "/storage/v1/b/bucket-name/o/128.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + self.assertEqual(len(response.get_data()), 128) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + # This will take less the injected delay (1s). + self.assertLess(elapsed_time, 1) + + start_time = time.perf_counter() + # The 256KiB file triggers the "stall-for-1s-after-128K" and will + # take more than injected delay (1s). + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + self.assertIn("x-goog-generation", response.headers) + def test_retry_test_return_error_after_bytes(self): response = self.client.post( "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) From 316501245fc380bb395e7560d18c61ddbaafa355 Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 20 Aug 2024 16:35:11 +0000 Subject: [PATCH 3/6] Fixing minor comments --- testbench/common.py | 5 +---- tests/test_testbench_retry.py | 11 +++-------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/testbench/common.py b/testbench/common.py index 10fd4863..cc080479 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -47,9 +47,7 @@ retry_return_broken_stream_after_bytes = re.compile( r"return-broken-stream-after-([0-9]+)K$" ) -retry_stall_after_bytes = re.compile( - r"stall-for-([0-9]+)s-after-([0-9]+)K$" -) +retry_stall_after_bytes = re.compile(r"stall-for-([0-9]+)s-after-([0-9]+)K$") content_range_split = re.compile(r"bytes (\*|[0-9]+-[0-9]+|[0-9]+-\*)\/(\*|[0-9]+)") @@ -805,7 +803,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method): test_id, method, transport="HTTP" ): return __get_default_response_fn - next_instruction = database.peek_next_instruction(test_id, method) error_code_matches = testbench.common.retry_return_error_code.match( next_instruction diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index f3a1b793..503d2d5a 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -19,9 +19,9 @@ import json import os import re +import time import unittest import unittest.mock -import time import crc32c from grpc import StatusCode @@ -444,9 +444,7 @@ def test_list_retry_stall_test(self): response = self.client.post( "/retry_test", data=json.dumps({ - "instructions": { - "storage.buckets.list": ["stall-for-1s-after-0K"] - } + "instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]} }), ) self.assertEqual(response.status_code, 200) @@ -495,9 +493,7 @@ def test_read_retry_test_stall_after_bytes(self): "/retry_test", data=json.dumps( { - "instructions": { - "storage.objects.get": ["stall-for-1s-after-128K"] - } + "instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]} } ), ) @@ -536,7 +532,6 @@ def test_read_retry_test_stall_after_bytes(self): end_time = time.perf_counter() elapsed_time = end_time - start_time self.assertGreater(elapsed_time, 1) - self.assertIn("x-goog-generation", response.headers) def test_retry_test_return_error_after_bytes(self): response = self.client.post( From 31029bb27ed43c4196a4a8cf4ffbfe79bfb6d89e Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 20 Aug 2024 18:21:08 +0000 Subject: [PATCH 4/6] Formatting changes --- tests/test_testbench_retry.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 503d2d5a..88e4ebfe 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -444,8 +444,8 @@ def test_list_retry_stall_test(self): response = self.client.post( "/retry_test", data=json.dumps({ - "instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]} - }), + "instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}} + ), ) self.assertEqual(response.status_code, 200) self.assertTrue( @@ -492,9 +492,7 @@ def test_read_retry_test_stall_after_bytes(self): response = self.client.post( "/retry_test", data=json.dumps( - { - "instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]} - } + {"instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]}} ), ) From 81b82e98133f897aba99c80d644e42a613053cbf Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 20 Aug 2024 18:24:19 +0000 Subject: [PATCH 5/6] minor formatting changes --- tests/test_testbench_retry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 88e4ebfe..6e54a4a1 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -443,8 +443,8 @@ def test_retry_test_return_broken_stream_after_bytes(self): def test_list_retry_stall_test(self): response = self.client.post( "/retry_test", - data=json.dumps({ - "instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}} + data=json.dumps( + {"instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}} ), ) self.assertEqual(response.status_code, 200) From e53b8f401681ed65b540a006d3f6d90c38b01f8a Mon Sep 17 00:00:00 2001 From: Prince Kumar Date: Tue, 20 Aug 2024 18:39:12 +0000 Subject: [PATCH 6/6] minor formatting change --- tests/test_testbench_retry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 6e54a4a1..a5b5f8a1 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -465,7 +465,6 @@ def test_list_retry_stall_test(self): elapsed_time = end_time - start_time self.assertGreater(elapsed_time, 1) - def test_read_retry_test_stall_after_bytes(self): response = self.client.post( "/storage/v1/b", data=json.dumps({"name": "bucket-name"})