Skip to content

Commit

Permalink
Fix for _PushStatus Stuck 'running' when Count is Off (parse-communit…
Browse files Browse the repository at this point in the history
…y#4319)

* Fix for _PushStatus stuck 'running' if count is off

* use 'count' for batches

* push worker test fix
montymxb authored and flovilmart committed Nov 5, 2017

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent d89927e commit f699e73
Showing 5 changed files with 276 additions and 9 deletions.
257 changes: 257 additions & 0 deletions spec/Parse.Push.spec.js
Original file line number Diff line number Diff line change
@@ -203,4 +203,261 @@ describe('Parse.Push', () => {
done();
});
});

const successfulAny = function(body, installations) {
const promises = installations.map((device) => {
return Promise.resolve({
transmitted: true,
device: device,
})
});

return Promise.all(promises);
};

const provideInstallations = function(num) {
if(!num) {
num = 2;
}

const installations = [];
while(installations.length !== num) {
// add Android installations
const installation = new Parse.Object("_Installation");
installation.set("installationId", "installation_" + installations.length);
installation.set("deviceToken","device_token_" + installations.length);
installation.set("deviceType", "android");
installations.push(installation);
}

return installations;
};

const losingAdapter = {
send: function(body, installations) {
// simulate having lost an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.pop();

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
};

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates a simple push where 1 installation is removed between _PushStatus
* count being set and the pushes being sent
*/
it('does not get stuck with _PushStatus \'running\' on 1 installation lost', (done) => {
reconfigureServer({
push: {adapter: losingAdapter}
}).then(() => {
return Parse.Object.saveAll(provideInstallations());
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: 'android'}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
expect(result.get('numSent')).toEqual(1);
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates a simple push where 1 installation is added between _PushStatus
* count being set and the pushes being sent
*/
it('does not get stuck with _PushStatus \'running\' on 1 installation added', (done) => {
const installations = provideInstallations();

// add 1 iOS installation which we will omit & add later on
const iOSInstallation = new Parse.Object("_Installation");
iOSInstallation.set("installationId", "installation_" + installations.length);
iOSInstallation.set("deviceToken","device_token_" + installations.length);
iOSInstallation.set("deviceType", "ios");
installations.push(iOSInstallation);

reconfigureServer({
push: {
adapter: {
send: function(body, installations) {
// simulate having added an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.push(iOSInstallation);

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
}
}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: {'$ne' : 'random'}}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
expect(result.get('numSent')).toEqual(3);
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates an extended push, where some installations may be removed,
* resulting in a non-zero count
*/
it('does not get stuck with _PushStatus \'running\' on many installations removed', (done) => {
const devices = 1000;
const installations = provideInstallations(devices);

reconfigureServer({
push: {adapter: losingAdapter}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: 'android'}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
// expect # less than # of batches used, assuming each batch is 100 pushes
expect(result.get('numSent')).toEqual(devices - (devices / 100));
expect(result.get('count')).toEqual(undefined);
done();
});
});

/**
* Verifies that _PushStatus cannot get stuck in a 'running' state
* Simulates an extended push, where some installations may be added,
* resulting in a non-zero count
*/
it('does not get stuck with _PushStatus \'running\' on many installations added', (done) => {
const devices = 1000;
const installations = provideInstallations(devices);

// add 1 iOS installation which we will omit & add later on
const iOSInstallations = [];

while(iOSInstallations.length !== (devices / 100)) {
const iOSInstallation = new Parse.Object("_Installation");
iOSInstallation.set("installationId", "installation_" + installations.length);
iOSInstallation.set("deviceToken", "device_token_" + installations.length);
iOSInstallation.set("deviceType", "ios");
installations.push(iOSInstallation);
iOSInstallations.push(iOSInstallation);
}

reconfigureServer({
push: {
adapter: {
send: function(body, installations) {
// simulate having added an installation before this was called
// thus invalidating our 'count' in _PushStatus
installations.push(iOSInstallations.pop());

return successfulAny(body, installations);
},
getValidPushTypes: function() {
return ["android"];
}
}
}
}).then(() => {
return Parse.Object.saveAll(installations);
}).then(() => {
return Parse.Push.send(
{
data: {alert: "We fixed our status!"},
where: {deviceType: {'$ne' : 'random'}}
},
{ useMasterKey: true }
);
}).then(() => {
// it is enqueued so it can take time
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 1000);
});
}).then(() => {
// query for push status
const query = new Parse.Query('_PushStatus');
return query.find({useMasterKey: true});
}).then((results) => {
// verify status is NOT broken
expect(results.length).toBe(1);
const result = results[0];
expect(result.get('status')).toEqual('succeeded');
// expect # less than # of batches used, assuming each batch is 100 pushes
expect(result.get('numSent')).toEqual(devices + (devices / 100));
expect(result.get('count')).toEqual(undefined);
done();
});
});
});
4 changes: 2 additions & 2 deletions spec/PushWorker.spec.js
Original file line number Diff line number Diff line change
@@ -276,7 +276,7 @@ describe('PushWorker', () => {
'failedPerType.ios': { __op: 'Increment', amount: 1 },
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
count: { __op: 'Increment', amount: -2 },
count: { __op: 'Increment', amount: -1 }
});
const query = new Parse.Query('_PushStatus');
return query.get(handler.objectId, { useMasterKey: true });
@@ -354,7 +354,7 @@ describe('PushWorker', () => {
'failedPerType.ios': { __op: 'Increment', amount: 1 },
[`sentPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
[`failedPerUTCOffset.${UTCOffset}`]: { __op: 'Increment', amount: 1 },
count: { __op: 'Increment', amount: -2 },
count: { __op: 'Increment', amount: -1 }
});
done();
});
2 changes: 1 addition & 1 deletion src/Controllers/SchemaController.js
Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ const defaultColumns = Object.freeze({
"failedPerType": {type:'Object'},
"sentPerUTCOffset": {type:'Object'},
"failedPerUTCOffset": {type:'Object'},
"count": {type:'Number'}
"count": {type:'Number'} // tracks # of batches queued and pending
},
_JobStatus: {
"jobName": {type: 'String'},
2 changes: 1 addition & 1 deletion src/Push/PushQueue.js
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ export class PushQueue {
if (!results || count == 0) {
return Promise.reject({error: 'PushController: no results in query'})
}
pushStatus.setRunning(count);
pushStatus.setRunning(Math.ceil(count / limit));
let skip = 0;
while (skip < count) {
const query = { where,
20 changes: 15 additions & 5 deletions src/StatusHandler.js
Original file line number Diff line number Diff line change
@@ -190,10 +190,18 @@ export function pushStatusHandler(config, existingObjectId) {
});
}

const setRunning = function(count) {
logger.verbose(`_PushStatus ${objectId}: sending push to %d installations`, count);
return handler.update({status:"pending", objectId: objectId},
{status: "running", count });
const setRunning = function(batches) {
logger.verbose(`_PushStatus ${objectId}: sending push to installations with %d batches`, batches);
return handler.update(
{
status:"pending",
objectId: objectId
},
{
status: "running",
count: batches
}
);
}

const trackSent = function(results, UTCOffset, cleanupInstallations = process.env.PARSE_SERVER_CLEANUP_INVALID_INSTALLATIONS) {
@@ -235,7 +243,6 @@ export function pushStatusHandler(config, existingObjectId) {
}
return memo;
}, update);
incrementOp(update, 'count', -results.length);
}

logger.verbose(`_PushStatus ${objectId}: sent push! %d success, %d failures`, update.numSent, update.numFailed);
@@ -259,6 +266,9 @@ export function pushStatusHandler(config, existingObjectId) {
});
}

// indicate this batch is complete
incrementOp(update, 'count', -1);

return handler.update({ objectId }, update).then((res) => {
if (res && res.count === 0) {
return this.complete();

0 comments on commit f699e73

Please sign in to comment.