Skip to content

Commit

Permalink
Fix bug when querying datanode with 'first' filter
Browse files Browse the repository at this point in the history
  • Loading branch information
paulineribeyre committed May 23, 2024
1 parent 9864735 commit a558897
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 14 deletions.
12 changes: 8 additions & 4 deletions bin/setup_psqlgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from psqlgraph import create_all, Node, Edge


def try_drop_test_data(user, database, root_user="postgres", host=""):
def try_drop_test_data(user, password, database, root_user="postgres", host=""):
print("Dropping old test data")

engine = create_engine(
"postgres://{user}@{host}/postgres".format(user=root_user, host=host)
"postgres://{user}:{pwd}@{host}/postgres".format(
user=root_user, pwd=password, host=host
)
)

conn = engine.connect()
Expand Down Expand Up @@ -40,10 +42,12 @@ def setup_database(
print("Setting up test database")

if not no_drop:
try_drop_test_data(user, database, host=host)
try_drop_test_data(user, password, database, host=host)

engine = create_engine(
"postgres://{user}@{host}/postgres".format(user=root_user, host=host)
"postgres://{user}:{pwd}@{host}/postgres".format(
user=root_user, pwd=password, host=host
)
)
conn = engine.connect()
conn.execute("commit")
Expand Down
23 changes: 13 additions & 10 deletions peregrine/resources/submission/graphql/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def query_with_args(classes, args, info):
of_types = [
psqlgraph.Node.get_subclass(label) for label in set(args.get("of_type", []))
]
rv = []
all_items = []
for cls in classes:
if not of_types or cls in of_types:
q = get_authorized_query(cls)
Expand All @@ -473,14 +473,10 @@ def query_with_args(classes, args, info):
q.entity()._props["project_id"].astext == args["project_id"]
)

rv.extend(apply_query_args(q, args, info).all())
# apply_arg_limit() applied the limit to individual query results, but we
# are concatenating several query results so we need to apply it again
limit = args.get("first", DEFAULT_LIMIT)
if limit > 0:
return rv[:limit]
else:
return rv
q = apply_query_args(q, args, info)
all_items.extend(q.all())

return all_items


def query_node_with_args(args, info):
Expand Down Expand Up @@ -1065,7 +1061,14 @@ def instantiate_graphene(t):


def resolve_datanode(self, info, **args):
"""The root query for the :class:`DataNode` node interface.
"""
The root query for the :class:`DataNode` node interface.
NOTE: A limitation of `datanode` is that the `first` and `offset` filters are not applied
properly. Example: If there are 4 file nodes, a `datanode` query with limit=10 returns up
to 4*10 = 40 items. `query_with_args()` concatenates the results of all file node queries
_after_ the filters are applied by `apply_query_args()`. Applying limits/offsets at the end
of `query_with_args()` causes inconsistent and incomplete results.
:returns:
A list of graphene object classes.
Expand Down
72 changes: 72 additions & 0 deletions tests/graphql/test_graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import random
import uuid

import pytest
from flask import g
Expand Down Expand Up @@ -1655,6 +1656,77 @@ def test_datanode(graphql_client, client, submitter, pg_driver_clean, cgci_blgsp
assert j1 == j2


def test_datanode_query_all(
graphql_client, client, submitter, pg_driver_clean, cgci_blgsp
):
"""
Regression test for a bug where querying all datanode objects does not return all objects
because "limit" and "offset" are not applied correctly.
Mitigated by datanode returning the first <limit> items for each file node.
"""
post_example_entities_together(client, pg_driver_clean, submitter)
utils.put_entity_from_file(client, "read_group.json", submitter)

# submit 20 SubmittedUnalignedReads and 25 SubmittedAlignedReads records
n_type_1 = 20
n_type_2 = 25
files_type_1 = {}
files_type_2 = {}
for _ in range(n_type_1):
unique_id = str(uuid.uuid4())
files_type_1[unique_id] = models.SubmittedUnalignedReads(
f"sub_id_{unique_id}", project_id="CGCI-BLGSP", object_id=unique_id
)
for _ in range(n_type_2):
unique_id = str(uuid.uuid4())
files_type_2[unique_id] = models.SubmittedAlignedReads(
f"sub_id_{unique_id}", project_id="CGCI-BLGSP", object_id=unique_id
)

with pg_driver_clean.session_scope() as s:
rg = pg_driver_clean.nodes(models.ReadGroup).one()
rg.submitted_unaligned_reads_files = files_type_1.values()
rg.submitted_aligned_reads_files = files_type_2.values()
s.merge(rg)

def check_results(results):
print("Datanode query result:", results)
assert len(results) == n_type_1 + n_type_2

sur_res = [e for e in results if e["type"] == "submitted_unaligned_reads"]
assert len(sur_res) == n_type_1
assert files_type_1.keys() == set((e["object_id"] for e in sur_res))

sar_res = [e for e in results if e["type"] == "submitted_aligned_reads"]
assert len(sar_res) == n_type_2
assert files_type_2.keys() == set((e["object_id"] for e in sar_res))

# query all the `datanode` records using `limit` and `offset`
chunk_size = 10
offset = 0
results = []
while True:
query_txt = "{datanode (first: %s, offset: %s) {object_id type}}" % (
chunk_size,
offset,
)
resp = graphql_client(query_txt).json
data = resp.get("data", {}).get("datanode", [])
if not len(data):
break
# "chunk_size * 2" because datanode returns the first <limit> items for each file node
assert len(data) <= chunk_size * 2
results = results + data
offset += chunk_size
check_results(results)

# query all the `datanode` records using `limit = 0` (query all records at once)
query_txt = "{datanode (first: 0) {object_id type}}"
resp = graphql_client(query_txt).json
results = resp.get("data", {}).get("datanode", [])
check_results(results)


def test_boolean_filter(client, submitter, pg_driver_clean, cgci_blgsp):
post_example_entities_together(client, pg_driver_clean, submitter)

Expand Down

0 comments on commit a558897

Please sign in to comment.