Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetching strategies for batched pairs #192

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 80 additions & 80 deletions src/chain-cache/ChainSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ChainCache } from './ChainCache';
import { findAndRemoveLeading, toPairKey } from './utils';
import { toPairKey } from './utils';
import { Logger } from '../common/logger';
import {
BlockMetadata,
Expand All @@ -18,16 +18,19 @@ export class ChainSync {
private _chainCache: ChainCache;
private _syncCalled: boolean = false;
private _slowPollPairs: boolean = false;
private _pairs: TokenPair[] = [];
private _uncachedPairs: TokenPair[] = [];
// keep the time stamp of last fetch
private _lastFetch: number = Date.now();
private _initialSyncDone: boolean = false;
private _maxBlockAge?: number;
private _numOfPairsToBatch: number;

constructor(fetcher: Fetcher, chainCache: ChainCache, maxBlockAge?: number) {
constructor(
fetcher: Fetcher,
chainCache: ChainCache,
numOfPairsToBatch: number = 100
) {
this._fetcher = fetcher;
this._chainCache = chainCache;
this._maxBlockAge = maxBlockAge;
this._numOfPairsToBatch = numOfPairsToBatch;
}

public async startDataSync(): Promise<void> {
Expand All @@ -43,62 +46,46 @@ export class ChainSync {
logger.debug('startDataSync - cache is new', arguments);
// cache starts from scratch so we want to avoid getting events from the beginning of time
this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []);
} else if (
this._maxBlockAge !== undefined &&
blockNumber - latestBlockInCache > this._maxBlockAge
) {
logger.debug(
`startDataSync - cache is too old: current block ${blockNumber}, cache block ${latestBlockInCache}`,
arguments
);
// cache is too old so we want to clear it and avoid getting events from the beginning of time
this._chainCache.clear(true);
this._chainCache.applyBatchedUpdates(blockNumber, [], [], [], [], []);
}

// let's fetch all pairs from the chain and set them to the cache - to be used by the following syncs
await this._updatePairsFromChain();
await this._updateUncachedPairsFromChain();

// _populateFeesData() should run first, before _populatePairsData() gets to manipulate the pairs list
await Promise.all([
this._populateFeesData(this._pairs),
this._populateFeesData(this._uncachedPairs),
this._populatePairsData(),
this._syncEvents(),
]);
}

// reads all pairs from chain and sets to private field
private async _updatePairsFromChain() {
logger.debug('_updatePairsFromChain fetches pairs');
this._pairs = [...(await this._fetcher.pairs())];
logger.debug('_updatePairsFromChain fetched pairs', this._pairs);
private async _updateUncachedPairsFromChain() {
logger.debug('_updateUncachedPairsFromChain fetches pairs');
const pairs = await this._fetcher.pairs();
logger.debug('_updateUncachedPairsFromChain fetched pairs', pairs);
this._lastFetch = Date.now();
if (this._pairs.length === 0) {
if (pairs.length === 0) {
logger.error(
'_updatePairsFromChain fetched no pairs - this indicates a problem'
'_updateUncachedPairsFromChain fetched no pairs - this indicates a problem'
);
}

// let's filter the uncached pairs
this._uncachedPairs = pairs.filter(
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);
}

private async _populateFeesData(
pairs: TokenPair[],
skipCache = false
): Promise<void> {
private async _populateFeesData(pairs: TokenPair[]): Promise<void> {
logger.debug('populateFeesData called');
if (pairs.length === 0) {
logger.error('populateFeesData called with no pairs - skipping');
logger.log('populateFeesData called with no pairs - skipping');
return;
}
const uncachedPairs = skipCache
? pairs
: pairs.filter(
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);

if (uncachedPairs.length === 0) return;

const feeUpdates: [string, string, number][] =
await this._fetcher.pairsTradingFeePPM(uncachedPairs);
await this._fetcher.pairsTradingFeePPM(pairs);

logger.debug('populateFeesData fetched fee updates', feeUpdates);

Expand All @@ -109,11 +96,9 @@ export class ChainSync {

// `_populatePairsData` sets timeout and returns immediately. It does the following:
// 1. Fetches all token pairs from the fetcher
// 2. selects a pair that's not in the cache
// 3. fetches strategies for the pair
// 4. adds the pair to the cache
// 5. sets short timeout to continue with the next pair
// 6. if there are no more pairs, it sets a timeout to call itself again
// 2. fetches strategies for all uncached pairs
// 3. adds the pairs strategies to the cache
// 4. sets a timeout to call itself again
private async _populatePairsData(): Promise<void> {
logger.debug('_populatePairsData called');
// this indicates we want to poll for pairs only once a minute.
Expand All @@ -122,40 +107,33 @@ export class ChainSync {

const processPairs = async () => {
try {
if (this._pairs.length === 0) {
if (this._uncachedPairs.length === 0) {
// if we have no pairs we need to fetch - unless we're in slow poll mode and less than a minute has passed since last fetch
if (this._slowPollPairs && Date.now() - this._lastFetch < 60000) {
// go back to sleep
setTimeout(processPairs, 1000);
return;
}
await this._updatePairsFromChain();
await this._updateUncachedPairsFromChain();
}
// let's find the first pair that's not in the cache and clear it from the list along with all the items before it
const nextPairToSync = findAndRemoveLeading<TokenPair>(
this._pairs,
(pair) => !this._chainCache.hasCachedPair(pair[0], pair[1])
);
if (nextPairToSync) {
logger.debug('_populatePairsData adds pair to cache', nextPairToSync);
// we have a pair to sync - let's do it - add its strategies to the cache and then to minimal timeout to process the next pair
await this.syncPairData(
nextPairToSync[0],
nextPairToSync[1],
!this._initialSyncDone
);
setTimeout(processPairs, 1);
} else {
// list is now empty and there are no more pairs to sync - we can poll them less frequently
// we will wake up once a second just to check if we're still in slow poll mode,
// but if not - we will actually poll once a minute

if (this._uncachedPairs.length > 0) {
logger.debug(
'_populatePairsData handled all pairs and goes to slow poll mode'
'_populatePairsData will now sync data for',
this._uncachedPairs
);
this._slowPollPairs = true;
this._initialSyncDone = true;
setTimeout(processPairs, 1000);
// we have pairs to sync - let's split them into batches - add their strategies to the cache and go into slow poll mode
await this._syncPairDataBatch();
}
// list is now empty and there are no more pairs to sync - we can poll them less frequently
// we will wake up once a second just to check if we're still in slow poll mode,
// but if not - we will actually poll once a minute
logger.debug(
'_populatePairsData handled all pairs and goes to slow poll mode'
);
this._slowPollPairs = true;
setTimeout(processPairs, 1000);
return;
} catch (e) {
logger.error('Error while syncing pairs data', e);
setTimeout(processPairs, 60000);
Expand All @@ -164,19 +142,45 @@ export class ChainSync {
setTimeout(processPairs, 1);
}

public async syncPairData(
token0: string,
token1: string,
noPairAddedEvent: boolean = false
): Promise<void> {
private async _syncPairDataBatch(): Promise<void> {
// Split all uncached pairs into batches
const batches: TokenPair[][] = [];
for (
let i = 0;
i < this._uncachedPairs.length;
i += this._numOfPairsToBatch
) {
batches.push(this._uncachedPairs.slice(i, i + this._numOfPairsToBatch));
}

try {
const strategiesBatches = await Promise.all(
batches.map((batch) => this._fetcher.strategiesByPairs(batch))
);
strategiesBatches.flat().forEach((pairStrategies) => {
this._chainCache.addPair(
pairStrategies.pair[0],
pairStrategies.pair[1],
pairStrategies.strategies,
true
);
});
this._uncachedPairs = [];
} catch (error) {
logger.error('Failed to fetch strategies for pairs batch:', error);
throw error; // Re-throw to be handled by caller
}
}

public async syncPairData(token0: string, token1: string): Promise<void> {
if (!this._syncCalled) {
throw new Error(
'ChainSync.startDataSync() must be called before syncPairData()'
);
}
const strategies = await this._fetcher.strategiesByPair(token0, token1);
if (this._chainCache.hasCachedPair(token0, token1)) return;
this._chainCache.addPair(token0, token1, strategies, noPairAddedEvent);
this._chainCache.addPair(token0, token1, strategies, false);
}

// used to break the blocks between latestBlock + 1 and currentBlock to chunks of 1000 blocks
Expand Down Expand Up @@ -348,18 +352,15 @@ export class ChainSync {
logger.debug(
'_syncEvents noticed at least one default fee update - refetching pair fees for all pairs'
);
await this._populateFeesData(
[...(await this._fetcher.pairs())],
true
);
await this._populateFeesData([...(await this._fetcher.pairs())]);
}
if (newlyCreatedPairs.length > 0) {
logger.debug(
'_syncEvents noticed at least one new pair created - setting slow poll mode to false'
);
this._slowPollPairs = false;
logger.debug('_syncEvents fetching fees for the new pairs');
await this._populateFeesData(newlyCreatedPairs, true);
await this._populateFeesData(newlyCreatedPairs);
}
}
} catch (err) {
Expand All @@ -371,9 +372,8 @@ export class ChainSync {
setTimeout(processEvents, 1);
}
private _resetPairsFetching() {
this._pairs = [];
this._uncachedPairs = [];
this._slowPollPairs = false;
this._initialSyncDone = false;
}

private async _detectReorg(currentBlock: number): Promise<boolean> {
Expand Down
5 changes: 3 additions & 2 deletions src/chain-cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * from './types';
* use the ChainCache and ChainSync classes directly.
* @param {Fetcher} fetcher - fetcher to use for syncing the cache
* @param {string} cachedData - serialized cache data to initialize the cache with
* @param {number} numOfPairsToBatch - number of pairs to fetch in a single batch - adapt this value based on the RPC limits and testing
* @returns an object with the initialized cache and a function to start syncing the cache
* @example
* const { cache, startDataSync } = initSyncedCache(fetcher, cachedData);
Expand All @@ -20,7 +21,7 @@ export * from './types';
export const initSyncedCache = (
fetcher: Fetcher,
cachedData?: string,
maxBlockAge?: number
numOfPairsToBatch?: number
): { cache: ChainCache; startDataSync: () => Promise<void> } => {
let cache: ChainCache | undefined;
if (cachedData) {
Expand All @@ -31,7 +32,7 @@ export const initSyncedCache = (
cache = new ChainCache();
}

const syncer = new ChainSync(fetcher, cache, maxBlockAge);
const syncer = new ChainSync(fetcher, cache, numOfPairsToBatch);
cache.setCacheMissHandler(syncer.syncPairData.bind(syncer));
return { cache, startDataSync: syncer.startDataSync.bind(syncer) };
};
12 changes: 0 additions & 12 deletions src/chain-cache/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ export const toDirectionKey = (token0: string, token1: string): string => {
return toKey([token0, token1]);
};

// find and return an element in an array, and remove it and all elements before it. If not found, remove all elements.
export const findAndRemoveLeading = <T>(
arr: T[],
predicate: (value: T) => boolean
): T | undefined => {
let element = undefined;
do {
element = arr.shift();
} while (element && !predicate(element));
return element;
};

export function isOrderTradable(order: EncodedOrder): boolean {
return order.y.gt(0) && (order.A.gt(0) || order.B.gt(0));
}
6 changes: 6 additions & 0 deletions src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ export type BlockMetadata = {
export interface Fetcher {
pairs(): Promise<TokenPair[]>;
strategiesByPair(token0: string, token1: string): Promise<EncodedStrategy[]>;
strategiesByPairs(pairs: TokenPair[]): Promise<
{
pair: TokenPair;
strategies: EncodedStrategy[];
}[]
>;
pairTradingFeePPM(token0: string, token1: string): Promise<number>;
pairsTradingFeePPM(pairs: TokenPair[]): Promise<[string, string, number][]>;
tradingFeePPM(): Promise<number>;
Expand Down
26 changes: 26 additions & 0 deletions src/contracts-api/Reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ export default class Reader implements Fetcher {
return res.map((r) => toStrategy(r));
}

// TODO: add a method to get all strategies by a list of pairs. Returns a collection of pairs and their strategies. It will use multicall to call strategiesByPair method from the contracts.
public async strategiesByPairs(pairs: TokenPair[]): Promise<
{
pair: TokenPair;
strategies: EncodedStrategy[];
}[]
> {
const results = await this._multicall(
pairs.map((pair) => ({
contractAddress: this._contracts.carbonController.address,
interface: this._contracts.carbonController.interface,
methodName: 'strategiesByPair',
methodParameters: [pair[0], pair[1], 0, 0],
}))
);
if (!results || results.length === 0) return [];
console.debug('results', results);
return results.map((result, i) => {
const strategiesResult = result[0] as StrategyStructOutput[];
return {
pair: pairs[i],
strategies: strategiesResult.map((r) => toStrategy(r)),
};
});
}

public async tokensByOwner(owner: string) {
if (!owner) return [];

Expand Down
Loading