Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: process package in queue instead of batch #656

Merged
merged 9 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"algolia/typescript"
],
"rules": {
"no-continue": "off",
"valid-jsdoc": "off",
"import/extensions": [
"error",
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ RUN true \
# This image must have the minimum amount of layers
FROM node:14.16.1-alpine as final

ENV NODE_ENV production

# Do not use root to run the app
USER node

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "npm-search",
"version": "1.0.1",
"version": "1.0.2",
"private": true,
"author": {
"name": "Algolia, Inc.",
Expand Down Expand Up @@ -35,6 +35,7 @@
"async": "3.2.0",
"bunyan": "1.8.15",
"bunyan-debug-stream": "2.0.0",
"chalk": "4.1.1",
"dtrace-provider": "0.8.8",
"escape-html": "1.0.3",
"got": "11.8.2",
Expand Down
1 change: 1 addition & 0 deletions scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ echo "Releasing: $current"
echo ""

docker build \
--platform linux/amd64 \
-t algolia/npm-search \
-t "algolia/npm-search:${current}" \
.
2 changes: 1 addition & 1 deletion src/__tests__/saveDocs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ it('should be similar batch vs one', async () => {

const row = { id: '', key: 'preact', value: { rev: 'a' }, doc: preact };
Haroenv marked this conversation as resolved.
Show resolved Hide resolved
await saveDocs({ docs: [row], index });
await saveDoc({ row, index });
await saveDoc({ row: preact, index });

expect(index.saveObjects).toHaveBeenCalledWith([clean]);
expect(index.saveObject).toHaveBeenCalledWith(clean);
Expand Down
159 changes: 99 additions & 60 deletions src/bootstrap.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import type { SearchClient, SearchIndex } from 'algoliasearch';
import ms from 'ms';
import type { DocumentListParams } from 'nano';
import type { QueueObject } from 'async';
import { queue } from 'async';
import chalk from 'chalk';

import type { StateManager } from './StateManager';
import * as algolia from './algolia';
import { config } from './config';
import * as npm from './npm';
import { saveDocs } from './saveDocs';
import type { PrefetchedPkg } from './npm/Prefetcher';
import { Prefetcher } from './npm/Prefetcher';
import { isFailure } from './npm/types';
import { saveDoc } from './saveDocs';
import { datadog } from './utils/datadog';
import { log } from './utils/log';
import * as sentry from './utils/sentry';
import { wait } from './utils/wait';

let loopStart: number = Date.now();
let prefetcher: Prefetcher;
let consumer: QueueObject<PrefetchedPkg>;

/**
* Bootstrap is the mode that goes from 0 to all the packages in NPM
Expand All @@ -24,17 +31,20 @@ let loopStart: number = Date.now();
* Watch mode should/can be reliably left running for weeks/months as CouchDB is made for that.
* BUT for the moment it's mandatory to relaunch it because it's the only way to update: typescript, downloads stats.
*/
async function run(
export async function run(
stateManager: StateManager,
algoliaClient: SearchClient,
mainIndex: SearchIndex,
bootstrapIndex: SearchIndex
): Promise<void> {
log.info('-----');
log.info('⛷ Bootstrap: starting');
Haroenv marked this conversation as resolved.
Show resolved Hide resolved
const state = await stateManager.check();

if (state.seq && state.seq > 0 && state.bootstrapDone === true) {
await algolia.putDefaultSettings(mainIndex, config);
log.info('⛷ Bootstrap: done');
log.info('-----');
return;
}

Expand All @@ -54,70 +64,53 @@ async function run(
}

log.info('-----');
log.info(`Total packages ${totalDocs}`);
log.info(chalk.yellowBright`Total packages: ${totalDocs}`);
log.info('-----');

let lastProcessedId = state.bootstrapLastId;
do {
loopStart = Date.now();

lastProcessedId = await loop(lastProcessedId, stateManager, bootstrapIndex);
} while (lastProcessedId !== null);

log.info('-----');
log.info('⛷ Bootstrap: done');
await stateManager.save({
bootstrapDone: true,
bootstrapLastDone: Date.now(),
prefetcher = new Prefetcher({
bodinsamuel marked this conversation as resolved.
Show resolved Hide resolved
nextKey: state.bootstrapLastId,
});
prefetcher.launch();

let done = 0;
consumer = createPkgConsumer(stateManager, bootstrapIndex);
consumer.unsaturated(async () => {
const next = await prefetcher.getNext();
consumer.push(next);
done += 1;
});
consumer.buffer = 0;

await moveToProduction(stateManager, algoliaClient);
}
let processing = true;
while (processing) {
logProgress(done);

/**
* Execute one loop for bootstrap,
* Fetch N packages from `lastId`, process and save them to Algolia.
* */
async function loop(
lastId: string | null,
stateManager: StateManager,
bootstrapIndex: SearchIndex
): Promise<string | null> {
const start = Date.now();
log.info('loop()', '::', lastId);

const options: DocumentListParams = {
limit: config.bootstrapConcurrency,
};
if (lastId) {
options.startkey = lastId;
options.skip = 1;
}
await wait(config.prefetchWaitBetweenPage);

const res = await npm.findAll(options);
processing = !prefetcher.isFinished;
done = 0;

if (res.rows.length <= 0) {
// Nothing left to process
// We return null to stop the bootstraping
return null;
// Push nothing to trigger event
consumer.push(null as any);
processing = false;
}

datadog.increment('packages', res.rows.length);
log.info(' - fetched', res.rows.length, 'packages');

const newLastId = res.rows[res.rows.length - 1].id;
consumer.pause();

await saveDocs({ docs: res.rows, index: bootstrapIndex });
log.info('-----');
log.info('⛷ Bootstrap: done');
log.info('-----');
await stateManager.save({
bootstrapLastId: newLastId,
bootstrapDone: true,
bootstrapLastDone: Date.now(),
});
await logProgress(res.offset, res.rows.length);

datadog.timing('loop', Date.now() - start);

return newLastId;
await moveToProduction(stateManager, algoliaClient);
}

/**
* Move algolia index to prod.
*/
async function moveToProduction(
stateManager: StateManager,
algoliaClient: SearchClient
Expand All @@ -130,18 +123,64 @@ async function moveToProduction(
await stateManager.save(currentState);
}

async function logProgress(offset: number, nbDocs: number): Promise<void> {
/**
* Log approximate progress.
*/
async function logProgress(nbDocs: number): Promise<void> {
const { nbDocs: totalDocs } = await npm.getInfo();
const offset = prefetcher.offset;

const ratePerSecond = nbDocs / ((Date.now() - loopStart) / 1000);
log.info(
`[progress] %d/%d docs (%d%), current rate: %d docs/s (%s remaining)`,
chalk.dim.italic
.white`[progress] %d/%d docs (%d%) (%s prefetched) (%s processing)`,
offset + nbDocs,
totalDocs,
Math.floor((Math.max(offset + nbDocs, 1) / totalDocs) * 100),
Math.round(ratePerSecond),
ms(((totalDocs - offset - nbDocs) / ratePerSecond) * 1000)
prefetcher.idleCount,
consumer.running()
);
}

export { run };
/**
* Consume packages.
*/
function createPkgConsumer(
stateManager: StateManager,
index: SearchIndex
): QueueObject<PrefetchedPkg> {
return queue<PrefetchedPkg>(async (pkg) => {
if (!pkg) {
return;
}

log.info(`Start:`, pkg.id);
const start = Date.now();

try {
datadog.increment('packages');

const res = await npm.getDoc(pkg.id);

if (isFailure(res)) {
log.error('Got an error', res.error);
return;
}

await saveDoc({ row: res, index });

const lastId = (await stateManager.get()).bootstrapLastId;

// Because of concurrency we can have processed a package after in the list but sooner in the process.
if (!lastId || lastId < pkg.id) {
await stateManager.save({
bootstrapLastId: pkg.id,
});
}
} catch (err) {
sentry.report(err);
} finally {
log.info(`Done:`, pkg.id);
datadog.timing('loop', Date.now() - start);
}
}, config.bootstrapConcurrency);
}
1 change: 0 additions & 1 deletion src/changelog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ export async function getChangelog(
for (const file of filelist) {
const name = path.basename(file.name);
if (!fileRegex.test(name)) {
// eslint-disable-next-line no-continue
continue;
}

Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ export const config = {
expiresAt: ms('30 days'),
popularExpiresAt: ms('7 days'),
cacheTotalDownloads: ms('1 minute'),
prefetchWaitBetweenPage: 5000,
prefetchMaxIdle: 100,
};

export type Config = typeof config;
Expand Down
6 changes: 4 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ async function main(): Promise<void> {
createAPI();

// first we make sure the bootstrap index has the correct settings
log.info('💪 Setting up Algolia');
log.info('💪 Setting up Algolia', [
config.bootstrapIndexName,
config.indexName,
]);
const {
client: algoliaClient,
mainIndex,
Expand All @@ -56,7 +59,6 @@ async function main(): Promise<void> {

// then we figure out which updates we missed since
// the last time main index was updated
log.info('🚀 Launching Watch');
await watch.run(stateManager, mainIndex);
}

Expand Down
2 changes: 1 addition & 1 deletion src/jsDelivr/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export async function getFilesList(
});
files = response.body.files;
} catch (e) {
log.error(`Failed to fetch ${url}`, e);
log.error(`Failed to fetch ${url}`, e.message);
}

datadog.timing('jsdelivr.getFilesList', Date.now() - start);
Expand Down
Loading