Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic decision engine(III) #9

Merged
merged 2 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/chain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ export default class CrustApi {
return null;
}

// async maybeGetFile(cid: string): Promise<File | null> {
// // TODO: query `Files`
// }

// TODO: add more error handling here
private async withApiReady(): Promise<void> {
await this.api.isReadyOrError;
Expand Down
63 changes: 46 additions & 17 deletions src/decision/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import IpfsApi from '../ipfs';
import CrustApi, {StorageOrder} from '../chain';
import {logger} from '../log';

// The initial probability is 5‰
const initialProbability = 0.005;

interface Task extends BT {
// The ipfs cid value
cid: string;
Expand All @@ -20,6 +23,7 @@ export default class DecisionEngine {
private readonly ipfsApi: IpfsApi;
private pendingQueue: TaskQueue<Task>;
private pullingQueue: TaskQueue<Task>;
private currentBn: number;

constructor(chainAddr: string, ipfsAddr: string, mto: number) {
this.crustApi = new CrustApi(chainAddr);
Expand All @@ -28,6 +32,9 @@ export default class DecisionEngine {
// MaxQueueLength is 50 and Overdue is 600 blocks(1h)
this.pendingQueue = new TaskQueue<Task>(50, 600);
this.pullingQueue = new TaskQueue<Task>(30, 600);

// Init the current block number
this.currentBn = 0;
}

/**
Expand All @@ -41,12 +48,13 @@ export default class DecisionEngine {
// 1. Get block number
const bn = b.number.toNumber();
logger.info(`⛓ Got new block ${bn} from chain`);

// 2. Try to get new storage order
// 2. Update current block number
this.currentBn = bn;
// 3. Try to get new storage order
const newSorder: StorageOrder | null = await this.crustApi.maybeGetNewSorder(
bn
);
// 3. If got new storage order, put it into pendingQueue
// 4. If got new storage order, put it into pendingQueue
if (newSorder) {
const nt: Task = {
cid: newSorder.file_identifier,
Expand All @@ -61,7 +69,7 @@ export default class DecisionEngine {
// Always push into pending queue
this.pendingQueue.push(nt);
}
// 4. Check and clean outdated tasks
// 5. Check and clean outdated tasks
this.pendingQueue.clear(bn);
this.pullingQueue.clear(bn);
};
Expand All @@ -78,14 +86,14 @@ export default class DecisionEngine {
*/
async subscribePullings(): Promise<cron.ScheduledTask> {
return cron.schedule('* * * * *', async () => {
// 1. Loop and pop all pendings
// 1. Loop and pop all pending tasks
const oldPts: Task[] = this.pendingQueue.tasks;
const newPts = new Array<Task>();
logger.info('⏳ Checking pending queue...');

for (const pt of oldPts) {
// 2. If join pullings and start puling in ipfs, otherwise push back to pending tasks
if (await this.pickOrDropPending(pt.cid, pt.size)) {
if (await this.pickOrDropPending(pt)) {
logger.info(
` ↪ 🎁 Pick pending task ${JSON.stringify(pt)}, pulling from ipfs`
);
Expand Down Expand Up @@ -146,28 +154,25 @@ export default class DecisionEngine {
* @throws ipfsApi error
*/
// TODO: add pending pick up strategy here, basically random with pks?
private async pickOrDropPending(
cid: string,
f_size: number
): Promise<boolean> {
private async pickOrDropPending(t: Task): Promise<boolean> {
// 1. Get and judge file size is match
const size = await this.ipfsApi.size(cid);
logger.info(` ↪ 📂 Got ipfs file ${cid} size: ${size}`);
if (size !== f_size) {
logger.warn(` ↪ ⚠️ Size not match: ${size} != ${f_size}`);
const size = await this.ipfsApi.size(t.cid);
logger.info(` ↪ 📂 Got ipfs file size ${t.cid}, size is: ${size}`);
if (size !== t.size) {
logger.warn(` ↪ ⚠️ Size not match: ${size} != ${t.size}`);
return false;
}

// 2. Get and judge repo can take it, make sure the free can take double file
// TODO: Remove this, cause this is no fucking use
const free = await this.ipfsApi.free();
const bn_f_size = new BigNumber(f_size);
const bn_f_size = new BigNumber(t.size);
if (free <= bn_f_size.multipliedBy(2)) {
logger.warn(` ↪ ⚠️ Free space not enought ${free} < ${size}*2`);
return false;
}

// TODO: Add probabilistic take sorder strategy here
return true;
return this.isFileOnChain(t.cid, t.bn);
}

/**
Expand All @@ -182,4 +187,28 @@ export default class DecisionEngine {
// TODO: check free space or just send into sWorker?
return true;
}

/**
* Query the given cid is already been picked plus a certain
* probability
* @param cid ipfs cid value
* @param bn task block number
* @throws crustApi error
*/
private async isFileOnChain(_cid: string, bn: number): Promise<boolean> {
// TODO: const fileInfo = await this.crustApi.maybeGetFile(cid);
// TODO: judge if fileInfo.expected_payouts < len(fileInfo.payouts), if true, return false
// TODO: if false, calculate probability with `expired_date`

// 1. Generate a number between 0 and 1
const randNum = Math.random();
// 2. Calculate probability
const multiple = (this.currentBn - bn) / 10 + 1; // 10 unit means 1min
const probability = initialProbability * multiple; // probability will turns into 100% after 200 * 10 unit = 2000min
logger.info(
`💓 Current randNum is ${randNum}, New target is ${probability}, ${this.currentBn}, ${bn}, ${multiple}`
);
// 3. Judge if we hit the spot
return randNum < probability;
}
}