Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft of streaming insert sample #196

Merged
merged 4 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions bigquery/system-test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ var path = require('path');
function generateUuid () {
return 'nodejs_docs_samples_' + uuid.v4().replace(/-/gi, '_');
}

var rows = [
{ Name: 'foo', Age: 27, Weight: 80.3, IsMagic: true },
{ Name: 'bar', Age: 13, Weight: 54.6, IsMagic: false }
];
var options = {
projectId: process.env.GCLOUD_PROJECT,
localFilePath: path.join(__dirname, '../resources/data.csv'),
bucket: generateUuid(),
file: 'data.json',
dataset: generateUuid(),
table: generateUuid(),
schema: 'Name:string, Age:integer, Weigth:float, IsMagic:boolean'
schema: 'Name:string, Age:integer, Weight:float, IsMagic:boolean',
rows: rows
};

var file = storage.bucket(options.bucket).file(options.file);

describe('bigquery:tables', function () {
before(function (done) {
// Create bucket
Expand Down Expand Up @@ -62,7 +64,9 @@ describe('bigquery:tables', function () {
return done(err);
}
// Delete bucket
storage.bucket(options.bucket).delete(done);
setTimeout(function () {
storage.bucket(options.bucket).delete(done);
}, 2000);
});
});
});
Expand Down Expand Up @@ -122,7 +126,7 @@ describe('bigquery:tables', function () {
assert(metadata.status, 'job metadata has status');
assert.equal(metadata.status.state, 'DONE', 'job was finished');

file.exists(function (err, exists) {
storage.bucket(options.bucket).file(options.file).exists(function (err, exists) {
assert.ifError(err, 'file existence check succeeded');
assert(exists, 'export destination exists');
done();
Expand All @@ -131,8 +135,30 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
it('should insert rows into a table', function (done) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.getRows({}, function (err, startRows) {
assert.equal(err, null);

program.insertRowsAsStream(options, function (err, insertErrors) {
assert.equal(err, null);
assert.deepEqual(insertErrors, [], 'no per-row insert errors occurred');

setTimeout(function () {
table.getRows({}, function (err, endRows) {
assert.equal(err, null);
assert.equal(startRows.length + 2, endRows.length, 'insertRows() added 2 rows');
done();
});
}, 2000);
});
});
});
});

describe('deleteTable', function () {
it('should list tables', function (done) {
it('should delete table', function (done) {
program.deleteTable(options, function (err) {
assert.ifError(err);
assert(console.log.calledWith('Deleted table: %s', options.table));
Expand Down
58 changes: 56 additions & 2 deletions bigquery/tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,41 @@ function exportTableToGCS (options, callback) {
});
}
// [END export_table_to_gcs]

// [START insert_rows_as_stream]
/**
* Insert rows (as a stream) into a BigQuery table.
* @param {object} options Configuration options.
* @param {array} options.rows An array of rows to insert into a BigQuery table.
* @param {string} options.dataset The ID of the dataset containing the target table.
* @param {string} options.table The ID of the table to insert rows into.
* @param {function} callback Callback function to receive query status.
*/
function insertRowsAsStream (options, callback) {
var table = bigquery.dataset(options.dataset).table(options.table);
table.insert(options.rows, function (err, insertErrors) {
if (err) {
return callback(err);
}
console.log('Inserted %d rows!', options.rows.length);
return callback(null, insertErrors);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a console.log above this line like all the other samples, e.g.:

if (err) {
  return callback(err);
}

console.log('Inserted %d row(s)!', options.rows.length);
return callback(null, insertErrors);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

});
}
// [END insert_rows_as_stream]
// [END all]

// The command-line program
var cli = require('yargs');
var utils = require('../utils');
var fs = require('fs');

var program = module.exports = {
createTable: createTable,
listTables: listTables,
deleteTable: deleteTable,
importFile: importFile,
exportTableToGCS: exportTableToGCS,
insertRowsAsStream: insertRowsAsStream,
main: function (args) {
// Run the command-line program
cli.help().strict().parse(args).argv;
Expand Down Expand Up @@ -243,6 +266,29 @@ cli
}, function (options) {
program.exportTableToGCS(utils.pick(options, ['dataset', 'table', 'bucket', 'file', 'format', 'gzip']), utils.makeHandler());
})
.command('insert <dataset> <table> <json_or_file>',
'Insert a JSON array (as a string or newline-delimited file) into a BigQuery table.', {},
function (options) {
var content;
try {
content = fs.readFileSync(options.json_or_file);
} catch (err) {
content = options.json_or_file;
}

var rows = null;
try {
rows = JSON.parse(content);
} catch (err) {}

if (!Array.isArray(rows)) {
throw new Error('"json_or_file" (or the file it points to) is not a valid JSON array.');
}

options.rows = rows;
program.insertRowsAsStream(utils.pick(options, ['rows', 'dataset', 'table']), utils.makeHandler());
}
)
.example(
'node $0 create my_dataset my_table',
'Create table "my_table" in "my_dataset".'
Expand All @@ -265,11 +311,19 @@ cli
)
.example(
'node $0 export my_dataset my_table my-bucket my-file',
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV'
'Export my_dataset:my_table to gcs://my-bucket/my-file as raw CSV.'
)
.example(
'node $0 export my_dataset my_table my-bucket my-file -f JSON --gzip',
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON'
'Export my_dataset:my_table to gcs://my-bucket/my-file as gzipped JSON.'
)
.example(
'node $0 insert my_dataset my_table json_string',
'Insert the JSON array represented by json_string into my_dataset:my_table.'
)
.example(
'node $0 insert my_dataset my_table json_file',
'Insert the JSON objects contained in json_file (one per line) into my_dataset:my_table.'
)
.wrap(100)
.recommendCommands()
Expand Down
131 changes: 130 additions & 1 deletion bigquery/test/tables.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ var dataset = 'dataset';
var table = 'table';
var format = 'JSON';
var schema = 'schema';
var jsonArray = [
{ name: 'foo', age: 27 },
{ name: 'bar', age: 13 }
];
var validJsonFile = 'validJsonFile';
var invalidJsonFile = 'invalidJsonFile';
var validJsonString = JSON.stringify(jsonArray);
var invalidJsonString = 'INVALID';
var errorList = ['error 1', 'error 2'];

function getSample () {
var tableMocks = [
Expand All @@ -44,7 +53,8 @@ function getSample () {
var tableMock = {
export: sinon.stub().yields(null, jobMock),
delete: sinon.stub().yields(null),
import: sinon.stub().yields(null, jobMock)
import: sinon.stub().yields(null, jobMock),
insert: sinon.stub().yields(null, errorList)
};
var datasetMock = {
table: sinon.stub().returns(tableMock),
Expand All @@ -57,11 +67,17 @@ function getSample () {
};
var BigQueryMock = sinon.stub().returns(bigqueryMock);
var StorageMock = sinon.stub().returns(storageMock);
var fsMock = {
readFileSync: sinon.stub().throws(new Error('Invalid file.'))
};
fsMock.readFileSync.withArgs(validJsonFile).returns(validJsonString);
fsMock.readFileSync.withArgs(invalidJsonFile).returns(invalidJsonString);

return {
program: proxyquire('../tables', {
'@google-cloud/bigquery': BigQueryMock,
'@google-cloud/storage': StorageMock,
'fs': fsMock,
yargs: proxyquire('yargs', {})
}),
mocks: {
Expand All @@ -74,6 +90,7 @@ function getSample () {
table: tableMock,
bucket: bucketMock,
dataset: datasetMock,
fs: fsMock,
tables: tableMocks
}
};
Expand Down Expand Up @@ -290,6 +307,45 @@ describe('bigquery:tables', function () {
});
});

describe('insertRowsAsStream', function () {
var options = {
file: file,
dataset: dataset,
table: table,
rows: jsonArray
};

it('should stream-insert rows into a table', function () {
var program = getSample().program;
var callback = sinon.stub();

program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});

it('should handle API errors', function () {
var example = getSample();
var callback = sinon.stub();
var error = new Error('error');
example.mocks.table.insert = sinon.stub().yields(error);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [error]);
});

it('should handle (per-row) insert errors', function () {
var example = getSample();
var callback = sinon.stub();
example.mocks.table.insert = sinon.stub().yields(null, errorList);

example.program.insertRowsAsStream(options, callback);
assert.equal(callback.calledOnce, true);
assert.deepEqual(callback.firstCall.args, [null, errorList]);
});
});

describe('main', function () {
it('should call createTable', function () {
var program = getSample().program;
Expand Down Expand Up @@ -349,6 +405,19 @@ describe('bigquery:tables', function () {
}]);
});

it('should call insertRowsAsStream', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonFile]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(
program.insertRowsAsStream.firstCall.args.slice(0, -1),
[{ rows: jsonArray, dataset: dataset, table: table }]
);
});

it('should recognize --gzip flag', function () {
var program = getSample().program;
program.exportTableToGCS = sinon.stub();
Expand Down Expand Up @@ -380,5 +449,65 @@ describe('bigquery:tables', function () {
gzip: false
}]);
});

describe('insert', function () {
var options = {
dataset: dataset,
table: table,
rows: jsonArray
};

it('should accept valid JSON files', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonFile]);

assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject files with invalid JSON', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, invalidJsonFile]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});

it('should reject invalid file names', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, '']); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});

it('should accept valid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

program.main(['insert', dataset, table, validJsonString]);
assert.equal(program.insertRowsAsStream.calledOnce, true);
assert.deepEqual(program.insertRowsAsStream.firstCall.args.slice(0, -1), [options]);
});

it('should reject invalid JSON strings', function () {
var program = getSample().program;
program.insertRowsAsStream = sinon.stub();

assert.throws(
function () { program.main(['insert', dataset, table, invalidJsonString]); },
/"json_or_file" \(or the file it points to\) is not a valid JSON array\./
);
assert.equal(program.insertRowsAsStream.called, false);
});
});
});
});