Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3 pandas connector broken with latest google-cloud-bigquery-storage #1110

Closed
tswast opened this issue Jan 13, 2022 · 2 comments
Closed

v3 pandas connector broken with latest google-cloud-bigquery-storage #1110

tswast opened this issue Jan 13, 2022 · 2 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@tswast
Copy link
Contributor

tswast commented Jan 13, 2022

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477f994ac0>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_ test_to_dataframe_bqstorage_preserve_order[SELECT name, age FROM table ORDER BY other_column;] _

query = 'SELECT name, age FROM table ORDER BY other_column;'
table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    @pytest.mark.parametrize(
        "query",
        (
            "select name, age from table order by other_column;",
            "Select name, age From table Order By other_column;",
            "SELECT name, age FROM table ORDER BY other_column;",
            "select name, age from table order\nby other_column;",
            "Select name, age From table Order\nBy other_column;",
            "SELECT name, age FROM table ORDER\nBY other_column;",
            "SelecT name, age froM table OrdeR \n\t BY other_column;",
        ),
    )
    def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        job_resource = _make_job_resource(
            project_id="test-project", job_type="query", ended=True
        )
        job_resource["configuration"]["query"]["query"] = query
        job_resource["status"] = {"state": "DONE"}
        query_resource = {
            "jobComplete": True,
            "jobReference": {"projectId": "test-project", "jobId": "test-job"},
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
            "totalRows": "4",
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(job_resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477f221f10>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_ test_to_dataframe_bqstorage_preserve_order[select name, age from table order\nby other_column;] _

query = 'select name, age from table order\nby other_column;'
table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    @pytest.mark.parametrize(
        "query",
        (
            "select name, age from table order by other_column;",
            "Select name, age From table Order By other_column;",
            "SELECT name, age FROM table ORDER BY other_column;",
            "select name, age from table order\nby other_column;",
            "Select name, age From table Order\nBy other_column;",
            "SELECT name, age FROM table ORDER\nBY other_column;",
            "SelecT name, age froM table OrdeR \n\t BY other_column;",
        ),
    )
    def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        job_resource = _make_job_resource(
            project_id="test-project", job_type="query", ended=True
        )
        job_resource["configuration"]["query"]["query"] = query
        job_resource["status"] = {"state": "DONE"}
        query_resource = {
            "jobComplete": True,
            "jobReference": {"projectId": "test-project", "jobId": "test-job"},
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
            "totalRows": "4",
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(job_resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f4767ef8e50>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_ test_to_dataframe_bqstorage_preserve_order[Select name, age From table Order\nBy other_column;] _

query = 'Select name, age From table Order\nBy other_column;'
table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    @pytest.mark.parametrize(
        "query",
        (
            "select name, age from table order by other_column;",
            "Select name, age From table Order By other_column;",
            "SELECT name, age FROM table ORDER BY other_column;",
            "select name, age from table order\nby other_column;",
            "Select name, age From table Order\nBy other_column;",
            "SELECT name, age FROM table ORDER\nBY other_column;",
            "SelecT name, age froM table OrdeR \n\t BY other_column;",
        ),
    )
    def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        job_resource = _make_job_resource(
            project_id="test-project", job_type="query", ended=True
        )
        job_resource["configuration"]["query"]["query"] = query
        job_resource["status"] = {"state": "DONE"}
        query_resource = {
            "jobComplete": True,
            "jobReference": {"projectId": "test-project", "jobId": "test-job"},
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
            "totalRows": "4",
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(job_resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477ef80a60>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_ test_to_dataframe_bqstorage_preserve_order[SELECT name, age FROM table ORDER\nBY other_column;] _

query = 'SELECT name, age FROM table ORDER\nBY other_column;'
table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    @pytest.mark.parametrize(
        "query",
        (
            "select name, age from table order by other_column;",
            "Select name, age From table Order By other_column;",
            "SELECT name, age FROM table ORDER BY other_column;",
            "select name, age from table order\nby other_column;",
            "Select name, age From table Order\nBy other_column;",
            "SELECT name, age FROM table ORDER\nBY other_column;",
            "SelecT name, age froM table OrdeR \n\t BY other_column;",
        ),
    )
    def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        job_resource = _make_job_resource(
            project_id="test-project", job_type="query", ended=True
        )
        job_resource["configuration"]["query"]["query"] = query
        job_resource["status"] = {"state": "DONE"}
        query_resource = {
            "jobComplete": True,
            "jobReference": {"projectId": "test-project", "jobId": "test-job"},
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
            "totalRows": "4",
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(job_resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477ef9e070>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_ test_to_dataframe_bqstorage_preserve_order[SelecT name, age froM table OrdeR \n\t BY other_column;] _

query = 'SelecT name, age froM table OrdeR \n\t BY other_column;'
table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    @pytest.mark.parametrize(
        "query",
        (
            "select name, age from table order by other_column;",
            "Select name, age From table Order By other_column;",
            "SELECT name, age FROM table ORDER BY other_column;",
            "select name, age from table order\nby other_column;",
            "Select name, age From table Order\nBy other_column;",
            "SELECT name, age FROM table ORDER\nBY other_column;",
            "SelecT name, age froM table OrdeR \n\t BY other_column;",
        ),
    )
    def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        job_resource = _make_job_resource(
            project_id="test-project", job_type="query", ended=True
        )
        job_resource["configuration"]["query"]["query"] = query
        job_resource["status"] = {"state": "DONE"}
        query_resource = {
            "jobComplete": True,
            "jobReference": {"projectId": "test-project", "jobId": "test-job"},
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
            "totalRows": "4",
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(job_resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:159:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477fb50340>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
_________________________ test_to_dataframe_bqstorage __________________________

table_read_options_kwarg = {'read_options': arrow_serialization_options {
  buffer_compression: LZ4_FRAME
}
}

    def test_to_dataframe_bqstorage(table_read_options_kwarg):
        from google.cloud.bigquery.job import QueryJob as target_class

        resource = _make_job_resource(job_type="query", ended=True)
        query_resource = {
            "jobComplete": True,
            "jobReference": resource["jobReference"],
            "totalRows": "4",
            "schema": {
                "fields": [
                    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
                ]
            },
        }
        stream_id = "projects/1/locations/2/sessions/3/streams/4"
        name_array = pyarrow.array(
            ["John", "Paul", "George", "Ringo"], type=pyarrow.string()
        )
        age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64())
        arrow_schema = pyarrow.schema(
            [
                pyarrow.field("name", pyarrow.string(), True),
                pyarrow.field("age", pyarrow.int64(), True),
            ]
        )
        record_batch = pyarrow.RecordBatch.from_arrays(
            [name_array, age_array], schema=arrow_schema
        )
        connection = make_connection(query_resource)
        client = _make_client(connection=connection)
        job = target_class.from_api_repr(resource, client)
        bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        session = bigquery_storage.types.ReadSession()
        session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        session.streams = [bigquery_storage.types.ReadStream(name=stream_id)]
        bqstorage_client.create_read_session.return_value = session
        bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
        page = bigquery_storage.types.ReadRowsResponse()
        if BQ_STORAGE_VERSIONS.is_read_session_optional:
            page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
        page.arrow_record_batch.serialized_record_batch = (
            record_batch.serialize().to_pybytes()
        )
        bqstorage_base_client.read_rows.return_value = [page]
        reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream(
            [page], bqstorage_base_client, stream_id, 0, {}
        )
        bqstorage_client.read_rows.return_value = reader

>       dataframe = job.to_dataframe(bqstorage_client=bqstorage_client)

tests/unit/job/test_query_pandas.py:557:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
google/cloud/bigquery/job/query.py:1636: in to_dataframe
    return query_result.to_dataframe(
google/cloud/bigquery/table.py:1940: in to_dataframe
    record_batch = self.to_arrow(
google/cloud/bigquery/table.py:1743: in to_arrow
    for record_batch in self.to_arrow_iterable(
google/cloud/bigquery/table.py:1613: in _to_page_iterable
    yield from result_pages
google/cloud/bigquery/_pandas_helpers.py:888: in _download_table_bqstorage
    future.result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:437: in result
    return self.__get_result()
/usr/local/lib/python3.8/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/usr/local/lib/python3.8/concurrent/futures/thread.py:57: in run
    result = self.fn(*self.args, **self.kwargs)
google/cloud/bigquery/_pandas_helpers.py:766: in _download_table_bqstorage_stream
    for page in rowstream.pages:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:338: in pages
    for message in self._reader:
.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:148: in __iter__
    self._reconnect()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.cloud.bigquery_storage_v1.reader.ReadRowsStream object at 0x7f477f935b50>

    def _reconnect(self):
        """Reconnect to the ReadRows stream using the most recent offset."""
        while True:
            try:
>               self._wrapped = self._client.read_rows(
                    read_stream=self._name,
                    offset=self._offset,
                    **self._read_rows_kwargs
                )
E               AttributeError: 'list' object has no attribute 'read_rows'

.nox/prerelease_deps-3-8/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/reader.py:178: AttributeError
=============================== warnings summary ===============================
tests/unit/test__pandas_helpers.py::test_bq_to_arrow_array_w_geography_type_shapely_data
tests/unit/test__pandas_helpers.py::test_dataframe_to_bq_schema_geography
tests/unit/test_table.py::TestRowIterator::test_rowiterator_to_geodataframe_delegation
  /tmpfs/src/github/python-bigquery/.nox/prerelease_deps-3-8/lib/python3.8/site-packages/pandas/core/dtypes/cast.py:122: ShapelyDeprecationWarning: The array interface is deprecated and will no longer work in Shapely 2.0. Convert the '.coords' to a numpy array instead.
    arr = construct_1d_object_array_from_listlike(values)

-- Docs: https://docs.pytest.org/en/stable/warnings.html
=========================== short test summary info ============================
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[select name, age from table order by other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[Select name, age From table Order By other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[SELECT name, age FROM table ORDER BY other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[select name, age from table order\nby other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[Select name, age From table Order\nBy other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[SELECT name, age FROM table ORDER\nBY other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage_preserve_order[SelecT name, age froM table OrdeR \n\t BY other_column;]
FAILED tests/unit/job/test_query_pandas.py::test_to_dataframe_bqstorage - Att...

https://source.cloud.google.com/results/invocations/58c407a7-f2cc-44ac-b640-5b0c36f38c94/targets/cloud-devrel%2Fclient-libraries%2Fpython%2Fgoogleapis%2Fpython-bigquery%2Fpresubmit%2Fprerelease-deps-3.8/log

TBD whether this is just a problem with how we are mocking out the client or if it's a true failure.

@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Jan 13, 2022
@tswast tswast added priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Jan 13, 2022
@tswast tswast self-assigned this Jan 13, 2022
@tswast
Copy link
Contributor Author

tswast commented Jan 13, 2022

Tested locally. All tests pass on the main branch. Failures only happen on the v3 branch. More investigation required.

@tswast tswast changed the title pandas connector broken with latest google-cloud-bigquery-storage v3 pandas connector broken with latest google-cloud-bigquery-storage Jan 13, 2022
@yoshi-automation yoshi-automation added 🚨 This issue needs some love. and removed 🚨 This issue needs some love. labels Jan 18, 2022
@tswast
Copy link
Contributor Author

tswast commented Jan 19, 2022

Fixed by #1113

@tswast tswast closed this as completed Jan 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants