Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure FNS global cache to be list #2036

Merged
merged 10 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 0 additions & 180 deletions .github/workflows/protocol-build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,150 +8,6 @@ on: # yamllint disable-line rule:truthy
- 'release/protocol/v[0-9]+.x' # e.g. release/protocol/v1.x

jobs:
build-and-push-dev:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./protocol
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: '0' # without this, ignite fails.

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV }}
aws-region: us-east-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, Tag, and Push the Image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: dev-validator
run: |
make localnet-build-amd64
commit_hash=$(git rev-parse --short=7 HEAD)
docker build \
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-dev/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags

build-and-push-dev2:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./protocol
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: '0' # without this, ignite fails.

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV2 }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV2 }}
aws-region: us-east-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, Tag, and Push the Image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: dev2-validator
run: |
make localnet-build-amd64
commit_hash=$(git rev-parse --short=7 HEAD)
docker build \
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-dev/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags

build-and-push-dev3:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./protocol
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: '0' # without this, ignite fails.

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV3 }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV3 }}
aws-region: us-east-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, Tag, and Push the Image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: dev3-validator
run: |
make localnet-build-amd64
commit_hash=$(git rev-parse --short=7 HEAD)
docker build \
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-dev/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags

build-and-push-dev4:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./protocol
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: '0' # without this, ignite fails.

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_DEV4 }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_DEV4 }}
aws-region: us-east-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, Tag, and Push the Image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: dev4-validator
run: |
make localnet-build-amd64
commit_hash=$(git rev-parse --short=7 HEAD)
docker build \
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-dev/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags

build-and-push-dev5:
runs-on: ubuntu-latest
defaults:
Expand Down Expand Up @@ -187,39 +43,3 @@ jobs:
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-dev/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags

build-and-push-staging:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./protocol
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: '0' # without this, ignite fails.

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_VALIDATOR_STAGING }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_VALIDATOR_STAGING }}
aws-region: us-east-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build, Tag, and Push the Image to Amazon ECR
id: build-image
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: staging-validator
run: |
make localnet-build-amd64
commit_hash=$(git rev-parse --short=7 HEAD)
docker build \
--platform amd64 \
-t $ECR_REGISTRY/$ECR_REPOSITORY:$commit_hash \
-f testing/testnet-staging/Dockerfile .
docker push $ECR_REGISTRY/$ECR_REPOSITORY --all-tags
102 changes: 63 additions & 39 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ type FullNodeStreamingManagerImpl struct {
ticker *time.Ticker
done chan bool

// map of clob pair id to stream updates.
streamUpdateCache map[uint32][]clobtypes.StreamUpdate
numUpdatesInCache uint32
// list of stream updates.
streamUpdateCache []clobtypes.StreamUpdate
// list of subscription ids for each stream update.
streamUpdateSubscriptionCache [][]uint32
clobPairIdToSubscriptionIdMapping map[uint32][]uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for these 2 new fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

numUpdatesInCache uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prob no need for this field now that we have a central list, can use len(list)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32
Expand Down Expand Up @@ -66,10 +69,12 @@ func NewFullNodeStreamingManager(
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make(map[uint32][]clobtypes.StreamUpdate),
numUpdatesInCache: 0,
ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond),
done: make(chan bool),
streamUpdateCache: make([]clobtypes.StreamUpdate, 0),
streamUpdateSubscriptionCache: make([][]uint32, 0),
clobPairIdToSubscriptionIdMapping: make(map[uint32][]uint32),
numUpdatesInCache: 0,

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
Expand Down Expand Up @@ -134,6 +139,18 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
if _, ok := sm.clobPairIdToSubscriptionIdMapping[clobPairId]; ok {
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append(
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
sm.nextSubscriptionId,
)
} else {
sm.clobPairIdToSubscriptionIdMapping[clobPairId] = []uint32{sm.nextSubscriptionId}
}
}

sm.logger.Info(
fmt.Sprintf(
Expand Down Expand Up @@ -295,27 +312,28 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename var since this is not a map anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

clobPairIds := make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: v1updates,
Snapshot: false,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(updates)))
sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(updates)))
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand All @@ -333,7 +351,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
)

// Group fills by clob pair id.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
updatesByClobPairId := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -346,24 +365,23 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
} else {
clobPairId = orderbookFill.Orders[0].OrderId.ClobPairId
}
if _, ok := updatesByClobPairId[clobPairId]; !ok {
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{}
}
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_OrderFill{
OrderFill: &orderbookFill,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
updatesByClobPairId = append(updatesByClobPairId, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(updatesByClobPairId, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(updatesByClobPairId, clobPairIds, uint32(len(orderbookFills)))
}

func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
numUpdatesToAdd uint32,
) {
sm.Lock()
Expand All @@ -374,8 +392,9 @@ func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
float32(numUpdatesToAdd),
)

for clobPairId, streamUpdates := range updatesByClobPairId {
sm.streamUpdateCache[clobPairId] = append(sm.streamUpdateCache[clobPairId], streamUpdates...)
sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)
for _, clobPairId := range clobPairIds {
sm.streamUpdateSubscriptionCache = append(sm.streamUpdateSubscriptionCache, sm.clobPairIdToSubscriptionIdMapping[clobPairId])
}
sm.numUpdatesInCache += numUpdatesToAdd

Expand All @@ -398,8 +417,8 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates() {
sm.FlushStreamUpdatesWithLock()
}

// FlushStreamUpdatesWithLock takes in a map of clob pair id to stream updates and emits them to subscribers.
// Note this method requires the lock and assumes that the lock has already been
// FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs,
// and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been
// acquired by the caller.
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
defer metrics.ModuleMeasureSince(
Expand All @@ -408,31 +427,36 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
time.Now(),
)

// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
// Map to collect updates for each subscription.
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate)
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
streamUpdatesForSubscription := make([]clobtypes.StreamUpdate, 0)
for _, clobPairId := range subscription.clobPairIds {
if update, ok := sm.streamUpdateCache[clobPairId]; ok {
streamUpdatesForSubscription = append(streamUpdatesForSubscription, update...)
}

// Collect updates for each subscription.
for i, update := range sm.streamUpdateCache {
subscriptionIds := sm.streamUpdateSubscriptionCache[i]
for _, id := range subscriptionIds {
subscriptionUpdates[id] = append(subscriptionUpdates[id], update)
}
}

if len(streamUpdatesForSubscription) > 0 {
// Non-blocking send updates through subscriber's buffered channel.
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.GrpcAddToSubscriptionChannelCount,
1,
)
select {
case subscription.updatesChannel <- streamUpdatesForSubscription:
case subscription.updatesChannel <- updates:
default:
idsToRemove = append(idsToRemove, id)
}
}
}

clear(sm.streamUpdateCache)
clear(sm.streamUpdateSubscriptionCache)
sm.numUpdatesInCache = 0

for _, id := range idsToRemove {
Expand Down
Loading