Skip to content

Commit

Permalink
Merge pull request #18 from milanbhagwat/master
Browse files Browse the repository at this point in the history
Adding support for Synch/Asynch Processing Mode for Trigger
  • Loading branch information
vijaynalawade authored Jan 9, 2021
2 parents 3ac935b + da3a20b commit 6c144bf
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pulsar/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type Settings struct {
AthenzAuthentication map[string]string `md:"athenzAuth"`
JWT string `md:"jwt"`
AllowInsecure bool `md:"allowInsecure"`
ConnectionTimeout int8 `md:connTimeout`
OperationTimeout int8 `md:opTimeout`
ConnectionTimeout int `md:"connTimeout"`
OperationTimeout int `md:"opTimeout"`
}

type PulsarConnection struct {
Expand Down
6 changes: 3 additions & 3 deletions pulsar/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ module github.com/project-flogo/messaging-contrib/pulsar

go 1.12

require (
require (
github.com/apache/pulsar-client-go v0.3.0
github.com/apache/pulsar/pulsar-function-go v0.0.0-20200819065839-6cae4afac3c3
github.com/project-flogo/core v1.2.0
github.com/apache/pulsar/pulsar-function-go v0.0.0-20200819065839-6cae4afac3c3
github.com/project-flogo/core v1.2.1-0.20210105194409-2dabde0292c7
)
36 changes: 26 additions & 10 deletions pulsar/trigger/subscriber/descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
"name": "topic",
"type": "string"
}

],
"handler": {
"settings": [
Expand All @@ -38,35 +37,52 @@
"name": "subscriptionName",
"type": "string",
"required": true,
"value":""
"value": ""
},
{
"name": "subscriptionType",
"type": "string",
"required": true,
"allowed":["Exclusive","Shared","Failover","KeyShared"],
"value":"Shared"
"allowed": [
"Exclusive",
"Shared",
"Failover",
"KeyShared"
],
"value": "Shared"
},
{
"name": "processingMode",
"type": "string",
"required": true,
"allowed": [
"Sync",
"Async"
],
"value": "Sync"
},
{
"name": "initialPosition",
"type": "string",
"required": true,
"allowed":["Latest","Earliest"],
"value":"Latest"
"allowed": [
"Latest",
"Earliest"
],
"value": "Latest"
},
{
"name": "dlqTopic",
"type": "string",
"required": false,
"value":""
"value": ""
},
{
"name": "dlqMaxDeliveries",
"type": "integer",
"required": false,
"value":3
"value": 3
}

]
}
}
}
1 change: 1 addition & 0 deletions pulsar/trigger/subscriber/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type HandlerSettings struct {
Topic string `md:"topic,required"`
Subscription string `md:"subscriptionName,required"`
SubscriptionType string `md:"subscriptionType"`
ProcessingMode string `md:"processingMode"`
InitialPosition string `md:"initialPosition"`
DLQMaxDeliveries int `md:"dlqMaxDeliveries"`
DLQTopic string `md:"dlqTopic"`
Expand Down
53 changes: 46 additions & 7 deletions pulsar/trigger/subscriber/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package subscriber
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/engine"
"github.com/project-flogo/core/support/log"
"github.com/project-flogo/core/trigger"
)

const (
ProcessingModeAsync = "Async"
)

var triggerMd = trigger.NewMetadata(&Settings{}, &HandlerSettings{}, &Output{})

func init() {
Expand All @@ -24,9 +30,12 @@ type Trigger struct {
logger log.Logger
}
type Handler struct {
handler trigger.Handler
consumer pulsar.Consumer
done chan bool
handler trigger.Handler
consumer pulsar.Consumer
done chan bool
asyncMode bool
maxMsgCount, currentMsgCount int
wg sync.WaitGroup
}

type Factory struct {
Expand Down Expand Up @@ -99,12 +108,24 @@ func (t *Trigger) Initialize(ctx trigger.InitContext) error {
if err != nil {
return err
}
t.handlers = append(t.handlers, &Handler{handler: handler, consumer: consumer, done: make(chan bool)})
tHandler := &Handler{handler: handler, consumer: consumer, done: make(chan bool)}
tHandler.asyncMode = s.ProcessingMode == ProcessingModeAsync
tHandler.maxMsgCount = getMaxMessageCount()
tHandler.wg = sync.WaitGroup{}
t.handlers = append(t.handlers, tHandler)
}

return nil
}

func getMaxMessageCount() int {
if engine.GetRunnerType() == engine.ValueRunnerTypePooled {
return engine.GetRunnerWorkers()
}
// For DIRECT mode
return 200
}

// Start implements util.Managed.Start
func (t *Trigger) Start() error {
t.logger.Info("Starting Trigger")
Expand Down Expand Up @@ -157,14 +178,33 @@ func (handler *Handler) consume() {
}
// Handle messages concurrently on separate goroutine
// go handler.handleMessage(msg)
handler.handleMessage(msg)
if handler.asyncMode {
handler.wg.Add(1)
handler.currentMsgCount++
go handler.handleMessage(msg)
if handler.currentMsgCount >= handler.maxMsgCount {
handler.handler.Logger().Infof("Total messages received are equal or more than maximum threshold [%d]. Blocking message handler.", handler.maxMsgCount)
handler.wg.Wait()
// reset count
handler.currentMsgCount = 0
handler.handler.Logger().Info("All received messages are processed. Unblocking message handler.")
}
} else {
handler.handleMessage(msg)
}
case <-handler.done:
return
}
}
}

func (handler *Handler) handleMessage(msg pulsar.ConsumerMessage) {
defer func() {
if handler.asyncMode {
handler.wg.Done()
handler.currentMsgCount--
}
}()
handler.handler.Logger().Debugf("Message received - %s", msg.ID())
out := &Output{}
if handler.handler.Settings()["format"] != nil &&
Expand All @@ -175,9 +215,8 @@ func (handler *Handler) handleMessage(msg pulsar.ConsumerMessage) {
handler.handler.Logger().Errorf("Pulsar consumer, configured to receive JSON formatted messages, was unable to parse message: [%v]", msg.Payload())
handler.consumer.Nack(msg)
return
} else {
out.Payload = obj
}
out.Payload = obj
} else {
out.Payload = string(msg.Payload())
}
Expand Down

0 comments on commit 6c144bf

Please sign in to comment.