Skip to content

Commit

Permalink
feat(reporter): aggregate reports from multi-distributed runners
Browse files Browse the repository at this point in the history
  • Loading branch information
NivLipetz committed Mar 3, 2019
1 parent 67b9d22 commit 0d7b132
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 53 deletions.
4 changes: 4 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ definitions:
- test_name
- test_description
- start_time
- runner_id
properties:
report_id:
type: string
Expand Down Expand Up @@ -976,6 +977,9 @@ definitions:
notes:
type: string
description: notes about the test
runner_id:
type: string
description: the id of the runner that created the report

post_report_response:
required:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE TABLE IF NOT EXISTS reports_stats(
container_id text,
runner_id uuid,
test_id uuid,
report_id text,
stats_id uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS report_subscribers(
test_id uuid,
report_id text,
runner_id uuid,
stage text,
PRIMARY KEY (test_id, report_id, runner_id));
56 changes: 49 additions & 7 deletions src/reports/models/database/cassandra/cassandraConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ const UPDATE_REPORT_SUMMARY = 'UPDATE reports_summary SET status=?, phase=?, las
const GET_REPORT_SUMMARY = 'SELECT * FROM reports_summary WHERE test_id=? AND report_id=? AND report_type=?';
const GET_REPORTS_SUMMARIES = 'SELECT * FROM reports_summary WHERE test_id=? AND report_type=?';
const GET_LAST_SUMMARIES = 'SELECT * FROM last_reports LIMIT ?';
const INSERT_REPORT_STATS = 'INSERT INTO reports_stats(container_id, test_id, report_id, stats_id, stats_time, phase_index, phase_status, data) values(?,?,?,?,?,?,?,?)';
const INSERT_REPORT_STATS = 'INSERT INTO reports_stats(runner_id, test_id, report_id, stats_id, stats_time, phase_index, phase_status, data) values(?,?,?,?,?,?,?,?)';
const GET_REPORT_STATS = 'SELECT * FROM reports_stats WHERE test_id=? AND report_id=?';
const SUBSCRIBE_RUNNER = 'INSERT INTO report_subscribers(test_id, report_id, runner_id, stage) values(?,?,?,?)';
const UPDATE_SUBSCRIBERS = 'UPDATE report_subscribers SET stage=? WHERE test_id=? AND report_id=? AND runner_id=?';
const GET_REPORT_SUBSCRIBERS = 'SELECT * FROM report_subscribers WHERE test_id=? AND report_id=?';

module.exports = {
init,
Expand All @@ -20,7 +23,9 @@ module.exports = {
getReports,
getLastReports,
insertStats,
getStats
getStats,
subscribeRunner,
updateSubscribers
};

let queryOptions = {
Expand Down Expand Up @@ -48,24 +53,24 @@ function updateReport(testId, reportId, status, phaseIndex, lastStats, endTime)
function getReport(testId, reportId) {
let params;
params = [testId, reportId, 'basic'];
return executeQuery(GET_REPORT_SUMMARY, params, queryOptions);
return getReportsAndParse(GET_REPORT_SUMMARY, params, queryOptions);
}

function getReports(testId) {
let params;
params = [testId, 'basic'];
return executeQuery(GET_REPORTS_SUMMARIES, params, queryOptions);
return getReportsAndParse(GET_REPORTS_SUMMARIES, params, queryOptions);
}

function getLastReports(limit) {
let params;
params = [limit];
return executeQuery(GET_LAST_SUMMARIES, params, queryOptions);
return getReportsAndParse(GET_LAST_SUMMARIES, params, queryOptions);
}

function insertStats(containerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data) {
function insertStats(runnerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data) {
let params;
params = [containerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data];
params = [runnerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data];
return executeQuery(INSERT_REPORT_STATS, params, queryOptions);
}

Expand All @@ -75,6 +80,24 @@ function getStats(testId, reportId) {
return executeQuery(GET_REPORT_STATS, params, queryOptions);
}

function subscribeRunner(testId, reportId, runnerId) {
let params;
params = [testId, reportId, runnerId, 'initializing'];
return executeQuery(SUBSCRIBE_RUNNER, params, queryOptions);
}

function updateSubscribers(testId, reportId, runnerId, stage) {
let params;
params = [stage, testId, reportId, runnerId];
return executeQuery(UPDATE_SUBSCRIBERS, params, queryOptions);
}

function getReportSubscribers(testId, reportId) {
let params;
params = [testId, reportId];
return executeQuery(GET_REPORT_SUBSCRIBERS, params, queryOptions);
}

function executeQuery(query, params, queryOptions) {
return client.execute(query, params, queryOptions).then((result) => {
logger.trace('Query result', {
Expand All @@ -88,3 +111,22 @@ function executeQuery(query, params, queryOptions) {
return Promise.reject(new Error('Error occurred in communication with cassandra'));
});
}

async function getReportsAndParse(query, params, queryOptions) {
let subscribers, report;
const reports = await executeQuery(query, params, queryOptions);

for (let reportIndex = 0; reportIndex < reports.length; reportIndex++) {
report = reports[reportIndex];
subscribers = await getReportSubscribers(report.test_id, report.report_id);
subscribers = subscribers.map((subscriber) => {
return {
'runner_id': subscriber.runner_id,
'stage': subscriber.stage
};
});
report.subscribers = subscribers;
}

return reports;
}
83 changes: 76 additions & 7 deletions src/reports/models/database/sequelize/sequelizeConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ module.exports = {
getReport,
getReports,
getLastReports,
getStats
getStats,
subscribeRunner,
updateSubscribers
};

async function init(sequlizeClient) {
Expand All @@ -35,16 +37,17 @@ async function insertReport(testId, revisionId, reportId, jobId, testType, start
notes: notes || '',
phase: '0',
status: 'initialized',
test_configuration: testConfiguration
test_configuration: testConfiguration,
runners_subscribed: []
};

return report.findOrCreate({ where: { report_id: reportId }, defaults: params });
}

async function insertStats(containerId, testId, reportId, statsId, statsTime, phaseIndex, phaseStatus, data) {
async function insertStats(runnerId, testId, reportId, statsId, statsTime, phaseIndex, phaseStatus, data) {
const stats = client.model('stats');
const params = {
container_id: containerId,
runner_id: runnerId,
report_id: reportId,
test_id: testId,
stats_id: statsId,
Expand Down Expand Up @@ -74,18 +77,72 @@ async function updateReport(testId, reportId, status, phaseIndex, lastStats, end
}, options);
}

async function subscribeRunner(testId, reportId, runnerId) {
try {
const newSubscriber = {
runner_id: runnerId,
stage: 'initializing'
};

const reportModel = client.model('report');
const options = {
where: {
test_id: testId,
report_id: reportId
}
};

let report = await reportModel.findAll(options);
report = report[0];

await report.createSubscriber(newSubscriber);
} catch (e) {
console.log(e, `Failed to subscribe runner ${runnerId}`);
}
}

async function updateSubscribers(testId, reportId, runnerId, stage) {
const reportModel = client.model('report');
const getReportOptions = {
where: {
test_id: testId,
report_id: reportId
}
};

let report = await reportModel.findAll(getReportOptions);
report = report[0];

const subscribers = await report.getSubscribers();
const subscriberToUpdate = await subscribers.find((subscriber) => {
return subscriber.dataValues.runner_id === runnerId;
});

await subscriberToUpdate.set('stage', stage);
return subscriberToUpdate.save();
}

async function getReportsAndParse(query) {
const report = client.model('report');

let options = {
attributes: { exclude: ['updated_at', 'created_at'] }
attributes: { exclude: ['updated_at', 'created_at'] },
include: [report.subscriber]
};

Object.assign(options, query);

const allReportsRawResponse = await report.findAll(options);
let allReports = allReportsRawResponse.map(rawReport => rawReport.dataValues);

allReports.forEach(report => {
report.subscribers = report.subscribers.map((sqlJob) => {
return {
runner_id: sqlJob.dataValues.runner_id,
stage: sqlJob.dataValues.stage
};
});
});
return allReports;
}

Expand Down Expand Up @@ -139,8 +196,8 @@ async function initSchemas() {
report_id: {
type: Sequelize.DataTypes.STRING
},
container_id: {
type: Sequelize.DataTypes.STRING
runner_id: {
type: Sequelize.DataTypes.UUID
},
stats_time: {
type: Sequelize.DataTypes.DATE
Expand All @@ -156,6 +213,16 @@ async function initSchemas() {
}
});

const subscriber = client.define('subscriber', {
runner_id: {
type: Sequelize.DataTypes.STRING,
primaryKey: true
},
stage: {
type: Sequelize.DataTypes.STRING
}
});

const report = client.define('report', {
report_id: {
type: Sequelize.DataTypes.STRING,
Expand Down Expand Up @@ -205,6 +272,8 @@ async function initSchemas() {
}
});

report.subscriber = report.hasMany(subscriber);
await report.sync();
await stats.sync();
await subscriber.sync();
}
16 changes: 13 additions & 3 deletions src/reports/models/databaseConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ module.exports = {
getReport,
getReports,
getLastReports,
getStats
getStats,
subscribeRunner,
updateSubscribers
};

function insertReport(testId, revisionId, reportId, jobId, testType, startTime, testName, testDescription, testConfiguration, notes) {
return databaseConnector.insertReport(testId, revisionId, reportId, jobId, testType, startTime, testName, testDescription, testConfiguration, notes);
}

function insertStats(containerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data) {
return databaseConnector.insertStats(containerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data);
function insertStats(runnerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data) {
return databaseConnector.insertStats(runnerId, testId, reportId, statId, statsTime, phaseIndex, phaseStatus, data);
}

function updateReport(testId, reportId, status, phaseIndex, lastStats, endTime) {
Expand All @@ -41,4 +43,12 @@ function getReport(testId, reportId) {

function getStats(testId, reportId) {
return databaseConnector.getStats(testId, reportId);
}

function subscribeRunner(testId, reportId, runnerId) {
return databaseConnector.subscribeRunner(testId, reportId, runnerId);
}

function updateSubscribers(testId, reportId, runnerId, stage) {
return databaseConnector.updateSubscribers(testId, reportId, runnerId, stage);
}
Loading

0 comments on commit 0d7b132

Please sign in to comment.