Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

v6.3.0 - Refactor StorageAdapter#_size to accept file key #669

Merged
merged 2 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 21 additions & 39 deletions lib/network/farmer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

var kfs = require('kfs');
var fs = require('fs');
var path = require('path');
var kad = require('kad');
Expand Down Expand Up @@ -58,7 +59,6 @@ function FarmerInterface(options) {

options = merge.recursive(Object.create(FarmerInterface.DEFAULTS), options);

this._hasFreeSpace = true;
this._negotiator = options.contractNegotiator;
this._pendingOffers = [];
this._offerBackoffLimit = options.offerBackoffLimit;
Expand All @@ -67,7 +67,6 @@ function FarmerInterface(options) {
null;

Network.call(this, options);
this._listenForCapacityChanges(options.storageManager);
}

inherits(FarmerInterface, Network);
Expand Down Expand Up @@ -110,7 +109,7 @@ FarmerInterface.Negotiator = function(contract, callback) {
// NB: Backoff on sending offers if we are already have high active transfer
var concurrentTransfer = (
self.transport.shardServer.activeTransfers >= self._offerBackoffLimit
);
);
self._logger.debug(
'active transfers %s is less than offerBackoffLimit %s: %s',
self.transport.shardServer.activeTransfers,
Expand Down Expand Up @@ -299,22 +298,26 @@ FarmerInterface.prototype._shouldSendOffer = function(contract, callback) {
this._negotiator.call(this, contract, function(shouldNegotiate) {
/* eslint max-statements: [2, 16] */
self._logger.debug('negotiator returned: %s', shouldNegotiate);
self.storageManager._storage.size(function(err, usedSpace) {
if (err) {
self._logger.error('Could not get usedSpace: %s',err.message);
return callback(false);
self.storageManager._storage.size(
contract.get('data_hash'),
function(err, usedSpace) {
if (err) {
self._logger.error('Could not get usedSpace: %s',err.message);
return callback(false);
}

var maxCapacity = self.storageManager._options.maxCapacity;
var estimatedMaxBucketSize = Math.floor(maxCapacity / kfs.constants.B);
var freeSpace = estimatedMaxBucketSize - usedSpace;
var enoughFreeSpace = contract.get('data_size') <= freeSpace;
self._logger.debug('we have enough free space: %s', enoughFreeSpace);

callback(
(self._pendingOffers.length < self._options.maxOfferConcurrency) &&
shouldNegotiate && enoughFreeSpace
);
}

var freeSpace = self.storageManager._options.maxCapacity - usedSpace;
var enoughFreeSpace = self._hasFreeSpace &&
(contract.get('data_size') <= freeSpace);
self._logger.debug('we have enough free space: %s', enoughFreeSpace);

callback(
(self._pendingOffers.length < self._options.maxOfferConcurrency) &&
shouldNegotiate && enoughFreeSpace
);
});
);
});
};

Expand Down Expand Up @@ -388,27 +391,6 @@ FarmerInterface.prototype._listenForContracts = function(opcodes) {
this.subscribe(opcodes, this._handleContractPublication.bind(this));
};

/**
* Updates the internal tracker of free space
* @private
* @param {StorageManager} manager - The storage manager passed to the interface
*/
FarmerInterface.prototype._listenForCapacityChanges = function(manager) {
var self = this;

manager.on('locked', function() {
self._hasFreeSpace = false;
});

manager.on('unlocked', function() {
self._hasFreeSpace = true;
});

manager.on('error', function(err) {
self._logger.warn('error in storage manager: %s', err.message);
});
};

/**
* Handles received contract publications
* @private
Expand Down
7 changes: 3 additions & 4 deletions lib/storage/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,11 @@ StorageAdapter.prototype.del = function(key, callback) {

/**
* Calls the implemented {@link StorageAdapter#_size}
* @param {String} [key] - Optional file key
* @param {Function} callback - Called with error or number of bytes stored
*/
StorageAdapter.prototype.size = function(callback) {
assert(typeof callback === 'function', 'Callback function must be supplied');

return this._size(callback);
StorageAdapter.prototype.size = function(/* [,key] callback */) {
return this._size.apply(this, arguments);
};

/**
Expand Down
19 changes: 16 additions & 3 deletions lib/storage/adapters/embedded.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,17 @@ EmbeddedStorageAdapter.prototype._flush = function(callback) {
/**
* Implements the abstract {@link StorageAdapter#_size}
* @private
* @param {String} [key]
* @param {Function} callback
*/
EmbeddedStorageAdapter.prototype._size = function(callback) {
EmbeddedStorageAdapter.prototype._size = function(key, callback) {
var self = this;

if (typeof key === 'function') {
callback = key;
key = null;
}

this._db.db.approximateSize(
EmbeddedStorageAdapter.SIZE_START_KEY,
EmbeddedStorageAdapter.SIZE_END_KEY,
Expand All @@ -179,7 +185,7 @@ EmbeddedStorageAdapter.prototype._size = function(callback) {
return callback(err);
}

self._fs.stat(function(err, stats) {
function handleStatResults(err, stats) {
if (err) {
return callback(err);
}
Expand All @@ -195,7 +201,14 @@ EmbeddedStorageAdapter.prototype._size = function(callback) {
}).sBucketStats.size;

callback(null, kfsUsedSpace + contractDbSize);
});
}

/* istanbul ignore if */
if (key) {
self._fs.stat(key, handleStatResults);
} else {
self._fs.stat(handleStatResults);
}
}
);
};
Expand Down
9 changes: 8 additions & 1 deletion lib/storage/adapters/ram.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,16 @@ RAMStorageAdapter.prototype._del = function(key, callback) {
/**
* Implements the abstract {@link StorageAdapter#_size}
* @private
* @param {String} [key]
* @param {Function} callback
*/
RAMStorageAdapter.prototype._size = function(callback) {
/* istanbul ignore next */
RAMStorageAdapter.prototype._size = function(key, callback) {
/* istanbul ignore if */
if (typeof key === 'function') {
callback = key;
}

var shardBytes = 0;

for (var _key in this._shards) {
Expand Down
40 changes: 0 additions & 40 deletions lib/storage/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,13 @@ function StorageManager(storage, options) {

this._options = merge(Object.create(StorageManager.DEFAULTS), options);
this._storage = storage;
this._capacityReached = false;
this._logger = this._options.logger || new Logger(0);

this._initShardReaper();
}

inherits(StorageManager, EventEmitter);

/**
* Triggered when the underlying storage adapter reaches capacity
* @event StorageManager#locked
*/

/**
* Triggered when the underlying storage adapter has newly freed space
* @event StorageManager#unlocked
*/

StorageManager.DEFAULTS = {
disableReaper: false,
maxCapacity: Infinity
Expand Down Expand Up @@ -87,10 +76,6 @@ StorageManager.prototype.save = function(item, callback) {
assert(item instanceof StorageItem, 'Invalid storage item supplied');
assert(typeof callback === 'function', 'Callback function must be supplied');

if (this._capacityReached) {
return callback(new Error('Storage capacity reached'));
}

self._storage.get(item.hash, function(err, existingItem) {
self._storage.put(
self._merge(existingItem, item),
Expand All @@ -99,7 +84,6 @@ StorageManager.prototype.save = function(item, callback) {
return callback(err);
}

self._checkCapacity();
callback(null);
}
);
Expand Down Expand Up @@ -188,36 +172,12 @@ StorageManager.prototype.clean = function(callback) {
self._logger.warn('problem while flushing shards, %s', err.message);
}

self._checkCapacity();
self._logger.info('flushing shards finished');
callback();
});
});
};

/**
* Checks the underlying storage adapter's size and determines if our defined
* capacity has been reached
* @private
*/
StorageManager.prototype._checkCapacity = function() {
var self = this;

this._storage.size(function(err, bytes) {
if (err) {
return self.emit('error', err);
}

var capacityReached = bytes >= self._options.maxCapacity;

if (capacityReached !== self._capacityReached) {
self.emit(capacityReached ? 'locked' : 'unlocked');
}

self._capacityReached = capacityReached;
});
};

/**
* Initialize the shard reaper to check for stale contracts and reap shards
* @private
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "storj-lib",
"version": "6.2.2",
"version": "6.3.0",
"description": "implementation of the storj protocol for node.js and the browser",
"main": "index.js",
"directories": {
Expand Down Expand Up @@ -80,7 +80,7 @@
"kad": "^1.6.4",
"kad-logger-json": "^0.1.2",
"kad-quasar": "^1.2.1",
"kfs": "^3.1.2",
"kfs": "^3.1.4",
"knuth-shuffle": "^1.0.1",
"leveldown": "^1.6.0",
"levelup": "^1.3.1",
Expand Down
68 changes: 2 additions & 66 deletions test/network/farmer.unit.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var utils = require('../../lib/utils');
var StorageItem = require('../../lib/storage/item');
var StorageManager = require('../../lib/storage/manager');
var RAMStorageAdapter = require('../../lib/storage/adapters/ram');
var EventEmitter = require('events').EventEmitter;
var CLEANUP = [];

describe('FarmerInterface', function() {
Expand Down Expand Up @@ -105,7 +104,7 @@ describe('FarmerInterface', function() {
var _size = sinon.stub(
farmer.storageManager._storage,
'size'
).callsArgWith(0, new Error('Cannot get farmer disk space'));
).callsArgWith(1, new Error('Cannot get farmer disk space'));
var _addTo = sinon.stub(farmer, '_addContractToPendingList');
farmer._handleContractPublication(Contract({}));
_size.restore();
Expand Down Expand Up @@ -133,7 +132,7 @@ describe('FarmerInterface', function() {
var _size = sinon.stub(
farmer.storageManager._storage,
'size'
).callsArgWith(0, null, 1000);
).callsArgWith(1, null, 1000);
farmer.storageManager._options.maxCapacity = 2000;
var _addTo = sinon.stub(farmer, '_addContractToPendingList');
//execute
Expand Down Expand Up @@ -675,69 +674,6 @@ describe('FarmerInterface', function() {

});

describe('#_listenForCapacityChanges', function() {

it('should set the free space to true', function(done) {
var farmer = new FarmerInterface({
keyPair: KeyPair(),
rpcPort: 0,
tunnelServerPort: 0,
doNotTraverseNat: true,
logger: kad.Logger(0),
storageManager: new StorageManager(new RAMStorageAdapter())
});
CLEANUP.push(farmer);
var manager = new EventEmitter();
farmer._listenForCapacityChanges(manager);
manager.emit('unlocked');
setImmediate(function() {
expect(farmer._hasFreeSpace).to.equal(true);
done();
});
});

it('should set the free space to false', function(done) {
var farmer = new FarmerInterface({
keyPair: KeyPair(),
rpcPort: 0,
tunnelServerPort: 0,
doNotTraverseNat: true,
logger: kad.Logger(0),
storageManager: new StorageManager(new RAMStorageAdapter())
});
CLEANUP.push(farmer);
var manager = new EventEmitter();
farmer._listenForCapacityChanges(manager);
manager.emit('locked');
setImmediate(function() {
expect(farmer._hasFreeSpace).to.equal(false);
done();
});
});

it('should log the error', function(done) {
var logger = kad.Logger(0);
var _warn = sinon.stub(logger, 'warn');
var farmer = new FarmerInterface({
keyPair: KeyPair(),
rpcPort: 0,
tunnelServerPort: 0,
doNotTraverseNat: true,
logger: logger,
storageManager: new StorageManager(new RAMStorageAdapter())
});
CLEANUP.push(farmer);
var manager = new EventEmitter();
farmer._listenForCapacityChanges(manager);
manager.emit('error', new Error('Failed'));
setImmediate(function() {
expect(_warn.called).to.equal(true);
done();
});
});

});

after(function() {
CLEANUP.forEach(function(farmer) {
if (farmer.node) {
Expand Down
Loading