Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webui: Add table to display the status of recent ingestion jobs. #350

Merged
merged 29 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
948f28b
webui: Add `<CompressionJobTable/>`.
junhaoliao Apr 7, 2024
38d36d8
Fix typo.
junhaoliao Apr 7, 2024
868f7c2
Optimize imports.
junhaoliao Apr 7, 2024
c1ec9f7
Avoid Mongo bulkWrite if no operation is to be done.
junhaoliao Apr 7, 2024
da7936b
Apply linter.
junhaoliao Apr 7, 2024
bd48210
Merge branch 'refs/heads/main' into compression-job-table
junhaoliao Apr 16, 2024
f5086f5
Allow page to scroll when its height overflows.
junhaoliao Apr 16, 2024
1809107
Refactor CompressionJobTable use <Panel/>
junhaoliao Apr 16, 2024
e38bb5a
Rename CompressionJobTable to IngestionJobs.
junhaoliao Apr 16, 2024
09f04ef
Add docs for compression job publications.
junhaoliao Apr 16, 2024
87f4c52
Refactor CompressionDbManager class.
junhaoliao Apr 16, 2024
3a88f2f
Reformat code; add docs.
junhaoliao Apr 16, 2024
29f6e6d
Move IngestionJobRow to a separate source file.
junhaoliao Apr 16, 2024
4f8eaba
Move IngestionJobRow to a separate source file.
junhaoliao Apr 16, 2024
b500857
Use dayjs to calculation job duration when the duration is absent fro…
junhaoliao Apr 16, 2024
aa8bddd
Reformat code.
junhaoliao Apr 16, 2024
8de53d7
Remove `eslint-disable-next-line no-underscore-dangle`.
junhaoliao Apr 16, 2024
5cb1b88
Apply suggestions from code review
junhaoliao Apr 22, 2024
5895a5b
Rename bootstrap classNames `text-right` to `text-end`.
junhaoliao Apr 22, 2024
957cc67
Update class for 'Job ID' column in IngestionJobs to align text to th…
junhaoliao Apr 22, 2024
130cd6a
Move CompressionJobsCollection declaration to top of collections.js f…
junhaoliao Apr 22, 2024
f7b439b
Moved COMPRESSION_MAX_RETRIEVE_JOBS constant to server/publications.js.
junhaoliao Apr 22, 2024
1454233
Refactor CompressionDbManager and update variable names for clarity.
junhaoliao Apr 22, 2024
95f47c4
Apply suggestions from code review
junhaoliao Apr 22, 2024
8bbd72c
Move CompressionJob typedef from IngestionJobRow to types.js.
junhaoliao Apr 22, 2024
f6cfc33
Update enum and icon array for compression job statuses in IngestionJ…
junhaoliao Apr 22, 2024
00aeb93
Update OverlayTrigger placement in IngestionJobRow component.
junhaoliao Apr 22, 2024
f30b7d9
Add comment to explain the `UNION DISTINCT` compression job selection…
junhaoliao Apr 22, 2024
f9f9d15
Add comment to explain why we clear the timeout in `refreshCompressio…
junhaoliao Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ALL_TARGET_NAME,
CLP_METADATA_TABLE_PREFIX,
CLPConfig,
COMPRESSION_JOBS_TABLE_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
Expand Down Expand Up @@ -685,9 +686,10 @@ def start_webui(instance_id: str, clp_config: CLPConfig, mounts: CLPDockerMounts
"SqlDbHost": clp_config.database.host,
"SqlDbPort": clp_config.database.port,
"SqlDbName": clp_config.database.name,
"SqlDbSearchJobsTableName": SEARCH_JOBS_TABLE_NAME,
"SqlDbClpArchivesTableName": f"{CLP_METADATA_TABLE_PREFIX}archives",
"SqlDbClpFilesTableName": f"{CLP_METADATA_TABLE_PREFIX}files",
"SqlDbCompressionJobsTableName": COMPRESSION_JOBS_TABLE_NAME,
"SqlDbSearchJobsTableName": SEARCH_JOBS_TABLE_NAME,
},
"public": {
"ClpStorageEngine": clp_config.package.storage_engine,
Expand Down
5 changes: 5 additions & 0 deletions components/webui/imports/api/ingestion/collections.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ const STATS_COLLECTION_ID = Object.freeze({

const StatsCollection = new Mongo.Collection(Meteor.settings.public.StatsCollectionName);

const CompressionJobsCollection = new Mongo.Collection(
Meteor.settings.public.CompressionJobsCollectionName
);

export {
CompressionJobsCollection,
STATS_COLLECTION_ID,
StatsCollection,
};
71 changes: 71 additions & 0 deletions components/webui/imports/api/ingestion/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* eslint-disable sort-keys */
/**
* Enum of the column names for the `compression_jobs` table.
*
* @enum {string}
*/
const COMPRESSION_JOBS_TABLE_COLUMN_NAMES = Object.freeze({
ID: "id",
STATUS: "status",
STATUS_MSG: "status_msg",
CREATION_TIME: "creation_time",
START_TIME: "start_time",
DURATION: "duration",
ORIGINAL_SIZE: "original_size",
UNCOMPRESSED_SIZE: "uncompressed_size",
COMPRESSED_SIZE: "compressed_size",
NUM_TASKS: "num_tasks",
NUM_TASKS_COMPLETED: "num_tasks_completed",
CLP_BINARY_VERSION: "clp_binary_version",
CLP_CONFIG: "clp_config",
});
/* eslint-enable sort-keys */

let enumCompressionJobStatus;
/**
* Enum of compression job statuses, matching the `CompressionJobStatus` class in
* `job_orchestration.scheduler.constants`.
*
* @enum {string}
*/
const COMPRESSION_JOB_STATUS = Object.freeze({
PENDING: (enumCompressionJobStatus = 0),
RUNNING: ++enumCompressionJobStatus,
SUCCEEDED: ++enumCompressionJobStatus,
FAILED: ++enumCompressionJobStatus,
});

/**
* List of waiting states for a compression job.
*
* @see COMPRESSION_JOB_STATUS
*/
const COMPRESSION_JOB_WAITING_STATES = Object.freeze([
COMPRESSION_JOB_STATUS.PENDING,
COMPRESSION_JOB_STATUS.RUNNING,
]);

/**
* Represents the possible status names for a compression job.
*
* @type {ReadonlyArray<string>}
*/
const COMPRESSION_JOB_STATUS_NAMES = Object.freeze([
"PENDING",
"RUNNING",
"SUCCEEDED",
"FAILED",
]);

/**
* The maximum number of compression jobs that is retrieved at a time.
*/
const COMPRESSION_MAX_RETRIEVE_JOBS = 5;

export {
COMPRESSION_JOB_STATUS,
COMPRESSION_JOB_STATUS_NAMES,
COMPRESSION_JOB_WAITING_STATES,
COMPRESSION_JOBS_TABLE_COLUMN_NAMES,
COMPRESSION_MAX_RETRIEVE_JOBS,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {COMPRESSION_JOBS_TABLE_COLUMN_NAMES} from "../constants";


/**
* Class for retrieving compression jobs from the database.
*/
class CompressionDbManager {
#sqlDbConnPool;

#compressionJobsTableName;

/**
* @param {import("mysql2/promise").Pool} sqlDbConnPool
* @param {object} tableNames
* @param {string} tableNames.compressionJobsTableName
*/
constructor (sqlDbConnPool, {
compressionJobsTableName,
}) {
this.#sqlDbConnPool = sqlDbConnPool;
this.#compressionJobsTableName = compressionJobsTableName;
}

/**
* Retrieve the last `limit` number of jobs. Also includes the ones in
* `jobIdList`.
*
* @param {number} limit
* @param {number[]} jobIdList
* @return {Promise<object[]>}
*/
async getCompressionJobs (limit, jobIdList) {
let queryString = `
WITH SelectedColumns AS (
SELECT
id as _id,
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS_MSG},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.START_TIME},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.DURATION},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.UNCOMPRESSED_SIZE},
${COMPRESSION_JOBS_TABLE_COLUMN_NAMES.COMPRESSED_SIZE}
FROM ${this.#compressionJobsTableName}
)
(
SELECT *
FROM SelectedColumns
ORDER BY _id DESC
LIMIT ${limit}
)
`;

jobIdList.forEach((jobId) => {
queryString += `
UNION DISTINCT
(
SELECT *
FROM SelectedColumns
WHERE _id=${jobId}
)
`;
});

queryString += "ORDER BY _id DESC;";

const results = await this.#sqlDbConnPool.query(queryString);

return results[0];
}
}

export default CompressionDbManager;
137 changes: 132 additions & 5 deletions components/webui/imports/api/ingestion/server/publications.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import {Meteor} from "meteor/meteor";

import {logger} from "/imports/utils/logger";
import {MONGO_SORT_BY_ID} from "/imports/utils/mongo";

import {
CompressionJobsCollection,
STATS_COLLECTION_ID,
StatsCollection,
} from "../collections";
import {
COMPRESSION_JOB_WAITING_STATES,
COMPRESSION_JOBS_TABLE_COLUMN_NAMES,
COMPRESSION_MAX_RETRIEVE_JOBS,
} from "../constants";
import CompressionDbManager from "./CompressionDbManager";
import StatsDbManager from "./StatsDbManager";


const COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS = 1000;
const STATS_REFRESH_INTERVAL_MILLIS = 5000;

/**
* @type {CompressionDbManager|null}
*/
let compressionDbManager = null;

/**
* @type {StatsDbManager|null}
*/
Expand All @@ -19,7 +33,12 @@ let statsDbManager = null;
/**
* @type {number|null}
*/
let refreshMeteorInterval = null;
let compressionJobsRefreshTimeout = null;

/**
* @type {number|null}
*/
let statsRefreshInterval = null;

/**
* Updates the compression statistics in the StatsCollection.
Expand All @@ -45,6 +64,94 @@ const refreshCompressionStats = async () => {
await StatsCollection.updateAsync(filter, modifier, options);
};

/**
* Updates the compression jobs in the CompressionJobsCollection.
*
* @return {Promise<void>}
*/
const refreshCompressionJobs = async () => {
if (null !== compressionJobsRefreshTimeout) {
Meteor.clearTimeout(compressionJobsRefreshTimeout);
compressionJobsRefreshTimeout = null;
}
if (0 === Meteor.server.stream_server.all_sockets().length) {
compressionJobsRefreshTimeout = Meteor.setTimeout(
refreshCompressionJobs,
COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS
);

return;
}

const pendingJobIdList = await CompressionJobsCollection.find({
[COMPRESSION_JOBS_TABLE_COLUMN_NAMES.STATUS]: {
$in: COMPRESSION_JOB_WAITING_STATES,
},
})
.fetch()
.map((job) => (
job._id
));

const jobs = await compressionDbManager.getCompressionJobs(
COMPRESSION_MAX_RETRIEVE_JOBS,
pendingJobIdList
);

const operations = jobs.map((doc) => ({
updateOne: {
filter: {_id: doc._id},
update: {$set: doc},
upsert: true,
},
}));

if (0 !== operations.length) {
await CompressionJobsCollection.rawCollection().bulkWrite(operations);
}

// `refreshCompressionJobs()` shall not be run concurrently and therefore incurs no race
// condition.
// eslint-disable-next-line require-atomic-updates
compressionJobsRefreshTimeout = Meteor.setTimeout(
refreshCompressionJobs,
COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS
);
};

/**
* Initializes the CompressionDbManager and starts a timeout timer (`compressionJobsRefreshTimeout`)
* for compression job updates.
*
* @param {import("mysql2/promise").Pool} sqlDbConnPool
* @param {object} tableNames
* @param {string} tableNames.compressionJobsTableName
* @throws {Error} on error.
*/
const initCompressionDbManager = (sqlDbConnPool, {
compressionJobsTableName,
}) => {
compressionDbManager = new CompressionDbManager(sqlDbConnPool, {
compressionJobsTableName,
});

compressionJobsRefreshTimeout = Meteor.setTimeout(
refreshCompressionJobs,
COMPRESSION_JOBS_REFRESH_INTERVAL_MILLIS
);
};

/**
* De-initializes the CompressionDbManager by clearing the timeout timer for compression job
* updates (`refreshCompressionJobs`).
*/
const deinitCompressionDbManager = () => {
if (null !== compressionJobsRefreshTimeout) {
Meteor.clearTimeout(compressionJobsRefreshTimeout);
compressionJobsRefreshTimeout = null;
}
};

/**
* Initializes the StatsDbManager and starts an interval timer (`refreshMeteorInterval`) for
* compression stats updates.
Expand All @@ -64,7 +171,7 @@ const initStatsDbManager = (sqlDbConnPool, {
clpFilesTableName,
});

refreshMeteorInterval = Meteor.setInterval(
statsRefreshInterval = Meteor.setInterval(
refreshCompressionStats,
STATS_REFRESH_INTERVAL_MILLIS
);
Expand All @@ -75,12 +182,30 @@ const initStatsDbManager = (sqlDbConnPool, {
* (`refreshMeteorInterval`).
*/
const deinitStatsDbManager = () => {
if (null !== refreshMeteorInterval) {
Meteor.clearInterval(refreshMeteorInterval);
refreshMeteorInterval = null;
if (null !== statsRefreshInterval) {
Meteor.clearInterval(statsRefreshInterval);
statsRefreshInterval = null;
}
};

/**
* Updates and publishes compression job statuses.
*
* @param {string} publicationName
* @return {Mongo.Cursor}
*/
Meteor.publish(Meteor.settings.public.CompressionJobsCollectionName, async () => {
logger.debug(`Subscription '${Meteor.settings.public.CompressionJobsCollectionName}'`);

await refreshCompressionJobs();

const findOptions = {
sort: [MONGO_SORT_BY_ID],
};

return CompressionJobsCollection.find({}, findOptions);
});

/**
* Updates and publishes compression statistics.
*
Expand All @@ -100,6 +225,8 @@ Meteor.publish(Meteor.settings.public.StatsCollectionName, async () => {
});

export {
deinitCompressionDbManager,
deinitStatsDbManager,
initCompressionDbManager,
initStatsDbManager,
};
7 changes: 5 additions & 2 deletions components/webui/imports/api/search/server/publications.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import {Meteor} from "meteor/meteor";

import {logger} from "/imports/utils/logger";
import {
MONGO_SORT_BY_ID,
MONGO_SORT_ORDER,
} from "/imports/utils/mongo";

import {SearchResultsMetadataCollection} from "../collections";
import {
MONGO_SORT_ORDER,
SEARCH_MAX_NUM_RESULTS,
SEARCH_RESULTS_FIELDS,
} from "../constants";
Expand Down Expand Up @@ -58,7 +61,7 @@ Meteor.publish(Meteor.settings.public.SearchResultsCollectionName, ({
sort: [
/* eslint-disable @stylistic/js/array-element-newline */
[SEARCH_RESULTS_FIELDS.TIMESTAMP, MONGO_SORT_ORDER.DESCENDING],
[SEARCH_RESULTS_FIELDS.ID, MONGO_SORT_ORDER.DESCENDING],
MONGO_SORT_BY_ID,
/* eslint-enable @stylistic/js/array-element-newline */
],
limit: SEARCH_MAX_NUM_RESULTS,
Expand Down
Loading
Loading