From d64fb53a41a5a6d831b2b162c34943e29b29e74c Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 25 Jul 2018 14:22:49 -0400 Subject: [PATCH] BigQuery add default location to client (#5678) * Add read-only 'Client.location' property. Settable via new 'location' argument to ctor. * Use 'Client.location' as default for 'Client._get_query_results'. * Use 'Client.location' as default for 'Client.get_job'. * Use 'Client.location' as default for 'Client.cancel_job'. * Use 'Client.location' as default for 'Client.load_table_from_uri'. * Use 'Client.location' as default for 'Client.load_table_from_file'. * Use 'Client.location' as default for 'Client.load_table_from_dataframe'. * Use 'Client.location' as default for 'Client.copy_table'. * Use 'Client.location' as default for 'Client.extract_table'. * Use 'Client.location' as default for 'Client.query'. * Use 'Client.location' as default default for 'create_dataset'. Closes #5148. --- bigquery/google/cloud/bigquery/client.py | 89 +++- bigquery/tests/unit/test_client.py | 566 ++++++++++++++++++++--- 2 files changed, 578 insertions(+), 77 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 39f85fe35e37..4d7063e260ee 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -107,6 +107,8 @@ class Client(ClientWithProject): current object. This parameter should be considered private, and could change in the future. + location str: + (Optional) Default location for jobs / datasets / tables. Raises: google.auth.exceptions.DefaultCredentialsError: @@ -118,10 +120,17 @@ class Client(ClientWithProject): 'https://www.googleapis.com/auth/cloud-platform') """The scopes required for authenticating as a BigQuery consumer.""" - def __init__(self, project=None, credentials=None, _http=None): + def __init__( + self, project=None, credentials=None, _http=None, location=None): super(Client, self).__init__( project=project, credentials=credentials, _http=_http) self._connection = Connection(self) + self._location = location + + @property + def location(self): + """Default location for jobs / datasets / tables.""" + return self._location def get_service_account_email(self, project=None): """Get the email address of the project's BigQuery service account @@ -286,8 +295,14 @@ def create_dataset(self, dataset): """ path = '/projects/%s/datasets' % (dataset.project,) + + data = dataset.to_api_repr() + if data.get('location') is None and self.location is not None: + data['location'] = self.location + api_response = self._connection.api_request( - method='POST', path=path, data=dataset.to_api_repr()) + method='POST', path=path, data=data) + return Dataset.from_api_repr(api_response) def create_table(self, table): @@ -548,6 +563,9 @@ def _get_query_results( if timeout_ms is not None: extra_params['timeoutMs'] = timeout_ms + if location is None: + location = self.location + if location is not None: extra_params['location'] = location @@ -613,6 +631,10 @@ def get_job( if project is None: project = self.project + + if location is None: + location = self.location + if location is not None: extra_params['location'] = location @@ -652,6 +674,10 @@ def cancel_job( if project is None: project = self.project + + if location is None: + location = self.location + if location is not None: extra_params['location'] = location @@ -737,8 +763,12 @@ def list_jobs( extra_params=extra_params) def load_table_from_uri( - self, source_uris, destination, job_id=None, job_id_prefix=None, - location=None, project=None, job_config=None, + self, source_uris, destination, + job_id=None, + job_id_prefix=None, + location=None, + project=None, + job_config=None, retry=DEFAULT_RETRY): """Starts a job for loading data into a table from CloudStorage. @@ -773,14 +803,22 @@ def load_table_from_uri( google.cloud.bigquery.job.LoadJob: A new load job. """ job_id = _make_job_id(job_id, job_id_prefix) + if project is None: project = self.project + + if location is None: + location = self.location + job_ref = job._JobReference(job_id, project=project, location=location) + if isinstance(source_uris, six.string_types): source_uris = [source_uris] + load_job = job.LoadJob( job_ref, source_uris, destination, self, job_config) load_job._begin(retry=retry) + return load_job def load_table_from_file( @@ -831,14 +869,22 @@ def load_table_from_file( mode. """ job_id = _make_job_id(job_id, job_id_prefix) + if project is None: project = self.project + + if location is None: + location = self.location + job_ref = job._JobReference(job_id, project=project, location=location) load_job = job.LoadJob(job_ref, None, destination, self, job_config) job_resource = load_job._build_resource() + if rewind: file_obj.seek(0, os.SEEK_SET) + _check_mode(file_obj) + try: if size is None or size >= _MAX_MULTIPART_SIZE: response = self._do_resumable_upload( @@ -848,6 +894,7 @@ def load_table_from_file( file_obj, job_resource, size, num_retries) except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) + return self.job_from_resource(response.json()) def load_table_from_dataframe(self, dataframe, destination, @@ -901,10 +948,19 @@ def load_table_from_dataframe(self, dataframe, destination, job_config = job.LoadJobConfig() job_config.source_format = job.SourceFormat.PARQUET + if location is None: + location = self.location + return self.load_table_from_file( - buffer, destination, num_retries=num_retries, rewind=True, - job_id=job_id, job_id_prefix=job_id_prefix, location=location, - project=project, job_config=job_config) + buffer, destination, + num_retries=num_retries, + rewind=True, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + ) def _do_resumable_upload(self, stream, metadata, num_retries): """Perform a resumable upload. @@ -1050,16 +1106,23 @@ def copy_table( google.cloud.bigquery.job.CopyJob: A new copy job instance. """ job_id = _make_job_id(job_id, job_id_prefix) + if project is None: project = self.project + + if location is None: + location = self.location + job_ref = job._JobReference(job_id, project=project, location=location) if not isinstance(sources, collections.Sequence): sources = [sources] + copy_job = job.CopyJob( job_ref, sources, destination, client=self, job_config=job_config) copy_job._begin(retry=retry) + return copy_job def extract_table( @@ -1103,8 +1166,13 @@ def extract_table( google.cloud.bigquery.job.ExtractJob: A new extract job instance. """ job_id = _make_job_id(job_id, job_id_prefix) + if project is None: project = self.project + + if location is None: + location = self.location + job_ref = job._JobReference(job_id, project=project, location=location) if isinstance(destination_uris, six.string_types): @@ -1114,6 +1182,7 @@ def extract_table( job_ref, source, destination_uris, client=self, job_config=job_config) extract_job._begin(retry=retry) + return extract_job def query( @@ -1149,12 +1218,18 @@ def query( google.cloud.bigquery.job.QueryJob: A new query job instance. """ job_id = _make_job_id(job_id, job_id_prefix) + if project is None: project = self.project + + if location is None: + location = self.location + job_ref = job._JobReference(job_id, project=project, location=location) query_job = job.QueryJob( job_ref, query, client=self, job_config=job_config) query_job._begin(retry=retry) + return query_job def insert_rows(self, table, rows, selected_fields=None, **kwargs): diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 20059c451769..47f116d258ed 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -59,6 +59,7 @@ class TestClient(unittest.TestCase): TABLE_ID = 'TABLE_ID' TABLE_REF = DatasetReference(PROJECT, DS_ID).table(TABLE_ID) KMS_KEY_NAME = 'projects/1/locations/global/keyRings/1/cryptoKeys/1' + LOCATION = 'us-central' @staticmethod def _get_target_class(): @@ -69,7 +70,7 @@ def _get_target_class(): def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) - def test_ctor(self): + def test_ctor_defaults(self): from google.cloud.bigquery._http import Connection creds = _make_credentials() @@ -79,6 +80,20 @@ def test_ctor(self): self.assertIsInstance(client._connection, Connection) self.assertIs(client._connection.credentials, creds) self.assertIs(client._connection.http, http) + self.assertIsNone(client.location) + + def test_ctor_w_location(self): + from google.cloud.bigquery._http import Connection + + creds = _make_credentials() + http = object() + location = 'us-central' + client = self._make_one(project=self.PROJECT, credentials=creds, + _http=http, location=location) + self.assertIsInstance(client._connection, Connection) + self.assertIs(client._connection.credentials, creds) + self.assertIs(client._connection.http, http) + self.assertEqual(client.location, location) def test__get_query_results_miss_w_explicit_project_and_timeout(self): from google.cloud.exceptions import NotFound @@ -89,13 +104,32 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): with self.assertRaises(NotFound): client._get_query_results( - 'nothere', None, project='other-project', location='US', + 'nothere', None, + project='other-project', + location=self.LOCATION, timeout_ms=500) conn.api_request.assert_called_once_with( method='GET', path='/projects/other-project/queries/nothere', - query_params={'maxResults': 0, 'timeoutMs': 500, 'location': 'US'}) + query_params={ + 'maxResults': 0, 'timeoutMs': 500, 'location': self.LOCATION}, + ) + + def test__get_query_results_miss_w_client_location(self): + from google.cloud.exceptions import NotFound + + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds, location=self.LOCATION) + conn = client._connection = _make_connection() + + with self.assertRaises(NotFound): + client._get_query_results('nothere', None) + + conn.api_request.assert_called_once_with( + method='GET', + path='/projects/PROJECT/queries/nothere', + query_params={'maxResults': 0, 'location': self.LOCATION}) def test__get_query_results_hit(self): job_id = 'query_job' @@ -401,27 +435,37 @@ def test_create_dataset_minimal(self): PATH = 'projects/%s/datasets' % self.PROJECT RESOURCE = { - 'datasetReference': - {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, 'etag': "etag", 'id': "%s:%s" % (self.PROJECT, self.DS_ID), } creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = _make_connection(RESOURCE) - ds = client.create_dataset(Dataset(client.dataset(self.DS_ID))) + + ds_ref = client.dataset(self.DS_ID) + before = Dataset(ds_ref) + + after = client.create_dataset(before) + + self.assertEqual(after.dataset_id, self.DS_ID) + self.assertEqual(after.project, self.PROJECT) + self.assertEqual(after.etag, RESOURCE['etag']) + self.assertEqual(after.full_dataset_id, RESOURCE['id']) + conn.api_request.assert_called_once_with( method='POST', path='/%s' % PATH, data={ - 'datasetReference': - {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, 'labels': {}, }) - self.assertEqual(ds.dataset_id, self.DS_ID) - self.assertEqual(ds.project, self.PROJECT) - self.assertEqual(ds.etag, RESOURCE['etag']) - self.assertEqual(ds.full_dataset_id, RESOURCE['id']) def test_create_dataset_w_attrs(self): from google.cloud.bigquery.dataset import Dataset, AccessEntry @@ -438,8 +482,10 @@ def test_create_dataset_w_attrs(self): 'tableId': 'northern-hemisphere', } RESOURCE = { - 'datasetReference': - {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, 'etag': "etag", 'id': "%s:%s" % (self.PROJECT, self.DS_ID), 'description': DESCRIPTION, @@ -449,45 +495,56 @@ def test_create_dataset_w_attrs(self): 'labels': LABELS, 'access': [ {'role': 'OWNER', 'userByEmail': USER_EMAIL}, - {'view': VIEW}], + {'view': VIEW}, + ], } creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = _make_connection(RESOURCE) - entries = [AccessEntry('OWNER', 'userByEmail', USER_EMAIL), - AccessEntry(None, 'view', VIEW)] - ds_arg = Dataset(client.dataset(self.DS_ID)) - ds_arg.access_entries = entries - ds_arg.description = DESCRIPTION - ds_arg.friendly_name = FRIENDLY_NAME - ds_arg.default_table_expiration_ms = 3600 - ds_arg.location = LOCATION - ds_arg.labels = LABELS - ds = client.create_dataset(ds_arg) + entries = [ + AccessEntry('OWNER', 'userByEmail', USER_EMAIL), + AccessEntry(None, 'view', VIEW), + ] + + ds_ref = client.dataset(self.DS_ID) + before = Dataset(ds_ref) + before.access_entries = entries + before.description = DESCRIPTION + before.friendly_name = FRIENDLY_NAME + before.default_table_expiration_ms = 3600 + before.location = LOCATION + before.labels = LABELS + + after = client.create_dataset(before) + + self.assertEqual(after.dataset_id, self.DS_ID) + self.assertEqual(after.project, self.PROJECT) + self.assertEqual(after.etag, RESOURCE['etag']) + self.assertEqual(after.full_dataset_id, RESOURCE['id']) + self.assertEqual(after.description, DESCRIPTION) + self.assertEqual(after.friendly_name, FRIENDLY_NAME) + self.assertEqual(after.location, LOCATION) + self.assertEqual(after.default_table_expiration_ms, 3600) + self.assertEqual(after.labels, LABELS) + conn.api_request.assert_called_once_with( method='POST', path='/%s' % PATH, data={ - 'datasetReference': - {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, 'description': DESCRIPTION, 'friendlyName': FRIENDLY_NAME, 'location': LOCATION, 'defaultTableExpirationMs': '3600', 'access': [ {'role': 'OWNER', 'userByEmail': USER_EMAIL}, - {'view': VIEW}], + {'view': VIEW}, + ], 'labels': LABELS, }) - self.assertEqual(ds.dataset_id, self.DS_ID) - self.assertEqual(ds.project, self.PROJECT) - self.assertEqual(ds.etag, RESOURCE['etag']) - self.assertEqual(ds.full_dataset_id, RESOURCE['id']) - self.assertEqual(ds.description, DESCRIPTION) - self.assertEqual(ds.friendly_name, FRIENDLY_NAME) - self.assertEqual(ds.location, LOCATION) - self.assertEqual(ds.default_table_expiration_ms, 3600) - self.assertEqual(ds.labels, LABELS) def test_create_dataset_w_custom_property(self): # The library should handle sending properties to the API that are not @@ -503,25 +560,112 @@ def test_create_dataset_w_custom_property(self): creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = _make_connection(resource) - dataset = Dataset(client.dataset(self.DS_ID)) - dataset._properties['newAlphaProperty'] = 'unreleased property' - dataset = client.create_dataset(dataset) + ds_ref = client.dataset(self.DS_ID) + before = Dataset(ds_ref) + before._properties['newAlphaProperty'] = 'unreleased property' + + after = client.create_dataset(before) + + self.assertEqual(after.dataset_id, self.DS_ID) + self.assertEqual(after.project, self.PROJECT) + self.assertEqual( + after._properties['newAlphaProperty'], 'unreleased property') + conn.api_request.assert_called_once_with( method='POST', path=path, data={ - 'datasetReference': - {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, 'newAlphaProperty': 'unreleased property', 'labels': {}, } ) - self.assertEqual(dataset.dataset_id, self.DS_ID) - self.assertEqual(dataset.project, self.PROJECT) - self.assertEqual( - dataset._properties['newAlphaProperty'], 'unreleased property') + def test_create_dataset_w_client_location_wo_dataset_location(self): + from google.cloud.bigquery.dataset import Dataset + + PATH = 'projects/%s/datasets' % self.PROJECT + RESOURCE = { + 'datasetReference': + {'projectId': self.PROJECT, 'datasetId': self.DS_ID}, + 'etag': "etag", + 'id': "%s:%s" % (self.PROJECT, self.DS_ID), + 'location': self.LOCATION, + } + creds = _make_credentials() + client = self._make_one( + project=self.PROJECT, credentials=creds, location=self.LOCATION) + conn = client._connection = _make_connection(RESOURCE) + + ds_ref = client.dataset(self.DS_ID) + before = Dataset(ds_ref) + + after = client.create_dataset(before) + + self.assertEqual(after.dataset_id, self.DS_ID) + self.assertEqual(after.project, self.PROJECT) + self.assertEqual(after.etag, RESOURCE['etag']) + self.assertEqual(after.full_dataset_id, RESOURCE['id']) + self.assertEqual(after.location, self.LOCATION) + + conn.api_request.assert_called_once_with( + method='POST', + path='/%s' % PATH, + data={ + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, + 'labels': {}, + 'location': self.LOCATION, + }) + + def test_create_dataset_w_client_location_w_dataset_location(self): + from google.cloud.bigquery.dataset import Dataset + + PATH = 'projects/%s/datasets' % self.PROJECT + OTHER_LOCATION = 'EU' + RESOURCE = { + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, + 'etag': "etag", + 'id': "%s:%s" % (self.PROJECT, self.DS_ID), + 'location': OTHER_LOCATION, + } + creds = _make_credentials() + client = self._make_one( + project=self.PROJECT, credentials=creds, location=self.LOCATION) + conn = client._connection = _make_connection(RESOURCE) + + ds_ref = client.dataset(self.DS_ID) + before = Dataset(ds_ref) + before.location = OTHER_LOCATION + + after = client.create_dataset(before) + + self.assertEqual(after.dataset_id, self.DS_ID) + self.assertEqual(after.project, self.PROJECT) + self.assertEqual(after.etag, RESOURCE['etag']) + self.assertEqual(after.full_dataset_id, RESOURCE['id']) + self.assertEqual(after.location, OTHER_LOCATION) + + conn.api_request.assert_called_once_with( + method='POST', + path='/%s' % PATH, + data={ + 'datasetReference': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + }, + 'labels': {}, + 'location': OTHER_LOCATION, + }) def test_create_table_w_day_partition(self): from google.cloud.bigquery.table import Table @@ -1429,12 +1573,36 @@ def test_get_job_miss_w_explict_project(self): conn = client._connection = _make_connection() with self.assertRaises(NotFound): - client.get_job(JOB_ID, project=OTHER_PROJECT, location='EU') + client.get_job( + JOB_ID, project=OTHER_PROJECT, location=self.LOCATION) conn.api_request.assert_called_once_with( method='GET', path='/projects/OTHER_PROJECT/jobs/NONESUCH', - query_params={'projection': 'full', 'location': 'EU'}) + query_params={ + 'projection': 'full', + 'location': self.LOCATION, + }) + + def test_get_job_miss_w_client_location(self): + from google.cloud.exceptions import NotFound + + OTHER_PROJECT = 'OTHER_PROJECT' + JOB_ID = 'NONESUCH' + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds, location=self.LOCATION) + conn = client._connection = _make_connection() + + with self.assertRaises(NotFound): + client.get_job(JOB_ID, project=OTHER_PROJECT) + + conn.api_request.assert_called_once_with( + method='GET', + path='/projects/OTHER_PROJECT/jobs/NONESUCH', + query_params={ + 'projection': 'full', + 'location': self.LOCATION, + }) def test_get_job_hit(self): from google.cloud.bigquery.job import CreateDisposition @@ -1480,7 +1648,8 @@ def test_get_job_hit(self): conn.api_request.assert_called_once_with( method='GET', path='/projects/PROJECT/jobs/query_job', - query_params={'projection': 'full'}) + query_params={'projection': 'full'}, + ) def test_cancel_job_miss_w_explict_project(self): from google.cloud.exceptions import NotFound @@ -1492,12 +1661,36 @@ def test_cancel_job_miss_w_explict_project(self): conn = client._connection = _make_connection() with self.assertRaises(NotFound): - client.cancel_job(JOB_ID, project=OTHER_PROJECT, location='EU') + client.cancel_job( + JOB_ID, project=OTHER_PROJECT, location=self.LOCATION) conn.api_request.assert_called_once_with( method='POST', path='/projects/OTHER_PROJECT/jobs/NONESUCH/cancel', - query_params={'projection': 'full', 'location': 'EU'}) + query_params={ + 'projection': 'full', + 'location': self.LOCATION, + }) + + def test_cancel_job_miss_w_client_location(self): + from google.cloud.exceptions import NotFound + + OTHER_PROJECT = 'OTHER_PROJECT' + JOB_ID = 'NONESUCH' + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds, location=self.LOCATION) + conn = client._connection = _make_connection() + + with self.assertRaises(NotFound): + client.cancel_job(JOB_ID, project=OTHER_PROJECT) + + conn.api_request.assert_called_once_with( + method='POST', + path='/projects/OTHER_PROJECT/jobs/NONESUCH/cancel', + query_params={ + 'projection': 'full', + 'location': self.LOCATION, + }) def test_cancel_job_hit(self): from google.cloud.bigquery.job import QueryJob @@ -1842,7 +2035,7 @@ def test_load_table_from_uri_w_explicit_project(self): resource = { 'jobReference': { 'projectId': 'other-project', - 'location': 'US', + 'location': self.LOCATION, 'jobId': job_id, }, 'configuration': { @@ -1865,7 +2058,47 @@ def test_load_table_from_uri_w_explicit_project(self): client.load_table_from_uri( source_uri, destination, job_id=job_id, project='other-project', - location='US') + location=self.LOCATION) + + # Check that load_table_from_uri actually starts the job. + conn.api_request.assert_called_once_with( + method='POST', + path='/projects/other-project/jobs', + data=resource) + + def test_load_table_from_uri_w_client_location(self): + job_id = 'this-is-a-job-id' + destination_id = 'destination_table' + source_uri = 'gs://example/source.csv' + resource = { + 'jobReference': { + 'projectId': 'other-project', + 'location': self.LOCATION, + 'jobId': job_id, + }, + 'configuration': { + 'load': { + 'sourceUris': [source_uri], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + 'tableId': destination_id, + }, + }, + }, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, credentials=creds, _http=http, + location=self.LOCATION) + conn = client._connection = _make_connection(resource) + destination = client.dataset(self.DS_ID).table(destination_id) + + client.load_table_from_uri( + source_uri, destination, + job_id=job_id, + project='other-project') # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( @@ -2082,7 +2315,7 @@ def test_copy_table_w_explicit_project(self): resource = { 'jobReference': { 'projectId': 'other-project', - 'location': 'US', + 'location': self.LOCATION, 'jobId': job_id, }, 'configuration': { @@ -2113,13 +2346,61 @@ def test_copy_table_w_explicit_project(self): client.copy_table( source, destination, job_id=job_id, project='other-project', - location='US') + location=self.LOCATION) # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( method='POST', path='/projects/other-project/jobs', - data=resource) + data=resource, + ) + + def test_copy_table_w_client_location(self): + job_id = 'this-is-a-job-id' + source_id = 'source_table' + destination_id = 'destination_table' + resource = { + 'jobReference': { + 'projectId': 'other-project', + 'location': self.LOCATION, + 'jobId': job_id, + }, + 'configuration': { + 'copy': { + 'sourceTables': [ + { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + 'tableId': source_id, + }, + ], + 'destinationTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + 'tableId': destination_id, + }, + }, + }, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, credentials=creds, _http=http, + location=self.LOCATION) + conn = client._connection = _make_connection(resource) + dataset = client.dataset(self.DS_ID) + source = dataset.table(source_id) + destination = dataset.table(destination_id) + + client.copy_table( + source, destination, job_id=job_id, project='other-project') + + # Check that copy_table actually starts the job. + conn.api_request.assert_called_once_with( + method='POST', + path='/projects/other-project/jobs', + data=resource, + ) def test_extract_table(self): from google.cloud.bigquery.job import ExtractJob @@ -2173,7 +2454,7 @@ def test_extract_table_w_explicit_project(self): resource = { 'jobReference': { 'projectId': 'other-project', - 'location': 'US', + 'location': self.LOCATION, 'jobId': job_id, }, 'configuration': { @@ -2197,13 +2478,54 @@ def test_extract_table_w_explicit_project(self): client.extract_table( source, destination, job_id=job_id, project='other-project', - location='US') + location=self.LOCATION) # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( method='POST', path='/projects/other-project/jobs', - data=resource) + data=resource, + ) + + def test_extract_table_w_client_location(self): + job_id = 'job_id' + source_id = 'source_table' + destination = 'gs://bucket_name/object_name' + resource = { + 'jobReference': { + 'projectId': 'other-project', + 'location': self.LOCATION, + 'jobId': job_id, + }, + 'configuration': { + 'extract': { + 'sourceTable': { + 'projectId': self.PROJECT, + 'datasetId': self.DS_ID, + 'tableId': source_id, + }, + 'destinationUris': [destination], + }, + }, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, credentials=creds, _http=http, + location=self.LOCATION) + conn = client._connection = _make_connection(resource) + dataset = client.dataset(self.DS_ID) + source = dataset.table(source_id) + + client.extract_table( + source, destination, job_id=job_id, project='other-project') + + # Check that extract_table actually starts the job. + conn.api_request.assert_called_once_with( + method='POST', + path='/projects/other-project/jobs', + data=resource, + ) def test_extract_table_generated_job_id(self): from google.cloud.bigquery.job import ExtractJob @@ -2357,7 +2679,7 @@ def test_query_w_explicit_project(self): resource = { 'jobReference': { 'projectId': 'other-project', - 'location': 'US', + 'location': self.LOCATION, 'jobId': job_id, }, 'configuration': { @@ -2374,13 +2696,48 @@ def test_query_w_explicit_project(self): conn = client._connection = _make_connection(resource) client.query( - query, job_id=job_id, project='other-project', location='US') + query, job_id=job_id, project='other-project', + location=self.LOCATION) # Check that query actually starts the job. conn.api_request.assert_called_once_with( method='POST', path='/projects/other-project/jobs', - data=resource) + data=resource, + ) + + def test_query_w_client_location(self): + job_id = 'some-job-id' + query = 'select count(*) from persons' + resource = { + 'jobReference': { + 'projectId': 'other-project', + 'location': self.LOCATION, + 'jobId': job_id, + }, + 'configuration': { + 'query': { + 'query': query, + 'useLegacySql': False, + }, + }, + } + creds = _make_credentials() + http = object() + client = self._make_one( + project=self.PROJECT, credentials=creds, _http=http, + location=self.LOCATION) + conn = client._connection = _make_connection(resource) + + client.query( + query, job_id=job_id, project='other-project') + + # Check that query actually starts the job. + conn.api_request.assert_called_once_with( + method='POST', + path='/projects/other-project/jobs', + data=resource, + ) def test_query_w_udf_resources(self): from google.cloud.bigquery.job import QueryJob @@ -3300,14 +3657,16 @@ class TestClientUpload(object): TABLE_REF = DatasetReference( 'project_id', 'test_dataset').table('test_table') + LOCATION = 'us-central' + @staticmethod - def _make_client(transport=None): + def _make_client(transport=None, location=None): from google.cloud.bigquery import _http from google.cloud.bigquery import client cl = client.Client(project='project_id', credentials=_make_credentials(), - _http=transport) + _http=transport, location=location) cl._connection = mock.create_autospec(_http.Connection, instance=True) return cl @@ -3393,11 +3752,33 @@ def test_load_table_from_file_w_explicit_project(self): with do_upload_patch as do_upload: client.load_table_from_file( file_obj, self.TABLE_REF, job_id='job_id', - project='other-project', location='US', + project='other-project', location=self.LOCATION, job_config=self._make_config()) expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) - expected_resource['jobReference']['location'] = 'US' + expected_resource['jobReference']['location'] = self.LOCATION + expected_resource['jobReference']['projectId'] = 'other-project' + do_upload.assert_called_once_with( + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES) + + def test_load_table_from_file_w_client_location(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client(location=self.LOCATION) + file_obj = self._make_file_obj() + + do_upload_patch = self._make_do_upload_patch( + client, '_do_resumable_upload', self.EXPECTED_CONFIGURATION) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, self.TABLE_REF, job_id='job_id', + project='other-project', + job_config=self._make_config()) + + expected_resource = copy.deepcopy(self.EXPECTED_CONFIGURATION) + expected_resource['jobReference']['location'] = self.LOCATION expected_resource['jobReference']['projectId'] = 'other-project' do_upload.assert_called_once_with( file_obj, @@ -3573,6 +3954,43 @@ def test_load_table_from_dataframe(self): sent_config = load_table_from_file.mock_calls[0][2]['job_config'] assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, 'Requires `pandas`') + @unittest.skipIf(pyarrow is None, 'Requires `pyarrow`') + def test_load_table_from_dataframe_w_client_location(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client(location=self.LOCATION) + records = [ + {'name': 'Monty', 'age': 100}, + {'name': 'Python', 'age': 60}, + ] + dataframe = pandas.DataFrame(records) + + load_patch = mock.patch( + 'google.cloud.bigquery.client.Client.load_table_from_file', + autospec=True) + with load_patch as load_table_from_file: + client.load_table_from_dataframe(dataframe, self.TABLE_REF) + + load_table_from_file.assert_called_once_with( + client, mock.ANY, self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, job_id=None, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) + + sent_file = load_table_from_file.mock_calls[0][1][1] + sent_bytes = sent_file.getvalue() + assert isinstance(sent_bytes, bytes) + assert len(sent_bytes) > 0 + + sent_config = load_table_from_file.mock_calls[0][2]['job_config'] + assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, 'Requires `pandas`') @unittest.skipIf(pyarrow is None, 'Requires `pyarrow`') def test_load_table_from_dataframe_w_custom_job_config(self): @@ -3592,12 +4010,20 @@ def test_load_table_from_dataframe_w_custom_job_config(self): autospec=True) with load_patch as load_table_from_file: client.load_table_from_dataframe( - dataframe, self.TABLE_REF, job_config=job_config) + dataframe, self.TABLE_REF, + job_config=job_config, + location=self.LOCATION) load_table_from_file.assert_called_once_with( - client, mock.ANY, self.TABLE_REF, num_retries=_DEFAULT_NUM_RETRIES, - rewind=True, job_id=None, job_id_prefix=None, location=None, - project=None, job_config=mock.ANY) + client, mock.ANY, self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + job_id=None, + job_id_prefix=None, + location=self.LOCATION, + project=None, + job_config=mock.ANY, + ) sent_config = load_table_from_file.mock_calls[0][2]['job_config'] assert sent_config is job_config