Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart tweaks #161

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions packages/api/src/eventListener.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed we should make this thread safe:

  • this.isInitialized=true only after update
  • change update accordingly for example with an optional argument
  • this.isInitialized should be checked on price updates (and potentially other functions that would send out funding rates)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed, since initial update runs before we start http or websockets server

Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ export default class EventListener extends IndexPriceInterface {
this.resetRPCWebsocket(this.wsRPC);
sdkInterface.registerObserver(this);
this.lastBlockChainEventTs = Date.now();

// run _update only for the first time to set fundingRates and
// openInterest
await this._update("Initial Update Call", true);

// Set to initialized only after all initializations are done
this.isInitialized = true;
}

Expand Down Expand Up @@ -446,12 +452,17 @@ export default class EventListener extends IndexPriceInterface {

/**
* Handles updates from sdk interface
* @param msg from observable
* @param msg
* @param firstTimeUpdate - if true, allow to run the _update even if not
* initialized
* @returns
*/
protected async _update(msg: String) {
// we receive a message from the observable sdk
// on update exchange info; we update price info and inform subscribers
if (!this.isInitialized) {
protected async _update(msg: String, firstTimeUpdate: boolean = false) {
// we receive a message from the observable sdk on update exchange info;
// we update price info and inform subscribers. Whenever this is a first
// time update - allow it to pass through to initialize the funding rate
// and openInterest
if (!this.isInitialized && !firstTimeUpdate) {
return;
}
console.log("received update from sdkInterface", msg);
Expand All @@ -471,6 +482,12 @@ export default class EventListener extends IndexPriceInterface {
perp.markPrice,
perp.indexPrice,
);

console.log(`[_update] setting fundingRate and openInterest`, {
fundingRate: perp.currentFundingRateBps / 1e4,
openInterest: perp.openInterestBC,
perpetualId: perp.id,
});
}
}
}
Expand Down Expand Up @@ -750,6 +767,11 @@ export default class EventListener extends IndexPriceInterface {
fMarkPricePremium: BigNumber,
fSpotIndexPrice: BigNumber,
): void {
if (!this.isInitialized) {
console.log("onUpdateMarkPrice: eventListener not initialized");
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is possible, then it is also possible that we receive the event between assigning this.isInitialized=true and calling this._update

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now changed, we set isInitialized only after first update call

this.lastBlockChainEventTs = Date.now();

const hash =
Expand Down Expand Up @@ -780,6 +802,12 @@ export default class EventListener extends IndexPriceInterface {

// update data in sdkInterface's exchangeInfo
const fundingRate = this.fundingRate.get(perpetualId) || 0;
// Do not allow 0 funding rate to be sent out
if (fundingRate === 0) {
console.log(`[onUpdateMarkPrice] funding rate is 0 for ${perpetualId}`);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be an error. Because if the code is correct we never run into this situation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjusted

}

const oi = this.openInterest.get(perpetualId) || 0;
const symbol = this.symbolFromPerpetualId(perpetualId);

Expand Down Expand Up @@ -818,7 +846,18 @@ export default class EventListener extends IndexPriceInterface {
newMarkPrice: number,
newIndexPrice: number,
) {
if (!this.isInitialized) {
console.log("updateMarkPrice: eventListener not initialized");
return;
}

const fundingRate = this.fundingRate.get(perpetualId) || 0;
// Do not allow 0 funding rate to be sent out
if (fundingRate === 0) {
console.log(`[updateMarkPrice] funding rate is 0 for ${perpetualId}`);
return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, should be an error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjusted

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b1674927 are you sure it is not possible to have 0 fundingRate for a perpetual? On arbitrum sepolia perp 30004 currently has 0 funding rate:

{
"id": 300004,
"state": "NORMAL",
"baseCurrency": "WIF",
"quoteCurrency": "USD",
"indexPrice": 1,
"collToQuoteIndexPrice": 1.000005,
"markPrice": 1,
"midPrice": 1,
"currentFundingRateBps": 0,
"openInterestBC": 0,
"isMarketClosed": false
}

This would basically crash the backend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation, as soon as the perp is traded for the first time, the funding rate is non-zero.

const oi = this.openInterest.get(perpetualId) || 0;
const symbol = this.symbolFromPerpetualId(perpetualId);
const obj: PriceUpdate = {
Expand Down
29 changes: 21 additions & 8 deletions packages/api/src/indexPriceInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ export default abstract class IndexPriceInterface extends Observer {

sdkInterface.registerObserver(this);
this.sdkInterface = sdkInterface;
// trigger exchange info so we get an "update" message
// note: this exchangeInfo call does not actually issue the "update"
// call, since initialization is not yet done in EventListener when
// priceInterfaceInitialize is called. Initial _update is called in the
// eventListener initialization.
const info = await this.sdkInterface.exchangeInfo();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so should we remove it then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the info is needed for _initIdxNamesToPerpetualIds

await this._initIdxNamesToPerpetualIds(<ExchangeInfo>JSON.parse(info));
}
Expand Down Expand Up @@ -120,20 +123,30 @@ export default abstract class IndexPriceInterface extends Observer {
}
}

private async _onRedisFeedHandlerMsg(message: string) {
public async _onRedisFeedHandlerMsg(message: string) {
// message must be indices separated by semicolon
// console.log("Received REDIS message" + message);
const indices = message.split(";");

// Create new updated indices and only send those that were updated.
const updatedIndices: string[] = [];

for (let k = 0; k < indices.length; k++) {
// get price from redit
const px_ts = await this.redisClient.ts.get(indices[k]);
const px = px_ts?.value;
if (px != undefined) {
this.idxPrices.set(indices[k], px);
try {
const px_ts = await this.redisClient.ts.get(indices[k]);
if (px_ts !== null) {
this.idxPrices.set(indices[k], px_ts.value);
updatedIndices.push(indices[k]);
}
} catch (error) {
console.log("[Error in _onRedisFeedHandlerMsg]", {
error: extractErrorMsg(error),
index: indices[k],
});
}
//console.log(indices[k], px);
}
this._updatePricesOnIndexPrice(indices);
this._updatePricesOnIndexPrice(updatedIndices);
}

/**
Expand Down