Skip to content

Commit

Permalink
Make bulkUpdate non atomic function
Browse files Browse the repository at this point in the history
  • Loading branch information
ssh24 committed Jun 6, 2017
1 parent 359a6a5 commit 205559b
Showing 1 changed file with 1 addition and 207 deletions.
208 changes: 1 addition & 207 deletions lib/persisted-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -1396,220 +1396,14 @@ module.exports = function(registry) {
/**
* Apply an update list.
*
* **Note: this is not atomic**
*
* @param {Array} updates An updates list, usually from [createUpdates()](#persistedmodel-createupdates).
* @param {Object} [options] An optional options object to pass to underlying data-access calls.
* @param {Function} callback Callback function.
*/

PersistedModel.bulkUpdate = function(updates, options, callback) {
var tasks = [];
var Model = this;
var Change = this.getChangeModel();
var conflicts = [];

var lastArg = arguments[arguments.length - 1];

if (typeof lastArg === 'function' && arguments.length > 1) {
callback = lastArg;
}

if (typeof options === 'function') {
options = {};
}

options = options || {};

buildLookupOfAffectedModelData(Model, updates, function(err, currentMap) {
if (err) return callback(err);

updates.forEach(function(update) {
var id = update.change.modelId;
var current = currentMap[id];
switch (update.type) {
case Change.UPDATE:
tasks.push(function(cb) {
applyUpdate(Model, id, current, update.data, update.change, conflicts, options, cb);
});
break;

case Change.CREATE:
tasks.push(function(cb) {
applyCreate(Model, id, current, update.data, update.change, conflicts, options, cb);
});
break;
case Change.DELETE:
tasks.push(function(cb) {
applyDelete(Model, id, current, update.change, conflicts, options, cb);
});
break;
}
});

async.parallel(tasks, function(err) {
if (err) return callback(err);
if (conflicts.length) {
err = new Error(g.f('Conflict'));
err.statusCode = 409;
err.details = {conflicts: conflicts};
return callback(err);
}
callback();
});
});
throwNotAttached(this.modelName, 'bulkUpdate');
};

function buildLookupOfAffectedModelData(Model, updates, callback) {
var idName = Model.dataSource.idName(Model.modelName);
var affectedIds = updates.map(function(u) { return u.change.modelId; });
var whereAffected = {};
whereAffected[idName] = {inq: affectedIds};
Model.find({where: whereAffected}, function(err, affectedList) {
if (err) return callback(err);
var dataLookup = {};
affectedList.forEach(function(it) {
dataLookup[it[idName]] = it;
});
callback(null, dataLookup);
});
}

function applyUpdate(Model, id, current, data, change, conflicts, options, cb) {
var Change = Model.getChangeModel();
var rev = current ? Change.revisionForInst(current) : null;

if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}

// TODO(bajtos) modify `data` so that it instructs
// the connector to remove any properties included in "inst"
// but not included in `data`
// See https://github.com/strongloop/loopback/issues/1215

Model.updateAll(current.toObject(), data, options, function(err, result) {
if (err) return cb(err);

var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();

case 0:
debug('UpdateAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) updateAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();

case undefined:
case null:
return cb(new Error(
g.f('Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of updated records.')));

default:
debug('%s.updateAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
g.f('Bulk update failed, the connector has modified unexpected ' +
'number of records: %s', JSON.stringify(count))));
}
});
}

function applyCreate(Model, id, current, data, change, conflicts, options, cb) {
Model.create(data, options, function(createErr) {
if (!createErr) return cb();

// We don't have a reliable way how to detect the situation
// where he model was not create because of a duplicate id
// The workaround is to query the DB to check if the model already exists
Model.findById(id, function(findErr, inst) {
if (findErr || !inst) {
// There isn't any instance with the same id, thus there isn't
// any conflict and we just report back the original error.
return cb(createErr);
}

return conflict();
});
});

function conflict() {
// The instance already exists - report a conflict
debug('Detected non-rectified new instance of %s %j',
Model.modelName, id);
conflicts.push(change);

var Change = Model.getChangeModel();
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}
}

function applyDelete(Model, id, current, change, conflicts, options, cb) {
if (!current) {
// The instance was either already deleted or not created at all,
// we are done.
return cb();
}

var Change = Model.getChangeModel();
var rev = Change.revisionForInst(current);
if (rev !== change.prev) {
debug('Detected non-rectified change of %s %j',
Model.modelName, id);
debug('\tExpected revision: %s', change.rev);
debug('\tActual revision: %s', rev);
conflicts.push(change);
return Change.rectifyModelChanges(Model.modelName, [id], cb);
}

Model.deleteAll(current.toObject(), options, function(err, result) {
if (err) return cb(err);

var count = result && result.count;
switch (count) {
case 1:
// The happy path, exactly one record was updated
return cb();

case 0:
debug('DeleteAll detected non-rectified change of %s %j',
Model.modelName, id);
conflicts.push(change);
// NOTE(bajtos) deleteAll triggers change rectification
// for all model instances, even when no records were updated,
// thus we don't need to rectify explicitly ourselves
return cb();

case undefined:
case null:
return cb(new Error(
g.f('Cannot apply bulk updates, ' +
'the connector does not correctly report ' +
'the number of deleted records.')));

default:
debug('%s.deleteAll modified unexpected number of instances: %j',
Model.modelName, count);
return cb(new Error(
g.f('Bulk update failed, the connector has deleted unexpected ' +
'number of records: %s', JSON.stringify(count))));
}
});
}

/**
* Get the `Change` model.
* Throws an error if the change model is not correctly setup.
Expand Down

0 comments on commit 205559b

Please sign in to comment.