-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial read_gbq
support
#4
Conversation
Thank you @jrbourbeau for the tweaks, I took this for a spin and things work. I read a copy of the table ddf = read_gbq(
project_id="dask-bigquery",
dataset_id="covid19_public_forecasts",
table_id="county_14d",
) %%time
ddf.compute()
I also experimented with partitions = ["Teton", "Loudoun"]
ddf = read_gbq(
project_id="dask-bigquery",
dataset_id="covid19_public_forecasts",
table_id="county_14d",
partition_field="county_name",
partitions=partitions,
fields=[],
) %%time
ddf.compute()
Which has only two partitions (one per
and results in this dataframe when computed. Which has as index the entries passed in One last thing I tested was using |
dask_bigquery/core.py
Outdated
""" | ||
|
||
bq_storage_client = None | ||
bq_client = bigquery.Client(project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting with version 2.11.0 of bigquery
we can use bigquery.Client
as a context manager. Given that there have been many bigquery
releases since then (the latest release is 2.26.0) I think it's safe to use 2.11.0
as a minimum support version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, now we do have a context manager for the bigquery client but not for the bigquery_storage_client. Would you suggest using at least the one we have? Something like
bq_storage_client = None
with bigquery.Client(project_id) as bq_client:
try:
if with_storage_api:
bq_storage_client = bigquery_storage.BigQueryReadClient(
credentials=bq_client._credentials
)
yield bq_client, bq_storage_client
else:
yield bq_client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's the right direction. Though since bq_client.close()
is all that was being called in the finally
block before, we can remove the try
/finally
blocks since bq_client.close()
will be called when we exit the scope of the bq_client
context manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we have to include the closing of the storage client, maybe we should keep the try/finally but on the finally have the bigquery_storage.transport.channel.close()
dask_bigquery/core.py
Outdated
else: | ||
yield bq_client | ||
finally: | ||
bq_client.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need to close bq_storage_client
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand the bq storage client doesn't have a .close()
method. But there is a work around explained in this comment googleapis/gapic-generator-python#575 (comment) , and there are some active discussions about this here googleapis/gapic-generator-python#987
Probably we can now get away by doing bqs.transport.channel.close()
as recommended here dask/dask#3121 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for investigating. What you proposed sounds good
dask_bigquery/core.py
Outdated
|
||
|
||
@contextmanager | ||
def bigquery_client(project_id=None, with_storage_api=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears we only every use this context manager with with_storage_api=True
. If this is the case (I could be missing something), I'll suggest we remove with_storage_api
as an option and just always yield
both a bigquery.Client
and a bigquery_storage.BigQueryReadClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what I see every time we use the created context manager we use it with with_storage_api=True
which enables the use of the storage API. My understanding is that we want to use this based on these two comments dask/dask#3121 (comment) and dask/dask#3121 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I'm suggesting that since we never use with_storage_api=False
, we remove it as an option from our custom bigquery_client
context manager. We can always add it back in the future if needed, but right now it's just an unnecessary keyword argument (since we always call it with with_storage_api=True
)
def make_create_read_session_request(row_filter=""): | ||
return bigquery_storage.types.CreateReadSessionRequest( | ||
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide | ||
parent=f"projects/{project_id}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should projects
be hard coded here?
Nvm, looking at the docstring for CreateReadSessionRequest
, it appears the answer is "yes, projects
should be hardcoded"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could consider using the BigQueryReadClient.common_project_path
class method.
session = bqs_client.create_read_session( | ||
make_create_read_session_request(row_filter=row_filter) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does session
have a close
method, or some other cleanup method, we should call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No as far as I know or see in the source code of the Class https://github.com/googleapis/python-bigquery-storage/blob/caffb629eb102c86a430003bed5095b379e502d5/google/cloud/bigquery_storage_v1/types/storage.py#L37
dask_bigquery/core.py
Outdated
partition_field: to specify filters of form "WHERE {partition_field} = ..." | ||
partitions: all values to select of `partition_field` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifying the index and specific value of the index in this way seems unusual to me (at least compared to other Dask I/O functions). @bnaul I'm wondering if this API will be familiar with Bigquery users, or if these options were useful for a specific use case when this prototype was initially developed?
Co-authored-by: James Bourbeau <[email protected]>
dask_bigquery/core.py
Outdated
] | ||
|
||
|
||
def bigquery_read( | ||
make_create_read_session_request: callable, | ||
project_id: str, | ||
timeout: int, | ||
read_kwargs: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrbourbeau I was going over the docs and realized that this still shows as an int
. Shouldn't it go as a keyword argument at the end? read_kwargs: dict=None
, and I wonder where should the *
go, right before it? Like
def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
timeout: int,
stream_name: str,
*,
read_kwargs: dict=None)
If this is correct I can modify it. and we should probably add a test that this works, although I'm not sure what's the easist to test for these kwargs, any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since bigquery_read
is only ever called internally in read_gbq
I don't think it matters whether or not read_kwargs
is a positional or keyword argument to bigquery_read
. Though you bring up a good point that the type annotation is now incorrect and should be updated to dict
instead of int
read_session=bigquery_storage.types.ReadSession( | ||
data_format=bigquery_storage.types.DataFormat.ARROW, | ||
read_options=bigquery_storage.types.ReadSession.TableReadOptions( | ||
row_restriction=row_filter, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrbourbeau I'm not quite sure if this works if we don't use the default, should we remove this, since we removed the partition field options. We seem to always leave it as ""
Here is some documentation to review https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1beta2/types.html#google.cloud.bigquery_storage_v1beta2.types.ReadSession.TableReadOptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_filter
is quite useful as it will perform the filtering server-side and can avoid a lot of extraneous IO. I don't see any reason to remove it, but probably it should be made into just a more generic TableReadOptions
object like you linked to so that it can be used for column selection as well
one other small note: the doc you linked is for the beta API, v1 was released since our original implementation https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1/types.html#google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions. there are a couple of other references to the beta docs throughout as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @ncclementi! I left a few small comments, but after those are addressed I think this PR is at a good place to merge in as a checkpoint. @bnaul @tswast have left some nice comments that we can spin out into separate issues and address in follow-up PRs.
read_kwargs={"timeout": 1e-12}, | ||
) | ||
|
||
with pytest.raises(Exception, match="504 Deadline Exceeded"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice use of match=
! I'm curious if we can catch a more specific exception here (e.g. ValueError
)? What type of error is raised when a timeout occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the traceback of the error I got, it's an actual Exception, do you think we can write this better?
distributed.worker - WARNING - Compute Failed
Function: subgraph_callable-4f693566-2e95-419b-8783-b36b04b3
args: ('projects/dask-bigquery/locations/us/sessions/CAISDFRjM3NzSmxoZm1BMxoCanEaAmpk/streams/GgJqcRoCamQoAg')
kwargs: {}
Exception: Exception('504 Deadline Exceeded')
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
/var/folders/1y/ydztfpnd11b6qmvbb8_x56jh0000gn/T/ipykernel_51820/2349744101.py in <module>
----> 1 test_kwargs.compute()
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
284 dask.base.compute
285 """
--> 286 (result,) = compute(self, traverse=False, **kwargs)
287 return result
288
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
323 if error[0]:
324 typ, exc, tb = error[0]
--> 325 raise exc.with_traceback(tb)
326 else:
327 return result[0]
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in f()
306 if callback_timeout is not None:
307 future = asyncio.wait_for(future, callback_timeout)
--> 308 result[0] = yield future
309 except Exception:
310 error[0] = sys.exc_info()
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/optimization.py in __call__()
967 if not len(args) == len(self.inkeys):
968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
970
971 def __reduce__(self):
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in _execute_task()
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/Documents/git/dask-bigquery/dask_bigquery/core.py in bigquery_read()
65 pyarrow.py_buffer(session.arrow_schema.serialized_schema)
66 )
---> 67 shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
68 # NOTE: BQ Storage API can return empty streams
69 if len(shards) == 0:
~/Documents/git/dask-bigquery/dask_bigquery/core.py in _stream_to_dfs()
35 schema,
36 ).to_pandas()
---> 37 for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
38 ]
39
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/client.py in read_rows()
122 """
123 gapic_client = super(BigQueryReadClient, self)
--> 124 stream = gapic_client.read_rows(
125 read_stream=name,
126 offset=offset,
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py in read_rows()
596
597 # Send the request.
--> 598 response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
599
600 # Done; return the response.
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__()
143 kwargs["metadata"] = metadata
144
--> 145 return wrapped_func(*args, **kwargs)
146
147
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func()
284 self._initial, self._maximum, multiplier=self._multiplier
285 )
--> 286 return retry_target(
287 target,
288 self._predicate,
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_target()
187 for sleep in sleep_generator:
188 try:
--> 189 return target()
190
191 # pylint: disable=broad-except
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/timeout.py in func_with_timeout()
100 """Wrapped function that adds timeout."""
101 kwargs["timeout"] = self._timeout
--> 102 return func(*args, **kwargs)
103
104 return func_with_timeout
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable()
162 )
163 except grpc.RpcError as exc:
--> 164 six.raise_from(exceptions.from_grpc_error(exc), exc)
165
166 return error_remapped_callable
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/six.py in raise_from()
Exception: 504 Deadline Exceeded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Since an Exception
is being raised, I don't think we can do any better than what you've already got here
Co-authored-by: James Bourbeau <[email protected]>
Co-authored-by: James Bourbeau <[email protected]>
Co-authored-by: James Bourbeau <[email protected]>
Co-authored-by: James Bourbeau <[email protected]>
Co-authored-by: James Bourbeau <[email protected]>
Co-authored-by: James Bourbeau <[email protected]>
See googleapis/google-cloud-python#9457 | ||
and googleapis/gapic-generator-python#575 for reference. | ||
""" | ||
with bigquery.Client(project_id) as bq_client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't have to be this PR, but it would be really helpful if we could attribute these requests to Dask/Dask-BigQuery. #6
dask_bigquery/core.py
Outdated
|
||
|
||
@contextmanager | ||
def bigquery_client(project_id=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that this always returns both BQ and BQS, should it be called bigquery_clients
? 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work here @ncclementi!
Also, thank you @bnaul @tswast for reviewing -- your feedback is much appreciated
Attempt to implement HLG for
read_gbq
whenpartition_field=None
still having problems due to the format of the functionbigquery_arrow_read
but I'm not sure how to solve it.[FIXED] 3070ae3 and b43daf6
Currently, this implementation fails when running the
test_read_gbq
with the following error:I also tried specifying
partition_field=None
when creating the layer, likebut this still gives
cc: @jrbourbeau @bnaul @mrocklin for visibility