From 3c7a63b4400bcfc35e6a42fb5cbd802fcbdbf344 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Mon, 5 Aug 2024 16:30:23 -0400 Subject: [PATCH 01/10] rm global cache --- .../streaming/full_node_streaming_manager.go | 102 +++++++++++------- 1 file changed, 63 insertions(+), 39 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 1d035ed6b3..e712d63fff 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -29,9 +29,12 @@ type FullNodeStreamingManagerImpl struct { ticker *time.Ticker done chan bool - // map of clob pair id to stream updates. - streamUpdateCache map[uint32][]clobtypes.StreamUpdate - numUpdatesInCache uint32 + // list of stream updates. + streamUpdateCache []clobtypes.StreamUpdate + // list of subscription ids for each stream update. + streamUpdateSubscriptionCache [][]uint32 + clobPairIdToSubscriptionIdMapping map[uint32][]uint32 + numUpdatesInCache uint32 maxUpdatesInCache uint32 maxSubscriptionChannelSize uint32 @@ -66,10 +69,12 @@ func NewFullNodeStreamingManager( orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), nextSubscriptionId: 0, - ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), - done: make(chan bool), - streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate), - numUpdatesInCache: 0, + ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), + done: make(chan bool), + streamUpdateCache: make([]clobtypes.StreamUpdate, 0), + streamUpdateSubscriptionCache: make([][]uint32, 0), + clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32), + numUpdatesInCache: 0, maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, @@ -134,6 +139,18 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( messageSender: messageSender, updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), } + for _, clobPairId := range clobPairIds { + // if clobPairId exists in the map, append the subscription id to the slice + // otherwise, create a new slice with the subscription id + if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; ok { + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + sm.nextSubscriptionId, + ) + } else { + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{sm.nextSubscriptionId} + } + } sm.logger.Info( fmt.Sprintf( @@ -295,27 +312,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + updatesByClobPairId := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for clobPairId, update := range updates { v1updates, err := streaming_util.GetOffchainUpdatesV1(update) if err != nil { panic(err) } - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ - { - UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ - OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: v1updates, - Snapshot: false, - }, + streamUpdate := clobtypes.StreamUpdate{ + UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ + OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ + Updates: v1updates, + Snapshot: false, }, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), }, + BlockHeight: blockHeight, + ExecMode: uint32(execMode), } + updatesByClobPairId = append(updatesByClobPairId, streamUpdate) + clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates))) + sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(updates))) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -333,7 +351,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( ) // Group fills by clob pair id. - updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) + updatesByClobPairId := make([]clobtypes.StreamUpdate, 0) + clobPairIds := make([]uint32, 0) for _, orderbookFill := range orderbookFills { // If this is a deleveraging fill, fetch the clob pair id from the deleveraged // perpetual id. @@ -346,9 +365,6 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( } else { clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId } - if _, ok := updatesByClobPairId[clobPairId]; !ok { - updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{} - } streamUpdate := clobtypes.StreamUpdate{ UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ OrderFill: &orderbookFill, @@ -356,14 +372,16 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) + updatesByClobPairId = append(updatesByClobPairId, streamUpdate) + clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills))) + sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(orderbookFills))) } func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( - updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + updates []clobtypes.StreamUpdate, + clobPairIds []uint32, numUpdatesToAdd uint32, ) { sm.Lock() @@ -374,8 +392,9 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( float32(numUpdatesToAdd), ) - for clobPairId, streamUpdates := range updatesByClobPairId { - sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...) + sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) + for _, clobPairId := range clobPairIds { + sm.streamUpdateSubscriptionCache = append(sm.streamUpdateSubscriptionCache, sm.clobPairIdToSubscriptionIdMapping[clobPairId]) } sm.numUpdatesInCache += numUpdatesToAdd @@ -398,8 +417,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() { sm.FlushStreamUpdatesWithLock() } -// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers. -// Note this method requires the lock and assumes that the lock has already been +// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs, +// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been // acquired by the caller. func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { defer metrics.ModuleMeasureSince( @@ -408,24 +427,28 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { time.Now(), ) - // Non-blocking send updates through subscriber's buffered channel. - // If the buffer is full, drop the subscription. + // Map to collect updates for each subscription. + subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate) idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0) - for _, clobPairId := range subscription.clobPairIds { - if update, ok := sm.streamUpdateCache[clobPairId]; ok { - streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...) - } + + // Collect updates for each subscription. + for i, update := range sm.streamUpdateCache { + subscriptionIds := sm.streamUpdateSubscriptionCache[i] + for _, id := range subscriptionIds { + subscriptionUpdates[id] = append(subscriptionUpdates[id], update) } + } - if len(streamUpdatesForSubscription) > 0 { + // Non-blocking send updates through subscriber's buffered channel. + // If the buffer is full, drop the subscription. + for id, updates := range subscriptionUpdates { + if subscription, ok := sm.orderbookSubscriptions[id]; ok { metrics.IncrCounter( metrics.GrpcAddToSubscriptionChannelCount, 1, ) select { - case subscription.updatesChannel <- streamUpdatesForSubscription: + case subscription.updatesChannel <- updates: default: idsToRemove = append(idsToRemove, id) } @@ -433,6 +456,7 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { } clear(sm.streamUpdateCache) + clear(sm.streamUpdateSubscriptionCache) sm.numUpdatesInCache = 0 for _, id := range idsToRemove { From af79f20e054f10ccdbd36be36d7cf53bc19a06e0 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Mon, 5 Aug 2024 16:42:00 -0400 Subject: [PATCH 02/10] dev5 --- .github/workflows/protocol-build-and-push.yml | 180 ------------------ 1 file changed, 180 deletions(-) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index fc26dd6e69..119c084695 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -8,150 +8,6 @@ on: # yamllint disable-line rule:truthy - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x jobs: - build-and-push-dev: - runs-on: ubuntu-latest - defaults: - run: - working-directory: ./protocol - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: '0' # without this, ignite fails. - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV }} - aws-region: us-east-2 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, Tag, and Push the Image to Amazon ECR - id: build-image - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: dev-validator - run: | - make localnet-build-amd64 - commit_hash=$(git rev-parse --short=7 HEAD) - docker build \ - --platform amd64 \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ - -f testing/testnet-dev/Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags - - build-and-push-dev2: - runs-on: ubuntu-latest - defaults: - run: - working-directory: ./protocol - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: '0' # without this, ignite fails. - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV2 }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV2 }} - aws-region: us-east-2 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, Tag, and Push the Image to Amazon ECR - id: build-image - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: dev2-validator - run: | - make localnet-build-amd64 - commit_hash=$(git rev-parse --short=7 HEAD) - docker build \ - --platform amd64 \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ - -f testing/testnet-dev/Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags - - build-and-push-dev3: - runs-on: ubuntu-latest - defaults: - run: - working-directory: ./protocol - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: '0' # without this, ignite fails. - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV3 }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV3 }} - aws-region: us-east-2 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, Tag, and Push the Image to Amazon ECR - id: build-image - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: dev3-validator - run: | - make localnet-build-amd64 - commit_hash=$(git rev-parse --short=7 HEAD) - docker build \ - --platform amd64 \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ - -f testing/testnet-dev/Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags - - build-and-push-dev4: - runs-on: ubuntu-latest - defaults: - run: - working-directory: ./protocol - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: '0' # without this, ignite fails. - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV4 }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV4 }} - aws-region: us-east-2 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, Tag, and Push the Image to Amazon ECR - id: build-image - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: dev4-validator - run: | - make localnet-build-amd64 - commit_hash=$(git rev-parse --short=7 HEAD) - docker build \ - --platform amd64 \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ - -f testing/testnet-dev/Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags - build-and-push-dev5: runs-on: ubuntu-latest defaults: @@ -187,39 +43,3 @@ jobs: -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ -f testing/testnet-dev/Dockerfile . docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags - - build-and-push-staging: - runs-on: ubuntu-latest - defaults: - run: - working-directory: ./protocol - steps: - - name: Checkout - uses: actions/checkout@v3 - with: - fetch-depth: '0' # without this, ignite fails. - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v1 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_STAGING }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_STAGING }} - aws-region: us-east-2 - - - name: Login to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v1 - - - name: Build, Tag, and Push the Image to Amazon ECR - id: build-image - env: - ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - ECR_REPOSITORY: staging-validator - run: | - make localnet-build-amd64 - commit_hash=$(git rev-parse --short=7 HEAD) - docker build \ - --platform amd64 \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ - -f testing/testnet-staging/Dockerfile . - docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags From 96cb02276a8bc3f7e2125c3a2347da9fce706949 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Mon, 5 Aug 2024 16:48:15 -0400 Subject: [PATCH 03/10] add branch --- .github/workflows/protocol-build-and-push.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index 119c084695..919654e7ae 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -4,6 +4,7 @@ on: # yamllint disable-line rule:truthy push: branches: - main + - 'wl/rm_global_cache' - 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x From 57e54fae6669488f706d46eca232de6dbf1b316d Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Aug 2024 11:28:59 -0400 Subject: [PATCH 04/10] dev2 --- .github/workflows/protocol-build-and-push.yml | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index 919654e7ae..2c2c5bdef8 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -9,6 +9,42 @@ on: # yamllint disable-line rule:truthy - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x jobs: + build-and-push-dev2: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./protocol + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: '0' # without this, ignite fails. + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV2 }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV2 }} + aws-region: us-east-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, Tag, and Push the Image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: dev2-validator + run: | + make localnet-build-amd64 + commit_hash=$(git rev-parse --short=7 HEAD) + docker build \ + --platform amd64 \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ + -f testing/testnet-dev/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + build-and-push-dev5: runs-on: ubuntu-latest defaults: From ce156190b9d7629f8df7abc87d33f89e52402def Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Aug 2024 12:14:35 -0400 Subject: [PATCH 05/10] lint --- .../streaming/full_node_streaming_manager.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index e712d63fff..f44ccfbdd0 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -142,14 +142,13 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( for _, clobPairId := range clobPairIds { // if clobPairId exists in the map, append the subscription id to the slice // otherwise, create a new slice with the subscription id - if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; ok { - sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( - sm.clobPairIdToSubscriptionIdMapping[clobPairId], - sm.nextSubscriptionId, - ) - } else { - sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{sm.nextSubscriptionId} + if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; !ok { + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{} } + sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + sm.nextSubscriptionId, + ) } sm.logger.Info( @@ -394,7 +393,10 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( sm.streamUpdateCache = append(sm.streamUpdateCache, updates...) for _, clobPairId := range clobPairIds { - sm.streamUpdateSubscriptionCache = append(sm.streamUpdateSubscriptionCache, sm.clobPairIdToSubscriptionIdMapping[clobPairId]) + sm.streamUpdateSubscriptionCache = append( + sm.streamUpdateSubscriptionCache, + sm.clobPairIdToSubscriptionIdMapping[clobPairId], + ) } sm.numUpdatesInCache += numUpdatesToAdd From e6fcbf521055ab696df4f94095f72977f588f8a7 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Aug 2024 12:15:03 -0400 Subject: [PATCH 06/10] revert ghwf --- .github/workflows/protocol-build-and-push.yml | 145 +++++++++++++++++- 1 file changed, 144 insertions(+), 1 deletion(-) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index 2c2c5bdef8..fc26dd6e69 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -4,11 +4,46 @@ on: # yamllint disable-line rule:truthy push: branches: - main - - 'wl/rm_global_cache' - 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x jobs: + build-and-push-dev: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./protocol + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: '0' # without this, ignite fails. + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV }} + aws-region: us-east-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, Tag, and Push the Image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: dev-validator + run: | + make localnet-build-amd64 + commit_hash=$(git rev-parse --short=7 HEAD) + docker build \ + --platform amd64 \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ + -f testing/testnet-dev/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + build-and-push-dev2: runs-on: ubuntu-latest defaults: @@ -45,6 +80,78 @@ jobs: -f testing/testnet-dev/Dockerfile . docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + build-and-push-dev3: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./protocol + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: '0' # without this, ignite fails. + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV3 }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV3 }} + aws-region: us-east-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, Tag, and Push the Image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: dev3-validator + run: | + make localnet-build-amd64 + commit_hash=$(git rev-parse --short=7 HEAD) + docker build \ + --platform amd64 \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ + -f testing/testnet-dev/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + + build-and-push-dev4: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./protocol + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: '0' # without this, ignite fails. + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV4 }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV4 }} + aws-region: us-east-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, Tag, and Push the Image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: dev4-validator + run: | + make localnet-build-amd64 + commit_hash=$(git rev-parse --short=7 HEAD) + docker build \ + --platform amd64 \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ + -f testing/testnet-dev/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + build-and-push-dev5: runs-on: ubuntu-latest defaults: @@ -80,3 +187,39 @@ jobs: -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ -f testing/testnet-dev/Dockerfile . docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags + + build-and-push-staging: + runs-on: ubuntu-latest + defaults: + run: + working-directory: ./protocol + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: '0' # without this, ignite fails. + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_STAGING }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_STAGING }} + aws-region: us-east-2 + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + + - name: Build, Tag, and Push the Image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: staging-validator + run: | + make localnet-build-amd64 + commit_hash=$(git rev-parse --short=7 HEAD) + docker build \ + --platform amd64 \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \ + -f testing/testnet-staging/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags From 8d333b8bed0429016c09e3c3893720515789e762 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Aug 2024 14:23:32 -0400 Subject: [PATCH 07/10] address cmts --- .../streaming/full_node_streaming_manager.go | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index f44ccfbdd0..57ea542e38 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -32,9 +32,9 @@ type FullNodeStreamingManagerImpl struct { // list of stream updates. streamUpdateCache []clobtypes.StreamUpdate // list of subscription ids for each stream update. - streamUpdateSubscriptionCache [][]uint32 + streamUpdateSubscriptionCache [][]uint32 + // map from clob pair id to subscription ids. clobPairIdToSubscriptionIdMapping map[uint32][]uint32 - numUpdatesInCache uint32 maxUpdatesInCache uint32 maxSubscriptionChannelSize uint32 @@ -74,7 +74,6 @@ func NewFullNodeStreamingManager( streamUpdateCache: make([]clobtypes.StreamUpdate, 0), streamUpdateSubscriptionCache: make([][]uint32, 0), clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32), - numUpdatesInCache: 0, maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, @@ -106,7 +105,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { metrics.SetGauge( metrics.GrpcStreamNumUpdatesBuffered, - float32(sm.numUpdatesInCache), + float32(len(sm.streamUpdateCache)), ) metrics.SetGauge( metrics.GrpcStreamSubscriberCount, @@ -210,6 +209,22 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription( } close(subscription.updatesChannel) delete(sm.orderbookSubscriptions, subscriptionIdToRemove) + + // Iterate over the clobPairIdToSubscriptionIdMapping to remove the subscriptionIdToRemove + for pairId, subscriptionIds := range sm.clobPairIdToSubscriptionIdMapping { + for i, id := range subscriptionIds { + if id == subscriptionIdToRemove { + // Remove the subscription ID from the slice + sm.clobPairIdToSubscriptionIdMapping[pairId] = append(subscriptionIds[:i], subscriptionIds[i+1:]...) + break + } + } + // If the list is empty after removal, delete the key from the map + if len(sm.clobPairIdToSubscriptionIdMapping[pairId]) == 0 { + delete(sm.clobPairIdToSubscriptionIdMapping, pairId) + } + } + sm.logger.Info( fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove), ) @@ -311,7 +326,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( } // Unmarshal each per-clob pair message to v1 updates. - updatesByClobPairId := make([]clobtypes.StreamUpdate, 0) + streamUpdates := make([]clobtypes.StreamUpdate, 0) clobPairIds := make([]uint32, 0) for clobPairId, update := range updates { v1updates, err := streaming_util.GetOffchainUpdatesV1(update) @@ -328,11 +343,11 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - updatesByClobPairId = append(updatesByClobPairId, streamUpdate) + streamUpdates = append(streamUpdates, streamUpdate) clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(updates))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates))) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -350,7 +365,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( ) // Group fills by clob pair id. - updatesByClobPairId := make([]clobtypes.StreamUpdate, 0) + streamUpdates := make([]clobtypes.StreamUpdate, 0) clobPairIds := make([]uint32, 0) for _, orderbookFill := range orderbookFills { // If this is a deleveraging fill, fetch the clob pair id from the deleveraged @@ -371,11 +386,11 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates( BlockHeight: blockHeight, ExecMode: uint32(execMode), } - updatesByClobPairId = append(updatesByClobPairId, streamUpdate) + streamUpdates = append(streamUpdates, streamUpdate) clobPairIds = append(clobPairIds, clobPairId) } - sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(orderbookFills))) + sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills))) } func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( @@ -398,17 +413,15 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache( sm.clobPairIdToSubscriptionIdMapping[clobPairId], ) } - sm.numUpdatesInCache += numUpdatesToAdd // Remove all subscriptions and wipe the buffer if buffer overflows. - if sm.numUpdatesInCache > sm.maxUpdatesInCache { + if len(sm.streamUpdateCache) > int(sm.maxUpdatesInCache) { sm.logger.Error("Streaming buffer full capacity. Dropping messages and all subscriptions. " + "Disconnect all clients and increase buffer size via the grpc-stream-buffer-size flag.") for id := range sm.orderbookSubscriptions { sm.removeSubscription(id) } clear(sm.streamUpdateCache) - sm.numUpdatesInCache = 0 } sm.EmitMetrics() } @@ -459,7 +472,6 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { clear(sm.streamUpdateCache) clear(sm.streamUpdateSubscriptionCache) - sm.numUpdatesInCache = 0 for _, id := range idsToRemove { sm.logger.Error( From 40a26d574071126be78e533768d2fc53833b5c72 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Tue, 6 Aug 2024 19:12:47 -0400 Subject: [PATCH 08/10] gh wf --- .github/workflows/protocol-build-and-push.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index fc26dd6e69..56795fefe7 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -3,6 +3,7 @@ name: Protocol Build & Push Image to AWS ECR on: # yamllint disable-line rule:truthy push: branches: + - 'wl/rm_global_cache' - main - 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x From 3e2325fa1d8b46d5af760f3b5123f0b3d203def3 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 7 Aug 2024 10:18:19 -0400 Subject: [PATCH 09/10] add todo --- protocol/streaming/full_node_streaming_manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 57ea542e38..4c526a200b 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -29,6 +29,9 @@ type FullNodeStreamingManagerImpl struct { ticker *time.Ticker done chan bool + // TODO: Consolidate the streamUpdateCache and streamUpdateSubscriptionCache into a single + // struct to avoid the need to maintain two separate slices for the same data. + // list of stream updates. streamUpdateCache []clobtypes.StreamUpdate // list of subscription ids for each stream update. From c238336418505ad5cd160cb4e60b6e50b59e5ea0 Mon Sep 17 00:00:00 2001 From: Will Liu Date: Wed, 7 Aug 2024 10:18:38 -0400 Subject: [PATCH 10/10] revert ghwf --- .github/workflows/protocol-build-and-push.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/protocol-build-and-push.yml b/.github/workflows/protocol-build-and-push.yml index 56795fefe7..fc26dd6e69 100644 --- a/.github/workflows/protocol-build-and-push.yml +++ b/.github/workflows/protocol-build-and-push.yml @@ -3,7 +3,6 @@ name: Protocol Build & Push Image to AWS ECR on: # yamllint disable-line rule:truthy push: branches: - - 'wl/rm_global_cache' - main - 'release/protocol/v[0-9]+.[0-9]+.x' # e.g. release/protocol/v0.1.x - 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x