Skip to content

Commit

Permalink
APIGOV-24414 - Updated harvester client to manage purged sequences (#606
Browse files Browse the repository at this point in the history
)

* APIGOV-24414 - Changes to process 410 response from harvester
- Updated sample watch client

* APIGOV-24414 - handle purged sequence error for poll mode
  • Loading branch information
vivekschauhan authored Jan 4, 2023
1 parent b18cf1e commit 56a28ef
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 20 deletions.
22 changes: 16 additions & 6 deletions pkg/agent/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e
return
}

if m.sequence.GetSequence() == 0 {
if m.sequence.GetSequence() < 0 {
m.onHarvesterErr()
go func() {
m.Stop()
Expand Down Expand Up @@ -101,19 +101,29 @@ func (m *pollExecutor) sync(topicSelfLink string, eventChan chan *proto.Event) e
}
}

func (m *pollExecutor) tick(topicSelfLink string, eventChan chan *proto.Event) error {
func (m *pollExecutor) tick(topicSelfLink string, eventChan chan *proto.Event) (ret error) {
sequence := m.sequence.GetSequence()
logger := m.logger.WithField("sequence-id", sequence)
logger.Debug("retrieving harvester events")

if _, err := m.harvester.ReceiveSyncEvents(topicSelfLink, sequence, eventChan); err != nil {
defer func() {
if ret == nil {
m.timer.Reset(m.interval)
}
}()

if lastSeqID, err := m.harvester.ReceiveSyncEvents(topicSelfLink, sequence, eventChan); err != nil {
if _, ok := err.(*harvester.ErrSeqGone); ok {
m.sequence.SetSequence(lastSeqID)
return
}

logger.WithError(err).Error("harvester returned an error when syncing events")
m.onHarvesterErr()
return err
ret = err
}

m.timer.Reset(m.interval)
return nil
return
}

func (m *pollExecutor) onHarvesterErr() {
Expand Down
27 changes: 26 additions & 1 deletion pkg/harvester/harvesterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ const (
defaultEventPageSize = 100
)

// ErrSeqGone - error for purged sequence
type ErrSeqGone struct {
}

func (e *ErrSeqGone) Error() string {
return "sequence purged"
}

// Harvest is an interface for retrieving harvester events
type Harvest interface {
EventCatchUp(link string, events chan *proto.Event) error
Expand Down Expand Up @@ -132,10 +140,22 @@ func (h *Client) ReceiveSyncEvents(topicSelfLink string, sequenceID int64, event
return lastID, err
}

if res.Code != http.StatusOK {
if res.Code != http.StatusOK && res.Code != http.StatusGone {
return lastID, fmt.Errorf("expected a 200 response but received %d", res.Code)
}

// requested sequence is purged get the current max sequence
if lastID == 0 && res.Code == http.StatusGone {
maxSeqId, ok := res.Headers["X-Axway-Max-Sequence-Id"]
if ok && len(maxSeqId) > 0 {
lastID, err = strconv.ParseInt(maxSeqId[0], 10, 64)
if err != nil {
return lastID, err
}
return lastID, &ErrSeqGone{}
}
}

pagedEvents := make([]*resourceEntryExternalEvent, 0)
err = json.Unmarshal(res.Body, &pagedEvents)
if err != nil {
Expand Down Expand Up @@ -189,6 +209,11 @@ func (h *Client) EventCatchUp(link string, events chan *proto.Event) error {
var err error
lastSequenceID, err := h.ReceiveSyncEvents(link, sequenceID, events)
if err != nil {
if _, ok := err.(*ErrSeqGone); ok {
// Set the max sequence returned from 410 to sequence provider as processed
h.Cfg.SequenceProvider.SetSequence(lastSequenceID)
return nil
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (m *watchManager) RegisterWatch(link string, events chan *proto.Event, erro
subscriptionID, _ := uuid.NewUUID()
subID := subscriptionID.String()

if m.options.sequence != nil && m.options.sequence.GetSequence() == 0 {
if m.options.sequence != nil && m.options.sequence.GetSequence() < 0 {
err := fmt.Errorf("do not have a sequence id, stopping watch manager")
m.logger.Error(err.Error())
m.CloseWatch(subID)
Expand Down
59 changes: 47 additions & 12 deletions samples/watchclient/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package client

import (
"crypto/tls"
"encoding/json"
"fmt"
"time"

"github.com/Axway/agent-sdk/pkg/apic/auth"
"github.com/Axway/agent-sdk/pkg/cache"
Expand All @@ -22,14 +25,15 @@ type WatchClient struct {
}

type sequenceManager struct {
seqCache cache.Cache
watchTopicName string
seqCache cache.Cache
}

func (s *sequenceManager) GetSequence() int64 {
cachedSeqID, err := s.seqCache.Get("watchSequenceID")
if err == nil {
if seqID, ok := cachedSeqID.(float64); ok {
if seqID, ok := cachedSeqID.(int64); ok {
return seqID
} else if seqID, ok := cachedSeqID.(float64); ok {
return int64(seqID)
}
}
Expand All @@ -38,50 +42,81 @@ func (s *sequenceManager) GetSequence() int64 {

func (s *sequenceManager) SetSequence(sequenceID int64) {
s.seqCache.Set("watchSequenceID", sequenceID)
s.seqCache.Save("sample.sequence")
}

// Todo - To be updated after cache persistence story
func getSequenceManager() *sequenceManager {
seqCache := cache.New()
err := seqCache.Load("sample.sequence")
if err != nil {
seqCache.Set("watchSequenceID", int64(0))
seqCache.Set("watchSequenceID", int64(1))
seqCache.Save("sample.sequence")
}

return &sequenceManager{seqCache: seqCache}
}

func defaultTLSConfig() *tls.Config {
return &tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
},
}
}

// NewWatchClient creates a WatchClient
func NewWatchClient(config *Config, logger logrus.FieldLogger) (*WatchClient, error) {
entry := logger.WithField("package", "client")

var watchOptions []wm.Option
watchOptions = append(watchOptions, wm.WithLogger(entry))
watchOptions = []wm.Option{
wm.WithLogger(entry),
}

if config.Insecure {
watchOptions = append(watchOptions, wm.WithTLSConfig(nil))
} else {
watchOptions = append(watchOptions, wm.WithTLSConfig(defaultTLSConfig()))
}
ta := auth.NewTokenAuth(config.Auth, config.TenantID)

ccfg := &corecfg.CentralConfiguration{
URL: config.Host,
ClientTimeout: 15,
URL: fmt.Sprintf("https://%s:%d", config.Host, config.Port),
ClientTimeout: 30 * time.Second,
ProxyURL: "",
TenantID: config.TenantID,
TLS: corecfg.NewTLSConfig(),
GRPCCfg: corecfg.GRPCConfig{
Enabled: true,
Insecure: config.Insecure,
Host: config.Host,
Port: int(config.Port),
},
Auth: &corecfg.AuthConfiguration{
URL: config.Auth.URL,
PrivateKey: config.Auth.PrivateKey,
PublicKey: config.Auth.PublicKey,
KeyPwd: config.Auth.KeyPassword,
Realm: "Broker",
ClientID: config.Auth.ClientID,
Timeout: config.Auth.Timeout,
},
}

ta := auth.NewPlatformTokenGetterWithCentralConfig(ccfg)
hCfg := harvester.NewConfig(ccfg, ta, getSequenceManager())
hClient := harvester.NewClient(hCfg)
watchOptions = append(watchOptions, wm.WithHarvester(hClient, getSequenceManager()))

cfg := &wm.Config{
Host: config.Host,
Port: config.Port,
Host: ccfg.GRPCCfg.Host,
Port: uint32(ccfg.GRPCCfg.Port),
TenantID: config.TenantID,
TokenGetter: ta.GetToken,
}
Expand Down

0 comments on commit 56a28ef

Please sign in to comment.