-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial read_gbq implementation (WIP) #1
Conversation
dask_bigquery/core.py
Outdated
logging.warning( | ||
"Materializing view in order to read into dask. This may be expensive." | ||
) | ||
query = f"SELECT * FROM `{full_id(table_ref)}`" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is just a shortcut we use
def full_id(table):
return f"{table.project}.{table.dataset_id}.{table.table_id}"
your call whether to inline it or add the helper
also same comment re: this view behavior, not sure whether it'd be safer here to just raise instead of materializing
dask_bigquery/core.py
Outdated
|
||
|
||
@contextmanager | ||
def bigquery_client(project_id="dask-bigquery", with_storage_api=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe link to googleapis/google-cloud-python#9457 and/or googleapis/gapic-generator-python#575 for context (no pun intended)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also re project_id, you probably ought make it default to None
and infer from the global context, don't remember off the top of my head where you grab that from
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe link to googleapis/google-cloud-python#9457 and/or googleapis/gapic-generator-python#575 for context (no pun intended)
Do you mean add them as a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, just bc eventually there will probably be one upstream that could be used but right now there's not
dask_bigquery/core.py
Outdated
|
||
@contextmanager | ||
def bigquery_client(project_id="dask-bigquery", with_storage_api=False): | ||
# Ignore google auth credentials warning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably delete this filterwarnings (although the warning is super annoying 🙃)
if (partition_field is not None) and fields and (partition_field not in fields): | ||
fields = (partition_field, *fields) | ||
|
||
# These read tasks seems to cause deadlocks (or at least long stuck workers out of touch with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this annotate is maybe a bad idea, would be nice to have @jrbourbeau or someone weigh in; note that we observed this behavior with now-fairly old dask and bigquery_storage/pyarrow versions so I have no idea if it's still relevant
dask_bigquery/core.py
Outdated
"Materializing view in order to read into dask. This may be expensive." | ||
) | ||
query = f"SELECT * FROM `{full_id(table_ref)}`" | ||
table_ref, _, _ = execute_query(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnaul what about the execute_query
function, is this also a shortcut?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, it's just a wrapper for bigquery.Client.query()
that saves the result to a temporary table. but it needs a temporary dataset to store things in which not everyone would have configured, so again maybe it's better to just raise for VIEWs instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, if we remove this part I'm assuming it'll raise an error on itself when we have a "VIEW" case, or is there a need to do a custom raise like:
if table_ref.table_type == "VIEW":
raise TypeError('Table type VIEW not supported')
dask_bigquery/core.py
Outdated
"Specified `partition_field` without `partitions`; reading full table." | ||
) | ||
partitions = pd.read_gbq( | ||
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will do a complete scan of the table. Maybe consider using something like return [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shaayohn thanks for the comment, I got a bit confused by the return
do you mean replacing this:
partitions = pd.read_gbq(
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
project_id=project_id,
)[partition_field].tolist()
For
partitions = [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, that's exactly what I meant! Silly copypasta :)
dask_bigquery/core.py
Outdated
"Specified `partition_field` without `partitions`; reading full table." | ||
) | ||
partitions = pd.read_gbq( | ||
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shaayohn thanks for the comment, I got a bit confused by the return
do you mean replacing this:
partitions = pd.read_gbq(
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
project_id=project_id,
)[partition_field].tolist()
For
partitions = [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']
# TODO generalize to ranges (as opposed to discrete values) | ||
|
||
partitions = sorted(partitions) | ||
delayed_kwargs["divisions"] = (*partitions, partitions[-1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bnaul I noticed in the example I run, that this line causes to have the last partition to contain only 1 element, but that element could have fit into the previous to last partition. What is the reason you separate the last partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why it's not working correctly for you but the idea is that you need n+1 divisions for n partitions. seems to work OK here
import dask.dataframe as dd
from dask import delayed
@delayed
def make_df(d):
return pd.DataFrame({"date": d, "x": np.random.random(10)}).set_index("date")
dates = pd.date_range("2020-01-01", "2020-01-08")
ddf = dd.from_delayed([make_df(d) for d in dates], divisions=[*dates, dates[-1]])
ddf
Out[61]:
Dask DataFrame Structure:
x
npartitions=8
2020-01-01 float64
2020-01-02 ...
... ...
2020-01-08 ...
2020-01-08 ...
Dask Name: from-delayed, 16 tasks
ddf.map_partitions(len).compute()
Out[62]:
0 10
1 10
2 10
3 10
4 10
5 10
6 10
7 10
dtype: int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's related to how the data is originally partitioned. For example when I read one of the tables of the covid public data set that I copied on "my_project" I see this
from dask_bigquery import read_gbq
ddf= read_gbq(
project_id="my_project",
dataset_id="covid19_public_forecasts",
table_id="county_14d",)
ddf.map_partitions(len).compute()
Notice the last two partitions...
0 3164
1 3164
2 3164
3 3164
4 3164
5 3164
6 3164
7 3164
8 3164
9 3164
10 3164
11 3164
12 3164
13 3164
14 3164
15 3164
16 3164
17 3164
18 3164
19 3164
20 3164
21 3164
22 3164
23 3164
24 3164
25 3164
26 3164
27 3164
28 3164
29 3164
30 3164
31 3164
32 3164
33 3164
34 3164
35 3164
36 3164
37 3164
38 3164
39 3164
40 3164
41 3163
42 1
dtype: int64
Added some tests, having issues with the linting. It's because of a flake warning, it complaining about this bare except. I wasn't sure what kind of error to put, I just need to pass, but any suggestions are welcomed. dask-bigquery/dask_bigquery/tests/test_core.py Lines 53 to 54 in 7bdd66a
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments!
dask_bigquery/tests/test_core.py
Outdated
try: | ||
# delete data set if exists | ||
bq_client = bigquery.Client() | ||
bq_client.delete_dataset( | ||
dataset="dask-bigquery.dataset_test", | ||
delete_contents=True, | ||
) | ||
bq_client.close() | ||
except: # if data doesn't exisit continue is that Value Error? | ||
pass | ||
# create data | ||
df = push_data() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like the kind of stuff that might be good as a fixture.
@pytest.fixture
def dataset():
bq = bigquery.Client()
...
push_data()
yield dask-bigquery.dataset_test
bq.delete_dataset(...)
def test_read_gbq(client, dataset):
project_id, dataset_id = dataset.split(".")
ddf = read_gbq(dataset_id=dataset_id, project_id=project_id, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look and see if we can use fixtures, but maybe not with the delete part. The purpose of the try/except here is to delete the data set if it exists before pushing the data. If I delete the dataset right after pushing it then I won't be able to read it,
@pytest.fixture
def dataset():
bq = bigquery.Client()
...
push_data()
yield dask-bigquery.dataset_test
bq.delete_dataset(...)
Maybe I'm missing something, but from this snippet I will be yielding the name of location of the dataset and then deleting it, by the time I get to read_gbq
the data would be removed, and there would be nothing to read.
That being said, there might be a way of still using a fixture of some form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test runs during the yield. So this test says "set up a dataset, then give the path of that dataset to the test, let the test do its thing, and then once the test is done clean up the dataset"
If you're concerned that the dataset might exist beforehand then you could add it to the part of the fixture before the yield as well.
|
||
assert ddf.columns.tolist() == ["name", "number", "idx"] | ||
assert len(ddf) == 10 | ||
assert ddf.npartitions == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we verify that the data is actually the same as the data created in push_data
instead? Pushing the fixture idea a little further
@pytest.fixture
def df():
...
@pytest.fixture
def dataset(df):
...
def test_read_gbq(client, dataset, df):
ddf = read_gbq(...)
assert_eq(ddf, df)
Maybe there are sorting things that get in the way (is GBQ ordered?) If so then, as you did before
assert_eq(ddf.set_index("idx"), df.set_index("idx"))
In general we want to use assert_eq if possible. It runs lots of cleanliness checks on the Dask collection, graph, metadata, and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I get some order issues when reading back from gbq, mainly because when I read back the default index goes from 0 to chunksize-1, where chunksize was chosen when I pushed the pandas dataframe. This was part of the reason I had as an extra column "idx".
But thanks for pointing out the assert_eq
I forgot we had that. Although, in order for that line to work I had to do a compute on the dask dataframe. I'm assuming this is because I'm comparing a dask data frame with a pandas one.?
assert_eq(ddf.set_index("idx").compute(), df.set_index("idx"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_eq
should handle comparing Dask and pandas DataFrames. What error do you get without the compute()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get this:
____________________________________________________________________ test_read_gbq _____________________________________________________________________
df = name number idx
0 betty 71 0
1 fred 36 1
2 wilma 75 2
3 betty 13 3
4 ... 4
5 fred 74 5
6 wilma 69 6
7 fred 31 7
8 barney 31 8
9 betty 97 9
dataset = 'dask-bigquery.dataset_test.table_test', client = <Client: 'tcp://127.0.0.1:55212' processes=2 threads=2, memory=32.00 GiB>
def test_read_gbq(df, dataset, client):
"""Test simple read of data pushed to BigQuery using pandas-gbq"""
project_id, dataset_id, table_id = dataset.split(".")
ddf = read_gbq(
project_id=project_id, dataset_id=dataset_id, table_id=table_id
)
assert ddf.columns.tolist() == ["name", "number", "idx"]
assert len(ddf) == 10
assert ddf.npartitions == 2
#breakpoint()
> assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))
test_core.py:67:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../../mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/dataframe/utils.py:541: in assert_eq
assert_sane_keynames(a)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
ddf = Dask DataFrame Structure:
name number
npartitions=2
0 object int64
4 ... ...
9 ... ...
Dask Name: sort_index, 22 tasks
def assert_sane_keynames(ddf):
if not hasattr(ddf, "dask"):
return
for k in ddf.dask.keys():
while isinstance(k, tuple):
k = k[0]
assert isinstance(k, (str, bytes))
assert len(k) < 100
assert " " not in k
> assert k.split("-")[0].isidentifier()
E AssertionError
../../../../../mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/dataframe/utils.py:621: AssertionError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting! Do you have any additional information in the traceback? I'm wondering what k
is? You could try adding --pdb
to the end of your pytest
command, which will drop you into a pdb
session at the point the test raises an error. You can then do pp k
to see what k
is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also xref dask/dask#8061
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting! Do you have any additional information in the traceback? I'm wondering what
k
is? You could try adding--pdb
to the end of yourpytest
command, which will drop you into apdb
session at the point the test raises an error. You can then dopp k
to see whatk
is
There is no additional in the traceback but pp k
returns
'dataset_test.table_test--46e9ff0148164adf1b543e44137043cd'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is coming from the
delayed_kwargs = dict(prefix=f"{dataset_id}.{table_id}-")
line earlier in this function. Ultimately we'll want to move away from delayed
and constructing the Dask graph ourselves, so for now I think it's okay to drop the prefix=
here and use
delayed_kwargs = {}
instead. That should allow you to also drop the compute()
call in assert_eq
timeout=read_timeout, | ||
) | ||
for row_filter in row_filters | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great for now, but at some point we may want to use raw task graphs. They're a bit cleaner in a few ways. Delayed is more designed for user code. If we have the time we prefer to use raw graphs in dev code.
For example, in some cases I wouldn't be surprised if each Delayed task produces a single TaskGroup, rather than having all of the tasks in a single TaskGroup. Sure, this will compute just fine, but other features (like the task group visualization, or coiled telemetry) may be sad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrbourbeau and I gave it a try to use HighLevelGraphs
and we realized that this will require modifying the structure of the function _read_rows_arrow
since as is now, the inputs don't match the required format asked in DataFrameIOLayer
https://github.com/dask/dask/blob/95fb60a31a87c6b94b01ed75ab6533fa04d51f19/dask/layers.py#L1159-L1166
We might want to move this to a separate PR.
dask_bigquery/core.py
Outdated
with bigquery_client(project_id, with_storage_api=True) as ( | ||
bq_client, | ||
bqs_client, | ||
), dask.annotate(priority=1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would definitely prefer to not have this annotation if possible. Data generation tasks should be *de-*prioritized if anything
dask_bigquery/core.py
Outdated
|
||
|
||
@dask.delayed | ||
def _read_rows_arrow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use delayed (see comment below) then this name will be the name that shows up in the task stream, progress bars, etc.. We may want to make it more clearly GBQ related, like bigquery_read
dask_bigquery/tests/test_core.py
Outdated
try: | ||
# delete data set if exists | ||
bq_client = bigquery.Client() | ||
bq_client.delete_dataset( | ||
dataset="dask-bigquery.dataset_test", | ||
delete_contents=True, | ||
) | ||
bq_client.close() | ||
except: # if data doesn't exisit continue is that Value Error? | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an error is raised when we call delete_dataset
, then bq_client.close()
won't ever be called. Let's put bq_client.close()
in a finally:
block to ensure it's called, even if an exception is raised. Something like:
try:
# delete data set if exists
bq_client = bigquery.Client()
bq_client.delete_dataset(
dataset="dask-bigquery.dataset_test",
delete_contents=True,
)
except: # if data doesn't exisit continue is that Value Error?
pass
finally:
bq_client.close()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, just to confirm, we can't use bigquery.Client()
as a context manager, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just double-checking this it might exists now. Haven't checked it but will give it a try
googleapis/python-bigquery#540
I had the feeling that this wasn't possible based on the code on the gist, but it seems most of this was drafted before the PR I mentioned was merged. But it seems only the storage API doesn't have the context manager nor a close method.
So just for the test we could use the bigquery context manager. Will give it a try now.
#4 is looking great, should this PR be closed in favor of that one? |
This PR is an implementation of
read_gbq
. Based on @bnaul and this gist and the comments on dask/dask#3121There are a couple of things that I'm not sure where they come from, like where the functions
full_id
andexecute_query
come from.There are currently no tests, Do we have a suggestion on how to go about testing and dealing with credentials?
Currently, this implementation will read only in one partition unless the data on BigQuery is partitioned in which case it reads those number of partitions.
One thing I noticed is that the last partition in some cases have only 1 element. For example, if I read a copy of the public dataset I have on my project, as
I get 43 partitions out of which 41 partitions have 3164 elements, and the last two partitions have 3163 and 1 element respectively.
Any inputs, comment are appreciated.