From 79ac9fb823f62da407bcbf8bcadaf777e1e5dbcb Mon Sep 17 00:00:00 2001 From: fengmk2 Date: Mon, 4 Jul 2016 00:06:42 +0800 Subject: [PATCH] fix: should show new version package count (#984) --- controllers/sync_module_worker.js | 44 +++++++++++++++++++------------ sync/changes_stream_syncer.js | 6 +++-- sync/status.js | 2 +- sync/sync_all.js | 13 +++++---- 4 files changed, 40 insertions(+), 25 deletions(-) diff --git a/controllers/sync_module_worker.js b/controllers/sync_module_worker.js index 01d221342..cf6ce0930 100644 --- a/controllers/sync_module_worker.js +++ b/controllers/sync_module_worker.js @@ -59,6 +59,7 @@ function SyncModuleWorker(options) { this.successes = []; this.fails = []; + this.updates = []; } util.inherits(SyncModuleWorker, EventEmitter); @@ -127,7 +128,7 @@ SyncModuleWorker.prototype.start = function () { // sync upstream if (that.syncUpstreamFirst) { try { - yield* that.syncUpstream(that.startName); + yield that.syncUpstream(that.startName); } catch (err) { logger.error(err); } @@ -180,7 +181,7 @@ SyncModuleWorker.prototype._doneOne = function* (concurrencyId, name, success) { var that = this; // relase the stack: https://github.com/cnpm/cnpmjs.org/issues/328 defer.setImmediate(function* () { - yield* that.next(concurrencyId); + yield that.next(concurrencyId); }); }; @@ -300,13 +301,13 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) { if (common.isPrivateScopedPackage(name)) { this.log('[c#%d] [%s] ignore sync private scoped %j package', concurrencyId, name, config.scopes); - yield* this._doneOne(concurrencyId, name, true); + yield this._doneOne(concurrencyId, name, true); return; } // get from npm try { - var result = yield* npmSerivce.request('/' + name.replace('/', '%2f')); + var result = yield npmSerivce.request('/' + name.replace('/', '%2f')); pkg = result.data; status = result.status; } catch (err) { @@ -333,7 +334,7 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) { if (!pkg) { that.log('[c#%s] [error] [%s] get package error: package not exists, status: %s', concurrencyId, name, status); - yield* that._doneOne(concurrencyId, name, true); + yield that._doneOne(concurrencyId, name, true); return; } @@ -341,31 +342,36 @@ SyncModuleWorker.prototype.next = function* (concurrencyId) { if (unpublishedInfo) { try { - yield* that._unpublished(name, unpublishedInfo); + yield that._unpublished(name, unpublishedInfo); } catch (err) { that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack); - yield* that._doneOne(concurrencyId, name, false); + yield that._doneOne(concurrencyId, name, false); return; } - return yield* that._doneOne(concurrencyId, name, true); + return yield that._doneOne(concurrencyId, name, true); } var versions; try { - versions = yield* that._sync(name, pkg); + versions = yield that._sync(name, pkg); } catch (err) { that.log('[c#%s] [error] [%s] sync error: %s', concurrencyId, name, err.stack); - yield* that._doneOne(concurrencyId, name, false); + yield that._doneOne(concurrencyId, name, false); return; } + // has new version + if (versions.length > 0) { + that.updates.push(name); + } + this.log('[c#%d] [%s] synced success, %d versions: %s', concurrencyId, name, versions.length, versions.join(', ')); - yield* this._doneOne(concurrencyId, name, true); + yield this._doneOne(concurrencyId, name, true); }; function* _listStarUsers(modName) { - var users = yield* packageService.listStarUserNames(modName); + var users = yield packageService.listStarUserNames(modName); var userMap = {}; users.forEach(function (user) { userMap[user] = true; @@ -663,7 +669,7 @@ SyncModuleWorker.prototype._sync = function* (name, pkg) { continue; } try { - yield* that._syncOneVersion(index, syncModule); + yield that._syncOneVersion(index, syncModule); syncedVersionNames.push(syncModule.version); } catch (err) { that.log(' [%s:%d] sync error, version: %s, %s: %s', @@ -923,7 +929,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack } // add module dependence - yield* packageService.addDependencies(sourcePackage.name, dependencies); + yield packageService.addDependencies(sourcePackage.name, dependencies); var shasum = crypto.createHash('sha1'); var dataSize = 0; @@ -945,6 +951,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack var err = new Error('Download ' + downurl + ' fail, status: ' + statusCode); err.name = 'DownloadTarballError'; err.data = sourcePackage; + logger.syncInfo('[sync_module_worker] %s', err.message); throw err; } @@ -961,6 +968,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack var err = new Error('Download ' + downurl + ' file size is zero'); err.name = 'DownloadTarballSizeZeroError'; err.data = sourcePackage; + logger.syncInfo('[sync_module_worker] %s', err.message); throw err; } @@ -971,6 +979,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack ' not match ' + sourcePackage.dist.shasum); err.name = 'DownloadTarballShasumError'; err.data = sourcePackage; + logger.syncInfo('[sync_module_worker] %s', err.message); throw err; } @@ -989,8 +998,9 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack throw err; } logger.syncInfo('[sync_module_worker] uploaded, saving %j to database', result); - var r = yield *afterUpload(result); - logger.syncInfo('[sync_module_worker] sync %s@%s done!', sourcePackage.name, sourcePackage.version); + var r = yield afterUpload(result); + logger.syncInfo('[sync_module_worker] sync %s@%s done!', + sourcePackage.name, sourcePackage.version); return r; } finally { // remove tmp file whatever @@ -1034,7 +1044,7 @@ SyncModuleWorker.prototype._syncOneVersion = function *(versionIndex, sourcePack } mod.package.dist = dist; - var r = yield* packageService.saveModule(mod); + var r = yield packageService.saveModule(mod); that.log(' [%s:%s] done, insertId: %s, author: %s, version: %s, ' + 'size: %d, publish_time: %j, publish on cnpm: %s', diff --git a/sync/changes_stream_syncer.js b/sync/changes_stream_syncer.js index dd5378721..1059d54ce 100644 --- a/sync/changes_stream_syncer.js +++ b/sync/changes_stream_syncer.js @@ -15,6 +15,7 @@ let _STREAM_ID = 0; module.exports = function* sync() { const since = yield getLastSequence(); const streamId = _STREAM_ID++; + let changesCount = 0; logger.syncInfo('start changes stream#%d, since: %s', streamId, since); const changes = new ChangesStream({ db, @@ -23,7 +24,8 @@ module.exports = function* sync() { }); changes.await = streamAwait; changes.on('data', change => { - logger.syncInfo('stream#%d get change: %j', streamId, change); + changesCount++; + logger.syncInfo('stream#%d get change#%d: %j', streamId, changesCount, change); syncPackage(change); }); @@ -32,7 +34,7 @@ module.exports = function* sync() { } catch (err) { // make sure changes steam is destroy changes.destroy(); - err.message += `, stream#${streamId}`; + err.message += `, stream#${streamId}, changesCount#${changesCount}`; throw err; } }; diff --git a/sync/status.js b/sync/status.js index 5faa3eb88..5501b16de 100644 --- a/sync/status.js +++ b/sync/status.js @@ -22,7 +22,7 @@ Status.prototype.log = function (syncDone) { lastSyncModule: this.lastSyncModule, }; co(function* () { - yield* Total.updateSyncNum(params); + yield Total.updateSyncNum(params); }).catch(function () {}); }; diff --git a/sync/sync_all.js b/sync/sync_all.js index b00f30edc..3fafe0fb5 100644 --- a/sync/sync_all.js +++ b/sync/sync_all.js @@ -57,19 +57,22 @@ module.exports = function* sync() { concurrency: config.syncConcurrency, syncUpstreamFirst: false, }); - Status.init({need: packages.length}, worker); + Status.init({ + need: packages.length, + }, worker); worker.start(); var end = thunkify.event(worker); yield end(); - logger.syncInfo('All packages sync done, successes %d, fails %d', - worker.successes.length, worker.fails.length); + logger.syncInfo('All packages sync done, successes %d, fails %d, updates %d', + worker.successes.length, worker.fails.length, worker.updates.length); //only when all succss, set last sync time if (!worker.fails.length) { - yield* totalService.setLastSyncTime(syncTime); + yield totalService.setLastSyncTime(syncTime); } return { successes: worker.successes, - fails: worker.fails + fails: worker.fails, + updates: worker.updates, }; };