Skip to content

Commit

Permalink
Merge branch 'bounce'
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed Aug 14, 2021
2 parents 185d511 + d41b697 commit 1be8c7d
Show file tree
Hide file tree
Showing 49 changed files with 2,890 additions and 309 deletions.
251 changes: 251 additions & 0 deletions cmd/bounce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

"github.com/knadh/listmonk/internal/subimporter"
"github.com/knadh/listmonk/models"
"github.com/labstack/echo"
"github.com/lib/pq"
)

type bouncesWrap struct {
Results []models.Bounce `json:"results"`

Total int `json:"total"`
PerPage int `json:"per_page"`
Page int `json:"page"`
}

// handleGetBounces handles retrieval of bounce records.
func handleGetBounces(c echo.Context) error {
var (
app = c.Get("app").(*App)
pg = getPagination(c.QueryParams(), 50)
out bouncesWrap

id, _ = strconv.Atoi(c.Param("id"))
campID, _ = strconv.Atoi(c.QueryParam("campaign_id"))
source = c.FormValue("source")
orderBy = c.FormValue("order_by")
order = c.FormValue("order")
)

// Fetch one list.
single := false
if id > 0 {
single = true
}

// Sort params.
if !strSliceContains(orderBy, bounceQuerySortFields) {
orderBy = "created_at"
}
if order != sortAsc && order != sortDesc {
order = sortDesc
}

stmt := fmt.Sprintf(app.queries.QueryBounces, orderBy, order)
if err := db.Select(&out.Results, stmt, id, campID, 0, source, pg.Offset, pg.Limit); err != nil {
app.log.Printf("error fetching bounces: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
app.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.bounce}", "error", pqErrMsg(err)))
}
if len(out.Results) == 0 {
out.Results = []models.Bounce{}
return c.JSON(http.StatusOK, okResp{out})
}

if single {
return c.JSON(http.StatusOK, okResp{out.Results[0]})
}

// Meta.
out.Total = out.Results[0].Total
out.Page = pg.Page
out.PerPage = pg.PerPage

return c.JSON(http.StatusOK, okResp{out})
}

// handleGetSubscriberBounces retrieves a subscriber's bounce records.
func handleGetSubscriberBounces(c echo.Context) error {
var (
app = c.Get("app").(*App)
subID = c.Param("id")
)

id, _ := strconv.ParseInt(subID, 10, 64)
if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidID"))
}

out := []models.Bounce{}
stmt := fmt.Sprintf(app.queries.QueryBounces, "created_at", "ASC")
if err := db.Select(&out, stmt, 0, 0, subID, "", 0, 1000); err != nil {
app.log.Printf("error fetching bounces: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
app.i18n.Ts("globals.messages.errorFetching",
"name", "{globals.terms.bounce}", "error", pqErrMsg(err)))
}

return c.JSON(http.StatusOK, okResp{out})
}

// handleDeleteBounces handles bounce deletion, either a single one (ID in the URI), or a list.
func handleDeleteBounces(c echo.Context) error {
var (
app = c.Get("app").(*App)
pID = c.Param("id")
all, _ = strconv.ParseBool(c.QueryParam("all"))
IDs = pq.Int64Array{}
)

// Is it an /:id call?
if pID != "" {
id, _ := strconv.ParseInt(pID, 10, 64)
if id < 1 {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidID"))
}
IDs = append(IDs, id)
} else if !all {
// Multiple IDs.
i, err := parseStringIDs(c.Request().URL.Query()["id"])
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest,
app.i18n.Ts("globals.messages.invalidID", "error", err.Error()))
}

if len(i) == 0 {
return echo.NewHTTPError(http.StatusBadRequest,
app.i18n.Ts("globals.messages.invalidID"))
}
IDs = i
}

if _, err := app.queries.DeleteBounces.Exec(IDs); err != nil {
app.log.Printf("error deleting bounces: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError,
app.i18n.Ts("globals.messages.errorDeleting",
"name", "{globals.terms.bounce}", "error", pqErrMsg(err)))
}

return c.JSON(http.StatusOK, okResp{true})
}

// handleBounceWebhook renders the HTML preview of a template.
func handleBounceWebhook(c echo.Context) error {
var (
app = c.Get("app").(*App)
service = c.Param("service")

bounces []models.Bounce
)

// Read the request body instead of using using c.Bind() to read to save the entire raw request as meta.
rawReq, err := ioutil.ReadAll(c.Request().Body)
if err != nil {
app.log.Printf("error reading ses notification body: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.Ts("globals.messages.internalError"))
}

switch true {
// Native internal webhook.
case service == "":
var b models.Bounce
if err := json.Unmarshal(rawReq, &b); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.Ts("globals.messages.invalidData"))
}

if err := validateBounceFields(b, app); err != nil {
return err
}

b.Email = strings.ToLower(b.Email)

if len(b.Meta) == 0 {
b.Meta = json.RawMessage("{}")
}

if b.CreatedAt.Year() == 0 {
b.CreatedAt = time.Now()
}

bounces = append(bounces, b)

// Amazon SES.
case service == "ses" && app.constants.BounceSESEnabled:
switch c.Request().Header.Get("X-Amz-Sns-Message-Type") {
// SNS webhook registration confirmation. Only after these are processed will the endpoint
// start getting bounce notifications.
case "SubscriptionConfirmation", "UnsubscribeConfirmation":
if err := app.bounce.SES.ProcessSubscription(rawReq); err != nil {
app.log.Printf("error processing SNS (SES) subscription: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidData"))
}
break

// Bounce notification.
case "Notification":
b, err := app.bounce.SES.ProcessBounce(rawReq)
if err != nil {
app.log.Printf("error processing SES notification: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidData"))
}
bounces = append(bounces, b)

default:
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidData"))
}

// SendGrid.
case service == "sendgrid" && app.constants.BounceSendgridEnabled:
var (
sig = c.Request().Header.Get("X-Twilio-Email-Event-Webhook-Signature")
ts = c.Request().Header.Get("X-Twilio-Email-Event-Webhook-Timestamp")
)

// Sendgrid sends multiple bounces.
bs, err := app.bounce.Sendgrid.ProcessBounce(sig, ts, rawReq)
if err != nil {
app.log.Printf("error processing sendgrid notification: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidData"))
}
bounces = append(bounces, bs...)

default:
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.Ts("bounces.unknownService"))
}

// Record bounces if any.
for _, b := range bounces {
if err := app.bounce.Record(b); err != nil {
app.log.Printf("error recording bounce: %v", err)
}
}

return c.JSON(http.StatusOK, okResp{true})
}

func validateBounceFields(b models.Bounce, app *App) error {
if b.Email == "" && b.SubscriberUUID == "" {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidData"))
}

if b.Email != "" && !subimporter.IsEmail(b.Email) {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidEmail"))
}

if b.SubscriberUUID != "" && !reUUID.MatchString(b.SubscriberUUID) {
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("globals.messages.invalidUUID"))
}

return nil
}
1 change: 1 addition & 0 deletions cmd/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
regexFullTextQuery = regexp.MustCompile(`\s+`)

campaignQuerySortFields = []string{"name", "status", "created_at", "updated_at"}
bounceQuerySortFields = []string{"email", "campaign_name", "source", "created_at"}
)

// handleGetCampaigns handles retrieval of campaigns.
Expand Down
14 changes: 14 additions & 0 deletions cmd/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func registerHTTPHandlers(e *echo.Echo, app *App) {

g.GET("/api/subscribers/:id", handleGetSubscriber)
g.GET("/api/subscribers/:id/export", handleExportSubscriberData)
g.GET("/api/subscribers/:id/bounces", handleGetSubscriberBounces)
g.DELETE("/api/subscribers/:id/bounces", handleDeleteSubscriberBounces)
g.POST("/api/subscribers", handleCreateSubscriber)
g.PUT("/api/subscribers/:id", handleUpdateSubscriber)
g.POST("/api/subscribers/:id/optin", handleSubscriberSendOptin)
Expand All @@ -72,6 +74,10 @@ func registerHTTPHandlers(e *echo.Echo, app *App) {
g.DELETE("/api/subscribers/:id", handleDeleteSubscribers)
g.DELETE("/api/subscribers", handleDeleteSubscribers)

g.GET("/api/bounces", handleGetBounces)
g.DELETE("/api/bounces", handleDeleteBounces)
g.DELETE("/api/bounces/:id", handleDeleteBounces)

// Subscriber operations based on arbitrary SQL queries.
// These aren't very REST-like.
g.POST("/api/subscribers/query/delete", handleDeleteSubscribersByQuery)
Expand Down Expand Up @@ -132,6 +138,14 @@ func registerHTTPHandlers(e *echo.Echo, app *App) {
g.GET("/settings", handleIndexPage)
g.GET("/settings/logs", handleIndexPage)

if app.constants.BounceWebhooksEnabled {
// Private authenticated bounce endpoint.
g.POST("/webhooks/bounce", handleBounceWebhook)

// Public bounce endpoints for webservices like SES.
e.POST("/webhooks/service/:service", handleBounceWebhook)
}

// Public subscriber facing views.
e.GET("/subscription/form", handleSubscriptionFormPage)
e.POST("/subscription/form", handleSubscriptionForm)
Expand Down
52 changes: 50 additions & 2 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/knadh/koanf/providers/confmap"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/providers/posflag"
"github.com/knadh/listmonk/internal/bounce"
"github.com/knadh/listmonk/internal/bounce/mailbox"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/manager"
"github.com/knadh/listmonk/internal/media"
Expand Down Expand Up @@ -65,6 +67,10 @@ type constants struct {
OptinURL string
MessageURL string
MediaProvider string

BounceWebhooksEnabled bool
BounceSESEnabled bool
BounceSendgridEnabled bool
}

func initFlags() {
Expand Down Expand Up @@ -296,6 +302,10 @@ func initConstants() *constants {

// url.com/campaign/{campaign_uuid}/{subscriber_uuid}/px.png
c.ViewTrackURL = fmt.Sprintf("%s/campaign/%%s/%%s/px.png", c.RootURL)

c.BounceWebhooksEnabled = ko.Bool("bounce.webhooks_enabled")
c.BounceSESEnabled = ko.Bool("bounce.ses_enabled")
c.BounceSendgridEnabled = ko.Bool("bounce.sendgrid_enabled")
return &c
}

Expand Down Expand Up @@ -344,8 +354,7 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
SlidingWindow: ko.Bool("app.message_sliding_window"),
SlidingWindowDuration: ko.Duration("app.message_sliding_window_duration"),
SlidingWindowRate: ko.Int("app.message_sliding_window_rate"),
}, newManagerDB(q), campNotifCB, app.i18n, lo)

}, newManagerStore(q), campNotifCB, app.i18n, lo)
}

// initImporter initializes the bulk subscriber importer.
Expand Down Expand Up @@ -495,6 +504,45 @@ func initNotifTemplates(path string, fs stuffbin.FileSystem, i *i18n.I18n, cs *c
return tpl
}

// initBounceManager initializes the bounce manager that scans mailboxes and listens to webhooks
// for incoming bounce events.
func initBounceManager(app *App) *bounce.Manager {
opt := bounce.Opt{
BounceCount: ko.MustInt("bounce.count"),
BounceAction: ko.MustString("bounce.action"),
WebhooksEnabled: ko.Bool("bounce.webhooks_enabled"),
SESEnabled: ko.Bool("bounce.ses_enabled"),
SendgridEnabled: ko.Bool("bounce.sendgrid_enabled"),
SendgridKey: ko.String("bounce.sendgrid_key"),
}

// For now, only one mailbox is supported.
for _, b := range ko.Slices("bounce.mailboxes") {
if !b.Bool("enabled") {
continue
}

var boxOpt mailbox.Opt
if err := b.UnmarshalWithConf("", &boxOpt, koanf.UnmarshalConf{Tag: "json"}); err != nil {
lo.Fatalf("error reading bounce mailbox config: %v", err)
}

opt.MailboxType = b.String("type")
opt.MailboxEnabled = true
opt.Mailbox = boxOpt
break
}

b, err := bounce.New(opt, &bounce.Queries{
RecordQuery: app.queries.RecordBounce,
}, app.log)
if err != nil {
lo.Fatalf("error initializing bounce manager: %v", err)
}

return b
}

// initHTTPServer sets up and runs the app's main HTTP server and blocks forever.
func initHTTPServer(app *App) *echo.Echo {
// Initialize the HTTP server.
Expand Down
3 changes: 1 addition & 2 deletions cmd/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ func handleUpdateList(c echo.Context) error {
return handleGetLists(c)
}

// handleDeleteLists handles deletion deletion,
// either a single one (ID in the URI), or a list.
// handleDeleteLists handles list deletion, either a single one (ID in the URI), or a list.
func handleDeleteLists(c echo.Context) error {
var (
app = c.Get("app").(*App)
Expand Down
Loading

0 comments on commit 1be8c7d

Please sign in to comment.