Skip to content

Commit

Permalink
Merge pull request #51 from Ilhasoft/update/v6.5.35
Browse files Browse the repository at this point in the history
Update/v6.5.35
  • Loading branch information
jcbalmeida authored Oct 4, 2021
2 parents e9e046d + 2336890 commit 250d745
Show file tree
Hide file tree
Showing 197 changed files with 1,570 additions and 1,571 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
v6.5.35
----------
* Tweak mailroom startup to show warning if no distinct readonly DB configured

v6.5.34
----------
* Switch to readonly database for asset loading

v6.5.33
----------
* Add support for READONLY_DB config setting that opens a new readonly database connection
* Finish the runtime.Runtime refactor

v6.5.32
----------
* Refactor more code to use runtime.Runtime instead of passing db instances and using the global config
* Update to latest goflow with doc fixes

v6.5.31
----------
* Recalculate dynamic groups after closing and reopening tickets
Expand Down
12 changes: 6 additions & 6 deletions cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ package main
import (
"os"
"os/signal"
"runtime"
goruntime "runtime"
"syscall"

"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/logrus_sentry"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/runtime"

_ "github.com/nyaruka/mailroom/core/handlers"
_ "github.com/nyaruka/mailroom/core/hooks"
_ "github.com/nyaruka/mailroom/core/ivr/twiml"
_ "github.com/nyaruka/mailroom/core/ivr/vonage"
_ "github.com/nyaruka/mailroom/core/tasks/broadcasts"
_ "github.com/nyaruka/mailroom/core/tasks/campaigns"
_ "github.com/nyaruka/mailroom/core/tasks/contacts"
Expand All @@ -26,6 +24,8 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/starts"
_ "github.com/nyaruka/mailroom/core/tasks/stats"
_ "github.com/nyaruka/mailroom/core/tasks/timeouts"
_ "github.com/nyaruka/mailroom/services/ivr/twiml"
_ "github.com/nyaruka/mailroom/services/ivr/vonage"
_ "github.com/nyaruka/mailroom/services/tickets/intern"
_ "github.com/nyaruka/mailroom/services/tickets/mailgun"
_ "github.com/nyaruka/mailroom/services/tickets/rocketchat"
Expand All @@ -49,7 +49,7 @@ import (
var version = "Dev"

func main() {
config := config.Mailroom
config := runtime.NewDefaultConfig()
loader := ezconf.NewLoader(
config,
"mailroom", "Mailroom - flow event handler for RapidPro",
Expand Down Expand Up @@ -114,7 +114,7 @@ func handleSignals(mr *mailroom.Mailroom) {
switch sig {
case syscall.SIGQUIT:
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
stacklen := goruntime.Stack(buf, true)
logrus.WithField("comp", "main").WithField("signal", sig).Info("received quit signal, dumping stack")
logrus.Printf("\n%s", buf[:stacklen])
case syscall.SIGINT, syscall.SIGTERM:
Expand Down
62 changes: 31 additions & 31 deletions core/goflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,83 +7,83 @@ import (
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
"github.com/nyaruka/goflow/services/webhooks"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/runtime"

"github.com/shopspring/decimal"
)

var eng, simulator flows.Engine
var engInit, simulatorInit sync.Once

var emailFactory engine.EmailServiceFactory
var classificationFactory engine.ClassificationServiceFactory
var ticketFactory engine.TicketServiceFactory
var airtimeFactory engine.AirtimeServiceFactory
var emailFactory func(*runtime.Config) engine.EmailServiceFactory
var classificationFactory func(*runtime.Config) engine.ClassificationServiceFactory
var ticketFactory func(*runtime.Config) engine.TicketServiceFactory
var airtimeFactory func(*runtime.Config) engine.AirtimeServiceFactory

// RegisterEmailServiceFactory can be used by outside callers to register a email factory
// for use by the engine
func RegisterEmailServiceFactory(factory engine.EmailServiceFactory) {
emailFactory = factory
func RegisterEmailServiceFactory(f func(*runtime.Config) engine.EmailServiceFactory) {
emailFactory = f
}

// RegisterClassificationServiceFactory can be used by outside callers to register a classification factory
// for use by the engine
func RegisterClassificationServiceFactory(factory engine.ClassificationServiceFactory) {
classificationFactory = factory
func RegisterClassificationServiceFactory(f func(*runtime.Config) engine.ClassificationServiceFactory) {
classificationFactory = f
}

// RegisterTicketServiceFactory can be used by outside callers to register a ticket service factory
// for use by the engine
func RegisterTicketServiceFactory(factory engine.TicketServiceFactory) {
ticketFactory = factory
func RegisterTicketServiceFactory(f func(*runtime.Config) engine.TicketServiceFactory) {
ticketFactory = f
}

// RegisterAirtimeServiceFactory can be used by outside callers to register a airtime factory
// for use by the engine
func RegisterAirtimeServiceFactory(factory engine.AirtimeServiceFactory) {
airtimeFactory = factory
func RegisterAirtimeServiceFactory(f func(*runtime.Config) engine.AirtimeServiceFactory) {
airtimeFactory = f
}

// Engine returns the global engine instance for use with real sessions
func Engine(cfg *config.Config) flows.Engine {
func Engine(c *runtime.Config) flows.Engine {
engInit.Do(func() {
webhookHeaders := map[string]string{
"User-Agent": "RapidProMailroom/" + cfg.Version,
"User-Agent": "RapidProMailroom/" + c.Version,
"X-Mailroom-Mode": "normal",
}

httpClient, httpRetries, httpAccess := HTTP(cfg)
httpClient, httpRetries, httpAccess := HTTP(c)

eng = engine.NewBuilder().
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, httpRetries, httpAccess, webhookHeaders, cfg.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory).
WithEmailServiceFactory(emailFactory).
WithTicketServiceFactory(ticketFactory).
WithAirtimeServiceFactory(airtimeFactory).
WithMaxStepsPerSprint(cfg.MaxStepsPerSprint).
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, httpRetries, httpAccess, webhookHeaders, c.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory(c)).
WithEmailServiceFactory(emailFactory(c)).
WithTicketServiceFactory(ticketFactory(c)).
WithAirtimeServiceFactory(airtimeFactory(c)).
WithMaxStepsPerSprint(c.MaxStepsPerSprint).
Build()
})

return eng
}

// Simulator returns the global engine instance for use with simulated sessions
func Simulator(cfg *config.Config) flows.Engine {
func Simulator(c *runtime.Config) flows.Engine {
simulatorInit.Do(func() {
webhookHeaders := map[string]string{
"User-Agent": "RapidProMailroom/" + cfg.Version,
"User-Agent": "RapidProMailroom/" + c.Version,
"X-Mailroom-Mode": "simulation",
}

httpClient, _, httpAccess := HTTP(cfg) // don't do retries in simulator
httpClient, _, httpAccess := HTTP(c) // don't do retries in simulator

simulator = engine.NewBuilder().
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, nil, httpAccess, webhookHeaders, cfg.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory). // simulated sessions do real classification
WithEmailServiceFactory(simulatorEmailServiceFactory). // but faked emails
WithTicketServiceFactory(simulatorTicketServiceFactory). // and faked tickets
WithAirtimeServiceFactory(simulatorAirtimeServiceFactory). // and faked airtime transfers
WithMaxStepsPerSprint(cfg.MaxStepsPerSprint).
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, nil, httpAccess, webhookHeaders, c.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory(c)). // simulated sessions do real classification
WithEmailServiceFactory(simulatorEmailServiceFactory). // but faked emails
WithTicketServiceFactory(simulatorTicketServiceFactory). // and faked tickets
WithAirtimeServiceFactory(simulatorAirtimeServiceFactory). // and faked airtime transfers
WithMaxStepsPerSprint(c.MaxStepsPerSprint).
Build()
})

Expand Down
5 changes: 4 additions & 1 deletion core/goflow/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
)

func TestEngineWebhook(t *testing.T) {
// this is only here because this is the first test run.. should find a better way to ensure DB is in correct state for first test that needs it
testsuite.Reset(testsuite.ResetDB)

_, rt, _, _ := testsuite.Get()

svc, err := goflow.Engine(rt.Config).Services().Webhook(nil)
Expand Down Expand Up @@ -68,7 +71,7 @@ func TestSimulatorTicket(t *testing.T) {
svc, err := goflow.Simulator(rt.Config).Services().Ticket(nil, flows.NewTicketer(ticketer))
assert.NoError(t, err)

oa, err := models.GetOrgAssets(ctx, db, testdata.Org1.ID)
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

ticket, err := svc.Open(nil, oa.SessionAssets().Topics().FindByName("General"), "Where are my cookies?", nil, nil)
Expand Down
8 changes: 4 additions & 4 deletions core/goflow/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/definition"
"github.com/nyaruka/goflow/flows/definition/migrations"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/runtime"

"github.com/Masterminds/semver"
)
Expand All @@ -22,7 +22,7 @@ func SpecVersion() *semver.Version {
}

// ReadFlow reads a flow from the given JSON definition, migrating it if necessary
func ReadFlow(cfg *config.Config, data json.RawMessage) (flows.Flow, error) {
func ReadFlow(cfg *runtime.Config, data json.RawMessage) (flows.Flow, error) {
return definition.ReadFlow(data, MigrationConfig(cfg))
}

Expand All @@ -32,12 +32,12 @@ func CloneDefinition(data json.RawMessage, depMapping map[uuids.UUID]uuids.UUID)
}

// MigrateDefinition migrates the given flow definition to the specified version
func MigrateDefinition(cfg *config.Config, data json.RawMessage, toVersion *semver.Version) (json.RawMessage, error) {
func MigrateDefinition(cfg *runtime.Config, data json.RawMessage, toVersion *semver.Version) (json.RawMessage, error) {
return migrations.MigrateToVersion(data, toVersion, MigrationConfig(cfg))
}

// MigrationConfig returns the migration configuration for flows
func MigrationConfig(cfg *config.Config) *migrations.Config {
func MigrationConfig(cfg *runtime.Config) *migrations.Config {
migConfInit.Do(func() {
migConf = &migrations.Config{BaseMediaURL: "https://" + cfg.AttachmentDomain}
})
Expand Down
4 changes: 2 additions & 2 deletions core/goflow/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/runtime"
)

var httpInit sync.Once
Expand All @@ -17,7 +17,7 @@ var httpRetries *httpx.RetryConfig
var httpAccess *httpx.AccessConfig

// HTTP returns the configuration objects for HTTP calls from the engine and its services
func HTTP(cfg *config.Config) (*http.Client, *httpx.RetryConfig, *httpx.AccessConfig) {
func HTTP(cfg *runtime.Config) (*http.Client, *httpx.RetryConfig, *httpx.AccessConfig) {
httpInit.Do(func() {
// customize the default golang transport
t := http.DefaultTransport.(*http.Transport).Clone()
Expand Down
4 changes: 2 additions & 2 deletions core/goflow/modifiers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

func TestReadModifiers(t *testing.T) {
ctx, _, db, _ := testsuite.Get()
ctx, rt, _, _ := testsuite.Get()

oa, err := models.GetOrgAssets(ctx, db, testdata.Org1.ID)
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
assert.NoError(t, err)

// can read empty list
Expand Down
4 changes: 2 additions & 2 deletions core/handlers/airtime_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/hooks"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/shopspring/decimal"
"github.com/sirupsen/logrus"
Expand All @@ -20,7 +20,7 @@ func init() {
}

// handleAirtimeTransferred is called for each airtime transferred event
func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
func handleAirtimeTransferred(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.AirtimeTransferredEvent)

status := models.AirtimeTransferStatusSuccess
Expand Down
6 changes: 3 additions & 3 deletions core/handlers/airtime_transferred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ var transactionRejectedResponse = `{
}`

func TestAirtimeTransferred(t *testing.T) {
_, _, db, _ := testsuite.Get()
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset()
defer testsuite.Reset(testsuite.ResetAll)
defer httpx.SetRequestor(httpx.DefaultRequestor)

httpx.SetRequestor(httpx.NewMockRequestor(map[string][]httpx.MockResponse{
Expand Down Expand Up @@ -332,5 +332,5 @@ func TestAirtimeTransferred(t *testing.T) {
},
}

handlers.RunTestCases(t, tcs)
handlers.RunTestCases(t, ctx, rt, tcs)
}
28 changes: 10 additions & 18 deletions core/handlers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

Expand Down Expand Up @@ -57,11 +56,6 @@ func NewActionUUID() flows.ActionUUID {
return flows.ActionUUID(uuids.New())
}

func TestMain(m *testing.M) {
testsuite.Reset()
os.Exit(m.Run())
}

// createTestFlow creates a flow that starts with a split by contact id
// and then routes the contact to a node where all the actions in the
// test case are present.
Expand Down Expand Up @@ -158,12 +152,10 @@ func createTestFlow(t *testing.T, uuid assets.FlowUUID, tc TestCase) flows.Flow
return flow
}

func RunTestCases(t *testing.T, tcs []TestCase) {
ctx, rt, db, _ := testsuite.Get()

func RunTestCases(t *testing.T, ctx context.Context, rt *runtime.Runtime, tcs []TestCase) {
models.FlushCache()

oa, err := models.GetOrgAssets(ctx, db, models.OrgID(1))
oa, err := models.GetOrgAssets(ctx, rt, models.OrgID(1))
assert.NoError(t, err)

// reuse id from one of our real flows
Expand All @@ -180,7 +172,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {
flowDef, err := json.Marshal(testFlow)
require.NoError(t, err)

oa, err = oa.CloneForSimulation(ctx, db, map[assets.FlowUUID]json.RawMessage{flowUUID: flowDef}, nil)
oa, err = oa.CloneForSimulation(ctx, rt, map[assets.FlowUUID]json.RawMessage{flowUUID: flowDef}, nil)
assert.NoError(t, err)

flow, err := oa.Flow(flowUUID)
Expand Down Expand Up @@ -214,7 +206,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {
// create scenes for our contacts
scenes := make([]*models.Scene, 0, len(tc.Modifiers))
for contact, mods := range tc.Modifiers {
contacts, err := models.LoadContacts(ctx, db, oa, []models.ContactID{contact.ID})
contacts, err := models.LoadContacts(ctx, rt.DB, oa, []models.ContactID{contact.ID})
assert.NoError(t, err)

contact := contacts[0]
Expand All @@ -238,24 +230,24 @@ func RunTestCases(t *testing.T, tcs []TestCase) {

}

tx, err := db.BeginTxx(ctx, nil)
tx, err := rt.DB.BeginTxx(ctx, nil)
assert.NoError(t, err)

for _, scene := range scenes {
err := models.HandleEvents(ctx, tx, rt.RP, oa, scene, results[scene.ContactID()].Events)
err := models.HandleEvents(ctx, rt, tx, oa, scene, results[scene.ContactID()].Events)
assert.NoError(t, err)
}

err = models.ApplyEventPreCommitHooks(ctx, tx, rt.RP, oa, scenes)
err = models.ApplyEventPreCommitHooks(ctx, rt, tx, oa, scenes)
assert.NoError(t, err)

err = tx.Commit()
assert.NoError(t, err)

tx, err = db.BeginTxx(ctx, nil)
tx, err = rt.DB.BeginTxx(ctx, nil)
assert.NoError(t, err)

err = models.ApplyEventPostCommitHooks(ctx, tx, rt.RP, oa, scenes)
err = models.ApplyEventPostCommitHooks(ctx, rt, tx, oa, scenes)
assert.NoError(t, err)

err = tx.Commit()
Expand All @@ -265,7 +257,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {

// now check our assertions
for j, a := range tc.SQLAssertions {
testsuite.AssertQuery(t, db, a.SQL, a.Args...).Returns(a.Count, "%d:%d: mismatch in expected count for query: %s", i, j, a.SQL)
testsuite.AssertQuery(t, rt.DB, a.SQL, a.Args...).Returns(a.Count, "%d:%d: mismatch in expected count for query: %s", i, j, a.SQL)
}

for j, a := range tc.Assertions {
Expand Down
Loading

0 comments on commit 250d745

Please sign in to comment.