Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No way to observe the AppendRowsResponse row_errors #836

Open
film42 opened this issue Oct 11, 2024 · 2 comments · May be fixed by #838
Open

No way to observe the AppendRowsResponse row_errors #836

film42 opened this issue Oct 11, 2024 · 2 comments · May be fixed by #838
Labels
api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API.

Comments

@film42
Copy link

film42 commented Oct 11, 2024

Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

Please run down the following list and make sure you've tried the usual "quick fixes":

If you are still having issues, please be sure to include as much information as possible:

Environment details

  • OS type and version:
  • Python version: python --version
  • pip version: pip --version
  • google-cloud-bigquery-storage version: pip show google-cloud-bigquery-storage
$ pip show google-cloud-bigquery-storage
Name: google-cloud-bigquery-storage
Version: 2.26.0
Summary: Google Cloud Bigquery Storage API client library
Home-page: https://github.com/googleapis/python-bigquery-storage
Author: Google LLC
Author-email: [email protected]
License: Apache 2.0
Location: /Users/film42/.pyenv/versions/3.10.9/envs/banzai-sept-2/lib/python3.10/site-packages
Requires: google-api-core, google-auth, proto-plus, protobuf
Required-by: 

Steps to reproduce

  1. Use a STRING protobuf field to append a row with a DATETIME bigquery column type using the value '2024-10-11T00:17:35.479490+00:00'.

Please see previous issue #717 where @bhavitsharma ran into the same issue as I did.

Code example

datetime_string = dt.datetime.now(dt.timezone.utc).isoformat()

p = MyProto()
p.created_at = datetime_string

# perform a normal append ...

try:
  future = append_rows_stream.send(request)
  future.result()
except InvalidArgument as e:
  pass
  # How are you supposed to get the AppendRowsResponse object?

Docs here say that errors must be read from the response which includes an index, msg, and error type for each row. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.AppendRowsResponse . But, there is no way to get access to the underlying response because on error this lib capture the code and message and throws away the rest:

future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
)
future.set_exception(exc)
else:
future.set_result(response)

If you crack open the source code of this lib and modify it to print the response.row_errors you get:

print(response.row_errors[499])

index: 499
code: FIELDS_ERROR
message: "Invalid date time value \'2024-10-11T00:17:35.479490+00:00\' on field \'MyProto.created_at\'"

Stack trace

Without access to the response to inspect each row, this is the only thing you see:

400 Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. Entity: projects/my_project/datasets/my_dataset/tables/my_table/streams/my_stream_id

In other words... nothing helpful.

I think it would be great if we could have something like:

try:
  append_rows_stream.send(request).result()
except InvalidArgument as e:
  # Provide access to the response on the InvalidArgument error...
  print(e.response.row_errors[499].message)

OR

try:
  future = append_rows_stream.send(request)
  future.result()
except InvalidArgument as e:
  # Allow this custom AppendRowsFuture object to retain access to the underlying response
  print(future.response().row_errors[499].message)
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API. label Oct 11, 2024
@film42
Copy link
Author

film42 commented Oct 11, 2024

Looks like a simple change is required:

https://github.com/googleapis/python-api-core/blob/8c533819b7e212aa2f1d695a7ce08629f4fb2daf/google/api_core/exceptions.py#L140-L147

The base exception supports a response kwarg so I think we'd only need:

 future: AppendRowsFuture = self._futures_queue.get_nowait() 
 if response.error.code: 
     exc = exceptions.from_grpc_status( 
         response.error.code, response.error.message, response=response # <-- just add response here.
     ) 
     future.set_exception(exc) 
 else: 
     future.set_result(response) 

film42 added a commit to film42/python-bigquery-storage that referenced this issue Oct 11, 2024
Should an AppendRowsRequest fail, you need to inspect the response to
see what went wrong. Currently this lib only raises an exception with
the code and message, throwing the actual response away. This patch adds
the response to any exception raise. This is fine because the base grpc
error has a response kwarg that this lib wasn't using.

Now you can catch the error and call `e.response.row_errors` to see the
underlying row errors.

Fixes: googleapis#836
@film42 film42 linked a pull request Oct 11, 2024 that will close this issue
4 tasks
@film42
Copy link
Author

film42 commented Oct 23, 2024

Btw... for folks looking for a work-around while this is reviewed by google, I'm using this monkey patch at the moment.

from google.protobuf import descriptor_pb2, descriptor_pool, message_factory
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types, writer

from google.api_core.exceptions import InvalidArgument


# HACK: I have to monkey patch AppendRowsStream until the following issue is closed
# and the response is accesssible from an append rows request exception.
#
# GH: https://github.com/googleapis/python-bigquery-storage/issues/836
#
def _monkey_patch_writer_append_rows_stream(w: writer.AppendRowsStream):
    from google.api_core import exceptions
    from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions

    # Take from my patch here: https://github.com/googleapis/python-bigquery-storage/pull/838
    def _on_response(self, response: types.AppendRowsResponse):
        """Process a response from a consumer callback."""
        # If the stream has closed, but somehow we still got a response message
        # back, discard it. The response futures queue has been drained, with
        # an exception reported.
        if self._closed:
            raise bqstorage_exceptions.StreamClosedError(
                f"Stream closed before receiving response: {response}"
            )
        # Since we have 1 response per request, if we get here from a response
        # callback, the queue should never be empty.
        future: writer.AppendRowsFuture = self._futures_queue.get_nowait()
        if response.error.code:
            exc = exceptions.from_grpc_status(
                response.error.code, response.error.message, response=response
            )
            future.set_exception(exc)
        else:
            future.set_result(response)

    w._on_response = _on_response.__get__(w, type(w))



# ...
# Later on while make a new append row stream...

append_rows_stream = writer.AppendRowsStream(write_client, request_template)

# Apply monkey patch to instance.
_monkey_patch_writer_append_rows_stream(append_rows_stream)

@whuffman36 whuffman36 removed their assignment Nov 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/python-bigquery-storage API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants