From f4f6aabdd5495e2a0832441d7cca7b2fb28d7886 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Dastidar Date: Sat, 23 Apr 2022 10:06:51 +0530 Subject: [PATCH 1/2] fix: fixed stuck token metadata updates Signed-off-by: Soumya Ghosh Dastidar --- cmd/metadata/service/token.go | 32 ++++++++++++++++++++++++++++++++ go.sum | 4 ---- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/cmd/metadata/service/token.go b/cmd/metadata/service/token.go index 2aa3f8a..a4ee5b5 100644 --- a/cmd/metadata/service/token.go +++ b/cmd/metadata/service/token.go @@ -149,6 +149,9 @@ func (s *TokenService) manager(ctx context.Context) { func (s *TokenService) saver(ctx context.Context) { defer s.wg.Done() + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + tokens := make([]*models.TokenMetadata, 0) for { select { @@ -167,6 +170,35 @@ func (s *TokenService) saver(ctx context.Context) { continue } + for i := range tokens { + s.queue.Delete(tokens[i].ID) + + if s.prom != nil { + switch tokens[i].Status { + case models.StatusApplied, models.StatusFailed: + s.prom.DecGaugeValue("metadata_new", map[string]string{ + "network": s.network, + "type": "token", + }) + s.prom.IncrementCounter("metadata_counter", map[string]string{ + "network": s.network, + "type": "token", + "status": tokens[i].Status.String(), + }) + } + } + } + tokens = nil + + case <-ticker.C: + if len(tokens) == 0 { + continue + } + if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { + log.Err(err).Msg("UpdateTokenMetadata") + continue + } + for i := range tokens { s.queue.Delete(tokens[i].ID) diff --git a/go.sum b/go.sum index 9088120..b5d2e98 100644 --- a/go.sum +++ b/go.sum @@ -89,8 +89,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= -github.com/dipdup-net/go-lib v0.1.56 h1:Jr0+lDCaOc9Y9XfKoi1F22peVaB1mZe5XsIJKg5mUHk= -github.com/dipdup-net/go-lib v0.1.56/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44= github.com/dipdup-net/go-lib v0.2.0 h1:RGreU/ydxeaU5Eyad1g1NF44uMpgqVR6kgmY+2Imvg0= github.com/dipdup-net/go-lib v0.2.0/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44= github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c= @@ -215,7 +213,6 @@ github.com/ipfs/go-ipfs-files v0.0.9/go.mod h1:aFv2uQ/qxWpL/6lidWvnSQmaVqCrf0TBG github.com/ipfs/go-ipfs-files v0.1.1 h1:/MbEowmpLo9PJTEQk16m9rKzUHjeP4KRU9nWJyJO324= github.com/ipfs/go-ipfs-files v0.1.1/go.mod h1:8xkIrMWH+Y5P7HvJ4Yc5XWwIW2e52dyXUiC0tZyjDbM= github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= -github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -237,7 +234,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= From 1c548ae496f2ac482bda047f3b0d2eb3bf91d4d0 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Dastidar Date: Mon, 25 Apr 2022 19:40:31 +0530 Subject: [PATCH 2/2] refactor: moved common code to separate func Signed-off-by: Soumya Ghosh Dastidar --- cmd/metadata/service/token.go | 66 +++++++++++++++-------------------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/cmd/metadata/service/token.go b/cmd/metadata/service/token.go index a4ee5b5..9222fea 100644 --- a/cmd/metadata/service/token.go +++ b/cmd/metadata/service/token.go @@ -165,63 +165,53 @@ func (s *TokenService) saver(ctx context.Context) { continue } - if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { + if err := s.bulkSaveTokens(ctx, tokens); err != nil { log.Err(err).Msg("UpdateTokenMetadata") continue } - for i := range tokens { - s.queue.Delete(tokens[i].ID) - - if s.prom != nil { - switch tokens[i].Status { - case models.StatusApplied, models.StatusFailed: - s.prom.DecGaugeValue("metadata_new", map[string]string{ - "network": s.network, - "type": "token", - }) - s.prom.IncrementCounter("metadata_counter", map[string]string{ - "network": s.network, - "type": "token", - "status": tokens[i].Status.String(), - }) - } - } - } tokens = nil case <-ticker.C: if len(tokens) == 0 { continue } - if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { + if err := s.bulkSaveTokens(ctx, tokens); err != nil { log.Err(err).Msg("UpdateTokenMetadata") continue } - for i := range tokens { - s.queue.Delete(tokens[i].ID) - - if s.prom != nil { - switch tokens[i].Status { - case models.StatusApplied, models.StatusFailed: - s.prom.DecGaugeValue("metadata_new", map[string]string{ - "network": s.network, - "type": "token", - }) - s.prom.IncrementCounter("metadata_counter", map[string]string{ - "network": s.network, - "type": "token", - "status": tokens[i].Status.String(), - }) - } - } - } tokens = nil } } } +func (s *TokenService) bulkSaveTokens(ctx context.Context, tokens []*models.TokenMetadata) error { + if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { + return err + } + + for i := range tokens { + s.queue.Delete(tokens[i].ID) + + if s.prom != nil { + switch tokens[i].Status { + case models.StatusApplied, models.StatusFailed: + s.prom.DecGaugeValue("metadata_new", map[string]string{ + "network": s.network, + "type": "token", + }) + s.prom.IncrementCounter("metadata_counter", map[string]string{ + "network": s.network, + "type": "token", + "status": tokens[i].Status.String(), + }) + } + } + } + return nil +} + func (s *TokenService) worker(ctx context.Context) { defer s.wg.Done()