Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: to_gbq loads unit8 columns to BigQuery INT64 instead of STRING #814

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
UNIT_TEST_EXTRAS = [
"bqstorage",
"tqdm",
"geopandas",
]
UNIT_TEST_EXTRAS_BY_PYTHON = {
"3.9": [],
Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# Use a middle version of Python to test when no extras are installed.
"3.9": []
}
extras = ["tqdm"]
extras = ["tqdm", "geopandas"]
templated_files = common.py_library(
unit_test_python_versions=["3.8", "3.9", "3.10", "3.11", "3.12"],
system_test_python_versions=["3.8", "3.9", "3.10", "3.11", "3.12"],
Expand Down
3 changes: 3 additions & 0 deletions pandas_gbq/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (c) 2024 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.
70 changes: 70 additions & 0 deletions pandas_gbq/core/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) 2019 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import itertools

import pandas


def list_columns_and_indexes(dataframe, index=True):
"""Return all index and column names with dtypes.

Returns:
Sequence[Tuple[str, dtype]]:
Returns a sorted list of indexes and column names with
corresponding dtypes. If an index is missing a name or has the
same name as a column, the index is omitted.
"""
column_names = frozenset(dataframe.columns)
columns_and_indexes = []
if index:
if isinstance(dataframe.index, pandas.MultiIndex):
for name in dataframe.index.names:
if name and name not in column_names:
values = dataframe.index.get_level_values(name)
columns_and_indexes.append((name, values.dtype))
else:
if dataframe.index.name and dataframe.index.name not in column_names:
columns_and_indexes.append(
(dataframe.index.name, dataframe.index.dtype)
)

columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
return columns_and_indexes


def first_valid(series):
first_valid_index = series.first_valid_index()
if first_valid_index is not None:
return series.at[first_valid_index]


def first_array_valid(series):
"""Return the first "meaningful" element from the array series.

Here, "meaningful" means the first non-None element in one of the arrays that can
be used for type detextion.
"""
first_valid_index = series.first_valid_index()
if first_valid_index is None:
return None

valid_array = series.at[first_valid_index]
valid_item = next((item for item in valid_array if not pandas.isna(item)), None)

if valid_item is not None:
return valid_item

# Valid item is None because all items in the "valid" array are invalid. Try
# to find a true valid array manually.
for array in itertools.islice(series, first_valid_index + 1, None):
try:
array_iter = iter(array)
except TypeError:
continue # Not an array, apparently, e.g. None, thus skip.
valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
if valid_item is not None:
break

return valid_item
12 changes: 10 additions & 2 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pandas_gbq.features import FEATURES
import pandas_gbq.query
import pandas_gbq.schema
import pandas_gbq.schema.pandas_to_bigquery
import pandas_gbq.timestamp

try:
Expand Down Expand Up @@ -1219,9 +1220,16 @@ def _generate_bq_schema(df, default_type="STRING"):
be overridden: https://github.com/pydata/pandas-gbq/issues/218, this
method can be removed after there is time to migrate away from this
method."""
from pandas_gbq import schema
fields = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should have un-deprecated generate_bq_schema for use in bigframes.

df,
default_type=default_type,
)
fields_json = []

for field in fields:
fields_json.append(field.to_api_repr())

return schema.generate_bq_schema(df, default_type=default_type)
return {"fields": fields_json}


class _Table(GbqConnector):
Expand Down
10 changes: 5 additions & 5 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from pandas_gbq import exceptions
import pandas_gbq.schema
import pandas_gbq.schema.bigquery
import pandas_gbq.schema.pandas_to_bigquery


def encode_chunk(dataframe):
Expand Down Expand Up @@ -214,11 +216,9 @@ def load_csv_from_file(
This method is needed for writing with google-cloud-bigquery versions that
don't implment load_table_from_dataframe with the CSV serialization format.
"""
if schema is None:
schema = pandas_gbq.schema.generate_bq_schema(dataframe)

schema = pandas_gbq.schema.remove_policy_tags(schema)
bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
bq_schema = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
dataframe, schema
)

def load_chunk(chunk, job_config):
try:
Expand Down
31 changes: 0 additions & 31 deletions pandas_gbq/schema.py → pandas_gbq/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,6 @@ def schema_is_subset(schema_remote, schema_local):
return all(field in fields_remote for field in fields_local)


def generate_bq_schema(dataframe, default_type="STRING"):
"""Given a passed dataframe, generate the associated Google BigQuery schema.

Arguments:
dataframe (pandas.DataFrame): D
default_type : string
The default big query type in case the type of the column
does not exist in the schema.
"""

# If you update this mapping, also update the table at
# `docs/source/writing.rst`.
type_mapping = {
"i": "INTEGER",
"b": "BOOLEAN",
"f": "FLOAT",
"O": "STRING",
"S": "STRING",
"U": "STRING",
"M": "TIMESTAMP",
}

fields = []
for column_name, dtype in dataframe.dtypes.items():
fields.append(
{"name": column_name, "type": type_mapping.get(dtype.kind, default_type)}
)

return {"fields": fields}


def update_schema(schema_old, schema_new):
"""
Given an old BigQuery schema, update it with a new one.
Expand Down
44 changes: 44 additions & 0 deletions pandas_gbq/schema/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2019 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import collections

import google.cloud.bigquery


def to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.

Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.

Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]

Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(
field, (google.cloud.bigquery.SchemaField, collections.abc.Mapping)
):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

return [
field
if isinstance(field, google.cloud.bigquery.SchemaField)
else google.cloud.bigquery.SchemaField.from_api_repr(field)
for field in schema
]
Loading