Skip to content
This repository has been archived by the owner on Jun 2, 2024. It is now read-only.

Commit

Permalink
fix: should show new version package count (#984)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengmk2 authored and dead-horse committed Jul 3, 2016
1 parent a437eb0 commit 79ac9fb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
44 changes: 27 additions & 17 deletions controllers/sync_module_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function SyncModuleWorker(options) {

this.successes = [];
this.fails = [];
this.updates = [];
}

util.inherits(SyncModuleWorker, EventEmitter);
Expand Down Expand Up @@ -127,7 +128,7 @@ SyncModuleWorker.prototype.start = function () {
// sync upstream
if (that.syncUpstreamFirst) {
try {
yield* that.syncUpstream(that.startName);
yield that.syncUpstream(that.startName);
} catch (err) {
logger.error(err);
}
Expand Down Expand Up @@ -180,7 +181,7 @@ SyncModuleWorker.prototype._doneOne = function* (concurrencyId, name, success) {
var that = this;
// relase the stack: https://github.com/cnpm/cnpmjs.org/issues/328
defer.setImmediate(function* () {
yield* that.next(concurrencyId);
yield that.next(concurrencyId);
});
};

Expand Down Expand Up @@ -300,13 +301,13 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) {
if (common.isPrivateScopedPackage(name)) {
this.log('[c#%d] [%s] ignore sync private scoped %j package',
concurrencyId, name, config.scopes);
yield* this._doneOne(concurrencyId, name, true);
yield this._doneOne(concurrencyId, name, true);
return;
}

// get from npm
try {
var result = yield* npmSerivce.request('/' + name.replace('/', '%2f'));
var result = yield npmSerivce.request('/' + name.replace('/', '%2f'));
pkg = result.data;
status = result.status;
} catch (err) {
Expand All @@ -333,39 +334,44 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) {
if (!pkg) {
that.log('[c#%s] [error] [%s] get package error: package not exists, status: %s',
concurrencyId, name, status);
yield* that._doneOne(concurrencyId, name, true);
yield that._doneOne(concurrencyId, name, true);
return;
}

that.log('[c#%d] [%s] pkg status: %d, start...', concurrencyId, name, status);

if (unpublishedInfo) {
try {
yield* that._unpublished(name, unpublishedInfo);
yield that._unpublished(name, unpublishedInfo);
} catch (err) {
that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack);
yield* that._doneOne(concurrencyId, name, false);
yield that._doneOne(concurrencyId, name, false);
return;
}
return yield* that._doneOne(concurrencyId, name, true);
return yield that._doneOne(concurrencyId, name, true);
}

var versions;
try {
versions = yield* that._sync(name, pkg);
versions = yield that._sync(name, pkg);
} catch (err) {
that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack);
yield* that._doneOne(concurrencyId, name, false);
yield that._doneOne(concurrencyId, name, false);
return;
}

// has new version
if (versions.length > 0) {
that.updates.push(name);
}

this.log('[c#%d] [%s] synced success, %d versions: %s',
concurrencyId, name, versions.length, versions.join(', '));
yield* this._doneOne(concurrencyId, name, true);
yield this._doneOne(concurrencyId, name, true);
};

function* _listStarUsers(modName) {
var users = yield* packageService.listStarUserNames(modName);
var users = yield packageService.listStarUserNames(modName);
var userMap = {};
users.forEach(function (user) {
userMap[user] = true;
Expand Down Expand Up @@ -663,7 +669,7 @@ SyncModuleWorker.prototype._sync = function* (name, pkg) {
continue;
}
try {
yield* that._syncOneVersion(index, syncModule);
yield that._syncOneVersion(index, syncModule);
syncedVersionNames.push(syncModule.version);
} catch (err) {
that.log(' [%s:%d] sync error, version: %s, %s: %s',
Expand Down Expand Up @@ -923,7 +929,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
}

// add module dependence
yield* packageService.addDependencies(sourcePackage.name, dependencies);
yield packageService.addDependencies(sourcePackage.name, dependencies);

var shasum = crypto.createHash('sha1');
var dataSize = 0;
Expand All @@ -945,6 +951,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
var err = new Error('Download ' + downurl + ' fail, status: ' + statusCode);
err.name = 'DownloadTarballError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -961,6 +968,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
var err = new Error('Download ' + downurl + ' file size is zero');
err.name = 'DownloadTarballSizeZeroError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -971,6 +979,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
' not match ' + sourcePackage.dist.shasum);
err.name = 'DownloadTarballShasumError';
err.data = sourcePackage;
logger.syncInfo('[sync_module_worker] %s', err.message);
throw err;
}

Expand All @@ -989,8 +998,9 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
throw err;
}
logger.syncInfo('[sync_module_worker] uploaded, saving %j to database', result);
var r = yield *afterUpload(result);
logger.syncInfo('[sync_module_worker] sync %s@%s done!', sourcePackage.name, sourcePackage.version);
var r = yield afterUpload(result);
logger.syncInfo('[sync_module_worker] sync %s@%s done!',
sourcePackage.name, sourcePackage.version);
return r;
} finally {
// remove tmp file whatever
Expand Down Expand Up @@ -1034,7 +1044,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack
}

mod.package.dist = dist;
var r = yield* packageService.saveModule(mod);
var r = yield packageService.saveModule(mod);

that.log(' [%s:%s] done, insertId: %s, author: %s, version: %s, '
+ 'size: %d, publish_time: %j, publish on cnpm: %s',
Expand Down
6 changes: 4 additions & 2 deletions sync/changes_stream_syncer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let _STREAM_ID = 0;
module.exports = function* sync() {
const since = yield getLastSequence();
const streamId = _STREAM_ID++;
let changesCount = 0;
logger.syncInfo('start changes stream#%d, since: %s', streamId, since);
const changes = new ChangesStream({
db,
Expand All @@ -23,7 +24,8 @@ module.exports = function* sync() {
});
changes.await = streamAwait;
changes.on('data', change => {
logger.syncInfo('stream#%d get change: %j', streamId, change);
changesCount++;
logger.syncInfo('stream#%d get change#%d: %j', streamId, changesCount, change);
syncPackage(change);
});

Expand All @@ -32,7 +34,7 @@ module.exports = function* sync() {
} catch (err) {
// make sure changes steam is destroy
changes.destroy();
err.message += `, stream#${streamId}`;
err.message += `, stream#${streamId}, changesCount#${changesCount}`;
throw err;
}
};
Expand Down
2 changes: 1 addition & 1 deletion sync/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Status.prototype.log = function (syncDone) {
lastSyncModule: this.lastSyncModule,
};
co(function* () {
yield* Total.updateSyncNum(params);
yield Total.updateSyncNum(params);
}).catch(function () {});
};

Expand Down
13 changes: 8 additions & 5 deletions sync/sync_all.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,22 @@ module.exports = function* sync() {
concurrency: config.syncConcurrency,
syncUpstreamFirst: false,
});
Status.init({need: packages.length}, worker);
Status.init({
need: packages.length,
}, worker);
worker.start();
var end = thunkify.event(worker);
yield end();

logger.syncInfo('All packages sync done, successes %d, fails %d',
worker.successes.length, worker.fails.length);
logger.syncInfo('All packages sync done, successes %d, fails %d, updates %d',
worker.successes.length, worker.fails.length, worker.updates.length);
//only when all succss, set last sync time
if (!worker.fails.length) {
yield* totalService.setLastSyncTime(syncTime);
yield totalService.setLastSyncTime(syncTime);
}
return {
successes: worker.successes,
fails: worker.fails
fails: worker.fails,
updates: worker.updates,
};
};

0 comments on commit 79ac9fb

Please sign in to comment.