diff --git a/src/chain/index.ts b/src/chain/index.ts index 124f0e2..dc4359e 100644 --- a/src/chain/index.ts +++ b/src/chain/index.ts @@ -144,6 +144,10 @@ export default class CrustApi { return null; } + // async maybeGetFile(cid: string): Promise { + // // TODO: query `Files` + // } + // TODO: add more error handling here private async withApiReady(): Promise { await this.api.isReadyOrError; diff --git a/src/decision/index.ts b/src/decision/index.ts index ffe03e2..a567f02 100644 --- a/src/decision/index.ts +++ b/src/decision/index.ts @@ -8,6 +8,9 @@ import IpfsApi from '../ipfs'; import CrustApi, {StorageOrder} from '../chain'; import {logger} from '../log'; +// The initial probability is 5‰ +const initialProbability = 0.005; + interface Task extends BT { // The ipfs cid value cid: string; @@ -20,6 +23,7 @@ export default class DecisionEngine { private readonly ipfsApi: IpfsApi; private pendingQueue: TaskQueue; private pullingQueue: TaskQueue; + private currentBn: number; constructor(chainAddr: string, ipfsAddr: string, mto: number) { this.crustApi = new CrustApi(chainAddr); @@ -28,6 +32,9 @@ export default class DecisionEngine { // MaxQueueLength is 50 and Overdue is 600 blocks(1h) this.pendingQueue = new TaskQueue(50, 600); this.pullingQueue = new TaskQueue(30, 600); + + // Init the current block number + this.currentBn = 0; } /** @@ -41,12 +48,13 @@ export default class DecisionEngine { // 1. Get block number const bn = b.number.toNumber(); logger.info(`⛓ Got new block ${bn} from chain`); - - // 2. Try to get new storage order + // 2. Update current block number + this.currentBn = bn; + // 3. Try to get new storage order const newSorder: StorageOrder | null = await this.crustApi.maybeGetNewSorder( bn ); - // 3. If got new storage order, put it into pendingQueue + // 4. If got new storage order, put it into pendingQueue if (newSorder) { const nt: Task = { cid: newSorder.file_identifier, @@ -61,7 +69,7 @@ export default class DecisionEngine { // Always push into pending queue this.pendingQueue.push(nt); } - // 4. Check and clean outdated tasks + // 5. Check and clean outdated tasks this.pendingQueue.clear(bn); this.pullingQueue.clear(bn); }; @@ -78,14 +86,14 @@ export default class DecisionEngine { */ async subscribePullings(): Promise { return cron.schedule('* * * * *', async () => { - // 1. Loop and pop all pendings + // 1. Loop and pop all pending tasks const oldPts: Task[] = this.pendingQueue.tasks; const newPts = new Array(); logger.info('⏳ Checking pending queue...'); for (const pt of oldPts) { // 2. If join pullings and start puling in ipfs, otherwise push back to pending tasks - if (await this.pickOrDropPending(pt.cid, pt.size)) { + if (await this.pickOrDropPending(pt)) { logger.info( ` ↪ 🎁 Pick pending task ${JSON.stringify(pt)}, pulling from ipfs` ); @@ -146,28 +154,25 @@ export default class DecisionEngine { * @throws ipfsApi error */ // TODO: add pending pick up strategy here, basically random with pks? - private async pickOrDropPending( - cid: string, - f_size: number - ): Promise { + private async pickOrDropPending(t: Task): Promise { // 1. Get and judge file size is match - const size = await this.ipfsApi.size(cid); - logger.info(` ↪ 📂 Got ipfs file ${cid} size: ${size}`); - if (size !== f_size) { - logger.warn(` ↪ ⚠️ Size not match: ${size} != ${f_size}`); + const size = await this.ipfsApi.size(t.cid); + logger.info(` ↪ 📂 Got ipfs file size ${t.cid}, size is: ${size}`); + if (size !== t.size) { + logger.warn(` ↪ ⚠️ Size not match: ${size} != ${t.size}`); return false; } // 2. Get and judge repo can take it, make sure the free can take double file + // TODO: Remove this, cause this is no fucking use const free = await this.ipfsApi.free(); - const bn_f_size = new BigNumber(f_size); + const bn_f_size = new BigNumber(t.size); if (free <= bn_f_size.multipliedBy(2)) { logger.warn(` ↪ ⚠️ Free space not enought ${free} < ${size}*2`); return false; } - // TODO: Add probabilistic take sorder strategy here - return true; + return this.isFileOnChain(t.cid, t.bn); } /** @@ -182,4 +187,28 @@ export default class DecisionEngine { // TODO: check free space or just send into sWorker? return true; } + + /** + * Query the given cid is already been picked plus a certain + * probability + * @param cid ipfs cid value + * @param bn task block number + * @throws crustApi error + */ + private async isFileOnChain(_cid: string, bn: number): Promise { + // TODO: const fileInfo = await this.crustApi.maybeGetFile(cid); + // TODO: judge if fileInfo.expected_payouts < len(fileInfo.payouts), if true, return false + // TODO: if false, calculate probability with `expired_date` + + // 1. Generate a number between 0 and 1 + const randNum = Math.random(); + // 2. Calculate probability + const multiple = (this.currentBn - bn) / 1 + 1; // 1 unit means 1min + const probability = initialProbability * multiple; // probability will turns into 100% after 200 * 1 unit = 200min + logger.info( + `💓 Current randNum is ${randNum}, New target is ${probability}, ${this.currentBn}, ${bn}, ${multiple}` + ); + // 3. Judge if we hit the spot + return randNum < probability; + } }