Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIGOV-24414 - Updated harvester client to manage purged sequences #606

Merged
merged 2 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *pollExecutor) RegisterWatch(eventChan chan *proto.Event, errChan chan e
return
}

if m.sequence.GetSequence() == 0 {
if m.sequence.GetSequence() < 0 {
jcollins-axway marked this conversation as resolved.
Show resolved Hide resolved
m.onHarvesterErr()
go func() {
m.Stop()
Expand Down
26 changes: 25 additions & 1 deletion pkg/harvester/harvesterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ const (
defaultEventPageSize = 100
)

type errSeqGone struct {
}

func (e *errSeqGone) Error() string {
return "sequence purged"
}

// Harvest is an interface for retrieving harvester events
type Harvest interface {
EventCatchUp(link string, events chan *proto.Event) error
Expand Down Expand Up @@ -132,10 +139,22 @@ func (h *Client) ReceiveSyncEvents(topicSelfLink string, sequenceID int64, event
return lastID, err
}

if res.Code != http.StatusOK {
if res.Code != http.StatusOK && res.Code != http.StatusGone {
return lastID, fmt.Errorf("expected a 200 response but received %d", res.Code)
}

// requested sequence is purged get the current max sequence
if lastID == 0 && res.Code == http.StatusGone {
maxSeqId, ok := res.Headers["X-Axway-Max-Sequence-Id"]
if ok && len(maxSeqId) > 0 {
lastID, err = strconv.ParseInt(maxSeqId[0], 10, 64)
if err != nil {
return lastID, err
}
return lastID, &errSeqGone{}
}
}

pagedEvents := make([]*resourceEntryExternalEvent, 0)
err = json.Unmarshal(res.Body, &pagedEvents)
if err != nil {
Expand Down Expand Up @@ -189,6 +208,11 @@ func (h *Client) EventCatchUp(link string, events chan *proto.Event) error {
var err error
lastSequenceID, err := h.ReceiveSyncEvents(link, sequenceID, events)
if err != nil {
if _, ok := err.(*errSeqGone); ok {
// Set the max sequence returned from 410 to sequence provider as processed
h.Cfg.SequenceProvider.SetSequence(lastSequenceID)
return nil
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (m *watchManager) RegisterWatch(link string, events chan *proto.Event, erro
subscriptionID, _ := uuid.NewUUID()
subID := subscriptionID.String()

if m.options.sequence != nil && m.options.sequence.GetSequence() == 0 {
if m.options.sequence != nil && m.options.sequence.GetSequence() < 0 {
err := fmt.Errorf("do not have a sequence id, stopping watch manager")
m.logger.Error(err.Error())
m.CloseWatch(subID)
Expand Down
59 changes: 47 additions & 12 deletions samples/watchclient/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package client

import (
"crypto/tls"
"encoding/json"
"fmt"
"time"

"github.com/Axway/agent-sdk/pkg/apic/auth"
"github.com/Axway/agent-sdk/pkg/cache"
Expand All @@ -22,14 +25,15 @@ type WatchClient struct {
}

type sequenceManager struct {
seqCache cache.Cache
watchTopicName string
seqCache cache.Cache
}

func (s *sequenceManager) GetSequence() int64 {
cachedSeqID, err := s.seqCache.Get("watchSequenceID")
if err == nil {
if seqID, ok := cachedSeqID.(float64); ok {
if seqID, ok := cachedSeqID.(int64); ok {
return seqID
} else if seqID, ok := cachedSeqID.(float64); ok {
return int64(seqID)
}
}
Expand All @@ -38,50 +42,81 @@ func (s *sequenceManager) GetSequence() int64 {

func (s *sequenceManager) SetSequence(sequenceID int64) {
s.seqCache.Set("watchSequenceID", sequenceID)
s.seqCache.Save("sample.sequence")
}

// Todo - To be updated after cache persistence story
func getSequenceManager() *sequenceManager {
seqCache := cache.New()
err := seqCache.Load("sample.sequence")
if err != nil {
seqCache.Set("watchSequenceID", int64(0))
seqCache.Set("watchSequenceID", int64(1))
seqCache.Save("sample.sequence")
}

return &sequenceManager{seqCache: seqCache}
}

func defaultTLSConfig() *tls.Config {
return &tls.Config{
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256,
},
}
}

// NewWatchClient creates a WatchClient
func NewWatchClient(config *Config, logger logrus.FieldLogger) (*WatchClient, error) {
entry := logger.WithField("package", "client")

var watchOptions []wm.Option
watchOptions = append(watchOptions, wm.WithLogger(entry))
watchOptions = []wm.Option{
wm.WithLogger(entry),
}

if config.Insecure {
watchOptions = append(watchOptions, wm.WithTLSConfig(nil))
} else {
watchOptions = append(watchOptions, wm.WithTLSConfig(defaultTLSConfig()))
}
ta := auth.NewTokenAuth(config.Auth, config.TenantID)

ccfg := &corecfg.CentralConfiguration{
URL: config.Host,
ClientTimeout: 15,
URL: fmt.Sprintf("https://%s:%d", config.Host, config.Port),
ClientTimeout: 30 * time.Second,
ProxyURL: "",
TenantID: config.TenantID,
TLS: corecfg.NewTLSConfig(),
GRPCCfg: corecfg.GRPCConfig{
Enabled: true,
Insecure: config.Insecure,
Host: config.Host,
Port: int(config.Port),
},
Auth: &corecfg.AuthConfiguration{
URL: config.Auth.URL,
PrivateKey: config.Auth.PrivateKey,
PublicKey: config.Auth.PublicKey,
KeyPwd: config.Auth.KeyPassword,
Realm: "Broker",
ClientID: config.Auth.ClientID,
Timeout: config.Auth.Timeout,
},
}

ta := auth.NewPlatformTokenGetterWithCentralConfig(ccfg)
hCfg := harvester.NewConfig(ccfg, ta, getSequenceManager())
hClient := harvester.NewClient(hCfg)
watchOptions = append(watchOptions, wm.WithHarvester(hClient, getSequenceManager()))

cfg := &wm.Config{
Host: config.Host,
Port: config.Port,
Host: ccfg.GRPCCfg.Host,
Port: uint32(ccfg.GRPCCfg.Port),
TenantID: config.TenantID,
TokenGetter: ta.GetToken,
}
Expand Down