Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(max-staleness): properly support a max staleness reducer
Browse files Browse the repository at this point in the history
This adds a `maxStalenessReducer` to our server selectors, and
adds it in all the right places in the `readPreferenceServerSelector`.
  • Loading branch information
mbroadst committed May 23, 2018
1 parent 45b3d15 commit d9c5c16
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 19 deletions.
64 changes: 50 additions & 14 deletions lib/sdam/server_selectors.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
const ServerType = require('./server_description').ServerType;
const TopologyType = require('./topology_description').TopologyType;
const ReadPreference = require('../topologies/read_preference');
const MongoError = require('../error').MongoError;

// max staleness constants
const IDLE_WRITE_PERIOD = 10000;
const SMALLEST_MAX_STALENESS_SECONDS = 90;

function writableServerSelector() {
return function(topologyDescription, servers) {
Expand All @@ -23,23 +28,39 @@ function maxStalenessReducer(readPreference, topologyDescription, servers) {
return servers;
}

const maxStaleness = readPreference.maxStalenessSeconds;
const maxStalenessVariance =
(topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
if (maxStaleness < maxStalenessVariance) {
throw MongoError(`maxStalenessSeconds must be at least ${maxStalenessVariance} seconds`);
}

if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
throw new MongoError(
`maxStalenessSeconds must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`
);
}

if (topologyDescription.type === TopologyType.ReplicaSetWithPrimary) {
const primary = servers.filter(primaryFilter);
const primary = servers.filter(primaryFilter)[0];
return servers.reduce((result, server) => {
const staleness =
const stalenessMS =
server.lastUpdateTime -
server.lastWriteDate -
(primary.lastUpdateTime - primary.lastWriteDate) +
topologyDescription.heartbeatFrequencyMS;

const staleness = stalenessMS / 1000;
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
} else if (topologyDescription.type === TopologyType.ReplicaSetNoPrimary) {
const sMax = servers.reduce((max, s) => (s.lastWriteDate > max.lastWriteDate ? s : max));
return servers.reduce((result, server) => {
const staleness =
const stalenessMS =
sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;

const staleness = stalenessMS / 1000;
if (staleness <= readPreference.maxStalenessSeconds) result.push(server);
return result;
}, []);
Expand Down Expand Up @@ -111,18 +132,33 @@ function nearestFilter(server) {
return server.type === ServerType.RSSecondary || server.type === ServerType.RSPrimary;
}

function knownFilter(server) {
return server.type !== ServerType.Unknown;
}

function readPreferenceServerSelector(readPreference) {
if (!readPreference.isValid()) {
throw new TypeError('Invalid read preference specified');
}

return function(topologyDescription, servers) {
const commonWireVersion = topologyDescription.commonWireVersion;
if (
commonWireVersion &&
(readPreference.minWireVersion && readPreference.minWireVersion > commonWireVersion)
) {
throw new MongoError(
`Minimum wire version '${
readPreference.minWireVersion
}' required, but found '${commonWireVersion}'`
);
}

if (
topologyDescription.type === TopologyType.Single ||
topologyDescription.type === TopologyType.Sharded ||
topologyDescription.type === TopologyType.Unknown
topologyDescription.type === TopologyType.Sharded
) {
return latencyWindowReducer(topologyDescription, servers);
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
}

if (readPreference.mode === ReadPreference.PRIMARY) {
Expand All @@ -134,25 +170,25 @@ function readPreferenceServerSelector(readPreference) {
topologyDescription,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
maxStalenessReducer(readPreference, topologyDescription, servers)
)
);
).filter(secondaryFilter);
} else if (readPreference.mode === ReadPreference.NEAREST) {
return latencyWindowReducer(
topologyDescription,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(nearestFilter))
maxStalenessReducer(readPreference, topologyDescription, servers)
)
);
).filter(nearestFilter);
} else if (readPreference.mode === ReadPreference.SECONDARY_PREFERRED) {
const result = latencyWindowReducer(
topologyDescription,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
maxStalenessReducer(readPreference, topologyDescription, servers)
)
);
).filter(secondaryFilter);

return result.length === 0 ? servers.filter(primaryFilter) : result;
} else if (readPreference.mode === ReadPreference.PRIMARY_PREFERRED) {
Expand All @@ -165,9 +201,9 @@ function readPreferenceServerSelector(readPreference) {
topologyDescription,
tagSetReducer(
readPreference,
maxStalenessReducer(readPreference, topologyDescription, servers.filter(secondaryFilter))
maxStalenessReducer(readPreference, topologyDescription, servers)
)
);
).filter(secondaryFilter);
}
};
}
Expand Down
14 changes: 9 additions & 5 deletions lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const TopologyType = require('./topology_description').TopologyType;
const monitoring = require('./monitoring');
const calculateDurationInMs = require('../utils').calculateDurationInMs;
const MongoTimeoutError = require('../error').MongoTimeoutError;
const MongoError = require('../error').MongoError;

// Global state
let globalTopologyCounter = 0;
Expand Down Expand Up @@ -209,12 +208,17 @@ class FakeServer {
* @param {*} callback
*/
function selectServers(topology, selector, timeout, start, callback) {
if (!topology.description.compatible) {
return callback(new MongoError(topology.description.compatibilityError));
const serverDescriptions = Array.from(topology.description.servers.values());
let descriptions;

try {
descriptions = selector
? selector(topology.description, serverDescriptions)
: serverDescriptions;
} catch (e) {
return callback(e, null);
}

const serverDescriptions = Array.from(topology.description.servers.values());
let descriptions = selector(topology.description, serverDescriptions);
if (descriptions.length) {
// TODO: obviously return the actual server in the future
const servers = descriptions.map(d => new FakeServer(d));
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"eslint": "^4.6.1",
"eslint-plugin-prettier": "^2.2.0",
"jsdoc": "3.5.4",
"mongodb-extjson": "^2.1.2",
"mongodb-mock-server": "^1.0.0",
"mongodb-test-runner": "^1.1.18",
"prettier": "^1.6.1",
Expand Down

0 comments on commit d9c5c16

Please sign in to comment.