From efdf1c653770f7c03c17e31e3c2f279bb685637b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 15 Mar 2021 09:52:04 -0500 Subject: [PATCH] refactor: split pandas system tests to new module (#548) Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Follow-up to https://github.com/googleapis/python-bigquery/pull/448 Towards #366 --- tests/system/conftest.py | 39 ++ tests/system/helpers.py | 94 ++++ tests/system/test_client.py | 953 ++---------------------------------- tests/system/test_pandas.py | 801 ++++++++++++++++++++++++++++++ 4 files changed, 969 insertions(+), 918 deletions(-) create mode 100644 tests/system/conftest.py create mode 100644 tests/system/helpers.py create mode 100644 tests/system/test_pandas.py diff --git a/tests/system/conftest.py b/tests/system/conftest.py new file mode 100644 index 000000000..4b5fcb543 --- /dev/null +++ b/tests/system/conftest.py @@ -0,0 +1,39 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from . import helpers + + +@pytest.fixture(scope="session") +def bigquery_client(): + from google.cloud import bigquery + + return bigquery.Client() + + +@pytest.fixture(scope="session") +def bqstorage_client(bigquery_client): + from google.cloud import bigquery_storage + + return bigquery_storage.BigQueryReadClient(credentials=bigquery_client._credentials) + + +@pytest.fixture +def dataset_id(bigquery_client): + dataset_id = f"bqsystem_{helpers.temp_suffix()}" + bigquery_client.create_dataset(dataset_id) + yield dataset_id + bigquery_client.delete_dataset(dataset_id, delete_contents=True) diff --git a/tests/system/helpers.py b/tests/system/helpers.py new file mode 100644 index 000000000..76e609345 --- /dev/null +++ b/tests/system/helpers.py @@ -0,0 +1,94 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import decimal +import uuid + +import google.api_core.exceptions +import test_utils.retry + +from google.cloud._helpers import UTC + + +_naive = datetime.datetime(2016, 12, 5, 12, 41, 9) +_naive_microseconds = datetime.datetime(2016, 12, 5, 12, 41, 9, 250000) +_stamp = "%s %s" % (_naive.date().isoformat(), _naive.time().isoformat()) +_stamp_microseconds = _stamp + ".250000" +_zoned = _naive.replace(tzinfo=UTC) +_zoned_microseconds = _naive_microseconds.replace(tzinfo=UTC) +_numeric = decimal.Decimal("123456789.123456789") + + +# Examples of most data types to test with query() and DB-API. +STANDARD_SQL_EXAMPLES = [ + ("SELECT 1", 1), + ("SELECT 1.3", 1.3), + ("SELECT TRUE", True), + ('SELECT "ABC"', "ABC"), + ('SELECT CAST("foo" AS BYTES)', b"foo"), + ('SELECT TIMESTAMP "%s"' % (_stamp,), _zoned), + ('SELECT TIMESTAMP "%s"' % (_stamp_microseconds,), _zoned_microseconds,), + ('SELECT DATETIME(TIMESTAMP "%s")' % (_stamp,), _naive), + ('SELECT DATETIME(TIMESTAMP "%s")' % (_stamp_microseconds,), _naive_microseconds,), + ('SELECT DATE(TIMESTAMP "%s")' % (_stamp,), _naive.date()), + ('SELECT TIME(TIMESTAMP "%s")' % (_stamp,), _naive.time()), + ('SELECT NUMERIC "%s"' % (_numeric,), _numeric), + ("SELECT (1, 2)", {"_field_1": 1, "_field_2": 2}), + ( + "SELECT ((1, 2), (3, 4), 5)", + { + "_field_1": {"_field_1": 1, "_field_2": 2}, + "_field_2": {"_field_1": 3, "_field_2": 4}, + "_field_3": 5, + }, + ), + ("SELECT [1, 2, 3]", [1, 2, 3]), + ( + "SELECT ([1, 2], 3, [4, 5])", + {"_field_1": [1, 2], "_field_2": 3, "_field_3": [4, 5]}, + ), + ( + "SELECT [(1, 2, 3), (4, 5, 6)]", + [ + {"_field_1": 1, "_field_2": 2, "_field_3": 3}, + {"_field_1": 4, "_field_2": 5, "_field_3": 6}, + ], + ), + ( + "SELECT [([1, 2, 3], 4), ([5, 6], 7)]", + [{"_field_1": [1, 2, 3], "_field_2": 4}, {"_field_1": [5, 6], "_field_2": 7}], + ), + ("SELECT ARRAY(SELECT STRUCT([1, 2]))", [{"_field_1": [1, 2]}]), + ("SELECT ST_GeogPoint(1, 2)", "POINT(1 2)"), +] + + +def temp_suffix(): + now = datetime.datetime.now() + return f"{now.strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}" + + +def _rate_limit_exceeded(forbidden): + """Predicate: pass only exceptions with 'rateLimitExceeded' as reason.""" + return any(error["reason"] == "rateLimitExceeded" for error in forbidden._errors) + + +# We need to wait to stay within the rate limits. +# The alternative outcome is a 403 Forbidden response from upstream, which +# they return instead of the more appropriate 429. +# See https://cloud.google.com/bigquery/quota-policy +retry_403 = test_utils.retry.RetryErrors( + google.api_core.exceptions.Forbidden, error_predicate=_rate_limit_exceeded, +) diff --git a/tests/system/test_client.py b/tests/system/test_client.py index ed48b0bfe..133f609a6 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -13,7 +13,6 @@ # limitations under the License. import base64 -import collections import concurrent.futures import csv import datetime @@ -29,9 +28,11 @@ import psutil import pytest -import pytz import pkg_resources +from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT +from . import helpers + try: from google.cloud import bigquery_storage except ImportError: # pragma: NO COVER @@ -42,10 +43,6 @@ except ImportError: # pragma: NO COVER fastavro = None -try: - import pandas -except ImportError: # pragma: NO COVER - pandas = None try: import pyarrow import pyarrow.types @@ -56,7 +53,6 @@ from google.api_core.exceptions import BadRequest from google.api_core.exceptions import ClientError from google.api_core.exceptions import Conflict -from google.api_core.exceptions import Forbidden from google.api_core.exceptions import GoogleAPICallError from google.api_core.exceptions import NotFound from google.api_core.exceptions import InternalServerError @@ -65,7 +61,6 @@ from google.api_core.iam import Policy from google.cloud import bigquery from google.cloud import bigquery_v2 -from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.table import Table @@ -121,14 +116,8 @@ (TooManyRequests, InternalServerError, ServiceUnavailable) ) -PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0") PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0") -if pandas: - PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version -else: - PANDAS_INSTALLED_VERSION = None - if pyarrow: PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version else: @@ -154,18 +143,6 @@ def _load_json_schema(filename="schema.json"): return _parse_schema_resource(json.load(schema_file)) -def _rate_limit_exceeded(forbidden): - """Predicate: pass only exceptions with 'rateLimitExceeded' as reason.""" - return any(error["reason"] == "rateLimitExceeded" for error in forbidden._errors) - - -# We need to wait to stay within the rate limits. -# The alternative outcome is a 403 Forbidden response from upstream, which -# they return instead of the more appropriate 429. -# See https://cloud.google.com/bigquery/quota-policy -retry_403 = RetryErrors(Forbidden, error_predicate=_rate_limit_exceeded) - - class Config(object): """Run-time configuration to be modified at set-up. @@ -262,7 +239,7 @@ def test_get_dataset(self): dataset_arg = Dataset(dataset_ref) dataset_arg.friendly_name = "Friendly" dataset_arg.description = "Description" - dataset = retry_403(client.create_dataset)(dataset_arg) + dataset = helpers.retry_403(client.create_dataset)(dataset_arg) self.to_delete.append(dataset) dataset_ref = bigquery.DatasetReference(project, dataset_id) @@ -345,7 +322,7 @@ def test_create_table(self): table_arg = Table(dataset.table(table_id), schema=SCHEMA) self.assertFalse(_table_exists(table_arg)) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -380,7 +357,7 @@ def test_create_table_with_policy(self): table_arg = Table(dataset.table(table_id), schema=schema) self.assertFalse(_table_exists(table_arg)) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -416,7 +393,7 @@ def test_create_table_w_time_partitioning_w_clustering_fields(self): table_arg.time_partitioning = TimePartitioning(field="transaction_time") table_arg.clustering_fields = ["user_email", "store_code"] - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -430,7 +407,7 @@ def test_delete_dataset_with_string(self): dataset_id = _make_dataset_id("delete_table_true_with_string") project = Config.CLIENT.project dataset_ref = bigquery.DatasetReference(project, dataset_id) - retry_403(Config.CLIENT.create_dataset)(Dataset(dataset_ref)) + helpers.retry_403(Config.CLIENT.create_dataset)(Dataset(dataset_ref)) self.assertTrue(_dataset_exists(dataset_ref)) Config.CLIENT.delete_dataset(dataset_id) self.assertFalse(_dataset_exists(dataset_ref)) @@ -439,11 +416,11 @@ def test_delete_dataset_delete_contents_true(self): dataset_id = _make_dataset_id("delete_table_true_with_content") project = Config.CLIENT.project dataset_ref = bigquery.DatasetReference(project, dataset_id) - dataset = retry_403(Config.CLIENT.create_dataset)(Dataset(dataset_ref)) + dataset = helpers.retry_403(Config.CLIENT.create_dataset)(Dataset(dataset_ref)) table_id = "test_table" table_arg = Table(dataset.table(table_id), schema=SCHEMA) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) Config.CLIENT.delete_dataset(dataset, delete_contents=True) self.assertFalse(_table_exists(table)) @@ -455,7 +432,7 @@ def test_delete_dataset_delete_contents_false(self): table_id = "test_table" table_arg = Table(dataset.table(table_id), schema=SCHEMA) - retry_403(Config.CLIENT.create_table)(table_arg) + helpers.retry_403(Config.CLIENT.create_table)(table_arg) with self.assertRaises(exceptions.BadRequest): Config.CLIENT.delete_dataset(dataset) @@ -504,7 +481,7 @@ def test_list_tables(self): ] for table_name in tables_to_create: table = Table(dataset.table(table_name), schema=SCHEMA) - created_table = retry_403(Config.CLIENT.create_table)(table) + created_table = helpers.retry_403(Config.CLIENT.create_table)(table) self.to_delete.insert(0, created_table) # Retrieve the tables. @@ -534,7 +511,7 @@ def test_update_table(self): TABLE_NAME = "test_table" table_arg = Table(dataset.table(TABLE_NAME), schema=SCHEMA) self.assertFalse(_table_exists(table_arg)) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) self.assertIsNone(table.friendly_name) @@ -574,7 +551,7 @@ def test_update_table_schema(self): TABLE_NAME = "test_table" table_arg = Table(dataset.table(TABLE_NAME), schema=SCHEMA) self.assertFalse(_table_exists(table_arg)) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) voter = bigquery.SchemaField("voter", "BOOLEAN", mode="NULLABLE") @@ -674,7 +651,7 @@ def test_insert_rows_then_dump_table(self): ] table_arg = Table(dataset.table(TABLE_ID), schema=schema) self.assertFalse(_table_exists(table_arg)) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -732,413 +709,6 @@ def test_load_table_from_local_avro_file_then_dump_table(self): sorted(row_tuples, key=by_wavelength), sorted(ROWS, key=by_wavelength) ) - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_automatic_schema(self): - """Test that a DataFrame with dtypes that map well to BigQuery types - can be uploaded without specifying a schema. - - https://github.com/googleapis/google-cloud-python/issues/9044 - """ - df_data = collections.OrderedDict( - [ - ("bool_col", pandas.Series([True, False, True], dtype="bool")), - ( - "ts_col", - pandas.Series( - [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ).dt.tz_localize(pytz.utc), - ), - ( - "dt_col", - pandas.Series( - [ - datetime.datetime(2010, 1, 2, 3, 44, 50), - datetime.datetime(2011, 2, 3, 14, 50, 59), - datetime.datetime(2012, 3, 14, 15, 16), - ], - dtype="datetime64[ns]", - ), - ), - ("float32_col", pandas.Series([1.0, 2.0, 3.0], dtype="float32")), - ("float64_col", pandas.Series([4.0, 5.0, 6.0], dtype="float64")), - ("int8_col", pandas.Series([-12, -11, -10], dtype="int8")), - ("int16_col", pandas.Series([-9, -8, -7], dtype="int16")), - ("int32_col", pandas.Series([-6, -5, -4], dtype="int32")), - ("int64_col", pandas.Series([-3, -2, -1], dtype="int64")), - ("uint8_col", pandas.Series([0, 1, 2], dtype="uint8")), - ("uint16_col", pandas.Series([3, 4, 5], dtype="uint16")), - ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), - ] - ) - dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_automatic_schema".format( - Config.CLIENT.project, dataset_id - ) - - load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) - load_job.result() - - table = Config.CLIENT.get_table(table_id) - self.assertEqual( - tuple(table.schema), - ( - bigquery.SchemaField("bool_col", "BOOLEAN"), - bigquery.SchemaField("ts_col", "TIMESTAMP"), - # BigQuery does not support uploading DATETIME values from - # Parquet files. See: - # https://github.com/googleapis/google-cloud-python/issues/9996 - bigquery.SchemaField("dt_col", "TIMESTAMP"), - bigquery.SchemaField("float32_col", "FLOAT"), - bigquery.SchemaField("float64_col", "FLOAT"), - bigquery.SchemaField("int8_col", "INTEGER"), - bigquery.SchemaField("int16_col", "INTEGER"), - bigquery.SchemaField("int32_col", "INTEGER"), - bigquery.SchemaField("int64_col", "INTEGER"), - bigquery.SchemaField("uint8_col", "INTEGER"), - bigquery.SchemaField("uint16_col", "INTEGER"), - bigquery.SchemaField("uint32_col", "INTEGER"), - ), - ) - self.assertEqual(table.num_rows, 3) - - @unittest.skipIf( - pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION, - "Only `pandas version >=1.0.0` is supported", - ) - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_nullable_int64_datatype(self): - """Test that a DataFrame containing column with None-type values and int64 datatype - can be uploaded if a BigQuery schema is specified. - - https://github.com/googleapis/python-bigquery/issues/22 - """ - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format( - Config.CLIENT.project, dataset_id - ) - table_schema = (bigquery.SchemaField("x", "INTEGER", mode="NULLABLE"),) - table = retry_403(Config.CLIENT.create_table)( - Table(table_id, schema=table_schema) - ) - self.to_delete.insert(0, table) - - df_data = collections.OrderedDict( - [("x", pandas.Series([1, 2, None, 4], dtype="Int64"))] - ) - dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) - load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) - load_job.result() - table = Config.CLIENT.get_table(table_id) - self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),)) - self.assertEqual(table.num_rows, 4) - - @unittest.skipIf( - pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION, - "Only `pandas version >=1.0.0` is supported", - ) - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(self): - """Test that a DataFrame containing column with None-type values and int64 datatype - can be uploaded without specifying a schema. - - https://github.com/googleapis/python-bigquery/issues/22 - """ - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format( - Config.CLIENT.project, dataset_id - ) - df_data = collections.OrderedDict( - [("x", pandas.Series([1, 2, None, 4], dtype="Int64"))] - ) - dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) - load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) - load_job.result() - table = Config.CLIENT.get_table(table_id) - self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),)) - self.assertEqual(table.num_rows, 4) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_nulls(self): - """Test that a DataFrame with null columns can be uploaded if a - BigQuery schema is specified. - - See: https://github.com/googleapis/google-cloud-python/issues/7370 - """ - # Schema with all scalar types. - scalars_schema = ( - bigquery.SchemaField("bool_col", "BOOLEAN"), - bigquery.SchemaField("bytes_col", "BYTES"), - bigquery.SchemaField("date_col", "DATE"), - bigquery.SchemaField("dt_col", "DATETIME"), - bigquery.SchemaField("float_col", "FLOAT"), - bigquery.SchemaField("geo_col", "GEOGRAPHY"), - bigquery.SchemaField("int_col", "INTEGER"), - bigquery.SchemaField("num_col", "NUMERIC"), - bigquery.SchemaField("str_col", "STRING"), - bigquery.SchemaField("time_col", "TIME"), - bigquery.SchemaField("ts_col", "TIMESTAMP"), - ) - if _BIGNUMERIC_SUPPORT: - scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),) - - table_schema = scalars_schema + ( - # TODO: Array columns can't be read due to NULLABLE versus REPEATED - # mode mismatch. See: - # https://issuetracker.google.com/133415569#comment3 - # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), - # TODO: Support writing StructArrays to Parquet. See: - # https://jira.apache.org/jira/browse/ARROW-2587 - # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), - ) - num_rows = 100 - nulls = [None] * num_rows - df_data = [ - ("bool_col", nulls), - ("bytes_col", nulls), - ("date_col", nulls), - ("dt_col", nulls), - ("float_col", nulls), - ("geo_col", nulls), - ("int_col", nulls), - ("num_col", nulls), - ("str_col", nulls), - ("time_col", nulls), - ("ts_col", nulls), - ] - if _BIGNUMERIC_SUPPORT: - df_data.append(("bignum_col", nulls)) - df_data = collections.OrderedDict(df_data) - dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( - Config.CLIENT.project, dataset_id - ) - - # Create the table before loading so that schema mismatch errors are - # identified. - table = retry_403(Config.CLIENT.create_table)( - Table(table_id, schema=table_schema) - ) - self.to_delete.insert(0, table) - - job_config = bigquery.LoadJobConfig(schema=table_schema) - load_job = Config.CLIENT.load_table_from_dataframe( - dataframe, table_id, job_config=job_config - ) - load_job.result() - - table = Config.CLIENT.get_table(table) - self.assertEqual(tuple(table.schema), table_schema) - self.assertEqual(table.num_rows, num_rows) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_required(self): - """Test that a DataFrame with required columns can be uploaded if a - BigQuery schema is specified. - - See: https://github.com/googleapis/google-cloud-python/issues/8093 - """ - table_schema = ( - bigquery.SchemaField("name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), - ) - - records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}] - dataframe = pandas.DataFrame(records, columns=["name", "age"]) - job_config = bigquery.LoadJobConfig(schema=table_schema) - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_required".format( - Config.CLIENT.project, dataset_id - ) - - # Create the table before loading so that schema mismatch errors are - # identified. - table = retry_403(Config.CLIENT.create_table)( - Table(table_id, schema=table_schema) - ) - self.to_delete.insert(0, table) - - job_config = bigquery.LoadJobConfig(schema=table_schema) - load_job = Config.CLIENT.load_table_from_dataframe( - dataframe, table_id, job_config=job_config - ) - load_job.result() - - table = Config.CLIENT.get_table(table) - self.assertEqual(tuple(table.schema), table_schema) - self.assertEqual(table.num_rows, 2) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_explicit_schema(self): - # Schema with all scalar types. - # TODO: Uploading DATETIME columns currently fails, thus that field type - # is temporarily removed from the test. - # See: - # https://github.com/googleapis/python-bigquery/issues/61 - # https://issuetracker.google.com/issues/151765076 - scalars_schema = ( - bigquery.SchemaField("bool_col", "BOOLEAN"), - bigquery.SchemaField("bytes_col", "BYTES"), - bigquery.SchemaField("date_col", "DATE"), - # bigquery.SchemaField("dt_col", "DATETIME"), - bigquery.SchemaField("float_col", "FLOAT"), - bigquery.SchemaField("geo_col", "GEOGRAPHY"), - bigquery.SchemaField("int_col", "INTEGER"), - bigquery.SchemaField("num_col", "NUMERIC"), - bigquery.SchemaField("str_col", "STRING"), - bigquery.SchemaField("time_col", "TIME"), - bigquery.SchemaField("ts_col", "TIMESTAMP"), - ) - if _BIGNUMERIC_SUPPORT: - scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),) - - table_schema = scalars_schema + ( - # TODO: Array columns can't be read due to NULLABLE versus REPEATED - # mode mismatch. See: - # https://issuetracker.google.com/133415569#comment3 - # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), - # TODO: Support writing StructArrays to Parquet. See: - # https://jira.apache.org/jira/browse/ARROW-2587 - # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), - ) - - df_data = [ - ("bool_col", [True, None, False]), - ("bytes_col", [b"abc", None, b"def"]), - ("date_col", [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)]), - # ( - # "dt_col", - # [ - # datetime.datetime(1, 1, 1, 0, 0, 0), - # None, - # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), - # ], - # ), - ("float_col", [float("-inf"), float("nan"), float("inf")]), - ( - "geo_col", - [ - "POINT(30 10)", - None, - "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", - ], - ), - ("int_col", [-9223372036854775808, None, 9223372036854775807]), - ( - "num_col", - [ - decimal.Decimal("-99999999999999999999999999999.999999999"), - None, - decimal.Decimal("99999999999999999999999999999.999999999"), - ], - ), - ("str_col", [u"abc", None, u"def"]), - ( - "time_col", - [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], - ), - ( - "ts_col", - [ - datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), - None, - datetime.datetime( - 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc - ), - ], - ), - ] - if _BIGNUMERIC_SUPPORT: - df_data.append( - ( - "bignum_col", - [ - decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), - None, - decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)), - ], - ) - ) - df_data = collections.OrderedDict(df_data) - dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema".format( - Config.CLIENT.project, dataset_id - ) - - job_config = bigquery.LoadJobConfig(schema=table_schema) - load_job = Config.CLIENT.load_table_from_dataframe( - dataframe, table_id, job_config=job_config - ) - load_job.result() - - table = Config.CLIENT.get_table(table_id) - self.assertEqual(tuple(table.schema), table_schema) - self.assertEqual(table.num_rows, 3) - - @unittest.skipIf( - pyarrow is None or PYARROW_INSTALLED_VERSION < PYARROW_MINIMUM_VERSION, - "Only `pyarrow version >=0.17.0` is supported", - ) - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_load_table_from_dataframe_w_struct_datatype(self): - """Test that a DataFrame with struct datatype can be uploaded if a - BigQuery schema is specified. - - https://github.com/googleapis/python-bigquery/issues/21 - """ - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_struct_datatype".format( - Config.CLIENT.project, dataset_id - ) - table_schema = [ - bigquery.SchemaField( - "bar", - "RECORD", - fields=[ - bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), - ], - mode="REQUIRED", - ), - ] - table = retry_403(Config.CLIENT.create_table)( - Table(table_id, schema=table_schema) - ) - self.to_delete.insert(0, table) - - df_data = [{"id": 1, "age": 21}, {"id": 2, "age": 22}, {"id": 2, "age": 23}] - dataframe = pandas.DataFrame(data={"bar": df_data}, columns=["bar"]) - - load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id) - load_job.result() - - table = Config.CLIENT.get_table(table_id) - self.assertEqual(table.schema, table_schema) - self.assertEqual(table.num_rows, 3) - def test_load_table_from_json_basic_use(self): table_schema = ( bigquery.SchemaField("name", "STRING", mode="REQUIRED"), @@ -1160,7 +730,7 @@ def test_load_table_from_json_basic_use(self): # Create the table before loading so that schema mismatch errors are # identified. - table = retry_403(Config.CLIENT.create_table)( + table = helpers.retry_403(Config.CLIENT.create_table)( Table(table_id, schema=table_schema) ) self.to_delete.insert(0, table) @@ -1175,149 +745,6 @@ def test_load_table_from_json_basic_use(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 2) - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): - from google.cloud.bigquery.job import SourceFormat - - table_schema = ( - bigquery.SchemaField("bool_col", "BOOLEAN"), - bigquery.SchemaField("bytes_col", "BYTES"), - bigquery.SchemaField("date_col", "DATE"), - bigquery.SchemaField("dt_col", "DATETIME"), - bigquery.SchemaField("float_col", "FLOAT"), - bigquery.SchemaField("geo_col", "GEOGRAPHY"), - bigquery.SchemaField("int_col", "INTEGER"), - bigquery.SchemaField("num_col", "NUMERIC"), - bigquery.SchemaField("bignum_col", "BIGNUMERIC"), - bigquery.SchemaField("str_col", "STRING"), - bigquery.SchemaField("time_col", "TIME"), - bigquery.SchemaField("ts_col", "TIMESTAMP"), - ) - df_data = collections.OrderedDict( - [ - ("bool_col", [True, None, False]), - ("bytes_col", ["abc", None, "def"]), - ( - "date_col", - [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], - ), - ( - "dt_col", - [ - datetime.datetime(1, 1, 1, 0, 0, 0), - None, - datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), - ], - ), - ("float_col", [float("-inf"), float("nan"), float("inf")]), - ( - "geo_col", - [ - "POINT(30 10)", - None, - "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", - ], - ), - ("int_col", [-9223372036854775808, None, 9223372036854775807]), - ( - "num_col", - [ - decimal.Decimal("-99999999999999999999999999999.999999999"), - None, - decimal.Decimal("99999999999999999999999999999.999999999"), - ], - ), - ( - "bignum_col", - [ - decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), - None, - decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)), - ], - ), - ("str_col", [u"abc", None, u"def"]), - ( - "time_col", - [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], - ), - ( - "ts_col", - [ - datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), - None, - datetime.datetime( - 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc - ), - ], - ), - ] - ) - dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( - Config.CLIENT.project, dataset_id - ) - - job_config = bigquery.LoadJobConfig( - schema=table_schema, source_format=SourceFormat.CSV - ) - load_job = Config.CLIENT.load_table_from_dataframe( - dataframe, table_id, job_config=job_config - ) - load_job.result() - - table = Config.CLIENT.get_table(table_id) - self.assertEqual(tuple(table.schema), table_schema) - self.assertEqual(table.num_rows, 3) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats(self): - from google.cloud.bigquery.job import SourceFormat - - table_schema = (bigquery.SchemaField("float_col", "FLOAT"),) - df_data = collections.OrderedDict( - [ - ( - "float_col", - [ - 0.14285714285714285, - 0.51428571485748, - 0.87128748, - 1.807960649, - 2.0679610649, - 2.4406779661016949, - 3.7148514257, - 3.8571428571428572, - 1.51251252e40, - ], - ), - ] - ) - dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) - - dataset_id = _make_dataset_id("bq_load_test") - self.temp_dataset(dataset_id) - table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( - Config.CLIENT.project, dataset_id - ) - - job_config = bigquery.LoadJobConfig( - schema=table_schema, source_format=SourceFormat.CSV - ) - load_job = Config.CLIENT.load_table_from_dataframe( - dataframe, table_id, job_config=job_config - ) - load_job.result() - - table = Config.CLIENT.get_table(table_id) - rows = self._fetch_single_page(table) - floats = [r.values()[0] for r in rows] - self.assertEqual(tuple(table.schema), table_schema) - self.assertEqual(table.num_rows, 9) - self.assertEqual(floats, df_data["float_col"]) - def test_load_table_from_json_schema_autodetect(self): json_rows = [ {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, @@ -1339,7 +766,7 @@ def test_load_table_from_json_schema_autodetect(self): bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"), ) # create the table before loading so that the column order is predictable - table = retry_403(Config.CLIENT.create_table)( + table = helpers.retry_403(Config.CLIENT.create_table)( Table(table_id, schema=table_schema) ) self.to_delete.insert(0, table) @@ -1374,7 +801,7 @@ def test_load_avro_from_uri_then_dump_table(self): dataset = self.temp_dataset(_make_dataset_id("bq_load_test")) table_arg = dataset.table(table_name) - table = retry_403(Config.CLIENT.create_table)(Table(table_arg)) + table = helpers.retry_403(Config.CLIENT.create_table)(Table(table_arg)) self.to_delete.insert(0, table) config = bigquery.LoadJobConfig() @@ -1405,7 +832,7 @@ def test_load_table_from_uri_then_dump_table(self): dataset = self.temp_dataset(_make_dataset_id("load_gcs_then_dump")) table_arg = Table(dataset.table(TABLE_ID), schema=SCHEMA) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) config = bigquery.LoadJobConfig() @@ -1623,7 +1050,7 @@ def test_get_set_iam_policy(self): table_ref = Table(dataset.table(table_id)) self.assertFalse(_table_exists(table_ref)) - table = retry_403(Config.CLIENT.create_table)(table_ref) + table = helpers.retry_403(Config.CLIENT.create_table)(table_ref) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -1648,7 +1075,7 @@ def test_test_iam_permissions(self): table_ref = Table(dataset.table(table_id)) self.assertFalse(_table_exists(table_ref)) - table = retry_403(Config.CLIENT.create_table)(table_ref) + table = helpers.retry_403(Config.CLIENT.create_table)(table_ref) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) @@ -1672,7 +1099,7 @@ def test_job_cancel(self): dataset = self.temp_dataset(DATASET_ID) table_arg = Table(dataset.table(TABLE_NAME), schema=SCHEMA) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) job = Config.CLIENT.query(QUERY, job_id_prefix=JOB_ID_PREFIX) @@ -1743,75 +1170,12 @@ def test_query_w_legacy_sql_types(self): self.assertEqual(len(rows[0]), 1) self.assertEqual(rows[0][0], example["expected"]) - def _generate_standard_sql_types_examples(self): - naive = datetime.datetime(2016, 12, 5, 12, 41, 9) - naive_microseconds = datetime.datetime(2016, 12, 5, 12, 41, 9, 250000) - stamp = "%s %s" % (naive.date().isoformat(), naive.time().isoformat()) - stamp_microseconds = stamp + ".250000" - zoned = naive.replace(tzinfo=UTC) - zoned_microseconds = naive_microseconds.replace(tzinfo=UTC) - numeric = decimal.Decimal("123456789.123456789") - return [ - {"sql": "SELECT 1", "expected": 1}, - {"sql": "SELECT 1.3", "expected": 1.3}, - {"sql": "SELECT TRUE", "expected": True}, - {"sql": 'SELECT "ABC"', "expected": "ABC"}, - {"sql": 'SELECT CAST("foo" AS BYTES)', "expected": b"foo"}, - {"sql": 'SELECT TIMESTAMP "%s"' % (stamp,), "expected": zoned}, - { - "sql": 'SELECT TIMESTAMP "%s"' % (stamp_microseconds,), - "expected": zoned_microseconds, - }, - {"sql": 'SELECT DATETIME(TIMESTAMP "%s")' % (stamp,), "expected": naive}, - { - "sql": 'SELECT DATETIME(TIMESTAMP "%s")' % (stamp_microseconds,), - "expected": naive_microseconds, - }, - {"sql": 'SELECT DATE(TIMESTAMP "%s")' % (stamp,), "expected": naive.date()}, - {"sql": 'SELECT TIME(TIMESTAMP "%s")' % (stamp,), "expected": naive.time()}, - {"sql": 'SELECT NUMERIC "%s"' % (numeric,), "expected": numeric}, - {"sql": "SELECT (1, 2)", "expected": {"_field_1": 1, "_field_2": 2}}, - { - "sql": "SELECT ((1, 2), (3, 4), 5)", - "expected": { - "_field_1": {"_field_1": 1, "_field_2": 2}, - "_field_2": {"_field_1": 3, "_field_2": 4}, - "_field_3": 5, - }, - }, - {"sql": "SELECT [1, 2, 3]", "expected": [1, 2, 3]}, - { - "sql": "SELECT ([1, 2], 3, [4, 5])", - "expected": {"_field_1": [1, 2], "_field_2": 3, "_field_3": [4, 5]}, - }, - { - "sql": "SELECT [(1, 2, 3), (4, 5, 6)]", - "expected": [ - {"_field_1": 1, "_field_2": 2, "_field_3": 3}, - {"_field_1": 4, "_field_2": 5, "_field_3": 6}, - ], - }, - { - "sql": "SELECT [([1, 2, 3], 4), ([5, 6], 7)]", - "expected": [ - {u"_field_1": [1, 2, 3], u"_field_2": 4}, - {u"_field_1": [5, 6], u"_field_2": 7}, - ], - }, - { - "sql": "SELECT ARRAY(SELECT STRUCT([1, 2]))", - "expected": [{u"_field_1": [1, 2]}], - }, - {"sql": "SELECT ST_GeogPoint(1, 2)", "expected": "POINT(1 2)"}, - ] - def test_query_w_standard_sql_types(self): - examples = self._generate_standard_sql_types_examples() - for example in examples: - rows = list(Config.CLIENT.query(example["sql"])) + for sql, expected in helpers.STANDARD_SQL_EXAMPLES: + rows = list(Config.CLIENT.query(sql)) self.assertEqual(len(rows), 1) self.assertEqual(len(rows[0]), 1) - self.assertEqual(rows[0][0], example["expected"]) + self.assertEqual(rows[0][0], expected) def test_query_w_failed_query(self): from google.api_core.exceptions import BadRequest @@ -1950,13 +1314,12 @@ def test_query_statistics(self): self.assertGreater(len(plan), stages_with_inputs) def test_dbapi_w_standard_sql_types(self): - examples = self._generate_standard_sql_types_examples() - for example in examples: - Config.CURSOR.execute(example["sql"]) + for sql, expected in helpers.STANDARD_SQL_EXAMPLES: + Config.CURSOR.execute(sql) self.assertEqual(Config.CURSOR.rowcount, 1) row = Config.CURSOR.fetchone() self.assertEqual(len(row), 1) - self.assertEqual(row[0], example["expected"]) + self.assertEqual(row[0], expected) row = Config.CURSOR.fetchone() self.assertIsNone(row) @@ -2107,7 +1470,7 @@ def _load_table_for_dml(self, rows, dataset_id, table_id): greeting = bigquery.SchemaField("greeting", "STRING", mode="NULLABLE") table_ref = dataset.table(table_id) table_arg = Table(table_ref, schema=[greeting]) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) with _NamedTemporaryFile() as temp: @@ -2480,152 +1843,6 @@ def test_query_iter(self): row_tuples = [r.values() for r in query_job] self.assertEqual(row_tuples, [(1,)]) - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_query_results_to_dataframe(self): - QUERY = """ - SELECT id, author, time_ts, dead - FROM `bigquery-public-data.hacker_news.comments` - LIMIT 10 - """ - - df = Config.CLIENT.query(QUERY).result().to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 10) # verify the number of rows - column_names = ["id", "author", "time_ts", "dead"] - self.assertEqual(list(df), column_names) # verify the column names - exp_datatypes = { - "id": int, - "author": str, - "time_ts": pandas.Timestamp, - "dead": bool, - } - for index, row in df.iterrows(): - for col in column_names: - # all the schema fields are nullable, so None is acceptable - if not row[col] is None: - self.assertIsInstance(row[col], exp_datatypes[col]) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf( - bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" - ) - def test_query_results_to_dataframe_w_bqstorage(self): - query = """ - SELECT id, author, time_ts, dead - FROM `bigquery-public-data.hacker_news.comments` - LIMIT 10 - """ - - bqstorage_client = bigquery_storage.BigQueryReadClient( - credentials=Config.CLIENT._credentials - ) - - df = Config.CLIENT.query(query).result().to_dataframe(bqstorage_client) - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 10) # verify the number of rows - column_names = ["id", "author", "time_ts", "dead"] - self.assertEqual(list(df), column_names) - exp_datatypes = { - "id": int, - "author": str, - "time_ts": pandas.Timestamp, - "dead": bool, - } - for index, row in df.iterrows(): - for col in column_names: - # all the schema fields are nullable, so None is acceptable - if not row[col] is None: - self.assertIsInstance(row[col], exp_datatypes[col]) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_insert_rows_from_dataframe(self): - SF = bigquery.SchemaField - schema = [ - SF("float_col", "FLOAT", mode="REQUIRED"), - SF("int_col", "INTEGER", mode="REQUIRED"), - SF("bool_col", "BOOLEAN", mode="REQUIRED"), - SF("string_col", "STRING", mode="NULLABLE"), - ] - - dataframe = pandas.DataFrame( - [ - { - "float_col": 1.11, - "bool_col": True, - "string_col": "my string", - "int_col": 10, - }, - { - "float_col": 2.22, - "bool_col": False, - "string_col": "another string", - "int_col": 20, - }, - { - "float_col": 3.33, - "bool_col": False, - "string_col": "another string", - "int_col": 30, - }, - { - "float_col": 4.44, - "bool_col": True, - "string_col": "another string", - "int_col": 40, - }, - { - "float_col": 5.55, - "bool_col": False, - "string_col": "another string", - "int_col": 50, - }, - { - "float_col": 6.66, - "bool_col": True, - # Include a NaN value, because pandas often uses NaN as a - # NULL value indicator. - "string_col": float("NaN"), - "int_col": 60, - }, - ] - ) - - table_id = "test_table" - dataset = self.temp_dataset(_make_dataset_id("issue_7553")) - table_arg = Table(dataset.table(table_id), schema=schema) - table = retry_403(Config.CLIENT.create_table)(table_arg) - self.to_delete.insert(0, table) - - chunk_errors = Config.CLIENT.insert_rows_from_dataframe( - table, dataframe, chunk_size=3 - ) - for errors in chunk_errors: - assert not errors - - # Use query to fetch rows instead of listing directly from the table so - # that we get values from the streaming buffer. - rows = list( - Config.CLIENT.query( - "SELECT * FROM `{}.{}.{}`".format( - table.project, table.dataset_id, table.table_id - ) - ) - ) - - sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) - row_tuples = [r.values() for r in sorted_rows] - expected = [ - tuple(None if col != col else col for col in data_row) - for data_row in dataframe.itertuples(index=False) - ] - - assert len(row_tuples) == len(expected) - - for row, expected_row in zip(row_tuples, expected): - self.assertCountEqual(row, expected_row) # column order does not matter - def test_insert_rows_nested_nested(self): # See #2951 SF = bigquery.SchemaField @@ -2656,7 +1873,7 @@ def test_insert_rows_nested_nested(self): table_id = "test_table" dataset = self.temp_dataset(_make_dataset_id("issue_2951")) table_arg = Table(dataset.table(table_id), schema=schema) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) Config.CLIENT.insert_rows(table, to_insert) @@ -2696,7 +1913,7 @@ def test_insert_rows_nested_nested_dictionary(self): table_id = "test_table" dataset = self.temp_dataset(_make_dataset_id("issue_2951")) table_arg = Table(dataset.table(table_id), schema=schema) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) Config.CLIENT.insert_rows(table, to_insert) @@ -2740,8 +1957,8 @@ def test_create_routine(self): str(routine.reference) ) - routine = retry_403(Config.CLIENT.create_routine)(routine) - query_job = retry_403(Config.CLIENT.query)(query_string) + routine = helpers.retry_403(Config.CLIENT.create_routine)(routine) + query_job = helpers.retry_403(Config.CLIENT.query)(query_string) rows = list(query_job.result()) assert len(rows) == 1 @@ -2752,7 +1969,7 @@ def test_create_table_rows_fetch_nested_schema(self): dataset = self.temp_dataset(_make_dataset_id("create_table_nested_schema")) schema = _load_json_schema() table_arg = Table(dataset.table(table_name), schema=schema) - table = retry_403(Config.CLIENT.create_table)(table_arg) + table = helpers.retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) self.assertTrue(_table_exists(table)) self.assertEqual(table.table_id, table_name) @@ -2872,85 +2089,6 @@ def test_nested_table_to_arrow(self): self.assertTrue(pyarrow.types.is_list(record_col[1].type)) self.assertTrue(pyarrow.types.is_int64(record_col[1].type.value_type)) - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_nested_table_to_dataframe(self): - from google.cloud.bigquery.job import SourceFormat - from google.cloud.bigquery.job import WriteDisposition - - SF = bigquery.SchemaField - schema = [ - SF("string_col", "STRING", mode="NULLABLE"), - SF( - "record_col", - "RECORD", - mode="NULLABLE", - fields=[ - SF("nested_string", "STRING", mode="NULLABLE"), - SF("nested_repeated", "INTEGER", mode="REPEATED"), - SF( - "nested_record", - "RECORD", - mode="NULLABLE", - fields=[SF("nested_nested_string", "STRING", mode="NULLABLE")], - ), - ], - ), - SF("bigfloat_col", "FLOAT", mode="NULLABLE"), - SF("smallfloat_col", "FLOAT", mode="NULLABLE"), - ] - record = { - "nested_string": "another string value", - "nested_repeated": [0, 1, 2], - "nested_record": {"nested_nested_string": "some deep insight"}, - } - to_insert = [ - { - "string_col": "Some value", - "record_col": record, - "bigfloat_col": 3.14, - "smallfloat_col": 2.72, - } - ] - rows = [json.dumps(row) for row in to_insert] - body = io.BytesIO("{}\n".format("\n".join(rows)).encode("ascii")) - table_id = "test_table" - dataset = self.temp_dataset(_make_dataset_id("nested_df")) - table = dataset.table(table_id) - self.to_delete.insert(0, table) - job_config = bigquery.LoadJobConfig() - job_config.write_disposition = WriteDisposition.WRITE_TRUNCATE - job_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON - job_config.schema = schema - # Load a table using a local JSON file from memory. - Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result() - - df = Config.CLIENT.list_rows(table, selected_fields=schema).to_dataframe( - dtypes={"smallfloat_col": "float16"} - ) - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 1) # verify the number of rows - exp_columns = ["string_col", "record_col", "bigfloat_col", "smallfloat_col"] - self.assertEqual(list(df), exp_columns) # verify the column names - row = df.iloc[0] - # verify the row content - self.assertEqual(row["string_col"], "Some value") - expected_keys = tuple(sorted(record.keys())) - row_keys = tuple(sorted(row["record_col"].keys())) - self.assertEqual(row_keys, expected_keys) - # Can't compare numpy arrays, which pyarrow encodes the embedded - # repeated column to, so convert to list. - self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2]) - # verify that nested data can be accessed with indices/keys - self.assertEqual(row["record_col"]["nested_repeated"][0], 0) - self.assertEqual( - row["record_col"]["nested_record"]["nested_nested_string"], - "some deep insight", - ) - # verify dtypes - self.assertEqual(df.dtypes["bigfloat_col"].name, "float64") - self.assertEqual(df.dtypes["smallfloat_col"].name, "float16") - def test_list_rows_empty_table(self): from google.cloud.bigquery.table import RowIterator @@ -2999,34 +2137,13 @@ def test_list_rows_page_size(self): page = next(pages) self.assertEqual(page.num_items, num_last_page) - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf( - bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" - ) - def test_list_rows_max_results_w_bqstorage(self): - table_ref = DatasetReference("bigquery-public-data", "utility_us").table( - "country_code_iso" - ) - bqstorage_client = bigquery_storage.BigQueryReadClient( - credentials=Config.CLIENT._credentials - ) - - row_iterator = Config.CLIENT.list_rows( - table_ref, - selected_fields=[bigquery.SchemaField("country_name", "STRING")], - max_results=100, - ) - dataframe = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) - - self.assertEqual(len(dataframe.index), 100) - def temp_dataset(self, dataset_id, location=None): project = Config.CLIENT.project dataset_ref = bigquery.DatasetReference(project, dataset_id) dataset = Dataset(dataset_ref) if location: dataset.location = location - dataset = retry_403(Config.CLIENT.create_dataset)(dataset) + dataset = helpers.retry_403(Config.CLIENT.create_dataset)(dataset) self.to_delete.append(dataset) return dataset diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py new file mode 100644 index 000000000..1164e36da --- /dev/null +++ b/tests/system/test_pandas.py @@ -0,0 +1,801 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""System tests for pandas connector.""" + +import collections +import datetime +import decimal +import json +import io +import operator + +import pkg_resources +import pytest +import pytz + +from google.cloud import bigquery +from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT +from . import helpers + + +bigquery_storage = pytest.importorskip( + "google.cloud.bigquery_storage", minversion="2.0.0" +) +pandas = pytest.importorskip("pandas", minversion="0.23.0") +pyarrow = pytest.importorskip("pyarrow", minversion="1.0.0") + + +PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version +PANDAS_INT64_VERSION = pkg_resources.parse_version("1.0.0") + + +def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_id): + """Test that a DataFrame with dtypes that map well to BigQuery types + can be uploaded without specifying a schema. + + https://github.com/googleapis/google-cloud-python/issues/9044 + """ + df_data = collections.OrderedDict( + [ + ("bool_col", pandas.Series([True, False, True], dtype="bool")), + ( + "ts_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ).dt.tz_localize(pytz.utc), + ), + ( + "dt_col", + pandas.Series( + [ + datetime.datetime(2010, 1, 2, 3, 44, 50), + datetime.datetime(2011, 2, 3, 14, 50, 59), + datetime.datetime(2012, 3, 14, 15, 16), + ], + dtype="datetime64[ns]", + ), + ), + ("float32_col", pandas.Series([1.0, 2.0, 3.0], dtype="float32")), + ("float64_col", pandas.Series([4.0, 5.0, 6.0], dtype="float64")), + ("int8_col", pandas.Series([-12, -11, -10], dtype="int8")), + ("int16_col", pandas.Series([-9, -8, -7], dtype="int16")), + ("int32_col", pandas.Series([-6, -5, -4], dtype="int32")), + ("int64_col", pandas.Series([-3, -2, -1], dtype="int64")), + ("uint8_col", pandas.Series([0, 1, 2], dtype="uint8")), + ("uint16_col", pandas.Series([3, 4, 5], dtype="uint16")), + ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), + ] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + + table_id = "{}.{}.load_table_from_dataframe_w_automatic_schema".format( + bigquery_client.project, dataset_id + ) + + load_job = bigquery_client.load_table_from_dataframe(dataframe, table_id) + load_job.result() + + table = bigquery_client.get_table(table_id) + assert tuple(table.schema) == ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + # BigQuery does not support uploading DATETIME values from + # Parquet files. See: + # https://github.com/googleapis/google-cloud-python/issues/9996 + bigquery.SchemaField("dt_col", "TIMESTAMP"), + bigquery.SchemaField("float32_col", "FLOAT"), + bigquery.SchemaField("float64_col", "FLOAT"), + bigquery.SchemaField("int8_col", "INTEGER"), + bigquery.SchemaField("int16_col", "INTEGER"), + bigquery.SchemaField("int32_col", "INTEGER"), + bigquery.SchemaField("int64_col", "INTEGER"), + bigquery.SchemaField("uint8_col", "INTEGER"), + bigquery.SchemaField("uint16_col", "INTEGER"), + bigquery.SchemaField("uint32_col", "INTEGER"), + ) + assert table.num_rows == 3 + + +@pytest.mark.skipif( + PANDAS_INSTALLED_VERSION < PANDAS_INT64_VERSION, + reason="Only `pandas version >=1.0.0` is supported", +) +def test_load_table_from_dataframe_w_nullable_int64_datatype( + bigquery_client, dataset_id +): + """Test that a DataFrame containing column with None-type values and int64 datatype + can be uploaded if a BigQuery schema is specified. + + https://github.com/googleapis/python-bigquery/issues/22 + """ + table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format( + bigquery_client.project, dataset_id + ) + table_schema = (bigquery.SchemaField("x", "INTEGER", mode="NULLABLE"),) + table = helpers.retry_403(bigquery_client.create_table)( + bigquery.Table(table_id, schema=table_schema) + ) + + df_data = collections.OrderedDict( + [("x", pandas.Series([1, 2, None, 4], dtype="Int64"))] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_job = bigquery_client.load_table_from_dataframe(dataframe, table_id) + load_job.result() + table = bigquery_client.get_table(table_id) + assert tuple(table.schema) == (bigquery.SchemaField("x", "INTEGER"),) + assert table.num_rows == 4 + + +@pytest.mark.skipif( + PANDAS_INSTALLED_VERSION < PANDAS_INT64_VERSION, + reason="Only `pandas version >=1.0.0` is supported", +) +def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema( + bigquery_client, dataset_id +): + """Test that a DataFrame containing column with None-type values and int64 datatype + can be uploaded without specifying a schema. + + https://github.com/googleapis/python-bigquery/issues/22 + """ + + table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format( + bigquery_client.project, dataset_id + ) + df_data = collections.OrderedDict( + [("x", pandas.Series([1, 2, None, 4], dtype="Int64"))] + ) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + load_job = bigquery_client.load_table_from_dataframe(dataframe, table_id) + load_job.result() + table = bigquery_client.get_table(table_id) + assert tuple(table.schema) == (bigquery.SchemaField("x", "INTEGER"),) + assert table.num_rows == 4 + + +def test_load_table_from_dataframe_w_nulls(bigquery_client, dataset_id): + """Test that a DataFrame with null columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/7370 + """ + # Schema with all scalar types. + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + if _BIGNUMERIC_SUPPORT: + scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),) + + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + num_rows = 100 + nulls = [None] * num_rows + df_data = [ + ("bool_col", nulls), + ("bytes_col", nulls), + ("date_col", nulls), + ("dt_col", nulls), + ("float_col", nulls), + ("geo_col", nulls), + ("int_col", nulls), + ("num_col", nulls), + ("str_col", nulls), + ("time_col", nulls), + ("ts_col", nulls), + ] + if _BIGNUMERIC_SUPPORT: + df_data.append(("bignum_col", nulls)) + df_data = collections.OrderedDict(df_data) + dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) + + table_id = "{}.{}.load_table_from_dataframe_w_nulls".format( + bigquery_client.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = helpers.retry_403(bigquery_client.create_table)( + bigquery.Table(table_id, schema=table_schema) + ) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = bigquery_client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = bigquery_client.get_table(table) + assert tuple(table.schema) == table_schema + assert table.num_rows == num_rows + + +def test_load_table_from_dataframe_w_required(bigquery_client, dataset_id): + """Test that a DataFrame with required columns can be uploaded if a + BigQuery schema is specified. + + See: https://github.com/googleapis/google-cloud-python/issues/8093 + """ + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ) + + records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}] + dataframe = pandas.DataFrame(records, columns=["name", "age"]) + job_config = bigquery.LoadJobConfig(schema=table_schema) + table_id = "{}.{}.load_table_from_dataframe_w_required".format( + bigquery_client.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = helpers.retry_403(bigquery_client.create_table)( + bigquery.Table(table_id, schema=table_schema) + ) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = bigquery_client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = bigquery_client.get_table(table) + assert tuple(table.schema) == table_schema + assert table.num_rows == 2 + + +def test_load_table_from_dataframe_w_explicit_schema(bigquery_client, dataset_id): + # Schema with all scalar types. + # TODO: Uploading DATETIME columns currently fails, thus that field type + # is temporarily removed from the test. + # See: + # https://github.com/googleapis/python-bigquery/issues/61 + # https://issuetracker.google.com/issues/151765076 + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + # bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + if _BIGNUMERIC_SUPPORT: + scalars_schema += (bigquery.SchemaField("bignum_col", "BIGNUMERIC"),) + + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + + df_data = [ + ("bool_col", [True, None, False]), + ("bytes_col", [b"abc", None, b"def"]), + ("date_col", [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)]), + # ( + # "dt_col", + # [ + # datetime.datetime(1, 1, 1, 0, 0, 0), + # None, + # datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + # ], + # ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + ["POINT(30 10)", None, "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ("str_col", ["abc", None, "def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc), + ], + ), + ] + if _BIGNUMERIC_SUPPORT: + df_data.append( + ( + "bignum_col", + [ + decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), + None, + decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)), + ], + ) + ) + df_data = collections.OrderedDict(df_data) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema".format( + bigquery_client.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = bigquery_client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = bigquery_client.get_table(table_id) + assert tuple(table.schema) == table_schema + assert table.num_rows == 3 + + +def test_load_table_from_dataframe_w_struct_datatype(bigquery_client, dataset_id): + """Test that a DataFrame with struct datatype can be uploaded if a + BigQuery schema is specified. + + https://github.com/googleapis/python-bigquery/issues/21 + """ + table_id = "{}.{}.load_table_from_dataframe_w_struct_datatype".format( + bigquery_client.project, dataset_id + ) + table_schema = [ + bigquery.SchemaField( + "bar", + "RECORD", + fields=[ + bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + ], + mode="REQUIRED", + ), + ] + table = helpers.retry_403(bigquery_client.create_table)( + bigquery.Table(table_id, schema=table_schema) + ) + + df_data = [{"id": 1, "age": 21}, {"id": 2, "age": 22}, {"id": 2, "age": 23}] + dataframe = pandas.DataFrame(data={"bar": df_data}, columns=["bar"]) + + load_job = bigquery_client.load_table_from_dataframe(dataframe, table_id) + load_job.result() + + table = bigquery_client.get_table(table_id) + assert table.schema == table_schema + assert table.num_rows == 3 + + +def test_load_table_from_dataframe_w_explicit_schema_source_format_csv( + bigquery_client, dataset_id +): + from google.cloud.bigquery.job import SourceFormat + + table_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("bignum_col", "BIGNUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + df_data = collections.OrderedDict( + [ + ("bool_col", [True, None, False]), + ("bytes_col", ["abc", None, "def"]), + ("date_col", [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],), + ( + "dt_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ( + "bignum_col", + [ + decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), + None, + decimal.Decimal("{d38}.{d38}".format(d38="9" * 38)), + ], + ), + ("str_col", ["abc", None, "def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + bigquery_client.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = bigquery_client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = bigquery_client.get_table(table_id) + assert tuple(table.schema) == table_schema + assert table.num_rows == 3 + + +def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats( + bigquery_client, dataset_id +): + from google.cloud.bigquery.job import SourceFormat + + table_schema = (bigquery.SchemaField("float_col", "FLOAT"),) + df_data = collections.OrderedDict( + [ + ( + "float_col", + [ + 0.14285714285714285, + 0.51428571485748, + 0.87128748, + 1.807960649, + 2.0679610649, + 2.4406779661016949, + 3.7148514257, + 3.8571428571428572, + 1.51251252e40, + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + bigquery_client.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = bigquery_client.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = bigquery_client.get_table(table_id) + rows = bigquery_client.list_rows(table_id) + floats = [r.values()[0] for r in rows] + assert tuple(table.schema) == table_schema + assert table.num_rows == 9 + assert floats == df_data["float_col"] + + +def test_query_results_to_dataframe(bigquery_client): + QUERY = """ + SELECT id, author, time_ts, dead + FROM `bigquery-public-data.hacker_news.comments` + LIMIT 10 + """ + + df = bigquery_client.query(QUERY).result().to_dataframe() + + assert isinstance(df, pandas.DataFrame) + assert len(df) == 10 # verify the number of rows + column_names = ["id", "author", "time_ts", "dead"] + assert list(df) == column_names # verify the column names + exp_datatypes = { + "id": int, + "author": str, + "time_ts": pandas.Timestamp, + "dead": bool, + } + for _, row in df.iterrows(): + for col in column_names: + # all the schema fields are nullable, so None is acceptable + if not row[col] is None: + assert isinstance(row[col], exp_datatypes[col]) + + +def test_query_results_to_dataframe_w_bqstorage(bigquery_client): + query = """ + SELECT id, author, time_ts, dead + FROM `bigquery-public-data.hacker_news.comments` + LIMIT 10 + """ + + bqstorage_client = bigquery_storage.BigQueryReadClient( + credentials=bigquery_client._credentials + ) + + df = bigquery_client.query(query).result().to_dataframe(bqstorage_client) + + assert isinstance(df, pandas.DataFrame) + assert len(df) == 10 # verify the number of rows + column_names = ["id", "author", "time_ts", "dead"] + assert list(df) == column_names + exp_datatypes = { + "id": int, + "author": str, + "time_ts": pandas.Timestamp, + "dead": bool, + } + for index, row in df.iterrows(): + for col in column_names: + # all the schema fields are nullable, so None is acceptable + if not row[col] is None: + assert isinstance(row[col], exp_datatypes[col]) + + +def test_insert_rows_from_dataframe(bigquery_client, dataset_id): + SF = bigquery.SchemaField + schema = [ + SF("float_col", "FLOAT", mode="REQUIRED"), + SF("int_col", "INTEGER", mode="REQUIRED"), + SF("bool_col", "BOOLEAN", mode="REQUIRED"), + SF("string_col", "STRING", mode="NULLABLE"), + ] + + dataframe = pandas.DataFrame( + [ + { + "float_col": 1.11, + "bool_col": True, + "string_col": "my string", + "int_col": 10, + }, + { + "float_col": 2.22, + "bool_col": False, + "string_col": "another string", + "int_col": 20, + }, + { + "float_col": 3.33, + "bool_col": False, + "string_col": "another string", + "int_col": 30, + }, + { + "float_col": 4.44, + "bool_col": True, + "string_col": "another string", + "int_col": 40, + }, + { + "float_col": 5.55, + "bool_col": False, + "string_col": "another string", + "int_col": 50, + }, + { + "float_col": 6.66, + "bool_col": True, + # Include a NaN value, because pandas often uses NaN as a + # NULL value indicator. + "string_col": float("NaN"), + "int_col": 60, + }, + ] + ) + + table_id = f"{bigquery_client.project}.{dataset_id}.test_insert_rows_from_dataframe" + table_arg = bigquery.Table(table_id, schema=schema) + table = helpers.retry_403(bigquery_client.create_table)(table_arg) + + chunk_errors = bigquery_client.insert_rows_from_dataframe( + table, dataframe, chunk_size=3 + ) + for errors in chunk_errors: + assert not errors + + # Use query to fetch rows instead of listing directly from the table so + # that we get values from the streaming buffer. + rows = list( + bigquery_client.query( + "SELECT * FROM `{}.{}.{}`".format( + table.project, table.dataset_id, table.table_id + ) + ) + ) + + sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) + row_tuples = [r.values() for r in sorted_rows] + expected = [ + # Pandas often represents NULL values as NaN. Convert to None for + # easier comparison. + tuple(None if col != col else col for col in data_row) + for data_row in dataframe.itertuples(index=False) + ] + + assert len(row_tuples) == len(expected) + + for row, expected_row in zip(row_tuples, expected): + assert ( + # Use Counter to verify the same number of values in each, because + # column order does not matter. + collections.Counter(row) + == collections.Counter(expected_row) + ) + + +def test_nested_table_to_dataframe(bigquery_client, dataset_id): + from google.cloud.bigquery.job import SourceFormat + from google.cloud.bigquery.job import WriteDisposition + + SF = bigquery.SchemaField + schema = [ + SF("string_col", "STRING", mode="NULLABLE"), + SF( + "record_col", + "RECORD", + mode="NULLABLE", + fields=[ + SF("nested_string", "STRING", mode="NULLABLE"), + SF("nested_repeated", "INTEGER", mode="REPEATED"), + SF( + "nested_record", + "RECORD", + mode="NULLABLE", + fields=[SF("nested_nested_string", "STRING", mode="NULLABLE")], + ), + ], + ), + SF("bigfloat_col", "FLOAT", mode="NULLABLE"), + SF("smallfloat_col", "FLOAT", mode="NULLABLE"), + ] + record = { + "nested_string": "another string value", + "nested_repeated": [0, 1, 2], + "nested_record": {"nested_nested_string": "some deep insight"}, + } + to_insert = [ + { + "string_col": "Some value", + "record_col": record, + "bigfloat_col": 3.14, + "smallfloat_col": 2.72, + } + ] + rows = [json.dumps(row) for row in to_insert] + body = io.BytesIO("{}\n".format("\n".join(rows)).encode("ascii")) + table_id = f"{bigquery_client.project}.{dataset_id}.test_nested_table_to_dataframe" + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = WriteDisposition.WRITE_TRUNCATE + job_config.source_format = SourceFormat.NEWLINE_DELIMITED_JSON + job_config.schema = schema + # Load a table using a local JSON file from memory. + bigquery_client.load_table_from_file(body, table_id, job_config=job_config).result() + + df = bigquery_client.list_rows(table_id, selected_fields=schema).to_dataframe( + dtypes={"smallfloat_col": "float16"} + ) + + assert isinstance(df, pandas.DataFrame) + assert len(df) == 1 # verify the number of rows + exp_columns = ["string_col", "record_col", "bigfloat_col", "smallfloat_col"] + assert list(df) == exp_columns # verify the column names + row = df.iloc[0] + # verify the row content + assert row["string_col"] == "Some value" + expected_keys = tuple(sorted(record.keys())) + row_keys = tuple(sorted(row["record_col"].keys())) + assert row_keys == expected_keys + # Can't compare numpy arrays, which pyarrow encodes the embedded + # repeated column to, so convert to list. + assert list(row["record_col"]["nested_repeated"]) == [0, 1, 2] + # verify that nested data can be accessed with indices/keys + assert row["record_col"]["nested_repeated"][0] == 0 + assert ( + row["record_col"]["nested_record"]["nested_nested_string"] + == "some deep insight" + ) + # verify dtypes + assert df.dtypes["bigfloat_col"].name == "float64" + assert df.dtypes["smallfloat_col"].name == "float16" + + +def test_list_rows_max_results_w_bqstorage(bigquery_client): + table_ref = bigquery.DatasetReference("bigquery-public-data", "utility_us").table( + "country_code_iso" + ) + bqstorage_client = bigquery_storage.BigQueryReadClient( + credentials=bigquery_client._credentials + ) + + row_iterator = bigquery_client.list_rows( + table_ref, + selected_fields=[bigquery.SchemaField("country_name", "STRING")], + max_results=100, + ) + with pytest.warns( + UserWarning, match="Cannot use bqstorage_client if max_results is set" + ): + dataframe = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + assert len(dataframe.index) == 100