From 3493072c0edb217dec9834323aaf9655e63c74c1 Mon Sep 17 00:00:00 2001 From: Vivek Singh Chauhan Date: Tue, 20 Dec 2022 14:21:52 -0700 Subject: [PATCH 1/2] APIGOV-24414 - Changes to process 410 response from harvester - Updated sample watch client --- pkg/agent/poller/poller.go | 2 +- pkg/harvester/harvesterclient.go | 26 ++++++++++- pkg/watchmanager/manager.go | 2 +- samples/watchclient/pkg/client/client.go | 59 +++++++++++++++++++----- 4 files changed, 74 insertions(+), 15 deletions(-) diff --git a/pkg/agent/poller/poller.go b/pkg/agent/poller/poller.go index 368b2a30b..354ba454b 100644 --- a/pkg/agent/poller/poller.go +++ b/pkg/agent/poller/poller.go @@ -61,7 +61,7 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e return } - if m.sequence.GetSequence() == 0 { + if m.sequence.GetSequence() < 0 { m.onHarvesterErr() go func() { m.Stop() diff --git a/pkg/harvester/harvesterclient.go b/pkg/harvester/harvesterclient.go index 9393e0f8f..f1bd4e717 100644 --- a/pkg/harvester/harvesterclient.go +++ b/pkg/harvester/harvesterclient.go @@ -23,6 +23,13 @@ const ( defaultEventPageSize = 100 ) +type errSeqGone struct { +} + +func (e *errSeqGone) Error() string { + return "sequence purged" +} + // Harvest is an interface for retrieving harvester events type Harvest interface { EventCatchUp(link string, events chan *proto.Event) error @@ -132,10 +139,22 @@ func (h *Client) ReceiveSyncEvents(topicSelfLink string, sequenceID int64, event return lastID, err } - if res.Code != http.StatusOK { + if res.Code != http.StatusOK && res.Code != http.StatusGone { return lastID, fmt.Errorf("expected a 200 response but received %d", res.Code) } + // requested sequence is purged get the current max sequence + if lastID == 0 && res.Code == http.StatusGone { + maxSeqId, ok := res.Headers["X-Axway-Max-Sequence-Id"] + if ok && len(maxSeqId) > 0 { + lastID, err = strconv.ParseInt(maxSeqId[0], 10, 64) + if err != nil { + return lastID, err + } + return lastID, &errSeqGone{} + } + } + pagedEvents := make([]*resourceEntryExternalEvent, 0) err = json.Unmarshal(res.Body, &pagedEvents) if err != nil { @@ -189,6 +208,11 @@ func (h *Client) EventCatchUp(link string, events chan *proto.Event) error { var err error lastSequenceID, err := h.ReceiveSyncEvents(link, sequenceID, events) if err != nil { + if _, ok := err.(*errSeqGone); ok { + // Set the max sequence returned from 410 to sequence provider as processed + h.Cfg.SequenceProvider.SetSequence(lastSequenceID) + return nil + } return err } diff --git a/pkg/watchmanager/manager.go b/pkg/watchmanager/manager.go index f6b250739..c07c4205b 100644 --- a/pkg/watchmanager/manager.go +++ b/pkg/watchmanager/manager.go @@ -148,7 +148,7 @@ func (m *watchManager) RegisterWatch(link string, events chan *proto.Event, erro subscriptionID, _ := uuid.NewUUID() subID := subscriptionID.String() - if m.options.sequence != nil && m.options.sequence.GetSequence() == 0 { + if m.options.sequence != nil && m.options.sequence.GetSequence() < 0 { err := fmt.Errorf("do not have a sequence id, stopping watch manager") m.logger.Error(err.Error()) m.CloseWatch(subID) diff --git a/samples/watchclient/pkg/client/client.go b/samples/watchclient/pkg/client/client.go index 056989db5..b6c4f15ee 100644 --- a/samples/watchclient/pkg/client/client.go +++ b/samples/watchclient/pkg/client/client.go @@ -1,7 +1,10 @@ package client import ( + "crypto/tls" "encoding/json" + "fmt" + "time" "github.com/Axway/agent-sdk/pkg/apic/auth" "github.com/Axway/agent-sdk/pkg/cache" @@ -22,14 +25,15 @@ type WatchClient struct { } type sequenceManager struct { - seqCache cache.Cache - watchTopicName string + seqCache cache.Cache } func (s *sequenceManager) GetSequence() int64 { cachedSeqID, err := s.seqCache.Get("watchSequenceID") if err == nil { - if seqID, ok := cachedSeqID.(float64); ok { + if seqID, ok := cachedSeqID.(int64); ok { + return seqID + } else if seqID, ok := cachedSeqID.(float64); ok { return int64(seqID) } } @@ -38,6 +42,7 @@ func (s *sequenceManager) GetSequence() int64 { func (s *sequenceManager) SetSequence(sequenceID int64) { s.seqCache.Set("watchSequenceID", sequenceID) + s.seqCache.Save("sample.sequence") } // Todo - To be updated after cache persistence story @@ -45,43 +50,73 @@ func getSequenceManager() *sequenceManager { seqCache := cache.New() err := seqCache.Load("sample.sequence") if err != nil { - seqCache.Set("watchSequenceID", int64(0)) + seqCache.Set("watchSequenceID", int64(1)) seqCache.Save("sample.sequence") } return &sequenceManager{seqCache: seqCache} } +func defaultTLSConfig() *tls.Config { + return &tls.Config{ + MinVersion: tls.VersionTLS12, + CipherSuites: []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256, + }, + } +} + // NewWatchClient creates a WatchClient func NewWatchClient(config *Config, logger logrus.FieldLogger) (*WatchClient, error) { entry := logger.WithField("package", "client") var watchOptions []wm.Option - watchOptions = append(watchOptions, wm.WithLogger(entry)) + watchOptions = []wm.Option{ + wm.WithLogger(entry), + } + if config.Insecure { watchOptions = append(watchOptions, wm.WithTLSConfig(nil)) + } else { + watchOptions = append(watchOptions, wm.WithTLSConfig(defaultTLSConfig())) } - ta := auth.NewTokenAuth(config.Auth, config.TenantID) ccfg := &corecfg.CentralConfiguration{ - URL: config.Host, - ClientTimeout: 15, + URL: fmt.Sprintf("https://%s:%d", config.Host, config.Port), + ClientTimeout: 30 * time.Second, ProxyURL: "", TenantID: config.TenantID, TLS: corecfg.NewTLSConfig(), GRPCCfg: corecfg.GRPCConfig{ Enabled: true, Insecure: config.Insecure, + Host: config.Host, + Port: int(config.Port), + }, + Auth: &corecfg.AuthConfiguration{ + URL: config.Auth.URL, + PrivateKey: config.Auth.PrivateKey, + PublicKey: config.Auth.PublicKey, + KeyPwd: config.Auth.KeyPassword, + Realm: "Broker", + ClientID: config.Auth.ClientID, + Timeout: config.Auth.Timeout, }, } - + ta := auth.NewPlatformTokenGetterWithCentralConfig(ccfg) hCfg := harvester.NewConfig(ccfg, ta, getSequenceManager()) hClient := harvester.NewClient(hCfg) watchOptions = append(watchOptions, wm.WithHarvester(hClient, getSequenceManager())) - cfg := &wm.Config{ - Host: config.Host, - Port: config.Port, + Host: ccfg.GRPCCfg.Host, + Port: uint32(ccfg.GRPCCfg.Port), TenantID: config.TenantID, TokenGetter: ta.GetToken, } From e69ea66f0555d3ec60d373ab1139942253e862b3 Mon Sep 17 00:00:00 2001 From: Vivek Singh Chauhan Date: Tue, 3 Jan 2023 14:10:39 -0700 Subject: [PATCH 2/2] APIGOV-24414 - handle purged sequence error for poll mode --- pkg/agent/poller/poller.go | 20 +++++++++++++++----- pkg/harvester/harvesterclient.go | 9 +++++---- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/agent/poller/poller.go b/pkg/agent/poller/poller.go index 354ba454b..56743be26 100644 --- a/pkg/agent/poller/poller.go +++ b/pkg/agent/poller/poller.go @@ -101,19 +101,29 @@ func (m *pollExecutor) sync(topicSelfLink string, eventChan chan *proto.Event) e } } -func (m *pollExecutor) tick(topicSelfLink string, eventChan chan *proto.Event) error { +func (m *pollExecutor) tick(topicSelfLink string, eventChan chan *proto.Event) (ret error) { sequence := m.sequence.GetSequence() logger := m.logger.WithField("sequence-id", sequence) logger.Debug("retrieving harvester events") - if _, err := m.harvester.ReceiveSyncEvents(topicSelfLink, sequence, eventChan); err != nil { + defer func() { + if ret == nil { + m.timer.Reset(m.interval) + } + }() + + if lastSeqID, err := m.harvester.ReceiveSyncEvents(topicSelfLink, sequence, eventChan); err != nil { + if _, ok := err.(*harvester.ErrSeqGone); ok { + m.sequence.SetSequence(lastSeqID) + return + } + logger.WithError(err).Error("harvester returned an error when syncing events") m.onHarvesterErr() - return err + ret = err } - m.timer.Reset(m.interval) - return nil + return } func (m *pollExecutor) onHarvesterErr() { diff --git a/pkg/harvester/harvesterclient.go b/pkg/harvester/harvesterclient.go index f1bd4e717..d35ed3c84 100644 --- a/pkg/harvester/harvesterclient.go +++ b/pkg/harvester/harvesterclient.go @@ -23,10 +23,11 @@ const ( defaultEventPageSize = 100 ) -type errSeqGone struct { +// ErrSeqGone - error for purged sequence +type ErrSeqGone struct { } -func (e *errSeqGone) Error() string { +func (e *ErrSeqGone) Error() string { return "sequence purged" } @@ -151,7 +152,7 @@ func (h *Client) ReceiveSyncEvents(topicSelfLink string, sequenceID int64, event if err != nil { return lastID, err } - return lastID, &errSeqGone{} + return lastID, &ErrSeqGone{} } } @@ -208,7 +209,7 @@ func (h *Client) EventCatchUp(link string, events chan *proto.Event) error { var err error lastSequenceID, err := h.ReceiveSyncEvents(link, sequenceID, events) if err != nil { - if _, ok := err.(*errSeqGone); ok { + if _, ok := err.(*ErrSeqGone); ok { // Set the max sequence returned from 410 to sequence provider as processed h.Cfg.SequenceProvider.SetSequence(lastSequenceID) return nil