From 0ec96cc794c4c71517e76c004ac17d67e8807d7e Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 24 Mar 2015 13:45:06 -0700 Subject: [PATCH] Making unit tests pass after regression3 test fixes. - Adding test for branch miss in credentials._get_signed_query_params - Making storage.batch._unpack_batch_response work correctly in Python2 and Python3 (parser expects str in both) NOTE: Yet again %s caused issues between Py2 and Py3 (as in the PR #126 in oauth2client). --- gcloud/storage/batch.py | 20 +++++++++++++------- gcloud/storage/test_batch.py | 28 ++++++++++++++++++++++++++++ gcloud/test_credentials.py | 26 ++++++++++++++++++++++---- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/gcloud/storage/batch.py b/gcloud/storage/batch.py index 6caa5e7ec7a64..5f875d69564bb 100644 --- a/gcloud/storage/batch.py +++ b/gcloud/storage/batch.py @@ -170,16 +170,22 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _unpack_batch_response(response, content): """Convert response, content -> [(status, reason, payload)].""" parser = Parser() - if isinstance(content, six.binary_type): - content = content.decode('utf-8') - faux_message = ''.join([ - 'Content-Type: ', - response['content-type'], - '\nMIME-Version: 1.0\n\n', + if not isinstance(content, six.binary_type): + content = content.encode('utf-8') + content_type = response['content-type'] + if not isinstance(content_type, six.binary_type): + content_type = content_type.encode('utf-8') + faux_message = b''.join([ + b'Content-Type: ', + content_type, + b'\nMIME-Version: 1.0\n\n', content, ]) - message = parser.parsestr(faux_message) + if six.PY2: + message = parser.parsestr(faux_message) + else: # pragma: NO COVER Python3 + message = parser.parsestr(faux_message.decode('utf-8')) if not isinstance(message._payload, list): raise ValueError('Bad response: not multi-part') diff --git a/gcloud/storage/test_batch.py b/gcloud/storage/test_batch.py index 2f7569f45f83d..6fc030cf1f3ce 100644 --- a/gcloud/storage/test_batch.py +++ b/gcloud/storage/test_batch.py @@ -347,6 +347,34 @@ def test_as_context_mgr_w_error(self): self.assertEqual(len(batch._responses), 0) +class Test__unpack_batch_response(unittest2.TestCase): + + def _callFUT(self, response, content): + from gcloud.storage.batch import _unpack_batch_response + return _unpack_batch_response(response, content) + + def test_bytes(self): + RESPONSE = {'content-type': b'multipart/mixed; boundary="DEADBEEF="'} + CONTENT = _THREE_PART_MIME_RESPONSE.encode('utf-8') + result = list(self._callFUT(RESPONSE, CONTENT)) + self.assertEqual(len(result), 3) + self.assertEqual(result[0], ('200', 'OK', {u'bar': 2, u'foo': 1})) + self.assertEqual(result[1], ('200', 'OK', {u'foo': 1, u'bar': 3})) + self.assertEqual(result[2], ('204', 'No Content', '')) + + def test_unicode(self): + import six + RESPONSE = {'content-type': u'multipart/mixed; boundary="DEADBEEF="'} + CONTENT = _THREE_PART_MIME_RESPONSE + if isinstance(CONTENT, six.binary_type): # pragma: NO COVER Python3 + CONTENT = CONTENT.decode('utf-8') + result = list(self._callFUT(RESPONSE, CONTENT)) + self.assertEqual(len(result), 3) + self.assertEqual(result[0], ('200', 'OK', {u'bar': 2, u'foo': 1})) + self.assertEqual(result[1], ('200', 'OK', {u'foo': 1, u'bar': 3})) + self.assertEqual(result[2], ('204', 'No Content', '')) + + _THREE_PART_MIME_RESPONSE = """\ --DEADBEEF= Content-Type: application/http diff --git a/gcloud/test_credentials.py b/gcloud/test_credentials.py index bb9c223c8408a..6e743cd83269b 100644 --- a/gcloud/test_credentials.py +++ b/gcloud/test_credentials.py @@ -176,11 +176,13 @@ def _get_pem_key(credentials): SIGNATURE_STRING = 'dummy_signature' with _Monkey(MUT, RSA=rsa, PKCS1_v1_5=pkcs_v1_5, SHA256=sha256, _get_pem_key=_get_pem_key): - self.assertRaises(NameError, self._callFUT, + self.assertRaises(UnboundLocalError, self._callFUT, BAD_CREDENTIALS, EXPIRATION, SIGNATURE_STRING) - def _run_test_with_credentials(self, credentials, account_name): + def _run_test_with_credentials(self, credentials, account_name, + signature_string=None): import base64 + import six from gcloud._testing import _Monkey from gcloud import credentials as MUT @@ -190,7 +192,7 @@ def _run_test_with_credentials(self, credentials, account_name): sha256 = _SHA256() EXPIRATION = '100' - SIGNATURE_STRING = b'dummy_signature' + SIGNATURE_STRING = signature_string or b'dummy_signature' with _Monkey(MUT, crypt=crypt, RSA=rsa, PKCS1_v1_5=pkcs_v1_5, SHA256=sha256): result = self._callFUT(credentials, EXPIRATION, SIGNATURE_STRING) @@ -199,7 +201,12 @@ def _run_test_with_credentials(self, credentials, account_name): self.assertEqual(crypt._private_key_text, base64.b64encode(b'dummy_private_key_text')) self.assertEqual(crypt._private_key_password, 'notasecret') - self.assertEqual(sha256._signature_string, SIGNATURE_STRING) + # sha256._signature_string is always bytes. + if isinstance(SIGNATURE_STRING, six.binary_type): + self.assertEqual(sha256._signature_string, SIGNATURE_STRING) + else: + self.assertEqual(sha256._signature_string, + SIGNATURE_STRING.encode('utf-8')) SIGNED = base64.b64encode(b'DEADBEEF') expected_query = { 'Expires': EXPIRATION, @@ -217,6 +224,17 @@ def test_signed_jwt_for_p12(self): ACCOUNT_NAME, b'dummy_private_key_text', scopes) self._run_test_with_credentials(credentials, ACCOUNT_NAME) + def test_signature_non_bytes(self): + from oauth2client import client + + scopes = [] + ACCOUNT_NAME = 'dummy_service_account_name' + SIGNATURE_STRING = u'dummy_signature' + credentials = client.SignedJwtAssertionCredentials( + ACCOUNT_NAME, b'dummy_private_key_text', scopes) + self._run_test_with_credentials(credentials, ACCOUNT_NAME, + signature_string=SIGNATURE_STRING) + def test_service_account_via_json_key(self): from oauth2client import service_account from gcloud._testing import _Monkey