Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌚 Contact creation endpoint #310

Merged
merged 20 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 106 additions & 44 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/utils/dates"
"github.com/nyaruka/goflow/utils/uuids"
"github.com/nyaruka/null"
"github.com/olivere/elastic"
Expand Down Expand Up @@ -592,7 +593,7 @@ func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []u
// create the contacts that are missing
for _, u := range us {
if urnMap[u] == NilContactID {
id, err := CreateContact(ctx, db, org, u)
contact, _, err := CreateContact(ctx, db, org, NilUserID, "", envs.NilLanguage, []urns.URN{u})
if err != nil {
return nil, errors.Wrapf(err, "error while creating contact")
}
Expand All @@ -601,7 +602,7 @@ func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []u
if !found {
return nil, errors.Wrapf(err, "unable to find original URN from identity")
}
urnMap[original] = ContactID(id)
urnMap[original] = contact.ID()
}
}
}
Expand All @@ -611,8 +612,82 @@ func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []u
}

// CreateContact creates a new contact for the passed in org with the passed in URNs
func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.URN) (ContactID, error) {
// we have a URN, first try to look up the URN
func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) {
Copy link
Contributor Author

@rowanseymour rowanseymour Jul 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a bit of repetition but feels worth it to keep the create vs get_or_create pathways separate since there are many little ways in which they end up different

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this looks good and nice job dealing with all the conflicts elegantly.

contactID, err := insertContactAndURNs(ctx, db, org, userID, name, language, urnz)
if err != nil {
if isUniqueViolationError(err) {
return nil, nil, errors.New("URNs in use by other contacts")
}
return nil, nil, err
}

// load a full contact so that we can calculate dynamic groups
contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID})
if err != nil {
return nil, nil, errors.Wrapf(err, "error loading new contact")
}
contact := contacts[0]

flowContact, err := contact.FlowContact(org)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating flow contact")
}

err = CalculateDynamicGroups(ctx, db, org, flowContact)
if err != nil {
return nil, nil, errors.Wrapf(err, "error calculating dynamic groups")
}

return contact, flowContact, nil
}

// GetOrCreateContact creates a new contact for the passed in org with the passed in URNs
func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.URN) (*Contact, *flows.Contact, error) {
created := true

contactID, err := insertContactAndURNs(ctx, db, org, UserID(1), "", envs.NilLanguage, []urns.URN{urn})
if err != nil {
if isUniqueViolationError(err) {
// if this was a duplicate URN, we should be able to fetch this contact instead
err := db.GetContext(ctx, &contactID, `SELECT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2`, org.OrgID(), urn.Identity())
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to load contact")
}
created = false
} else {
return nil, nil, err
}
}

// load a full contact so that we can calculate dynamic groups
contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID})
if err != nil {
return nil, nil, errors.Wrapf(err, "error loading new contact")
}
contact := contacts[0]

flowContact, err := contact.FlowContact(org)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating flow contact")
}

// calculate dynamic groups if contact was created
if created {
err := CalculateDynamicGroups(ctx, db, org, flowContact)
if err != nil {
return nil, nil, errors.Wrapf(err, "error calculating dynamic groups")
}
}

return contact, flowContact, nil
}

// tries to create a new contact for the passed in org with the passed in URNs
func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
if userID == NilUserID {
userID = UserID(1)
}

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return NilContactID, errors.Wrapf(err, "unable to start transaction")
Expand All @@ -623,36 +698,35 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.UR
err = tx.GetContext(ctx, &contactID,
`INSERT INTO
contacts_contact
(org_id, is_active, is_blocked, is_stopped, uuid, created_on, modified_on, created_by_id, modified_by_id, name)
(org_id, is_active, is_blocked, is_stopped, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id)
VALUES
($1, TRUE, FALSE, FALSE, $2, NOW(), NOW(), 1, 1, '')
($1, TRUE, FALSE, FALSE, $2, $3, $4, $5, $5, $6, $6)
RETURNING id`,
org.OrgID(), uuids.New(),
org.OrgID(), uuids.New(), name, string(language), dates.Now(), userID,
)

if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error inserting new contact")
}

// handler for when we insert the URN or commit, we try to look the contact up instead
handleURNError := func(err error) (ContactID, error) {
if pqErr, ok := err.(*pq.Error); ok {
// if this was a duplicate URN, we should be able to select this contact instead
if pqErr.Code.Name() == "unique_violation" {
ids, err := ContactIDsFromURNs(ctx, db, org, []urns.URN{urn})
if err != nil || len(ids) == 0 {
return NilContactID, errors.Wrapf(err, "unable to load contact for urn: %s", urn)
}
return ids[urn], nil
}
var urnsToAttach []URNID

// now try to insert the URNs
for _, urn := range urnz {
// look for a URN with this identity that already exists but doesn't have a contact so could be attached
var orphanURNID URNID
err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, org.OrgID(), urn.Identity())
if err != nil && err != sql.ErrNoRows {
return NilContactID, err
}
if orphanURNID > 0 {
urnsToAttach = append(urnsToAttach, orphanURNID)
continue
}
return NilContactID, errors.Wrapf(err, "error creating new contact")
}

// now try to insert our URN if we have one
if urn != urns.NilURN {
_, err := tx.Exec(
_, err := tx.ExecContext(
ctx,
`INSERT INTO
contacts_contacturn
(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id)
Expand All @@ -663,36 +737,24 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.UR

if err != nil {
tx.Rollback()
return handleURNError(err)
return NilContactID, err
}
}

// load a full contact so that we can calculate dynamic groups
contacts, err := LoadContacts(ctx, tx, org, []ContactID{contactID})
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error loading new contact")
}

flowContact, err := contacts[0].FlowContact(org)
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error creating flow contact")
}

// now calculate dynamic groups
err = CalculateDynamicGroups(ctx, tx, org, flowContact)
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error calculating dynamic groups")
// attach URNs
if len(urnsToAttach) > 0 {
_, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, org.OrgID(), pq.Array(urnsToAttach), contactID)
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error attaching existing URNs to new contact")
}
}

// try to commit
err = tx.Commit()

if err != nil {
tx.Rollback()
return handleURNError(err)
return NilContactID, err
}

return contactID, nil
Expand Down
32 changes: 18 additions & 14 deletions models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestContactsFromURN(t *testing.T) {
}
}

func TestCreateContact(t *testing.T) {
func TestGetOrCreateContact(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
testsuite.Reset()
Expand All @@ -302,27 +302,16 @@ func TestCreateContact(t *testing.T) {
{Org1, urns.URN(CathyURN.String() + "?foo=bar"), CathyID},
{Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)},
{Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)},
{Org1, urns.NilURN, ContactID(maxContactID + 5)},
}

org, err := GetOrgAssets(ctx, db, Org1)
assert.NoError(t, err)

for i, tc := range tcs {
id, err := CreateContact(ctx, db, org, tc.URN)
contact, _, err := GetOrCreateContact(ctx, db, org, tc.URN)
assert.NoError(t, err, "%d: error creating contact", i)
assert.Equal(t, tc.ContactID, id, "%d: mismatch in contact id", i)
assert.Equal(t, tc.ContactID, contact.ID(), "%d: mismatch in contact id", i)
}

// stop kathy
err = StopContact(ctx, db, Org1, CathyID)
assert.NoError(t, err)

// verify she's only in the stopped group
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = $1`, []interface{}{CathyID}, 1)

// verify she's stopped
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE id = $1 AND is_stopped = TRUE AND is_active = TRUE and is_blocked = FALSE`, []interface{}{CathyID}, 1)
}

func TestUpdateContactModifiedBy(t *testing.T) {
Expand All @@ -347,6 +336,21 @@ func TestUpdateContactModifiedBy(t *testing.T) {

}

func TestStopContact(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()

// stop kathy
err := StopContact(ctx, db, Org1, CathyID)
assert.NoError(t, err)

// verify she's only in the stopped group
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = $1`, []interface{}{CathyID}, 1)

// verify she's stopped
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE id = $1 AND is_stopped = TRUE AND is_active = TRUE and is_blocked = FALSE`, []interface{}{CathyID}, 1)
}

func TestUpdateContactStatus(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
Expand Down
8 changes: 8 additions & 0 deletions models/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"gopkg.in/go-playground/validator.v9"
Expand Down Expand Up @@ -164,3 +165,10 @@ func BulkSQL(ctx context.Context, label string, tx Queryer, sql string, vs []int

return nil
}

func isUniqueViolationError(err error) bool {
if pqErr, ok := err.(*pq.Error); ok {
return pqErr.Code.Name() == "unique_violation"
}
return false
}
14 changes: 7 additions & 7 deletions tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"encoding/json"
"time"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/contactql"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/runner"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/olivere/elastic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -84,11 +84,11 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela

// if we are meant to create a new contact, do so
if start.CreateContact() {
newID, err := models.CreateContact(ctx, db, oa, urns.NilURN)
contact, _, err := models.CreateContact(ctx, db, oa, models.NilUserID, "", envs.NilLanguage, nil)
if err != nil {
return errors.Wrapf(err, "error creating new contact")
}
contactIDs[newID] = true
contactIDs[contact.ID()] = true
}

// now add all the ids for our groups
Expand Down
Loading