Skip to content

Commit

Permalink
Merge pull request #748 from nyaruka/std-errorr-part-2
Browse files Browse the repository at this point in the history
Std Error part 2
  • Loading branch information
rowanseymour authored Jun 3, 2024
2 parents c6be0bc + 2859b4f commit 910c15d
Show file tree
Hide file tree
Showing 25 changed files with 82 additions and 82 deletions.
38 changes: 19 additions & 19 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
Expand Down Expand Up @@ -33,7 +34,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
)

// the name for our message queue
Expand Down Expand Up @@ -359,15 +359,15 @@ func (b *backend) DeleteMsgByExternalID(ctx context.Context, channel courier.Cha
var msgID courier.MsgID
var contactID ContactID
if err := row.Scan(&msgID, &contactID); err != nil && err != sql.ErrNoRows {
return errors.Wrap(err, "error querying deleted msg")
return fmt.Errorf("error querying deleted msg: %w", err)
}

if msgID != courier.NilMsgID && contactID != NilContactID {
rc := b.redisPool.Get()
defer rc.Close()

if err := queueMsgDeleted(rc, ch, msgID, contactID); err != nil {
return errors.Wrap(err, "error queuing message deleted task")
return fmt.Errorf("error queuing message deleted task: %w", err)
}
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
queue.MarkComplete(rc, msgQueueName, token)
return nil, errors.Wrapf(err, "unable to unmarshal message: %s", string(msgJSON))
return nil, fmt.Errorf("unable to unmarshal message: %s: %w", string(msgJSON), err)
}

// populate the channel on our db msg
Expand Down Expand Up @@ -536,7 +536,7 @@ func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUp
if oldURN != urns.NilURN && newURN != urns.NilURN {
err := b.updateContactURN(ctx, status)
if err != nil {
return errors.Wrap(err, "error updating contact URN")
return fmt.Errorf("error updating contact URN: %w", err)
}
}

Expand Down Expand Up @@ -575,7 +575,7 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
// retrieve channel
channel, err := b.GetChannel(ctx, courier.AnyChannelType, status.ChannelUUID())
if err != nil {
return errors.Wrap(err, "error retrieving channel")
return fmt.Errorf("error retrieving channel: %w", err)
}
dbChannel := channel.(*Channel)
tx, err := b.db.BeginTxx(ctx, nil)
Expand All @@ -585,7 +585,7 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
// retrieve the old URN
oldContactURN, err := getContactURNByIdentity(tx, dbChannel.OrgID(), old)
if err != nil {
return errors.Wrap(err, "error retrieving old contact URN")
return fmt.Errorf("error retrieving old contact URN: %w", err)
}
// retrieve the new URN
newContactURN, err := getContactURNByIdentity(tx, dbChannel.OrgID(), new)
Expand All @@ -598,11 +598,11 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
err = fullyUpdateContactURN(tx, oldContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating old contact URN")
return fmt.Errorf("error updating old contact URN: %w", err)
}
return tx.Commit()
}
return errors.Wrap(err, "error retrieving new contact URN")
return fmt.Errorf("error retrieving new contact URN: %w", err)
}

// only update the new URN if it doesn't have an associated contact
Expand All @@ -616,12 +616,12 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
err = fullyUpdateContactURN(tx, newContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating new contact URN")
return fmt.Errorf("error updating new contact URN: %w", err)
}
err = fullyUpdateContactURN(tx, oldContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating old contact URN")
return fmt.Errorf("error updating old contact URN: %w", err)
}
return tx.Commit()
}
Expand Down Expand Up @@ -663,7 +663,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten

storageURL, err := b.attachmentStorage.Put(ctx, path, contentType, data)
if err != nil {
return "", errors.Wrapf(err, "error saving attachment to storage (bytes=%d)", len(data))
return "", fmt.Errorf("error saving attachment to storage (bytes=%d): %w", len(data), err)
}

return storageURL, nil
Expand All @@ -673,7 +673,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten
func (b *backend) ResolveMedia(ctx context.Context, mediaUrl string) (courier.Media, error) {
u, err := url.Parse(mediaUrl)
if err != nil {
return nil, errors.Wrapf(err, "error parsing media URL")
return nil, fmt.Errorf("error parsing media URL: %w", err)
}

mediaUUID := uuidRegex.FindString(u.Path)
Expand All @@ -692,15 +692,15 @@ func (b *backend) ResolveMedia(ctx context.Context, mediaUrl string) (courier.Me
var media *Media
mediaJSON, err := b.mediaCache.Get(rc, mediaUUID)
if err != nil {
return nil, errors.Wrap(err, "error looking up cached media")
return nil, fmt.Errorf("error looking up cached media: %w", err)
}
if mediaJSON != "" {
jsonx.MustUnmarshal([]byte(mediaJSON), &media)
} else {
// lookup media in our database
media, err = lookupMediaFromUUID(ctx, b.db, uuids.UUID(mediaUUID))
if err != nil {
return nil, errors.Wrap(err, "error looking up media")
return nil, fmt.Errorf("error looking up media: %w", err)
}

// cache it for future requests
Expand Down Expand Up @@ -756,11 +756,11 @@ func (b *backend) Heartbeat() error {

active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return errors.Wrapf(err, "error getting active queues")
return fmt.Errorf("error getting active queues: %w", err)
}
throttled, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:throttled", msgQueueName), "0", "-1"))
if err != nil {
return errors.Wrapf(err, "error getting throttled queues")
return fmt.Errorf("error getting throttled queues: %w", err)
}
queues := append(active, throttled...)

Expand All @@ -770,14 +770,14 @@ func (b *backend) Heartbeat() error {
q := fmt.Sprintf("%s/1", queue)
count, err := redis.Int(rc.Do("ZCARD", q))
if err != nil {
return errors.Wrapf(err, "error getting size of priority queue: %s", q)
return fmt.Errorf("error getting size of priority queue: %s: %w", q, err)
}
prioritySize += count

q = fmt.Sprintf("%s/0", queue)
count, err = redis.Int(rc.Do("ZCARD", q))
if err != nil {
return errors.Wrapf(err, "error getting size of bulk queue: %s", q)
return fmt.Errorf("error getting size of bulk queue: %s: %w", q, err)
}
bulkSize += count
}
Expand Down
16 changes: 8 additions & 8 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"log/slog"
"strconv"
"time"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// used by unit tests to slow down urn operations to test races
Expand Down Expand Up @@ -110,23 +110,23 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
err := b.db.GetContext(ctx, contact, lookupContactFromURNSQL, urn.Identity(), org)
if err != nil && err != sql.ErrNoRows {
log.Error("error looking up contact by URN", "error", err)
return nil, errors.Wrap(err, "error looking up contact by URN")
return nil, fmt.Errorf("error looking up contact by URN: %w", err)
}

// we found it, return it
if err != sql.ErrNoRows {
tx, err := b.db.BeginTxx(ctx, nil)
if err != nil {
log.Error("error beginning transaction", "error", err)
return nil, errors.Wrap(err, "error beginning transaction")
return nil, fmt.Errorf("error beginning transaction: %w", err)
}

// update contact's URNs so this URN has priority
err = setDefaultURN(tx, channel, contact, urn, authTokens)
if err != nil {
log.Error("error updating default URN for contact", "error", err)
tx.Rollback()
return nil, errors.Wrap(err, "error setting default URN for contact")
return nil, fmt.Errorf("error setting default URN for contact: %w", err)
}
return contact, tx.Commit()
}
Expand Down Expand Up @@ -174,13 +174,13 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// insert it
tx, err := b.db.BeginTxx(ctx, nil)
if err != nil {
return nil, errors.Wrap(err, "error beginning transaction")
return nil, fmt.Errorf("error beginning transaction: %w", err)
}

err = insertContact(tx, contact)
if err != nil {
tx.Rollback()
return nil, errors.Wrap(err, "error inserting contact")
return nil, fmt.Errorf("error inserting contact: %w", err)
}

// used for unit testing contact races
Expand All @@ -199,7 +199,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// if this was a duplicate URN, start over with a contact lookup
return contactForURN(ctx, b, org, channel, urn, authTokens, name, clog)
}
return nil, errors.Wrap(err, "error getting URN for contact")
return nil, fmt.Errorf("error getting URN for contact: %w", err)
}

// we stole the URN from another contact, roll back and start over
Expand All @@ -211,7 +211,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// all is well, we created the new contact, commit and move forward
err = tx.Commit()
if err != nil {
return nil, errors.Wrap(err, "error commiting transaction")
return nil, fmt.Errorf("error commiting transaction: %w", err)
}

// store this URN on our contact
Expand Down
9 changes: 4 additions & 5 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// MsgDirection is the direction of a message
Expand Down Expand Up @@ -204,7 +203,7 @@ func writeMsg(ctx context.Context, b *backend, msg courier.MsgIn, clog *courier.
attData, err := base64.StdEncoding.DecodeString(attURL[5:])
if err != nil {
clog.Error(courier.ErrorAttachmentNotDecodable())
return errors.Wrap(err, "unable to decode attachment data")
return fmt.Errorf("unable to decode attachment data: %w", err)
}

var contentType, extension string
Expand Down Expand Up @@ -257,7 +256,7 @@ func writeMsgToDB(ctx context.Context, b *backend, m *Msg, clog *courier.Channel

// our db is down, write to the spool, we will write/queue this later
if err != nil {
return errors.Wrap(err, "error getting contact for message")
return fmt.Errorf("error getting contact for message: %w", err)
}

// set our contact and urn id
Expand All @@ -266,14 +265,14 @@ func writeMsgToDB(ctx context.Context, b *backend, m *Msg, clog *courier.Channel

rows, err := b.db.NamedQueryContext(ctx, sqlInsertMsg, m)
if err != nil {
return errors.Wrap(err, "error inserting message")
return fmt.Errorf("error inserting message: %w", err)
}
defer rows.Close()

rows.Next()
err = rows.Scan(&m.ID_)
if err != nil {
return errors.Wrap(err, "error scanning for inserted message id")
return fmt.Errorf("error scanning for inserted message id: %w", err)
}

// queue this up to be handled by RapidPro
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rapidpro
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/syncx"
"github.com/nyaruka/gocommon/urns"
"github.com/pkg/errors"
)

// StatusUpdate represents a status update on a message
Expand Down Expand Up @@ -244,7 +244,7 @@ func (b *backend) writeStatusUpdatesToDB(ctx context.Context, statuses []*Status

err := dbutil.BulkQuery(ctx, b.db, sqlUpdateMsgByID, resolved)
if err != nil {
return nil, errors.Wrap(err, "error updating status")
return nil, fmt.Errorf("error updating status: %w", err)
}

return unresolved, nil
Expand Down Expand Up @@ -313,7 +313,7 @@ func (b *backend) resolveStatusUpdateMsgIDs(ctx context.Context, statuses []*Sta

for rows.Next() {
if err := rows.Scan(&msgID, &channelID, &externalID); err != nil {
return errors.Wrap(err, "error scanning rows")
return fmt.Errorf("error scanning rows: %w", err)
}

// find the status with this channel ID and external ID and update its msg ID
Expand Down
13 changes: 7 additions & 6 deletions backends/rapidpro/urn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// ContactURNID represents a contact urn's id
Expand Down Expand Up @@ -178,14 +177,14 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u
}
err := db.Get(contactURN, sqlSelectURNByIdentity, channel.OrgID(), urn.Identity())
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrap(err, "error looking up URN by identity")
return nil, fmt.Errorf("error looking up URN by identity: %w", err)
}

// we didn't find it, let's insert it
if err == sql.ErrNoRows {
err = insertContactURN(db, contactURN)
if err != nil {
return nil, errors.Wrap(err, "error inserting URN")
return nil, fmt.Errorf("error inserting URN: %w", err)
}
}

Expand All @@ -201,7 +200,7 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u
contactURN.Display = display
err = updateContactURN(db, contactURN)
if err != nil {
return nil, errors.Wrap(err, "error updating URN")
return nil, fmt.Errorf("error updating URN: %w", err)
}
}

Expand All @@ -211,8 +210,10 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u

err = updateContactURN(db, contactURN)
}

return contactURN, errors.Wrap(err, "error updating URN auth")
if err != nil {
return contactURN, fmt.Errorf("error updating URN auth: %w", err)
}
return contactURN, nil
}

const sqlInsertURN = `
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ require (
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.55.1
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.8.0
github.com/nyaruka/redisx v0.8.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-sentry v1.2.2
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ github.com/nyaruka/null/v3 v3.0.0 h1:JvOiNuKmRBFHxzZFt4sWii+ewmMkCQ1vO7X0clTNn6E
github.com/nyaruka/null/v3 v3.0.0/go.mod h1:Sus286RmC8P0VihFuQDDQPib/xJQ7++TsaPLdRuwgVc=
github.com/nyaruka/phonenumbers v1.3.5 h1:WZLbQn61j2E1OFnvpUTYbK/6hViUgl6tppJ55/E2iQM=
github.com/nyaruka/phonenumbers v1.3.5/go.mod h1:Ut+eFwikULbmCenH6InMKL9csUNLyxHuBLyfkpum11s=
github.com/nyaruka/redisx v0.8.0 h1:5QSDLfHBRFjrlTD9VaxfBbq72rb0S++Dr2ui9q/q9kY=
github.com/nyaruka/redisx v0.8.0/go.mod h1:mpTqnydmHfzeQ8V55AwS/cVrkxzHu4yGvwClEeprvbA=
github.com/nyaruka/redisx v0.8.1 h1:d9Hc8nfSKTSEU+bx+YrB13d6bzAgiiHygk4jg/Q4nb4=
github.com/nyaruka/redisx v0.8.1/go.mod h1:2TUmkDvprPInnmInR5AEbCm0zRRewkvSDVLsO+Do6iI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
Expand Down
Loading

0 comments on commit 910c15d

Please sign in to comment.