Skip to content

Commit

Permalink
Fix multi-file download (#521)
Browse files Browse the repository at this point in the history
* [SNOW-740458] Fix multi-file download

* Fix multi-file download

* Add test cases for multi-file download

* Fix and multi-file upload and add test cases

* Fix Linux/Mac file location in test

* Cleanup code based on code review.

* Fix path for Windows.
  • Loading branch information
dan-lin-insightsoftware authored Jun 8, 2023
1 parent 1fc9441 commit 33214d2
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 9 deletions.
11 changes: 8 additions & 3 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,6 @@ function file_transfer_agent(context)
var data = response['data'];
commandType = data['command'];

initEncryptionMaterial();

if (commandType === CMD_TYPE_UPLOAD)
{
var src = data['src_locations'][0];
Expand Down Expand Up @@ -719,6 +717,8 @@ function file_transfer_agent(context)

for (var matchingFileName of matchingFileNames)
{
initEncryptionMaterial();

var fileInfo = fs.statSync(matchingFileName);
var currFileObj = {};
currFileObj['srcFileName'] = matchingFileName.substring(matchingFileName.lastIndexOf('/') + 1);
Expand All @@ -733,6 +733,8 @@ function file_transfer_agent(context)
// No wildcard, get single file
if (fs.existsSync(root))
{
initEncryptionMaterial();

var fileInfo = fs.statSync(fileNameFullPath);

var currFileObj = {};
Expand All @@ -755,6 +757,7 @@ function file_transfer_agent(context)
}
else if (commandType === CMD_TYPE_DOWNLOAD)
{
initEncryptionMaterial();
srcFiles = data['src_locations'];

if (srcFiles.length == encryptionMaterial.length)
Expand Down Expand Up @@ -856,9 +859,11 @@ function file_transfer_agent(context)

if (encryptionMaterial.length > 0)
{
var i=0;
for (var file of fileMetadata)
{
file['encryptionMaterial'] = encryptionMaterial[0];
file['encryptionMaterial'] = encryptionMaterial[i];
i++;
}
}
}
Expand Down
283 changes: 277 additions & 6 deletions test/integration/testPutGet.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const testUtil = require('./testUtil');
const tmp = require('tmp');
const os = require('os');
const path = require('path');
const zlib = require('zlib');

const DATABASE_NAME = connOption.valid.database;
const SCHEMA_NAME = connOption.valid.schema;
Expand All @@ -36,10 +37,46 @@ const ROW_DATA_SIZE = 76;
const ROW_DATA_OVERWRITE = COL3_DATA + "," + COL1_DATA + "," + COL2_DATA + "\n";
const ROW_DATA_OVERWRITE_SIZE = 19;

function getPlatformTmpPath(tmpPath){
var path = `file://${tmpPath}`;
// Windows user contains a '~' in the path which causes an error
if (process.platform == "win32")
{
var fileName = tmpPath.substring(tmpPath.lastIndexOf('\\'));
path = `file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\${fileName}`;
}
return path;
}

function executePutCmd(connection, putQuery, callback, results){
// Upload file
var statement = connection.execute({
sqlText: putQuery,
complete: function (err, stmt, rows)
{
var stream = statement.streamRows();
stream.on('error', function (err)
{
callback(err);
});
stream.on('data', function (row)
{
results.fileSize = row.targetSize;
// Check the file is correctly uploaded
assert.strictEqual(row['status'], UPLOADED);
// Check the target encoding is correct
assert.strictEqual(row['targetCompression'], 'GZIP');
});
stream.on('end', function (row)
{
callback();
});
}
});
}

describe('PUT GET test', function ()
{
this.timeout(100000);

var connection;
var tmpFile;
var createTable = `create or replace table ${TEMP_TABLE_NAME} (${COL1} STRING, ${COL2} STRING, ${COL3} STRING)`;
Expand Down Expand Up @@ -271,8 +308,6 @@ describe('PUT GET test', function ()

describe('PUT GET overwrite test', function ()
{
this.timeout(100000);

var connection;
var tmpFile;
var createTable = `create or replace table ${TEMP_TABLE_NAME} (${COL1} STRING, ${COL2} STRING, ${COL3} STRING)`;
Expand Down Expand Up @@ -430,8 +465,6 @@ describe('PUT GET overwrite test', function ()

describe('PUT GET test with GCS_USE_DOWNSCOPED_CREDENTIAL', function ()
{
this.timeout(100000);

var connection;

before(function (done)
Expand Down Expand Up @@ -657,3 +690,241 @@ describe('PUT GET test with GCS_USE_DOWNSCOPED_CREDENTIAL', function ()
);
});
});

describe('PUT GET test multiple files', function ()
{
var connection;
var stage = `@${DATABASE_NAME}.${SCHEMA_NAME}.%${TEMP_TABLE_NAME}`;
var removeFile = `REMOVE ${stage}`;

before(function (done)
{
connection = testUtil.createConnection();
connection.gcsUseDownscopedCredential = true;
async.series(
[
function (callback)
{
testUtil.connect(connection, callback);
},
function (callback)
{
var createTable = `create or replace table ${TEMP_TABLE_NAME} (${COL1} STRING, ${COL2} STRING, ${COL3} STRING)`;
// Create temp table
testUtil.executeCmd(connection, createTable, callback);
}
],
done
);
});

after(function (done)
{
async.series(
[
function (callback)
{
var dropTable = `DROP TABLE IF EXISTS ${TEMP_TABLE_NAME}`;
// Drop temp table
testUtil.executeCmd(connection, dropTable, callback);
},
function (callback)
{
testUtil.destroyConnection(connection, callback);
}
],
done
);
});

it('testDownloadMultifiles', function (done)
{
var tmpFile1, tmpFile2;

// Create two temp file with specified file extension
tmpFile1 = tmp.fileSync({ postfix: 'gz' });
tmpFile2 = tmp.fileSync({ postfix: 'gz' });
// Write row data to temp file
fs.writeFileSync(tmpFile1.name, ROW_DATA);
fs.writeFileSync(tmpFile2.name, ROW_DATA);

var tmpfilePath1 = getPlatformTmpPath(tmpFile1.name);
var tmpfilePath2 = getPlatformTmpPath(tmpFile2.name);

var putQuery1 = `PUT ${tmpfilePath1} ${stage}`;
var putQuery2 = `PUT ${tmpfilePath2} ${stage}`;

// Create a tmp folder for downloaded files
var tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'get'));
var results = {};

var tmpdirPath = getPlatformTmpPath(tmpDir);
var getQuery = `GET ${stage} ${tmpdirPath}`;

async.series(
[
function (callback)
{
fileSize = executePutCmd(connection, putQuery1, callback, results);
},
function (callback)
{
fileSize = executePutCmd(connection, putQuery2, callback, results);
},
function (callback)
{
// Run GET command
var statement = connection.execute({
sqlText: getQuery,
complete: function (err, stmt, rows)
{
var stream = statement.streamRows();
stream.on('error', function (err)
{
callback(err);
});
stream.on('data', function (row)
{
assert.strictEqual(row.status, DOWNLOADED);
assert.strictEqual(row.size, results.fileSize);

// Decompress the downloaded file
var compressedFile = path.join(tmpDir,row.file);
var decompressedFile = path.join(tmpDir,'de-'+row.file);
const fileContents = fs.createReadStream(compressedFile);
const writeStream = fs.createWriteStream(decompressedFile);
const unzip = zlib.createGunzip();

fileContents.pipe(unzip).pipe(writeStream).on('finish', function() {
// Verify the data of the downloaded file
var data = fs.readFileSync(decompressedFile).toString();
assert.strictEqual(data, ROW_DATA);
fs.unlinkSync(compressedFile);
fs.unlinkSync(decompressedFile);
})
});
stream.on('end', function (row)
{
callback();
});
}
});
},
function (callback)
{
// Remove files from staging
testUtil.executeCmd(connection, removeFile, callback);
},
function (callback)
{
fs.closeSync(tmpFile1.fd);
fs.unlinkSync(tmpFile1.name);
fs.closeSync(tmpFile2.fd);
fs.unlinkSync(tmpFile2.name);

// Delete the temporary folder
fs.rmdir(tmpDir, (err) =>
{
if (err) throw (err);
});
callback();
}
],
done
);
});

it('testUploadMultifiles', function (done)
{
const count = 5;
var tmpFiles = [];

// Create a tmp folder for downloaded files
var tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'get'));
var results = {};

var tmpdirPath = getPlatformTmpPath(tmpDir);
var getQuery = `GET ${stage} ${tmpdirPath}`;

// Create temp files with specified prefix
for (let i = 0; i < count; i++) {
var tmpFile = tmp.fileSync({ prefix: 'testUploadDownloadMultifiles'});
fs.writeFileSync(tmpFile.name, ROW_DATA);
tmpFiles.push(tmpFile);
}

var putQuery = `PUT file://${os.tmpdir()}/testUploadDownloadMultifiles* ${stage}`;
// Windows user contains a '~' in the path which causes an error
if (process.platform == "win32")
{
putQuery = `PUT file://${process.env.USERPROFILE}\\AppData\\Local\\Temp\\testUploadDownloadMultifiles* ${stage}`;
}

async.series(
[
function (callback)
{
fileSize = executePutCmd(connection, putQuery, callback, results);
},
function (callback)
{
// Run GET command
var statement = connection.execute({
sqlText: getQuery,
complete: function (err, stmt, rows)
{
var stream = statement.streamRows();
stream.on('error', function (err)
{
callback(err);
});
stream.on('data', function (row)
{
assert.strictEqual(row.status, DOWNLOADED);
assert.strictEqual(row.size, results.fileSize);

// Decompress the downloaded file
var compressedFile = path.join(tmpDir,row.file);
var decompressedFile = path.join(tmpDir,'de-'+row.file);
const fileContents = fs.createReadStream(compressedFile);
const writeStream = fs.createWriteStream(decompressedFile);
const unzip = zlib.createGunzip();

fileContents.pipe(unzip).pipe(writeStream).on('finish', function() {
// Verify the data of the downloaded file
var data = fs.readFileSync(decompressedFile).toString();
assert.strictEqual(data, ROW_DATA);
fs.unlinkSync(compressedFile);
fs.unlinkSync(decompressedFile);
})
});
stream.on('end', function (row)
{
callback();
});
}
});
},
function (callback)
{
// Remove files from staging
testUtil.executeCmd(connection, removeFile, callback);
},
function (callback)
{
for (let i = 0; i < count; i++) {
fs.closeSync(tmpFiles[i].fd);
fs.unlinkSync(tmpFiles[i].name);
}
// Delete the temporary folder
fs.rmdir(tmpDir, (err) =>
{
if (err) throw (err);
});
callback();
}
],
done
);
});
});

0 comments on commit 33214d2

Please sign in to comment.