Skip to content

Commit

Permalink
BigQuery add default location to client (#5678)
Browse files Browse the repository at this point in the history
* Add read-only 'Client.location' property.

Settable via new 'location' argument to ctor.

* Use 'Client.location' as default for 'Client._get_query_results'.

* Use 'Client.location' as default for 'Client.get_job'.

* Use 'Client.location' as default for 'Client.cancel_job'.

* Use 'Client.location' as default for 'Client.load_table_from_uri'.

* Use 'Client.location' as default for 'Client.load_table_from_file'.

* Use 'Client.location' as default for 'Client.load_table_from_dataframe'.

* Use 'Client.location' as default for 'Client.copy_table'.

* Use 'Client.location' as default for 'Client.extract_table'.

* Use 'Client.location' as default for 'Client.query'.

* Use 'Client.location' as default default for 'create_dataset'.

Closes #5148.
  • Loading branch information
tseaver authored Jul 25, 2018
1 parent 2631c7f commit d64fb53
Show file tree
Hide file tree
Showing 2 changed files with 578 additions and 77 deletions.
89 changes: 82 additions & 7 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class Client(ClientWithProject):
current object.
This parameter should be considered private, and could change in
the future.
location str:
(Optional) Default location for jobs / datasets / tables.
Raises:
google.auth.exceptions.DefaultCredentialsError:
Expand All @@ -118,10 +120,17 @@ class Client(ClientWithProject):
'https://www.googleapis.com/auth/cloud-platform')
"""The scopes required for authenticating as a BigQuery consumer."""

def __init__(self, project=None, credentials=None, _http=None):
def __init__(
self, project=None, credentials=None, _http=None, location=None):
super(Client, self).__init__(
project=project, credentials=credentials, _http=_http)
self._connection = Connection(self)
self._location = location

@property
def location(self):
"""Default location for jobs / datasets / tables."""
return self._location

def get_service_account_email(self, project=None):
"""Get the email address of the project's BigQuery service account
Expand Down Expand Up @@ -286,8 +295,14 @@ def create_dataset(self, dataset):
"""
path = '/projects/%s/datasets' % (dataset.project,)

data = dataset.to_api_repr()
if data.get('location') is None and self.location is not None:
data['location'] = self.location

api_response = self._connection.api_request(
method='POST', path=path, data=dataset.to_api_repr())
method='POST', path=path, data=data)

return Dataset.from_api_repr(api_response)

def create_table(self, table):
Expand Down Expand Up @@ -548,6 +563,9 @@ def _get_query_results(
if timeout_ms is not None:
extra_params['timeoutMs'] = timeout_ms

if location is None:
location = self.location

if location is not None:
extra_params['location'] = location

Expand Down Expand Up @@ -613,6 +631,10 @@ def get_job(

if project is None:
project = self.project

if location is None:
location = self.location

if location is not None:
extra_params['location'] = location

Expand Down Expand Up @@ -652,6 +674,10 @@ def cancel_job(

if project is None:
project = self.project

if location is None:
location = self.location

if location is not None:
extra_params['location'] = location

Expand Down Expand Up @@ -737,8 +763,12 @@ def list_jobs(
extra_params=extra_params)

def load_table_from_uri(
self, source_uris, destination, job_id=None, job_id_prefix=None,
location=None, project=None, job_config=None,
self, source_uris, destination,
job_id=None,
job_id_prefix=None,
location=None,
project=None,
job_config=None,
retry=DEFAULT_RETRY):
"""Starts a job for loading data into a table from CloudStorage.
Expand Down Expand Up @@ -773,14 +803,22 @@ def load_table_from_uri(
google.cloud.bigquery.job.LoadJob: A new load job.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)

if isinstance(source_uris, six.string_types):
source_uris = [source_uris]

load_job = job.LoadJob(
job_ref, source_uris, destination, self, job_config)
load_job._begin(retry=retry)

return load_job

def load_table_from_file(
Expand Down Expand Up @@ -831,14 +869,22 @@ def load_table_from_file(
mode.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)
load_job = job.LoadJob(job_ref, None, destination, self, job_config)
job_resource = load_job._build_resource()

if rewind:
file_obj.seek(0, os.SEEK_SET)

_check_mode(file_obj)

try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
Expand All @@ -848,6 +894,7 @@ def load_table_from_file(
file_obj, job_resource, size, num_retries)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)

return self.job_from_resource(response.json())

def load_table_from_dataframe(self, dataframe, destination,
Expand Down Expand Up @@ -901,10 +948,19 @@ def load_table_from_dataframe(self, dataframe, destination,
job_config = job.LoadJobConfig()
job_config.source_format = job.SourceFormat.PARQUET

if location is None:
location = self.location

return self.load_table_from_file(
buffer, destination, num_retries=num_retries, rewind=True,
job_id=job_id, job_id_prefix=job_id_prefix, location=location,
project=project, job_config=job_config)
buffer, destination,
num_retries=num_retries,
rewind=True,
job_id=job_id,
job_id_prefix=job_id_prefix,
location=location,
project=project,
job_config=job_config,
)

def _do_resumable_upload(self, stream, metadata, num_retries):
"""Perform a resumable upload.
Expand Down Expand Up @@ -1050,16 +1106,23 @@ def copy_table(
google.cloud.bigquery.job.CopyJob: A new copy job instance.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)

if not isinstance(sources, collections.Sequence):
sources = [sources]

copy_job = job.CopyJob(
job_ref, sources, destination, client=self,
job_config=job_config)
copy_job._begin(retry=retry)

return copy_job

def extract_table(
Expand Down Expand Up @@ -1103,8 +1166,13 @@ def extract_table(
google.cloud.bigquery.job.ExtractJob: A new extract job instance.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)

if isinstance(destination_uris, six.string_types):
Expand All @@ -1114,6 +1182,7 @@ def extract_table(
job_ref, source, destination_uris, client=self,
job_config=job_config)
extract_job._begin(retry=retry)

return extract_job

def query(
Expand Down Expand Up @@ -1149,12 +1218,18 @@ def query(
google.cloud.bigquery.job.QueryJob: A new query job instance.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
project = self.project

if location is None:
location = self.location

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(
job_ref, query, client=self, job_config=job_config)
query_job._begin(retry=retry)

return query_job

def insert_rows(self, table, rows, selected_fields=None, **kwargs):
Expand Down
Loading

0 comments on commit d64fb53

Please sign in to comment.