Skip to content

Commit

Permalink
Merge pull request #273 from prey/resum-upload
Browse files Browse the repository at this point in the history
Files upload resuming
  • Loading branch information
javo authored Feb 21, 2017
2 parents af12693 + 4660fbe commit 0404f60
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 72 deletions.
103 changes: 83 additions & 20 deletions lib/agent/actions/fileretrieval/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,29 @@
// GPLv3 Licensed
//////////////////////////////////////////

var fs = require('fs'),
path = require('path'),
mime = require('mime'),
common = require('./../../common'),
needle = require('needle'),
join = require('path').join,
files = require('./storage'),
Emitter = require('events').EventEmitter;
var fs = require('fs'),
path = require('path'),
mime = require('mime'),
common = require('./../../common'),
needle = require('needle'),
join = require('path').join,
files = require('./storage'),
needle = require('needle'),
Emitter = require('events').EventEmitter;

var system = common.system,
run_as_user = common.system.run_as_user,
node_bin = path.join(system.paths.current, 'bin', 'node'),
os_name = process.platform.replace('darwin', 'mac').replace('win32', 'windows'),
logger = common.logger;

var config = common.config,
protocol = config._values['control-panel'].protocol,
host = config._values['control-panel'].host,
url = protocol + '://' + host;

var UPLOAD_SERVER = url + '/upload/upload';

var em,
cp;

Expand All @@ -30,14 +38,8 @@ var path_arg,

// check_pending_files is used to resume any files that might been pending. It's called from
// filesagent/providers/network.
exports.check_pending_files = function() {
files.run_stored();
}

exports.start = function(options, cb) {
if (!options.resumable) {
options.resumable = false;
}
var retrieve_file_as_user = function(options, cb) {
if (os_name == 'windows') {
path_arg = path.resolve(options.path);
name_arg = path.resolve(options.name);
Expand All @@ -49,25 +51,86 @@ exports.start = function(options, cb) {
user: options.user,
bin: node_bin,
type: 'exec',
args: [path.join(__dirname, 'upload.js'), path_arg, options.user, name_arg, options.size, options.file_id, options.resumable, options.port],
args: [path.join(__dirname, 'upload.js'), path_arg, options.user, name_arg, options.size, options.file_id, options.total, options.port],
opts: {
env: process.env
}
};
em = em || new Emitter();

files.store(options.file_id, options.path, options.size, options.user, options.name);

run_as_user(opts, function(err, out) {
if (err) {
logger.error("Upload error: " + err.message);
return;
}
logger.info("Ran as user: " + out);
if (out.indexOf("File succesfuly uploaded") != -1){
if (out.indexOf("File succesfuly uploaded") != -1) {
files.del(options.file_id);
return;
}
if (out.includes("EPIPE") || out.includes("EACCES")) {
files.update(options.file_id, options.path, options.size, options.user, options.name, options.resumable, function(err) {
if (err) logger.error("Database update error");
logger.info("Resume file option activated for ID: " + options.file_id);
});
}
});
}

exports.check_pending_files = function() {
files.run_stored();
}

exports.start = function(options, cb) {

var url = UPLOAD_SERVER + '?uploadID=' + options.file_id;
// Make a call to get the last byte processed by the upload server
// in order to resume the upload from that position.
needle.request('get', url, null, function(err, res) {
if (err) {
console.log(err);
return;
}
if (res.statusCode == 404) {
files.del(options.file_id);
return;
}
var data = JSON.parse(res.body);
var file_status = JSON.parse(res.body).Status
options.total = data.Total;

if (file_status == 0 || file_status == 4) { // File in progress(0) or Pending(4)
files.exist(options.file_id, function(err, exist) {
if (!exist) {
options.resumable = false;
options.total = 0;
files.store(options.file_id, options.path, options.size, options.user, options.name, options.resumable);
retrieve_file_as_user(options);

} else {
setTimeout(function() {
if (options.resumable) {
files.update(options.file_id, options.path, options.size, options.user, options.name, options.resumable, function(err) {
if (err) logger.error("Database update error");
logger.info("Resume file option deactivated for ID: " + options.file_id);
retrieve_file_as_user(options);
});
}
}, 2000);
}
})

} else {
if (file_status == 1)
logger.debug("File already uploaded, deleting from db...");
else
logger.debug("File cancelled or with an error, deleting from db...");
files.del(options.file_id);
return;
}
})

em = em || new Emitter();

if (cb) cb(null, em);
em.emit('end');
}
Expand Down
62 changes: 50 additions & 12 deletions lib/agent/actions/fileretrieval/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,63 @@ var exist = function(id, cb) {
if (err)
return err.message;
if (files[key])
return cb(true);
return cb(false);
return cb(null, true);
return cb(null, false);
});
}

exports.store = function(id, path, size, user, name) {
exist(id, function(cb) {
if (cb == false) {
exports.store = function(id, path, size, user, name, resumable) {
exist(id, function(err, out) {
if (out == false) {
logger.debug('Storing file_id in DB: ' + id);
var opts = {
path: path,
size: size,
user: user,
name: name
name: name,
resumable: false
}
var key = ['file', id].join('-');
storage.set(key, opts);
}
});
}

exports.update = function(id, path, size, user, name, resumable, cb) {
var key = ["file", id].join("-");

var file_del,
file_add,
obj_del = {},
obj_add = {},
to_delete,
to_add;

file_del = {
"path": path,
"size": size,
"user": user,
"name": name,
"resumable": resumable
}

file_add = {
"path": path,
"size": size,
"user": user,
"name": name,
"resumable": !resumable
}

obj_del[key] = file_del;
obj_add[key] = file_add;

to_delete = new Buffer(JSON.stringify(obj_del, null, 0)).toString('base64');
to_add = new Buffer(JSON.stringify(obj_add, null, 0)).toString('base64');

storage.update(key, to_delete, to_add, cb);
}

exports.del = function(id) {
var key = ['file', id].join('-');
logger.debug('Removing file_id from DB: ' + id);
Expand All @@ -54,14 +90,16 @@ exports.run_stored = function(cb) {

for (key in files) {
var opts = {
path: files[key].path,
user: files[key].user,
name: files[key].name,
size: files[key].size,
file_id: key.substring(5, key.length),
resumable: true
path: files[key].path,
user: files[key].user,
name: files[key].name,
size: files[key].size,
file_id: key.substring(5, key.length),
resumable: files[key].resumable
}
fileretrieval.start(opts, cb);
}
})
}

exports.exist = exist;
57 changes: 17 additions & 40 deletions lib/agent/actions/fileretrieval/upload.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#!/usr/bin/env node

var fs = require('fs'),
path = require('path'),
mime = require('mime'),
common = require('./../../common'),
needle = require('needle');
var fs = require('fs'),
path = require('path'),
mime = require('mime'),
common = require('./../../common'),
needle = require('needle');

var config = common.config,
protocol = config._values['control-panel'].protocol,
host = config._values['control-panel'].host,
url = protocol + '://' + host;
var config = common.config,
protocol = config._values['control-panel'].protocol,
host = config._values['control-panel'].host,
url = protocol + '://' + host;

var UPLOAD_SERVER = url + '/upload/upload',
RESUMABLE_HEADER = 'X-Prey-Upload-Resumable',
Expand All @@ -21,19 +21,19 @@ var PATH = 2,
NAME = 4,
SIZE = 5,
FILE_ID = 6,
RESUME = 7,
TOTAL = 7,
PORT = 8;

function main() {
var argv = process.argv;
var options = {
path: argv[PATH],
user: argv[USER],
name: argv[NAME],
size: argv[SIZE],
path: argv[PATH],
user: argv[USER],
name: argv[NAME],
size: argv[SIZE],
file_id: argv[FILE_ID],
resumable : argv[RESUME],
port: argv[PORT]
total: argv[TOTAL],
port: argv[PORT]
}
Main(options, function(err) {
if (err) {
Expand All @@ -57,30 +57,7 @@ function Main(options, cb) {
path: file_path,
user: user,
id: file_id,
size: file_size,
resumable: options.resumable
}
retrieve_file(file, cb);
}

function retrieve_file(file, cb) {
if (file.resumable == 'true') {
console.log("Resumable file:", file.id);
var url = UPLOAD_SERVER + '?uploadID=' + file.id;
// Make a call to get the last byte processed by the upload server
// in order to resume the upload from that position.
needle.request('get', url, null, function(err, res) {
if (err) {
console.log(err);
cb(err);
return;
}
var data = JSON.parse(res.body);

file.total = data.Total;
get_file(file, cb);
})
return;
size: file_size
}
get_file(file, cb);
}
Expand Down

0 comments on commit 0404f60

Please sign in to comment.