Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve flows and groups to ids for elastic queries #614

Merged
merged 5 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/handlers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func createTestFlow(t *testing.T, uuid assets.FlowUUID, tc TestCase) flows.Flow
definition.NewLocalization(),
nodes,
nil,
nil,
)
require.NoError(t, err)

Expand Down
9 changes: 6 additions & 3 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ func (c *Contact) UpdatePreferredURN(ctx context.Context, db Queryer, oa *OrgAss
// FlowContact converts our mailroom contact into a flow contact for use in the engine
func (c *Contact) FlowContact(oa *OrgAssets) (*flows.Contact, error) {
// convert our groups to a list of references
groups := make([]*assets.GroupReference, len(c.groups))
for i, g := range c.groups {
groups[i] = assets.NewGroupReference(g.UUID(), g.Name())
groups := make([]*assets.GroupReference, 0, len(c.groups))
for _, g := range c.groups {
// exclude the db-trigger based status groups for now
if g.Type() == GroupTypeManual || g.Type() == GroupTypeSmart {
groups = append(groups, assets.NewGroupReference(g.UUID(), g.Name()))
}
}

// convert our tickets to flow tickets
Expand Down
32 changes: 24 additions & 8 deletions core/models/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/sirupsen/logrus"
)

// GroupID is our type for group ids
type GroupID int

// GroupStatus is the current status of the passed in group
type GroupStatus string

Expand All @@ -24,16 +27,23 @@ const (
GroupStatusReady = GroupStatus("R")
)

// GroupID is our type for group ids
type GroupID int
// GroupType is the the type of a group
type GroupType string

const (
GroupTypeManual = GroupType("M")
GroupTypeSmart = GroupType("Q")
)

// Group is our mailroom type for contact groups
type Group struct {
g struct {
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
Status GroupStatus `json:"status"`
Type GroupType `json:"group_type"`
}
}

Expand All @@ -49,6 +59,12 @@ func (g *Group) Name() string { return g.g.Name }
// Query returns the query string (if any) for this group
func (g *Group) Query() string { return g.g.Query }

// Status returns the status of this group
func (g *Group) Status() GroupStatus { return g.g.Status }

// Type returns the type of this group
func (g *Group) Type() GroupType { return g.g.Type }

// LoadGroups loads the groups for the passed in org
func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, error) {
start := time.Now()
Expand Down Expand Up @@ -77,9 +93,9 @@ func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, e

const selectGroupsSQL = `
SELECT ROW_TO_JSON(r) FROM (
SELECT id, uuid, name, query
SELECT id, uuid, name, query, status, group_type
FROM contacts_contactgroup
WHERE org_id = $1 AND group_type IN ('M', 'Q') AND is_active = TRUE
WHERE org_id = $1 AND is_active = TRUE
ORDER BY name ASC
) r;`

Expand Down
7 changes: 6 additions & 1 deletion core/models/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ func TestLoadGroups(t *testing.T) {
name string
query string
}{
{testdata.ActiveGroup.ID, testdata.ActiveGroup.UUID, "Active", ""},
{testdata.ArchivedGroup.ID, testdata.ArchivedGroup.UUID, "Archived", ""},
{testdata.BlockedGroup.ID, testdata.BlockedGroup.UUID, "Blocked", ""},
{testdata.DoctorsGroup.ID, testdata.DoctorsGroup.UUID, "Doctors", ""},
{testdata.OpenTicketsGroup.ID, testdata.OpenTicketsGroup.UUID, "Open Tickets", "tickets > 0"},
{testdata.StoppedGroup.ID, testdata.StoppedGroup.UUID, "Stopped", ""},
{testdata.TestersGroup.ID, testdata.TestersGroup.UUID, "Testers", ""},
}

assert.Equal(t, 3, len(groups))
assert.Equal(t, 7, len(groups))

for i, tc := range tcs {
group := groups[i].(*models.Group)
assert.Equal(t, tc.uuid, group.UUID())
Expand Down
36 changes: 21 additions & 15 deletions core/models/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@ import (
"github.com/sirupsen/logrus"
)

// AssetMapper maps resolved assets in queries to how we identify them in ES which in the case
// of flows and groups is their ids. We can do this by just type cracking them to their models.
type AssetMapper struct{}

func (m *AssetMapper) Flow(f assets.Flow) int64 {
return int64(f.(*Flow).ID())
}

func (m *AssetMapper) Group(g assets.Group) int64 {
return int64(g.(*Group).ID())
}

var assetMapper = &AssetMapper{}

// BuildElasticQuery turns the passed in contact ql query into an elastic query
func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
func BuildElasticQuery(oa *OrgAssets, group *Group, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
// filter by org and active contacts
eq := elastic.NewBoolQuery().Must(
elastic.NewTermQuery("org_id", oa.OrgID()),
elastic.NewTermQuery("is_active", true),
)

// our group if present
if group != "" {
eq = eq.Must(elastic.NewTermQuery("groups", group))
if group != nil {
eq = eq.Must(elastic.NewTermQuery("group_ids", group.ID()))
}

// our status is present
Expand All @@ -45,15 +59,15 @@ func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStat

// and by our query if present
if query != nil {
q := es.ToElasticQuery(oa.Env(), query)
q := es.ToElasticQuery(oa.Env(), assetMapper, query)
eq = eq.Must(q)
}

return eq
}

// GetContactIDsForQueryPage returns a page of contact ids for the given query and sort
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group assets.GroupUUID, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group *Group, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
env := oa.Env()
start := time.Now()
var parsed *contactql.ContactQuery
Expand Down Expand Up @@ -97,15 +111,7 @@ func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *
return nil, nil, 0, err
}

logrus.WithFields(logrus.Fields{
"org_id": oa.OrgID(),
"parsed": parsed,
"group_uuid": group,
"query": query,
"elapsed": time.Since(start),
"page_count": len(ids),
"total_count": results.Hits.TotalHits,
}).Debug("paged contact query complete")
logrus.WithFields(logrus.Fields{"org_id": oa.OrgID(), "query": query, "elapsed": time.Since(start), "page_count": len(ids), "total_count": results.Hits.TotalHits}).Debug("paged contact query complete")

return parsed, ids, results.Hits.TotalHits.Value, nil
}
Expand All @@ -126,7 +132,7 @@ func GetContactIDsForQuery(ctx context.Context, client *elastic.Client, oa *OrgA
}

routing := strconv.FormatInt(int64(oa.OrgID()), 10)
eq := BuildElasticQuery(oa, "", ContactStatusActive, nil, parsed)
eq := BuildElasticQuery(oa, nil, ContactStatusActive, nil, parsed)
ids := make([]ContactID, 0, 100)

// if limit provided that can be done with regular search, do that
Expand Down
16 changes: 9 additions & 7 deletions core/models/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -32,7 +31,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
require.NoError(t, err)

tcs := []struct {
Group assets.GroupUUID
Group *testdata.Group
ExcludeIDs []models.ContactID
Query string
Sort string
Expand All @@ -43,7 +42,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedError string
}{
{
Group: testdata.ActiveGroup.UUID,
Group: testdata.ActiveGroup,
Query: "george",
ExpectedESRequest: `{
"_source": false,
Expand All @@ -63,7 +62,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "b97f69f7-5edf-45c7-9fda-d37066eae91d"
"group_ids": 1
}
},
{
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedTotal: 1,
},
{
Group: testdata.BlockedGroup.UUID,
Group: testdata.BlockedGroup,
ExcludeIDs: []models.ContactID{testdata.Bob.ID, testdata.Cathy.ID},
Query: "age > 32",
Sort: "-age",
Expand All @@ -139,7 +138,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "14f6ea01-456b-4417-b0b8-35e942f549f1"
"group_ids": 2
}
},
{
Expand Down Expand Up @@ -229,6 +228,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedTotal: 1,
},
{
Group: testdata.ActiveGroup,
Query: "goats > 2", // no such contact field
ExpectedError: "error parsing query: goats > 2: can't resolve 'goats' to attribute, scheme or field",
},
Expand All @@ -237,7 +237,9 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
for i, tc := range tcs {
es.NextResponse = tc.MockedESResponse

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, tc.Group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)
group := oa.GroupByID(tc.Group.ID)

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)

if tc.ExpectedError != "" {
assert.EqualError(t, err, tc.ExpectedError)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/goflow v0.154.0
github.com/nyaruka/goflow v0.156.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY
github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc=
github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/goflow v0.156.1 h1:bRVNuuMkbbmkKsphyLI9+F57kVGBfFfu2rjLZ+0er/U=
github.com/nyaruka/goflow v0.156.1/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
25 changes: 21 additions & 4 deletions web/contact/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ func init() {
//
// {
// "org_id": 1,
// "group_id": 234,
// "group_uuid": "985a83fe-2e9f-478d-a3ec-fa602d5e7ddd",
// "query": "age > 10",
// "sort": "-age"
// }
//
type searchRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
GroupUUID assets.GroupUUID `json:"group_uuid" validate:"required"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
ExcludeIDs []models.ContactID `json:"exclude_ids"`
Query string `json:"query"`
PageSize int `json:"page_size"`
Expand Down Expand Up @@ -78,9 +80,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

// perform our search
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa,
request.GroupUUID, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa, group, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)

if err != nil {
isQueryError, qerr := contactql.IsQueryError(err)
Expand Down Expand Up @@ -117,13 +125,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
// {
// "org_id": 1,
// "query": "age > 10",
// "group_id": 234,
// "group_uuid": "123123-123-123-"
// }
//
type parseRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
Query string `json:"query" validate:"required"`
ParseOnly bool `json:"parse_only"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
}

Expand Down Expand Up @@ -158,6 +168,13 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

env := oa.Env()
var resolver contactql.Resolver
if !request.ParseOnly {
Expand All @@ -179,7 +196,7 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)

var elasticSource interface{}
if !request.ParseOnly {
eq := models.BuildElasticQuery(oa, request.GroupUUID, models.NilContactStatus, nil, parsed)
eq := models.BuildElasticQuery(oa, group, models.NilContactStatus, nil, parsed)
elasticSource, err = eq.Source()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error getting elastic source")
Expand Down
2 changes: 1 addition & 1 deletion web/contact/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestSearch(t *testing.T) {
},
{
"term": {
"groups": "b97f69f7-5edf-45c7-9fda-d37066eae91d"
"group_ids": 1
}
},
{
Expand Down
Loading