Skip to content

Commit

Permalink
Add support for arc direction to v3 node (#1455)
Browse files Browse the repository at this point in the history
Adds base query for in-arcs

Still TODO for Node:
- SingleProp filtering
- Decorators
- BracketProps
- Filters
- Multiple hops
  • Loading branch information
n-h-diaz authored Nov 14, 2024
1 parent c28010b commit 13c3cdd
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 37 deletions.
14 changes: 12 additions & 2 deletions internal/server/spanner/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

v3 "github.com/datacommonsorg/mixer/internal/proto/v3"
"github.com/datacommonsorg/mixer/internal/server/datasource"
v2 "github.com/datacommonsorg/mixer/internal/server/v2"
)

// SpannerDataSource represents a data source that interacts with Spanner.
Expand All @@ -38,8 +39,17 @@ func (sds *SpannerDataSource) Type() datasource.DataSourceType {

// Node retrieves node data from Spanner.
func (sds *SpannerDataSource) Node(ctx context.Context, req *v3.NodeRequest) (*v3.NodeResponse, error) {
// TODO: Support additional Node functionality (properties, pagination, etc).
edges, err := sds.client.GetNodeEdgesByID(ctx, req.Nodes)
arcs, err := v2.ParseProperty(req.GetProperty())
if err != nil {
return nil, err
}
if len(arcs) == 0 {
return &v3.NodeResponse{}, nil
}
if len(arcs) > 1 {
return nil, fmt.Errorf("multiple arcs in node request")
}
edges, err := sds.client.GetNodeEdgesByID(ctx, req.Nodes, arcs[0])
if err != nil {
return nil, fmt.Errorf("error getting node edges: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/server/spanner/golden/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestNode(t *testing.T) {
goldenFile := "node.json"

req := &v3.NodeRequest{
Nodes: []string{"Aadhaar", "Monthly_Average_RetailPrice_Electricity_Residential"},
Nodes: []string{"Aadhaar", "Monthly_Average_RetailPrice_Electricity_Residential"},
Property: "->*",
}

got, err := ds.Node(ctx, req)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"FireIncidentTypeEnum": [
{
"SubjectID": "FireIncidentTypeEnum",
"Predicate": "typeOf",
"ObjectID": "ComplexFire",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema",
"Name": "ComplexFire",
"Types": [
"FireIncidentTypeEnum"
]
},
{
"SubjectID": "FireIncidentTypeEnum",
"Predicate": "typeOf",
"ObjectID": "PrescribedFire",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema",
"Name": "PrescribedFire",
"Types": [
"EmissionSourceEnum",
"FireCauseEnum",
"FireCauseSpecificEnum",
"FireIncidentTypeEnum"
]
},
{
"SubjectID": "FireIncidentTypeEnum",
"Predicate": "typeOf",
"ObjectID": "Wildfire",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema",
"Name": "Wildfire",
"Types": [
"EmissionSourceEnum",
"FireIncidentTypeEnum"
]
},
{
"SubjectID": "FireIncidentTypeEnum",
"Predicate": "typeOf",
"ObjectID": "WildlandFireUse",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema",
"Name": "WildlandFireUse",
"Types": [
"FireIncidentTypeEnum"
]
}
],
"FoodTypeEnum": [
{
"SubjectID": "FoodTypeEnum",
"Predicate": "typeOf",
"ObjectID": "Meal",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema",
"Name": "Meal",
"Types": [
"FoodTypeEnum"
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@
"ObjectID": "consumingSector",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars",
"Name": "",
"Types": []
"Name": "consumingSector",
"Types": [
"Property"
]
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
Expand All @@ -142,8 +144,10 @@
"ObjectID": "eia/g/ELEC.1012",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars",
"Name": "",
"Types": []
"Name": "Residential",
"Types": [
"StatVarGroup"
]
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
Expand Down
66 changes: 43 additions & 23 deletions internal/server/spanner/golden/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"testing"

v2 "github.com/datacommonsorg/mixer/internal/server/v2"
"github.com/datacommonsorg/mixer/internal/server/v2/shared"
"github.com/datacommonsorg/mixer/test"
"github.com/google/go-cmp/cmp"
Expand All @@ -35,37 +36,56 @@ func TestGetNodeEdgesByID(t *testing.T) {
ctx := context.Background()
_, filename, _, _ := runtime.Caller(0)
goldenDir := path.Join(path.Dir(filename), "query")
goldenFile := "get_node_edges_by_id.json"

ids := []string{"Aadhaar", "Monthly_Average_RetailPrice_Electricity_Residential"}
for _, c := range []struct {
ids []string
arc *v2.Arc
goldenFile string
}{
{
ids: []string{"Aadhaar", "Monthly_Average_RetailPrice_Electricity_Residential"},
arc: &v2.Arc{
Out: true,
SingleProp: "*",
},
goldenFile: "get_node_edges_by_subject_id.json",
},
{
ids: []string{"FireIncidentTypeEnum", "FoodTypeEnum"},
arc: &v2.Arc{
Out: false,
SingleProp: "*",
},
goldenFile: "get_node_edges_by_object_id.json",
},
} {
actual, err := client.GetNodeEdgesByID(ctx, c.ids, c.arc)
if err != nil {
t.Fatalf("GetNodeEdgesByID error (%v): %v", c.goldenFile, err)
}

actual, err := client.GetNodeEdgesByID(ctx, ids)
if err != nil {
t.Fatalf("GetNodeEdgesByID error (%v): %v", goldenFile, err)
}
got, err := test.StructToJSON(actual)
if err != nil {
t.Fatalf("StructToJSON error (%v): %v", c.goldenFile, err)
}

got, err := test.StructToJSON(actual)
if err != nil {
t.Fatalf("StructToJSON error (%v): %v", goldenFile, err)
}
if test.GenerateGolden {
err = test.WriteGolden(got, goldenDir, c.goldenFile)
if err != nil {
t.Fatalf("WriteGolden error (%v): %v", c.goldenFile, err)
}
return
}

if test.GenerateGolden {
err = test.WriteGolden(got, goldenDir, goldenFile)
want, err := test.ReadGolden(goldenDir, c.goldenFile)
if err != nil {
t.Fatalf("WriteGolden error (%v): %v", goldenFile, err)
t.Fatalf("ReadGolden error (%v): %v", c.goldenFile, err)
}
return
}

want, err := test.ReadGolden(goldenDir, goldenFile)
if err != nil {
t.Fatalf("ReadGolden error (%v): %v", goldenFile, err)
}

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("%v payload mismatch (-want +got):\n%s", goldenFile, diff)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("%v payload mismatch (-want +got):\n%s", c.goldenFile, diff)
}
}

}
func TestGetObservations(t *testing.T) {
client := test.NewSpannerClient()
Expand Down
39 changes: 32 additions & 7 deletions internal/server/spanner/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"cloud.google.com/go/spanner"
v2 "github.com/datacommonsorg/mixer/internal/server/v2"
"github.com/datacommonsorg/mixer/internal/server/v2/shared"
"google.golang.org/api/iterator"
)
Expand All @@ -42,6 +43,7 @@ var (
// SQL / GQL statements executed by the SpannerClient
var statements = struct {
getEdgesBySubjectID string
getEdgesByObjectID string
getObsByVariableAndEntity string
getObsByVariableEntityAndDate string
getLatestObsByVariableAndEntity string
Expand All @@ -58,17 +60,29 @@ var statements = struct {
FROM
Edge edge
LEFT JOIN
graph_table( DCGraph match -[e:Edge
GRAPH_TABLE( DCGraph MATCH -[e:Edge
WHERE
e.subject_id IN UNNEST(@ids)
AND e.object_value IS NULL]->(n:Node) return n.subject_id,
AND e.object_value IS NULL]->(n:Node) RETURN n.subject_id,
n.name,
n.types) object
ON
edge.object_id = object.subject_id
WHERE
edge.subject_id IN UNNEST(@ids)
`,
getEdgesByObjectID: `
GRAPH DCGraph MATCH (n:Node)-[e:Edge
WHERE
e.object_id IN UNNEST(@ids)
AND e.subject_id != e.object_id]-> return e.object_id AS subject_id,
e.predicate,
n.subject_id AS object_id,
'' as object_value,
COALESCE(e.provenance, '') AS provenance,
COALESCE(n.name, '') AS name,
COALESCE(n.types, []) AS types
`,
getObsByVariableAndEntity: fmt.Sprintf(`
SELECT %s
FROM StatVarObservation
Expand Down Expand Up @@ -122,16 +136,27 @@ var statements = struct {
getSelectColumns(ObsColumns, "t1.")),
}

// GetNodeEdgesByID retrieves node edges from Spanner given a list of IDs and returns a map.
func (sc *SpannerClient) GetNodeEdgesByID(ctx context.Context, ids []string) (map[string][]*Edge, error) {
// GetNodeEdgesByID retrieves node edges from Spanner given a V3 NodeRequest and returns a map.
func (sc *SpannerClient) GetNodeEdgesByID(ctx context.Context, ids []string, arc *v2.Arc) (map[string][]*Edge, error) {
// TODO: Support additional Node functionality (properties, pagination, etc).
edges := make(map[string][]*Edge)
if len(ids) == 0 {
return edges, nil
}

stmt := spanner.Statement{
SQL: statements.getEdgesBySubjectID,
Params: map[string]interface{}{"ids": ids},
var stmt spanner.Statement

switch arc.Out {
case true:
stmt = spanner.Statement{
SQL: statements.getEdgesBySubjectID,
Params: map[string]interface{}{"ids": ids},
}
case false:
stmt = spanner.Statement{
SQL: statements.getEdgesByObjectID,
Params: map[string]interface{}{"ids": ids},
}
}

err := sc.queryAndCollect(
Expand Down

0 comments on commit 13c3cdd

Please sign in to comment.