Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add create job method #9882

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
Expand All @@ -76,7 +77,6 @@
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
Expand Down Expand Up @@ -1294,9 +1294,77 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(self, job_config, retry=DEFAULT_RETRY):
"""Create a new job.
Arguments:
job_config (dict): configuration job representation returned from the API.

Keyword Arguments:
retry (google.api_core.retry.Retry):
(Optional) How to retry the RPC.

Returns:
Union[ \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]:
A new job instance.
"""

if "load" in job_config:
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
destination = TableReference.from_api_repr(
job_config["load"]["destinationTable"]
)
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
)
elif "copy" in job_config:
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
copy_resource = job_config["copy"]
destination = TableReference.from_api_repr(
copy_resource["destinationTable"]
)
sources = []
source_configs = copy_resource.get("sourceTables")
if source_configs is None:
source_configs = [copy_resource["sourceTable"]]
for source_config in source_configs:
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
return self.copy_table(
sources, destination, job_config=copy_job_config, retry=retry
)
elif "extract" in job_config:
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
source = TableReference.from_api_repr(job_config["extract"]["sourceTable"])
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source, destination_uris, job_config=extract_job_config, retry=retry
)
elif "query" in job_config:
del job_config["query"]["destinationTable"]
query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
job_config
)
query = _get_sub_prop(job_config, ["query", "query"])
return self.query(query, job_config=query_job_config, retry=retry)
else:
raise TypeError("Invalid job configuration received.")

def get_job(
self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None
):

"""Fetch a job for the project associated with this client.

See
Expand Down
108 changes: 108 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2766,6 +2766,114 @@ def test_delete_table_w_not_found_ok_true(self):

conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None)

def _create_job_helper(self, job_config, client_method):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

client._connection = make_connection()
rf1 = mock.Mock()
get_config_patch = mock.patch(
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
)
load_patch = mock.patch(client_method, autospec=True)

with load_patch as client_method, get_config_patch:
client.create_job(job_config=job_config)
client_method.assert_called_once()

def test_create_job_load_config(self):
configuration = {
"load": {
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"sourceUris": ["gs://test_bucket/src_object*"],
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.load_table_from_uri"
)

def test_create_job_copy_config(self):
configuration = {
"copy": {
"sourceTables": [
{
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
}
],
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_copy_config_w_single_source(self):
configuration = {
"copy": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_extract_config(self):
configuration = {
"extract": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationUris": ["gs://test_bucket/dst_object*"],
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.extract_table",
)

def test_create_job_query_config(self):
configuration = {
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.query",
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

with self.assertRaises(TypeError) as exc:
client.create_job(job_config=configuration)

self.assertIn("Invalid job configuration", exc.exception.args[0])

def test_job_from_resource_unknown_type(self):
from google.cloud.bigquery.job import UnknownJob

Expand Down