Skip to content

Commit

Permalink
Fix memory leak and cache update issues in 14.0.4 (nightscout#6133)
Browse files Browse the repository at this point in the history
* Fix a memory leak in 14.0.4
* Fix linter error in ddata.js
* Move data retention periods to caching
* Update _id of inserted entries and device status so merging to cache works correctly
* Reset the data in ddata before updating
* Fix typo on entry cache retention period
* Have device status cache retention period follow configuration
* Fix _id injection in treatments
  • Loading branch information
sulkaharo authored and mrspouse committed Oct 17, 2020
1 parent 403f839 commit 6b0009e
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 128 deletions.
36 changes: 14 additions & 22 deletions lib/data/dataloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const _ = require('lodash');
const async = require('async');
const times = require('../times');
const fitTreatmentsToBGCurve = require('./treatmenttocurve');
const constants = require('../constants');

Expand Down Expand Up @@ -144,8 +143,11 @@ function init(env, ctx) {
done(err, result);
}

// clear treatments to the base set, we're going to merge from multiple queries
ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : [];
// clear data we'll get from the cache

ddata.treatments = [];
ddata.devicestatus = [];
ddata.entries = [];

ddata.dbstats = {};

Expand Down Expand Up @@ -196,11 +198,8 @@ function loadEntries(ddata, ctx, callback) {

if (!err && results) {

const ageFilter = ddata.lastUpdated - constants.TWO_DAYS;
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('entries', r, ageFilter);

const currentData = ctx.cache.getData('entries').reverse();
const currentData = ctx.cache.insertData('entries', r).reverse();

const mbgs = [];
const sgvs = [];
Expand Down Expand Up @@ -324,12 +323,11 @@ function loadTreatments(ddata, ctx, callback) {

ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
const ageFilter = ddata.lastUpdated - longLoad;
const r = ctx.ddata.processRawDataForRuntime(results);

// update cache
ctx.cache.insertData('treatments', r, ageFilter);
ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments'));
// update cache and apply to runtime data
const r = ctx.ddata.processRawDataForRuntime(results);
const currentData = ctx.cache.insertData('treatments', r);
ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, currentData);
}

callback();
Expand Down Expand Up @@ -361,7 +359,6 @@ function loadProfileSwitchTreatments(ddata, ctx, callback) {
ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
ddata.treatments = mergeProcessSort(ddata.treatments, results);
//mergeToTreatments(ddata, results);
}

// Store last profile switch
Expand Down Expand Up @@ -418,7 +415,6 @@ function loadLatestSingle(ddata, ctx, dataType, callback) {
ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
ddata.treatments = mergeProcessSort(ddata.treatments, results);
//mergeToTreatments(ddata, results);
}
callback();
});
Expand Down Expand Up @@ -473,16 +469,12 @@ function loadDeviceStatus(ddata, env, ctx, callback) {

ctx.devicestatus.list(opts, function(err, results) {
if (!err && results) {
// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter);

const ageFilter = ddata.lastUpdated - longLoad;
// update cache and apply to runtime data
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('devicestatus', r, ageFilter);

const res = ctx.cache.getData('devicestatus');
const currentData = ctx.cache.insertData('devicestatus', r);

const res2 = _.map(res, function eachStatus(result) {
//result.mills = new Date(result.created_at).getTime();
const res2 = _.map(currentData, function eachStatus(result) {
if ('uploaderBattery' in result) {
result.uploader = {
battery: result.uploaderBattery
Expand All @@ -492,7 +484,7 @@ function loadDeviceStatus(ddata, env, ctx, callback) {
return result;
});

ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter);
ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2);
} else {
ddata.devicestatus = [];
}
Expand Down
8 changes: 5 additions & 3 deletions lib/data/ddata.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ function init () {

Object.keys(obj).forEach(key => {
if (typeof obj[key] === 'object' && obj[key]) {
if (obj[key].hasOwnProperty('_id')) {
if (Object.prototype.hasOwnProperty.call(obj[key], '_id')) {
obj[key]._id = obj[key]._id.toString();
}
if (obj[key].hasOwnProperty('created_at') && !obj[key].hasOwnProperty('mills')) {
if (Object.prototype.hasOwnProperty.call(obj[key], 'created_at')
&& !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) {
obj[key].mills = new Date(obj[key].created_at).getTime();
}
if (obj[key].hasOwnProperty('sysTime') && !obj[key].hasOwnProperty('mills')) {
if (Object.prototype.hasOwnProperty.call(obj[key], 'sysTime')
&& !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) {
obj[key].mills = new Date(obj[key].sysTime).getTime();
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/server/bootevent.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ function boot (env, language) {
});

ctx.bus.on('data-loaded', function updatePlugins ( ) {
console.info('reloading sandbox data');
// console.info('reloading sandbox data');
var sbx = require('../sandbox')().serverInit(env, ctx);
ctx.plugins.setProperties(sbx);
ctx.notifications.initRequests();
Expand Down
45 changes: 23 additions & 22 deletions lib/server/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,33 @@ function cache (env, ctx) {
, entries: []
};

const dataArray = [
data.treatments
, data.devicestatus
, data.entries
];
const retentionPeriods = {
treatments: constants.ONE_HOUR * 60
, devicestatus: env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY
, entries: constants.TWO_DAYS
};

function mergeCacheArrays (oldData, newData, retentionPeriod) {

function mergeCacheArrays (oldData, newData, ageLimit) {
const ageLimit = Date.now() - retentionPeriod;

var filtered = _.filter(newData, function hasId (object) {
const hasId = !_.isEmpty(object._id);
const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit);
return isFresh && hasId;
});
var filteredOld = filterForAge(oldData, ageLimit);
var filteredNew = filterForAge(newData, ageLimit);

const merged = ctx.ddata.idMergePreferNew(oldData, filtered);
const merged = ctx.ddata.idMergePreferNew(filteredOld, filteredNew);

return _.sortBy(merged, function(item) {
return -item.mills;
});

function filterForAge(data, ageLimit) {
return _.filter(data, function hasId(object) {
const hasId = !_.isEmpty(object._id);
const isFresh = object.mills >= ageLimit;
return isFresh && hasId;
});
}

}

data.isEmpty = (datatype) => {
Expand All @@ -54,20 +60,17 @@ function cache (env, ctx) {
return _.cloneDeep(data[datatype]);
}

data.insertData = (datatype, newData, retentionPeriod) => {
data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod);
data.insertData = (datatype, newData) => {
data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriods[datatype]);
return data.getData(datatype);
}

function dataChanged (operation) {
//console.log('Cache data operation requested', operation);

if (!data[operation.type]) return;

if (operation.op == 'remove') {
//console.log('Cache data delete event');
// if multiple items were deleted, flush entire cache
if (!operation.changes) {
//console.log('Multiple items delete from cache, flushing all')
data.treatments = [];
data.devicestatus = [];
data.entries = [];
Expand All @@ -76,9 +79,8 @@ function cache (env, ctx) {
}
}

if (operation.op == 'update') {
//console.log('Cache data update event');
data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes);
if (operation.op == 'update') {
data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[operation.type]);
}
}

Expand All @@ -96,7 +98,6 @@ function cache (env, ctx) {
}

return data;

}

module.exports = cache;
62 changes: 34 additions & 28 deletions lib/server/devicestatus.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,38 @@ var find_options = require('./query');

function storage (collection, ctx) {

function create(obj, fn) {
function create (obj, fn) {

// Normalize all dates to UTC
const d = moment(obj.created_at).isValid() ? moment.parseZone(obj.created_at) : moment();
obj.created_at = d.toISOString();
obj.utcOffset = d.utcOffset();
api().insert(obj, function (err, doc) {
if (err != null && err.message) {

api().insertOne(obj, function(err, results) {
if (err !== null && err.message) {
console.log('Error inserting the device status object', err.message);
fn(err.message, null);
return;
}

ctx.bus.emit('data-update', {
type: 'devicestatus',
op: 'update',
changes: ctx.ddata.processRawDataForRuntime([doc])
});
if (!err) {

if (!obj._id) obj._id = results.insertedIds[0]._id;

fn(null, doc.ops);
ctx.bus.emit('data-update', {
type: 'devicestatus'
, op: 'update'
, changes: ctx.ddata.processRawDataForRuntime([obj])
});
}

fn(null, results.ops);
ctx.bus.emit('data-received');
});
}

function last(fn) {
return list({count: 1}, function (err, entries) {
function last (fn) {
return list({ count: 1 }, function(err, entries) {
if (entries && entries.length > 0) {
fn(err, entries[0]);
} else {
Expand All @@ -44,18 +49,18 @@ function storage (collection, ctx) {
return find_options(opts, storage.queryOpts);
}

function list(opts, fn) {
function list (opts, fn) {
// these functions, find, sort, and limit, are used to
// dynamically configure the request, based on the options we've
// been given

// determine sort options
function sort ( ) {
return opts && opts.sort || {created_at: -1};
function sort () {
return opts && opts.sort || { created_at: -1 };
}

// configure the limit portion of the current query
function limit ( ) {
function limit () {
if (opts && opts.count) {
return this.limit(parseInt(opts.count));
}
Expand All @@ -68,31 +73,31 @@ function storage (collection, ctx) {
}

// now just stitch them all together
limit.call(api( )
.find(query_for(opts))
.sort(sort( ))
limit.call(api()
.find(query_for(opts))
.sort(sort())
).toArray(toArray);
}

function remove (opts, fn) {

function removed(err, stat) {
function removed (err, stat) {

ctx.bus.emit('data-update', {
type: 'devicestatus',
op: 'remove',
count: stat.result.n,
changes: opts.find._id
type: 'devicestatus'
, op: 'remove'
, count: stat.result.n
, changes: opts.find._id
});

fn(err, stat);
}

return api( ).remove(
return api().remove(
query_for(opts), removed);
}

function api() {
function api () {
return ctx.store.collection(collection);
}

Expand All @@ -101,9 +106,10 @@ function storage (collection, ctx) {
api.query_for = query_for;
api.last = last;
api.remove = remove;
api.aggregate = require('./aggregate')({ }, api);
api.aggregate = require('./aggregate')({}, api);
api.indexedFields = [
'created_at'

, 'NSCLIENT_ID'
];
return api;
Expand Down
Loading

0 comments on commit 6b0009e

Please sign in to comment.