Skip to content

Commit

Permalink
Chore: Add sendgrid webhook receiver to trigger workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Jan 8, 2025
1 parent 1a2e386 commit 4b99147
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 113 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/pterm/pterm v0.12.79
github.com/robfig/cron/v3 v3.0.1
github.com/rs/cors v1.11.1
github.com/sendgrid/sendgrid-go v3.16.0+incompatible
github.com/spf13/cobra v1.8.1
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible h1:i8eE6IMkiCy7vusSdacHHSBUpXyTcTXy/Rl9N9aZ/Qw=
github.com/sendgrid/sendgrid-go v3.16.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/authz/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var staticRules = map[string][]string{
"GET /api/app-oauth/refresh/{id}",
"GET /api/app-oauth/callback/{id}",
"GET /api/app-oauth/get-token",

"POST /webhook/sendgrid",
},
AuthenticatedGroup: {
"/api/oauth/redirect/{service}",
Expand Down
40 changes: 40 additions & 0 deletions pkg/api/handlers/sendgrid/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sendgrid

import (
"fmt"
"net/http"

"github.com/obot-platform/obot/apiclient/types"
"github.com/obot-platform/obot/pkg/api"
"github.com/obot-platform/obot/pkg/emailtrigger"
"github.com/sendgrid/sendgrid-go/helpers/inbound"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type InboundWebhookHandler struct {
emailTrigger *emailtrigger.EmailHandler
}

func NewInboundWebhookHandler(c kclient.Client, hostname string) *InboundWebhookHandler {
emailTrigger := emailtrigger.EmailTrigger(c, hostname)
return &InboundWebhookHandler{emailTrigger: emailTrigger}
}

func (h *InboundWebhookHandler) InboundWebhookHandler(req api.Context) error {
if req.Request.Method != http.MethodPost {
return types.NewErrHttp(http.StatusMethodNotAllowed, "Invalid request method")
}

inboundEmail, err := inbound.Parse(req.Request)
if err != nil {
return types.NewErrHttp(http.StatusBadRequest, fmt.Sprintf("Failed to parse inbound email: %v", err))
}

subject := inboundEmail.Headers["Subject"]
if err := h.emailTrigger.Handler(req.Context(), inboundEmail.Envelope.From, inboundEmail.Envelope.To, subject, []byte(inboundEmail.TextBody)); err != nil {
return types.NewErrHttp(http.StatusInternalServerError, fmt.Sprintf("Failed to handle inbound email: %v", err))
}

req.WriteHeader(http.StatusOK)
return nil
}
6 changes: 6 additions & 0 deletions pkg/api/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/obot-platform/obot/pkg/api/handlers"
"github.com/obot-platform/obot/pkg/api/handlers/sendgrid"
"github.com/obot-platform/obot/pkg/services"
"github.com/obot-platform/obot/ui"
)
Expand Down Expand Up @@ -31,6 +32,8 @@ func Router(services *services.Services) (http.Handler, error) {
version := handlers.NewVersionHandler(services.EmailServerName, services.SupportDocker)
tables := handlers.NewTableHandler(services.GPTClient)

sendgridWebhookHandler := sendgrid.NewInboundWebhookHandler(services.StorageClient, services.EmailServerName)

// Version
mux.HandleFunc("GET /api/version", version.GetVersion)

Expand Down Expand Up @@ -256,6 +259,9 @@ func Router(services *services.Services) (http.Handler, error) {
mux.HandleFunc("POST /api/webhooks/{id}/remove-token", webhooks.RemoveToken)
mux.HandleFunc("POST /api/webhooks/{namespace}/{id}", webhooks.Execute)

// Webhook for third party integration to trigger workflow
mux.HandleFunc("POST /webhook/sendgrid", sendgridWebhookHandler.InboundWebhookHandler)

// Email Receivers
mux.HandleFunc("POST /api/email-receivers", emailreceiver.Create)
mux.HandleFunc("GET /api/email-receivers", emailreceiver.List)
Expand Down
132 changes: 132 additions & 0 deletions pkg/emailtrigger/emailtrigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package emailtrigger

import (
"context"
"encoding/json"
"fmt"
"net/mail"
"path"
"strings"

"github.com/obot-platform/obot/logger"
"github.com/obot-platform/obot/pkg/alias"
v1 "github.com/obot-platform/obot/pkg/storage/apis/obot.obot.ai/v1"
"github.com/obot-platform/obot/pkg/system"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var log = logger.Package()

type EmailHandler struct {
c kclient.Client
hostname string
}

func EmailTrigger(c kclient.Client, hostname string) *EmailHandler {
return &EmailHandler{
c: c,
hostname: hostname,
}
}

func (h *EmailHandler) Handler(ctx context.Context, from string, to []string, subject string, data []byte) error {
for _, to := range to {
toAddr, err := mail.ParseAddress(to)
if err != nil {
return fmt.Errorf("parse to address: %w", err)
}

name, host, ok := strings.Cut(toAddr.Address, "@")
if !ok {
return fmt.Errorf("invalid to address: %s", toAddr.Address)
}

if host != h.hostname {
log.Infof("Skipping mail for %s: not for this host", toAddr.Address)
continue
}

name, ns, _ := strings.Cut(name, "+")
if ns == "" {
ns = system.DefaultNamespace
}

var emailReceiver v1.EmailReceiver
if err = alias.Get(ctx, h.c, &emailReceiver, ns, name); apierror.IsNotFound(err) {
log.Infof("Skipping mail for %s: no receiver found", toAddr.Address)
continue
} else if err != nil {
return fmt.Errorf("get email receiver: %w", err)
}

if !matches(from, emailReceiver) {
log.Infof("Skipping mail for %s: sender not allowed", toAddr.Address)
continue
}

if err = h.dispatchEmail(ctx, emailReceiver, string(data), from, to, subject); err != nil {
return fmt.Errorf("dispatch email: %w", err)
}
}

return nil
}

func (h *EmailHandler) dispatchEmail(ctx context.Context, email v1.EmailReceiver, body string, from, to, subject string) error {
var input struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}

input.Type = "email"
input.From = from
input.To = to
input.Subject = subject
input.Body = body

inputJSON, err := json.Marshal(input)
if err != nil {
return fmt.Errorf("marshal input: %w", err)
}

var workflow v1.Workflow
if err = alias.Get(ctx, h.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil {
return err
}

return h.c.Create(ctx, &v1.WorkflowExecution{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowExecutionPrefix,
Namespace: workflow.Namespace,
},
Spec: v1.WorkflowExecutionSpec{
WorkflowName: workflow.Name,
EmailReceiverName: email.Name,
ThreadName: workflow.Spec.ThreadName,
Input: string(inputJSON),
},
})
}

func matches(address string, email v1.EmailReceiver) bool {
if len(email.Spec.AllowedSenders) == 0 {
return true
}

for _, allowedSender := range email.Spec.AllowedSenders {
if allowedSender == address {
return true
}
matched, _ := path.Match(allowedSender, address)
if matched {
return true
}
}

return false
}
123 changes: 10 additions & 113 deletions pkg/smtp/smtp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,39 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net"
"net/mail"
"path"
"strings"

"github.com/mhale/smtpd"
"github.com/obot-platform/obot/logger"
"github.com/obot-platform/obot/pkg/alias"
v1 "github.com/obot-platform/obot/pkg/storage/apis/obot.obot.ai/v1"
"github.com/obot-platform/obot/pkg/system"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/obot-platform/obot/pkg/emailtrigger"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var log = logger.Package()

type Server struct {
s smtpd.Server
c kclient.Client
ctx context.Context
hostname string
s smtpd.Server
ctx context.Context
emailTrigger *emailtrigger.EmailHandler
}

func Start(ctx context.Context, c kclient.Client, hostname string) {
emailTrigger := emailtrigger.EmailTrigger(c, hostname)
s := Server{
s: smtpd.Server{
Addr: ":2525",
},
c: c,
ctx: ctx,
hostname: hostname,
ctx: ctx,
emailTrigger: emailTrigger,
}
s.s.Handler = s.handler
s.s.Handler = s.Handler
go func() {
err := s.s.ListenAndServe()
select {
Expand All @@ -54,7 +47,7 @@ func Start(ctx context.Context, c kclient.Client, hostname string) {
}()
}

func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) error {
func (s *Server) Handler(_ net.Addr, from string, to []string, data []byte) error {
log.Infof("New mail received from %s for %s: length=%d", from, to, len(data))

message, err := mail.ReadMessage(bytes.NewReader(data))
Expand All @@ -72,46 +65,7 @@ func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) erro
return fmt.Errorf("parse from address: %w", err)
}

for _, to := range to {
toAddr, err := mail.ParseAddress(to)
if err != nil {
return fmt.Errorf("parse to address: %w", err)
}

name, host, ok := strings.Cut(toAddr.Address, "@")
if !ok {
return fmt.Errorf("invalid to address: %s", toAddr.Address)
}

if host != s.hostname {
log.Infof("Skipping mail for %s: not for this host", toAddr.Address)
continue
}

name, ns, _ := strings.Cut(name, "+")
if ns == "" {
ns = system.DefaultNamespace
}

var emailReceiver v1.EmailReceiver
if err = alias.Get(s.ctx, s.c, &emailReceiver, ns, name); apierror.IsNotFound(err) {
log.Infof("Skipping mail for %s: no receiver found", toAddr.Address)
continue
} else if err != nil {
return fmt.Errorf("get email receiver: %w", err)
}

if !matches(fromAddress.Address, emailReceiver) {
log.Infof("Skipping mail for %s: sender not allowed", toAddr.Address)
continue
}

if err = s.dispatchEmail(emailReceiver, body, message, from, to); err != nil {
return fmt.Errorf("dispatch email: %w", err)
}
}

return err
return s.emailTrigger.Handler(s.ctx, fromAddress.Address, to, message.Header.Get("Subject"), []byte(body))
}

func getBody(message *mail.Message) (string, error) {
Expand Down Expand Up @@ -170,60 +124,3 @@ func getBody(message *mail.Message) (string, error) {

return "", fmt.Errorf("failed to find text/plain body: %s", mediaType)
}

func (s *Server) dispatchEmail(email v1.EmailReceiver, body string, message *mail.Message, from, to string) error {
var input struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}

input.Type = "email"
input.From = from
input.To = to
input.Subject = message.Header.Get("Subject")
input.Body = body

inputJSON, err := json.Marshal(input)
if err != nil {
return fmt.Errorf("marshal input: %w", err)
}

var workflow v1.Workflow
if err = alias.Get(s.ctx, s.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil {
return err
}

return s.c.Create(s.ctx, &v1.WorkflowExecution{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowExecutionPrefix,
Namespace: workflow.Namespace,
},
Spec: v1.WorkflowExecutionSpec{
WorkflowName: workflow.Name,
EmailReceiverName: email.Name,
ThreadName: workflow.Spec.ThreadName,
Input: string(inputJSON),
},
})
}

func matches(address string, email v1.EmailReceiver) bool {
if len(email.Spec.AllowedSenders) == 0 {
return true
}

for _, allowedSender := range email.Spec.AllowedSenders {
if allowedSender == address {
return true
}
matched, _ := path.Match(allowedSender, address)
if matched {
return true
}
}

return false
}

0 comments on commit 4b99147

Please sign in to comment.