Skip to content

Commit

Permalink
- Refactor and move template compilation from runner to models.Campai…
Browse files Browse the repository at this point in the history
…gn to support adhoc template funcs

- Add support for {{ Track "https://url.com" }} in templates to register and track links
  • Loading branch information
knadh committed Oct 31, 2018
1 parent 1ae2905 commit 81953d6
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 57 deletions.
27 changes: 12 additions & 15 deletions campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ func handlePreviewCampaign(c echo.Context) error {
id, _ = strconv.Atoi(c.Param("id"))
body = c.FormValue("body")

camp models.Campaign
camp = &models.Campaign{}
)

if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid ID.")
}

err := app.Queries.GetCampaignForPreview.Get(&camp, id)
err := app.Queries.GetCampaignForPreview.Get(camp, id)
if err != nil {
if err == sql.ErrNoRows {
return echo.NewHTTPError(http.StatusBadRequest, "Campaign not found.")
Expand Down Expand Up @@ -130,25 +130,23 @@ func handlePreviewCampaign(c echo.Context) error {
}

// Compile the template.
if body == "" {
body = camp.Body
if body != "" {
camp.Body = body
}
tpl, err := runner.CompileMessageTemplate(camp.TemplateBody, body)
if err != nil {

if err := camp.CompileTemplate(app.Runner.TemplateFuncs(camp)); err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error compiling template: %v", err))
}

// Render the message body.
var out = bytes.Buffer{}
if err := tpl.ExecuteTemplate(&out,
runner.BaseTPL,
runner.Message{Campaign: &camp, Subscriber: &sub, UnsubscribeURL: "#dummy"}); err != nil {
m := app.Runner.NewMessage(camp, &sub)
if err := m.Render(); err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
fmt.Sprintf("Error executing template: %v", err))
fmt.Sprintf("Error rendering message: %v", err))
}

return c.HTML(http.StatusOK, out.String())
return c.HTML(http.StatusOK, string(m.Body))
}

// handleCreateCampaign handles campaign creation.
Expand Down Expand Up @@ -479,14 +477,13 @@ func handleTestCampaign(c echo.Context) error {

// sendTestMessage takes a campaign and a subsriber and sends out a sample campain message.
func sendTestMessage(sub *models.Subscriber, camp *models.Campaign, app *App) error {
tpl, err := runner.CompileMessageTemplate(camp.TemplateBody, camp.Body)
if err != nil {
if err := camp.CompileTemplate(app.Runner.TemplateFuncs(camp)); err != nil {
return fmt.Errorf("Error compiling template: %v", err)
}

// Render the message body.
var out = bytes.Buffer{}
if err := tpl.ExecuteTemplate(&out,
if err := camp.Tpl.ExecuteTemplate(&out,
runner.BaseTPL,
runner.Message{Campaign: camp, Subscriber: sub, UnsubscribeURL: "#dummy"}); err != nil {
return fmt.Errorf("Error executing template: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func registerHandlers(e *echo.Echo) {
// Subscriber facing views.
e.GET("/unsubscribe/:campUUID/:subUUID", handleUnsubscribePage)
e.POST("/unsubscribe/:campUUID/:subUUID", handleUnsubscribePage)
e.GET("/link/:linkUUID/:campUUID/:subUUID", handleLinkRedirect)

// Static views.
e.GET("/lists", handleIndexPage)
Expand Down Expand Up @@ -223,6 +224,8 @@ func main() {

// url.com/unsubscribe/{campaign_uuid}/{subscriber_uuid}
UnsubscribeURL: fmt.Sprintf("%s/unsubscribe/%%s/%%s", app.Constants.RootURL),
// url.com/link/{campaign_uuid}/{subscriber_uuid}/{link_uuid}
LinkTrackURL: fmt.Sprintf("%s/link/%%s/%%s/%%s", app.Constants.RootURL),
}, newRunnerDB(q), logger)
app.Runner = r

Expand Down
41 changes: 41 additions & 0 deletions models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"html/template"
"regexp"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"
Expand Down Expand Up @@ -37,6 +38,20 @@ const (
UserTypeUser = "user"
UserStatusEnabled = "enabled"
UserStatusDisabled = "disabled"

// BaseTpl is the name of the base template.
BaseTpl = "base"

// ContentTpl is the name of the compiled message.
ContentTpl = "content"
)

// Regular expression for matching {{ Track "http://link.com" }} in the template
// and substituting it with {{ Track "http://link.com" .Campaign.UUID .Subscriber.UUID }}
// before compilation. This string gimmick is to make linking easier for users.
var (
regexpLinkTag = regexp.MustCompile(`{{(\s+)?Track\s+?"(.+?)"(\s+)?}}`)
regexpLinkTagReplace = `{{ Track "$2" .Campaign.UUID .Subscriber.UUID }}`
)

// Base holds common fields shared across models.
Expand Down Expand Up @@ -187,3 +202,29 @@ func (s SubscriberAttribs) Scan(src interface{}) error {
}
return fmt.Errorf("Could not not decode type %T -> %T", src, s)
}

// CompileTemplate compiles a campaign body template into its base
// template and sets the resultant template to Campaign.Tpl
func (c *Campaign) CompileTemplate(f template.FuncMap) error {
// Compile the base template.
t := regexpLinkTag.ReplaceAllString(c.TemplateBody, regexpLinkTagReplace)
baseTPL, err := template.New(BaseTpl).Funcs(f).Parse(t)
if err != nil {
return fmt.Errorf("error compiling base template: %v", err)
}

// Compile the campaign message.
t = regexpLinkTag.ReplaceAllString(c.Body, regexpLinkTagReplace)
msgTpl, err := template.New(ContentTpl).Funcs(f).Parse(t)
if err != nil {
return fmt.Errorf("error compiling message: %v", err)
}

out, err := baseTPL.AddParseTree(ContentTpl, msgTpl.Tree)
if err != nil {
return fmt.Errorf("error inserting child template: %v", err)
}

c.Tpl = out
return nil
}
3 changes: 3 additions & 0 deletions queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Queries struct {
SetDefaultTemplate *sqlx.Stmt `query:"set-default-template"`
DeleteTemplate *sqlx.Stmt `query:"delete-template"`

CreateLink *sqlx.Stmt `query:"create-link"`
RegisterLinkClick *sqlx.Stmt `query:"register-link-click"`

// GetStats *sqlx.Stmt `query:"get-stats"`
}

Expand Down
18 changes: 17 additions & 1 deletion queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ sub AS (
WHERE uuid = $2 RETURNING id
)
UPDATE subscriber_lists SET status = 'unsubscribed' WHERE
subscriber_id = (SELECT id FROM sub) AND
subscriber_id = (SELECT id FROM sub) AND status != 'unsubscribed' AND
-- If $3 is false, unsubscribe from the campaign's lists, otherwise all lists.
CASE WHEN $3 IS FALSE THEN list_id = ANY(SELECT list_id FROM lists) ELSE list_id != 0 END;

Expand Down Expand Up @@ -349,6 +349,22 @@ SELECT * FROM media ORDER BY created_at DESC;
-- name: delete-media
DELETE FROM media WHERE id=$1 RETURNING filename;

-- links
-- name: create-link
INSERT INTO links (uuid, url) VALUES($1, $2) ON CONFLICT (url) DO UPDATE SET url=EXCLUDED.url RETURNING uuid;

-- name: register-link-click
WITH link AS (
SELECT url, links.id AS link_id, campaigns.id as campaign_id, subscribers.id AS subscriber_id FROM links
LEFT JOIN campaigns ON (campaigns.uuid = $2)
LEFT JOIN subscribers ON (subscribers.uuid = $3)
WHERE links.uuid = $1
)
INSERT INTO link_clicks (campaign_id, subscriber_id, link_id)
VALUES((SELECT campaign_id FROM link), (SELECT subscriber_id FROM link), (SELECT link_id FROM link))
RETURNING (SELECT url FROM link);


-- -- name: get-stats
-- WITH lists AS (
-- SELECT type, COUNT(id) AS num FROM lists GROUP BY type
Expand Down
112 changes: 78 additions & 34 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"html/template"
"log"
"sync"
"time"

"github.com/knadh/listmonk/messenger"
Expand All @@ -30,6 +31,7 @@ type DataSource interface {
PauseCampaign(campID int) error
CancelCampaign(campID int) error
FinishCampaign(campID int) error
CreateLink(url string) (string, error)
}

// Runner handles the scheduling, processing, and queuing of campaigns
Expand All @@ -43,7 +45,13 @@ type Runner struct {
// Campaigns that are currently running.
camps map[int]*models.Campaign

msgQueue chan Message
// Links generated using Track() are cached here so as to not query
// the database for the link UUID for every message sent. This has to
// be locked as it may be used externally when previewing campaigns.
links map[string]string
linksMutex sync.RWMutex

msgQueue chan *Message
subFetchQueue chan *models.Campaign
}

Expand All @@ -52,14 +60,14 @@ type Message struct {
Campaign *models.Campaign
Subscriber *models.Subscriber
UnsubscribeURL string

body []byte
to string
Body []byte
to string
}

// Config has parameters for configuring the runner.
type Config struct {
Concurrency int
LinkTrackURL string
UnsubscribeURL string
}

Expand All @@ -70,14 +78,26 @@ func New(cfg Config, src DataSource, l *log.Logger) *Runner {
messengers: make(map[string]messenger.Messenger),
src: src,
camps: make(map[int]*models.Campaign, 0),
links: make(map[string]string, 0),
logger: l,
subFetchQueue: make(chan *models.Campaign, 100),
msgQueue: make(chan Message, cfg.Concurrency),
msgQueue: make(chan *Message, cfg.Concurrency),
}

return &r
}

// NewMessage creates and returns a Message that is made available
// to message templates while they're compiled.
func (r *Runner) NewMessage(c *models.Campaign, s *models.Subscriber) *Message {
return &Message{
to: s.Email,
Campaign: c,
Subscriber: s,
UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID),
}
}

// AddMessenger adds a Messenger messaging backend to the runner process.
func (r *Runner) AddMessenger(msg messenger.Messenger) error {
id := msg.Name()
Expand Down Expand Up @@ -160,39 +180,47 @@ func (r *Runner) Run(tick time.Duration) {
// SpawnWorkers spawns workers goroutines that push out messages.
func (r *Runner) SpawnWorkers() {
for i := 0; i < r.cfg.Concurrency; i++ {
go func(ch chan Message) {
go func(ch chan *Message) {
for {
select {
case m := <-ch:
r.messengers[m.Campaign.MessengerID].Push(
m.Campaign.FromEmail,
m.Subscriber.Email,
m.Campaign.Subject,
m.body)
m.Body)
}
}
}(r.msgQueue)
}
}

// TemplateFuncs returns the template functions to be applied into
// compiled campaign templates.
func (r *Runner) TemplateFuncs(c *models.Campaign) template.FuncMap {
return template.FuncMap{
"Track": func(url, campUUID, subUUID string) string {
return r.trackLink(url, campUUID, subUUID)
},
}
}

// addCampaign adds a campaign to the process queue.
func (r *Runner) addCampaign(c *models.Campaign) error {
var tplErr error

c.Tpl, tplErr = CompileMessageTemplate(c.TemplateBody, c.Body)
if tplErr != nil {
return tplErr
}

// Validate messenger.
if _, ok := r.messengers[c.MessengerID]; !ok {
r.src.CancelCampaign(c.ID)
return fmt.Errorf("unknown messenger %s on campaign %s", c.MessengerID, c.Name)
}

// Load the template.
if err := c.CompileTemplate(r.TemplateFuncs(c)); err != nil {
return err
}

// Add the campaign to the active map.
r.camps[c.ID] = c

return nil
}

Expand Down Expand Up @@ -225,17 +253,14 @@ func (r *Runner) nextSubscribers(c *models.Campaign, batchSize int) (bool, error

// Push messages.
for _, s := range subs {
to, body, err := r.makeMessage(c, s)
if err != nil {
r.logger.Printf("error preparing message (%s) (%s): %v", c.Name, s.Email, err)
m := r.NewMessage(c, s)
if err := m.Render(); err != nil {
r.logger.Printf("error rendering message (%s) (%s): %v", c.Name, s.Email, err)
continue
}

// Send the message.
r.msgQueue <- Message{Campaign: c,
Subscriber: s,
to: to,
body: body}
r.msgQueue <- m
}

return true, nil
Expand Down Expand Up @@ -263,21 +288,40 @@ func (r *Runner) processExhaustedCampaign(c *models.Campaign) error {
return nil
}

// makeMessage prepares a campaign message for a subscriber and returns
// the 'to' address and the body.
func (r *Runner) makeMessage(c *models.Campaign, s *models.Subscriber) (string, []byte, error) {
// Render the message body.
var (
out = bytes.Buffer{}
tplMsg = Message{Campaign: c,
Subscriber: s,
UnsubscribeURL: fmt.Sprintf(r.cfg.UnsubscribeURL, c.UUID, s.UUID)}
)
if err := c.Tpl.ExecuteTemplate(&out, BaseTPL, tplMsg); err != nil {
return "", nil, err
// Render takes a Message, executes its pre-compiled Campaign.Tpl
// and applies the resultant bytes to Message.body to be used in messages.
func (m *Message) Render() error {
out := bytes.Buffer{}
if err := m.Campaign.Tpl.ExecuteTemplate(&out, models.BaseTpl, m); err != nil {
return err
}
m.Body = out.Bytes()
return nil
}

// trackLink register a URL and return its UUID to be used in message templates
// for tracking links.
func (r *Runner) trackLink(url, campUUID, subUUID string) string {
r.linksMutex.RLock()
if uu, ok := r.links[url]; ok {
return uu
}
r.linksMutex.RUnlock()

// Register link.
uu, err := r.src.CreateLink(url)
if err != nil {
r.logger.Printf("error registering tracking for link '%s': %v", url, err)

// If the registration fails, fail over to the original URL.
return url
}

r.linksMutex.Lock()
r.links[url] = uu
r.linksMutex.Unlock()

return s.Email, out.Bytes(), nil
return fmt.Sprintf(r.cfg.LinkTrackURL, uu, campUUID, subUUID)
}

// CompileMessageTemplate takes a base template body string and a child (message) template
Expand Down
Loading

0 comments on commit 81953d6

Please sign in to comment.