From 6b0009eedbb28b9238193c45361ae4d1bca1366f Mon Sep 17 00:00:00 2001 From: Sulka Haro Date: Sun, 27 Sep 2020 17:16:42 +0300 Subject: [PATCH] Fix memory leak and cache update issues in 14.0.4 (#6133) * Fix a memory leak in 14.0.4 * Fix linter error in ddata.js * Move data retention periods to caching * Update _id of inserted entries and device status so merging to cache works correctly * Reset the data in ddata before updating * Fix typo on entry cache retention period * Have device status cache retention period follow configuration * Fix _id injection in treatments --- lib/data/dataloader.js | 36 +++++------- lib/data/ddata.js | 8 ++- lib/server/bootevent.js | 2 +- lib/server/cache.js | 45 +++++++-------- lib/server/devicestatus.js | 62 +++++++++++---------- lib/server/entries.js | 110 ++++++++++++++++++++----------------- lib/server/treatments.js | 11 +++- 7 files changed, 146 insertions(+), 128 deletions(-) diff --git a/lib/data/dataloader.js b/lib/data/dataloader.js index 0f1b3c7bd86e..7074b18e1fbd 100644 --- a/lib/data/dataloader.js +++ b/lib/data/dataloader.js @@ -2,7 +2,6 @@ const _ = require('lodash'); const async = require('async'); -const times = require('../times'); const fitTreatmentsToBGCurve = require('./treatmenttocurve'); const constants = require('../constants'); @@ -144,8 +143,11 @@ function init(env, ctx) { done(err, result); } - // clear treatments to the base set, we're going to merge from multiple queries - ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : []; + // clear data we'll get from the cache + + ddata.treatments = []; + ddata.devicestatus = []; + ddata.entries = []; ddata.dbstats = {}; @@ -196,11 +198,8 @@ function loadEntries(ddata, ctx, callback) { if (!err && results) { - const ageFilter = ddata.lastUpdated - constants.TWO_DAYS; const r = ctx.ddata.processRawDataForRuntime(results); - ctx.cache.insertData('entries', r, ageFilter); - - const currentData = ctx.cache.getData('entries').reverse(); + const currentData = ctx.cache.insertData('entries', r).reverse(); const mbgs = []; const sgvs = []; @@ -324,12 +323,11 @@ function loadTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { - const ageFilter = ddata.lastUpdated - longLoad; - const r = ctx.ddata.processRawDataForRuntime(results); - // update cache - ctx.cache.insertData('treatments', r, ageFilter); - ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments')); + // update cache and apply to runtime data + const r = ctx.ddata.processRawDataForRuntime(results); + const currentData = ctx.cache.insertData('treatments', r); + ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, currentData); } callback(); @@ -361,7 +359,6 @@ function loadProfileSwitchTreatments(ddata, ctx, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { ddata.treatments = mergeProcessSort(ddata.treatments, results); - //mergeToTreatments(ddata, results); } // Store last profile switch @@ -418,7 +415,6 @@ function loadLatestSingle(ddata, ctx, dataType, callback) { ctx.treatments.list(tq, function(err, results) { if (!err && results) { ddata.treatments = mergeProcessSort(ddata.treatments, results); - //mergeToTreatments(ddata, results); } callback(); }); @@ -473,16 +469,12 @@ function loadDeviceStatus(ddata, env, ctx, callback) { ctx.devicestatus.list(opts, function(err, results) { if (!err && results) { -// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter); - const ageFilter = ddata.lastUpdated - longLoad; + // update cache and apply to runtime data const r = ctx.ddata.processRawDataForRuntime(results); - ctx.cache.insertData('devicestatus', r, ageFilter); - - const res = ctx.cache.getData('devicestatus'); + const currentData = ctx.cache.insertData('devicestatus', r); - const res2 = _.map(res, function eachStatus(result) { - //result.mills = new Date(result.created_at).getTime(); + const res2 = _.map(currentData, function eachStatus(result) { if ('uploaderBattery' in result) { result.uploader = { battery: result.uploaderBattery @@ -492,7 +484,7 @@ function loadDeviceStatus(ddata, env, ctx, callback) { return result; }); - ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter); + ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2); } else { ddata.devicestatus = []; } diff --git a/lib/data/ddata.js b/lib/data/ddata.js index 65120782fc0f..9389f10d3600 100644 --- a/lib/data/ddata.js +++ b/lib/data/ddata.js @@ -32,13 +32,15 @@ function init () { Object.keys(obj).forEach(key => { if (typeof obj[key] === 'object' && obj[key]) { - if (obj[key].hasOwnProperty('_id')) { + if (Object.prototype.hasOwnProperty.call(obj[key], '_id')) { obj[key]._id = obj[key]._id.toString(); } - if (obj[key].hasOwnProperty('created_at') && !obj[key].hasOwnProperty('mills')) { + if (Object.prototype.hasOwnProperty.call(obj[key], 'created_at') + && !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) { obj[key].mills = new Date(obj[key].created_at).getTime(); } - if (obj[key].hasOwnProperty('sysTime') && !obj[key].hasOwnProperty('mills')) { + if (Object.prototype.hasOwnProperty.call(obj[key], 'sysTime') + && !Object.prototype.hasOwnProperty.call(obj[key], 'mills')) { obj[key].mills = new Date(obj[key].sysTime).getTime(); } } diff --git a/lib/server/bootevent.js b/lib/server/bootevent.js index 93c6c7f20574..6247645b3d20 100644 --- a/lib/server/bootevent.js +++ b/lib/server/bootevent.js @@ -255,7 +255,7 @@ function boot (env, language) { }); ctx.bus.on('data-loaded', function updatePlugins ( ) { - console.info('reloading sandbox data'); + // console.info('reloading sandbox data'); var sbx = require('../sandbox')().serverInit(env, ctx); ctx.plugins.setProperties(sbx); ctx.notifications.initRequests(); diff --git a/lib/server/cache.js b/lib/server/cache.js index fca93afafdea..1b2576ce0297 100644 --- a/lib/server/cache.js +++ b/lib/server/cache.js @@ -23,27 +23,33 @@ function cache (env, ctx) { , entries: [] }; - const dataArray = [ - data.treatments - , data.devicestatus - , data.entries - ]; + const retentionPeriods = { + treatments: constants.ONE_HOUR * 60 + , devicestatus: env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY + , entries: constants.TWO_DAYS + }; + function mergeCacheArrays (oldData, newData, retentionPeriod) { - function mergeCacheArrays (oldData, newData, ageLimit) { + const ageLimit = Date.now() - retentionPeriod; - var filtered = _.filter(newData, function hasId (object) { - const hasId = !_.isEmpty(object._id); - const isFresh = (ageLimit && object.mills >= ageLimit) || (!ageLimit); - return isFresh && hasId; - }); + var filteredOld = filterForAge(oldData, ageLimit); + var filteredNew = filterForAge(newData, ageLimit); - const merged = ctx.ddata.idMergePreferNew(oldData, filtered); + const merged = ctx.ddata.idMergePreferNew(filteredOld, filteredNew); return _.sortBy(merged, function(item) { return -item.mills; }); + function filterForAge(data, ageLimit) { + return _.filter(data, function hasId(object) { + const hasId = !_.isEmpty(object._id); + const isFresh = object.mills >= ageLimit; + return isFresh && hasId; + }); + } + } data.isEmpty = (datatype) => { @@ -54,20 +60,17 @@ function cache (env, ctx) { return _.cloneDeep(data[datatype]); } - data.insertData = (datatype, newData, retentionPeriod) => { - data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriod); + data.insertData = (datatype, newData) => { + data[datatype] = mergeCacheArrays(data[datatype], newData, retentionPeriods[datatype]); + return data.getData(datatype); } function dataChanged (operation) { - //console.log('Cache data operation requested', operation); - if (!data[operation.type]) return; if (operation.op == 'remove') { - //console.log('Cache data delete event'); // if multiple items were deleted, flush entire cache if (!operation.changes) { - //console.log('Multiple items delete from cache, flushing all') data.treatments = []; data.devicestatus = []; data.entries = []; @@ -76,9 +79,8 @@ function cache (env, ctx) { } } - if (operation.op == 'update') { - //console.log('Cache data update event'); - data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes); + if (operation.op == 'update') { + data[operation.type] = mergeCacheArrays(data[operation.type], operation.changes, retentionPeriods[operation.type]); } } @@ -96,7 +98,6 @@ function cache (env, ctx) { } return data; - } module.exports = cache; diff --git a/lib/server/devicestatus.js b/lib/server/devicestatus.js index f35153674284..e228f5b372ca 100644 --- a/lib/server/devicestatus.js +++ b/lib/server/devicestatus.js @@ -5,33 +5,38 @@ var find_options = require('./query'); function storage (collection, ctx) { - function create(obj, fn) { - + function create (obj, fn) { + // Normalize all dates to UTC const d = moment(obj.created_at).isValid() ? moment.parseZone(obj.created_at) : moment(); obj.created_at = d.toISOString(); obj.utcOffset = d.utcOffset(); - - api().insert(obj, function (err, doc) { - if (err != null && err.message) { + + api().insertOne(obj, function(err, results) { + if (err !== null && err.message) { console.log('Error inserting the device status object', err.message); fn(err.message, null); return; } - ctx.bus.emit('data-update', { - type: 'devicestatus', - op: 'update', - changes: ctx.ddata.processRawDataForRuntime([doc]) - }); + if (!err) { + + if (!obj._id) obj._id = results.insertedIds[0]._id; - fn(null, doc.ops); + ctx.bus.emit('data-update', { + type: 'devicestatus' + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([obj]) + }); + } + + fn(null, results.ops); ctx.bus.emit('data-received'); }); } - function last(fn) { - return list({count: 1}, function (err, entries) { + function last (fn) { + return list({ count: 1 }, function(err, entries) { if (entries && entries.length > 0) { fn(err, entries[0]); } else { @@ -44,18 +49,18 @@ function storage (collection, ctx) { return find_options(opts, storage.queryOpts); } - function list(opts, fn) { + function list (opts, fn) { // these functions, find, sort, and limit, are used to // dynamically configure the request, based on the options we've // been given // determine sort options - function sort ( ) { - return opts && opts.sort || {created_at: -1}; + function sort () { + return opts && opts.sort || { created_at: -1 }; } // configure the limit portion of the current query - function limit ( ) { + function limit () { if (opts && opts.count) { return this.limit(parseInt(opts.count)); } @@ -68,31 +73,31 @@ function storage (collection, ctx) { } // now just stitch them all together - limit.call(api( ) - .find(query_for(opts)) - .sort(sort( )) + limit.call(api() + .find(query_for(opts)) + .sort(sort()) ).toArray(toArray); } function remove (opts, fn) { - function removed(err, stat) { + function removed (err, stat) { ctx.bus.emit('data-update', { - type: 'devicestatus', - op: 'remove', - count: stat.result.n, - changes: opts.find._id + type: 'devicestatus' + , op: 'remove' + , count: stat.result.n + , changes: opts.find._id }); fn(err, stat); } - return api( ).remove( + return api().remove( query_for(opts), removed); } - function api() { + function api () { return ctx.store.collection(collection); } @@ -101,9 +106,10 @@ function storage (collection, ctx) { api.query_for = query_for; api.last = last; api.remove = remove; - api.aggregate = require('./aggregate')({ }, api); + api.aggregate = require('./aggregate')({}, api); api.indexedFields = [ 'created_at' + , 'NSCLIENT_ID' ]; return api; diff --git a/lib/server/entries.js b/lib/server/entries.js index f6b61024e7de..50c8e0cc41a2 100644 --- a/lib/server/entries.js +++ b/lib/server/entries.js @@ -10,60 +10,60 @@ var moment = require('moment'); * Encapsulate persistent storage of sgv entries. \**********/ -function storage(env, ctx) { +function storage (env, ctx) { // TODO: Code is a little redundant. // query for entries from storage function list (opts, fn) { - // these functions, find, sort, and limit, are used to - // dynamically configure the request, based on the options we've - // been given + // these functions, find, sort, and limit, are used to + // dynamically configure the request, based on the options we've + // been given - // determine sort options - function sort ( ) { - return opts && opts.sort || {date: -1}; - } + // determine sort options + function sort () { + return opts && opts.sort || { date: -1 }; + } - // configure the limit portion of the current query - function limit ( ) { - if (opts && opts.count) { - return this.limit(parseInt(opts.count)); - } - return this; + // configure the limit portion of the current query + function limit () { + if (opts && opts.count) { + return this.limit(parseInt(opts.count)); } + return this; + } - // handle all the results - function toArray (err, entries) { - fn(err, entries); - } + // handle all the results + function toArray (err, entries) { + fn(err, entries); + } - // now just stitch them all together - limit.call(api( ) - .find(query_for(opts)) - .sort(sort( )) - ).toArray(toArray); + // now just stitch them all together + limit.call(api() + .find(query_for(opts)) + .sort(sort()) + ).toArray(toArray); } function remove (opts, fn) { - api( ).remove(query_for(opts), function (err, stat) { + api().remove(query_for(opts), function(err, stat) { ctx.bus.emit('data-update', { - type: 'entries', - op: 'remove', - count: stat.result.n, - changes: opts.find._id + type: 'entries' + , op: 'remove' + , count: stat.result.n + , changes: opts.find._id }); //TODO: this is triggering a read from Mongo, we can do better - ctx.bus.emit('data-received'); - fn(err, stat); - }); + ctx.bus.emit('data-received'); + fn(err, stat); + }); } // return writable stream to lint each sgv record passing through it // TODO: get rid of this? not doing anything now - function map ( ) { + function map () { return es.map(function iter (item, next) { return next(null, item); }); @@ -80,7 +80,7 @@ function storage(env, ctx) { create(result, fn); } // lint and store the entire list - return es.pipeline(map( ), es.writeArray(done)); + return es.pipeline(map(), es.writeArray(done)); } //TODO: implement @@ -91,9 +91,9 @@ function storage(env, ctx) { // store new documents using the storage mechanism function create (docs, fn) { // potentially a batch insert - var firstErr = null, - numDocs = docs.length, - totalCreated = 0; + var firstErr = null + , numDocs = docs.length + , totalCreated = 0; docs.forEach(function(doc) { @@ -106,15 +106,21 @@ function storage(env, ctx) { doc.sysTime = _sysTime.toISOString(); if (doc.dateString) doc.dateString = doc.sysTime; - var query = (doc.sysTime && doc.type) ? {sysTime: doc.sysTime, type: doc.type} : doc; - api( ).update(query, doc, {upsert: true}, function (err) { + var query = (doc.sysTime && doc.type) ? { sysTime: doc.sysTime, type: doc.type } : doc; + api().update(query, doc, { upsert: true }, function(err, updateResults) { firstErr = firstErr || err; - ctx.bus.emit('data-update', { - type: 'entries', - op: 'update', - changes: ctx.ddata.processRawDataForRuntime([doc]) - }); + if (!err) { + if (updateResults.result.upserted) { + doc._id = updateResults.result.upserted[0]._id + } + + ctx.bus.emit('data-update', { + type: 'entries' + , op: 'update' + , changes: ctx.ddata.processRawDataForRuntime([doc]) + }); + } if (++totalCreated === numDocs) { //TODO: this is triggering a read from Mongo, we can do better @@ -125,8 +131,8 @@ function storage(env, ctx) { }); } - function getEntry(id, fn) { - api( ).findOne({_id: ObjectID(id)}, function (err, entry) { + function getEntry (id, fn) { + api().findOne({ _id: ObjectID(id) }, function(err, entry) { if (err) { fn(err); } else { @@ -140,7 +146,7 @@ function storage(env, ctx) { } // closure to represent the API - function api ( ) { + function api () { // obtain handle usable for querying the collection associated // with these records return ctx.store.collection(env.entries_collection); @@ -154,15 +160,21 @@ function storage(env, ctx) { api.persist = persist; api.query_for = query_for; api.getEntry = getEntry; - api.aggregate = require('./aggregate')({ }, api); + api.aggregate = require('./aggregate')({}, api); api.indexedFields = [ 'date' + , 'type' + , 'sgv' + , 'mbg' + , 'sysTime' + , 'dateString' - , { 'type' : 1, 'date' : -1, 'dateString' : 1 } + + , { 'type': 1, 'date': -1, 'dateString': 1 } ]; return api; } @@ -176,7 +188,7 @@ storage.queryOpts = { , rssi: parseInt , noise: parseInt , mbg: parseInt - } + } , useEpoch: true }; diff --git a/lib/server/treatments.js b/lib/server/treatments.js index dad13b5b5d65..a9107ea99b23 100644 --- a/lib/server/treatments.js +++ b/lib/server/treatments.js @@ -53,9 +53,7 @@ function storage (env, ctx) { if (!err) { if (updateResults.result.upserted) { obj._id = updateResults.result.upserted[0]._id - //console.log('PERSISTENCE: treatment upserted', updateResults.result.upserted[0]); } - //console.log('Update result', updateResults.result); } // TODO document this feature @@ -72,7 +70,14 @@ function storage (env, ctx) { } query.created_at = pbTreat.created_at; - api( ).update(query, pbTreat, {upsert: true}, function pbComplete (err) { + api( ).update(query, pbTreat, {upsert: true}, function pbComplete (err, updateResults) { + + if (!err) { + if (updateResults.result.upserted) { + pbTreat._id = updateResults.result.upserted[0]._id + } + } + var treatments = _.compact([obj, pbTreat]); ctx.bus.emit('data-update', {