Skip to content

Commit

Permalink
Merge pull request #144 from tseaver/coverage-storage
Browse files Browse the repository at this point in the history
Coverage for gcloud.storage
  • Loading branch information
silvolu committed Sep 23, 2014
2 parents b477587 + 117bca9 commit a6fa3f6
Show file tree
Hide file tree
Showing 13 changed files with 2,890 additions and 22 deletions.
6 changes: 3 additions & 3 deletions gcloud/storage/acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __str__(self):
else:
return '{self.type}-{self.identifier}'.format(self=self)

def __repr__(self):
def __repr__(self): #pragma NO COVER
return '<ACL Entity: {self} ({roles})>'.format(
self=self, roles=', '.join(self.roles))

Expand Down Expand Up @@ -223,7 +223,7 @@ def entity_from_dict(self, entity_dict):
entity = self.entity(type=type, identifier=identifier)

if not isinstance(entity, ACL.Entity):
raise ValueError('Invalid dictionary: %s' % acl_dict)
raise ValueError('Invalid dictionary: %s' % entity_dict)

return entity.grant(role)

Expand Down Expand Up @@ -352,7 +352,7 @@ def get_entities(self):

return self.entities.values()

def save(self):
def save(self): #pragma NO COVER
"""A method to be overridden by subclasses.
:raises: NotImplementedError
Expand Down
6 changes: 3 additions & 3 deletions gcloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def from_dict(cls, bucket_dict, connection=None):
return cls(connection=connection, name=bucket_dict['name'],
metadata=bucket_dict)

def __repr__(self):
def __repr__(self): #pragma NO COVER
return '<Bucket: %s>' % self.name

def __iter__(self):
Expand Down Expand Up @@ -122,7 +122,7 @@ def new_key(self, key):
# Support Python 2 and 3.
try:
string_type = basestring
except NameError:
except NameError: #pragma NO COVER PY3k
string_type = str

if isinstance(key, string_type):
Expand Down Expand Up @@ -194,7 +194,7 @@ def delete_keys(self, keys):
for key in keys:
self.delete_key(key)

def copy_key(self):
def copy_key(self): #pragma NO COVER
raise NotImplementedError

def upload_file(self, filename, key=None):
Expand Down
4 changes: 2 additions & 2 deletions gcloud/storage/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,15 @@ def new_bucket(self, bucket):
# Support Python 2 and 3.
try:
string_type = basestring
except NameError:
except NameError: #pragma NO COVER PY3k
string_type = str

if isinstance(bucket, string_type):
return Bucket(connection=self, name=bucket)

raise TypeError('Invalid bucket: %s' % bucket)

def generate_signed_url(self, resource, expiration, method='GET', content_md5=None, content_type=None):
def generate_signed_url(self, resource, expiration, method='GET', content_md5=None, content_type=None): #pragma NO COVER UGH
"""Generate a signed URL to provide query-string authentication to a resource.
:type resource: string
Expand Down
2 changes: 1 addition & 1 deletion gcloud/storage/demo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
PROJECT = 'gcloud-storage-demo'


def get_connection():
def get_connection(): #pragma NO COVER
return storage.get_connection(PROJECT, CLIENT_EMAIL, PRIVATE_KEY_PATH)
2 changes: 1 addition & 1 deletion gcloud/storage/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def reset(self):
self.page_number = 0
self.next_page_token = None

def get_items_from_response(self, response):
def get_items_from_response(self, response): #pragma NO COVER
"""Factory method called while iterating. This should be overriden.
This method should be overridden by a subclass.
Expand Down
6 changes: 3 additions & 3 deletions gcloud/storage/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def from_dict(cls, key_dict, bucket=None):

return cls(bucket=bucket, name=key_dict['name'], metadata=key_dict)

def __repr__(self):
def __repr__(self): #pragma NO COVER
if self.bucket:
bucket_name = self.bucket.name
else:
Expand Down Expand Up @@ -97,7 +97,7 @@ def public_url(self):
return '{storage_base_url}/{self.bucket.name}/{self.name}'.format(
storage_base_url='http://commondatastorage.googleapis.com', self=self)

def generate_signed_url(self, expiration, method='GET'):
def generate_signed_url(self, expiration, method='GET'): #pragma NO COVER UGH
"""Generates a signed URL for this key.
If you have a key that you want to allow access to
Expand Down Expand Up @@ -153,7 +153,7 @@ def get_contents_to_file(self, fh):
for chunk in KeyDataIterator(self):
try:
fh.write(chunk)
except IOError, e:
except IOError, e: #pragma NO COVER
if e.errno == errno.ENOSPC:
raise Exception('No space left on device.')

Expand Down
69 changes: 69 additions & 0 deletions gcloud/storage/test___init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import unittest2


class Test_get_connection(unittest2.TestCase):

def _callFUT(self, *args, **kw):
from gcloud.storage import get_connection
return get_connection(*args, **kw)

def test_it(self):
from tempfile import NamedTemporaryFile
from gcloud import credentials
from gcloud.storage import SCOPE
from gcloud.storage.connection import Connection
from gcloud.test_credentials import _Client
from gcloud.test_credentials import _Monkey
PROJECT = 'project'
CLIENT_EMAIL = '[email protected]'
PRIVATE_KEY = 'SEEkR1t'
client = _Client()
with _Monkey(credentials, client=client):
with NamedTemporaryFile() as f:
f.write(PRIVATE_KEY)
f.flush()
found = self._callFUT(PROJECT, CLIENT_EMAIL, f.name)
self.assertTrue(isinstance(found, Connection))
self.assertEqual(found.project, PROJECT)
self.assertTrue(found._credentials is client._signed)
self.assertEqual(client._called_with,
{'service_account_name': CLIENT_EMAIL,
'private_key': PRIVATE_KEY,
'scope': SCOPE,
})


class Test_get_bucket(unittest2.TestCase):

def _callFUT(self, *args, **kw):
from gcloud.storage import get_bucket
return get_bucket(*args, **kw)

def test_it(self):
from tempfile import NamedTemporaryFile
from gcloud import storage
from gcloud.test_credentials import _Monkey
bucket = object()
class _Connection(object):
def get_bucket(self, bucket_name):
self._called_With = bucket_name
return bucket
connection = _Connection()
_called_With = []
def get_connection(*args, **kw):
_called_With.append((args, kw))
return connection
BUCKET = 'bucket'
PROJECT = 'project'
CLIENT_EMAIL = '[email protected]'
PRIVATE_KEY = 'SEEkR1t'
with _Monkey(storage, get_connection=get_connection):
with NamedTemporaryFile() as f:
f.write(PRIVATE_KEY)
f.flush()
found = self._callFUT(BUCKET, PROJECT, CLIENT_EMAIL, f.name)
self.assertTrue(found is bucket)
self.assertEqual(_called_With,
[((PROJECT, CLIENT_EMAIL, f.name), {})])
self.assertEqual(connection._called_With, BUCKET)

Loading

0 comments on commit a6fa3f6

Please sign in to comment.