Skip to content

Commit

Permalink
Offload job processing to server attempt #2
Browse files Browse the repository at this point in the history
  • Loading branch information
nolanm1122 committed Sep 3, 2024
1 parent 8f60985 commit 2493b8a
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 111 deletions.
101 changes: 52 additions & 49 deletions src/oqlParse/oqlParse.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ function makeColumnIDsMap() {
const columns = [];
for (const colKey in configs[key].columns) {
columns.push(
{name: configs[key].columns[colKey].displayName.toLowerCase(),
id: configs[key].columns[colKey].id,
entityId: configs[key].columns[colKey].entityId}
{
name: configs[key].columns[colKey].displayName.toLowerCase(),
id: configs[key].columns[colKey].id,
entityId: configs[key].columns[colKey].entityId
}
);
}
map[key] = columns;
Expand All @@ -25,7 +27,7 @@ function getEntityId(subjectEntity, colName) {
if (!(subjectEntity in COLUMN_IDS_MAP)) {
throw new Error(`${subjectEntity} is not a valid subjectEntity`);
}
for (const m of COLUMN_IDS_MAP[subjectEntity]) {
for (const m of COLUMN_IDS_MAP[subjectEntity]) {
if (colName === m.name || colName === m.value) return m.entityId;
}
return null;
Expand Down Expand Up @@ -80,7 +82,7 @@ function parseCondition(condition, subjectEntity) {

value = parsePrimitive(value);
let entityId = getEntityId(subjectEntity, columnName);
if (typeof value === "string" && entityId !== null && !value.includes(`${entityId}/`)) value = `${entityId}/${value}`;
if (typeof value === "string" && entityId !== null && entityId !== "works" && !value.includes(`${entityId}/`)) value = `${entityId}/${value}`;

return {
id: generateId(),
Expand Down Expand Up @@ -163,7 +165,7 @@ function parseFilters(oql) {
let worksOperator = "and";
let summarizeByOperator = "and";

const worksMatch = oql.match(/where (.+?)(;|$)/);
const worksMatch = oql.match(/(?<!summarize by.*)where (.+?)(;|$)/);
if (worksMatch) {
const worksClause = worksMatch[1];
if (worksClause.includes("(")) {
Expand All @@ -172,8 +174,7 @@ function parseFilters(oql) {
condition.subjectEntity = 'works';
}
filters.push(...nestedConditions);
}
else if (worksClause.includes(" or ")) {
} else if (worksClause.includes(" or ")) {
worksOperator = "or";
worksConditions.push(...worksClause.split(" or "));
} else {
Expand Down Expand Up @@ -329,66 +330,68 @@ function oqlToQuery(oql) {
}

function queryToOQL(query) {
if (!query.filters || query.filters.length === 0) {
return "get works";
}
let oql = "get works";

if (Array.isArray(query.filters) && query.filters.length > 0) {
const worksFilters = query.filters.filter(filter => filter.subjectEntity === "works");
if (worksFilters.length > 0) {
oql += ` where ${generateFilters(worksFilters, "works")}`;
}
}

const rootFilter = findRootFilter(query.filters);
let oql = `get ${rootFilter.subjectEntity} where `;
oql += generateFilters(query.filters);

if (query.summarize_by) {
oql += `; summarize by ${query.summarize_by}`;
if (query.summarize_by) {
oql += `; summarize by ${query.summarize_by}`;

const summaryFilters = query.filters.filter(filter => filter.subjectEntity === query.summarize_by);
if (summaryFilters.length > 0) {
oql += ` where ${generateFilters(summaryFilters)}`;
const summaryFilters = (query.filters || []).filter(filter => filter.subjectEntity === query.summarize_by);
if (summaryFilters.length > 0) {
oql += ` where ${generateFilters(summaryFilters, query.summarize_by)}`;
}
}
}

if (query.sort_by) {
oql += `; sort by ${query.sort_by.column_id} ${query.sort_by.direction}`;
}
if (query.sort_by) {
oql += `; sort by ${query.sort_by.column_id} ${query.sort_by.direction}`;
}

if (query.return_columns && query.return_columns.length > 0) {
oql += `; return ${query.return_columns.join(', ')}`;
}
if (query.return_columns && query.return_columns.length > 0) {
oql += `; return ${query.return_columns.join(', ')}`;
}

return oql;
return oql;
}

function findRootFilter(filters) {
const childIds = new Set(filters.flatMap(filter => filter.children || []));
return filters.find(filter => !childIds.has(filter.id));
function findRootFilter(filters, subjEntity) {
const childIds = new Set(filters.flatMap(filter => filter.children || []));
return filters.find(filter => !childIds.has(filter.id) && filter.subjectEntity === subjEntity);
}

function generateFilters(filters) {
const rootFilter = findRootFilter(filters);
return generateFilterString(rootFilter, filters);
function generateFilters(filters, subjEntity) {
const rootFilter = findRootFilter(filters, subjEntity);
return generateFilterString(rootFilter, filters);
}

function generateFilterString(filter, allFilters) {
if (filter.type === 'leaf') {
return `${filter.column_id} ${filter.operator || 'is'} ${filter.value}`;
} else if (filter.type === 'branch') {
const childFilters = filter.children.map(childId => {
const childFilter = allFilters.find(f => f.id === childId);
return generateFilterString(childFilter, allFilters);
});
if (filter.type === 'leaf') {
return `${filter.column_id} ${filter.operator || 'is'} ${filter.value}`;
} else if (filter.type === 'branch') {
const childFilters = filter.children.map(childId => {
const childFilter = allFilters.find(f => f.id === childId);
return generateFilterString(childFilter, allFilters);
});

const needsParentheses = filter.children.length > 1 &&
allFilters.some(f => f.type === 'branch' && f.id !== filter.id && f.subjectEntity === filter.subjectEntity);
const needsParentheses = filter.children.length > 1 &&
allFilters.some(f => f.type === 'branch' && f.id !== filter.id && f.subjectEntity === filter.subjectEntity);

const joinedFilters = childFilters.join(` ${filter.operator} `);
return needsParentheses ? `(${joinedFilters})` : joinedFilters;
}
const joinedFilters = childFilters.join(` ${filter.operator} `);
return needsParentheses ? `(${joinedFilters})` : joinedFilters;
}
}

function formatValue(value) {
if (typeof value === 'string') {
return `"${value}"`;
}
return value;
if (typeof value === 'string') {
return `"${value}"`;
}
return value;
}

export {
Expand Down
128 changes: 71 additions & 57 deletions src/oqlParse/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class OQOTestRunner {
this.tests = tests;
this.onTestResultCb = onTestResultCb;
this.serverUrl = 'https://api.openalex.org';
this.queueId = null;
// this.serverUrl = 'http://localhost:5000';
this.jobId = null;
}

expectedResults(tests, cases = ["oqlToQuery", "queryToOql", "natLang", "queryToSearch"]) {
Expand Down Expand Up @@ -237,7 +238,7 @@ class OQOTestRunner {
}
}

async startServerTests(cases) {
async startServerTests(cases) {
const serverTests = this.tests.flatMap(test => {
const testId = objectMD5ShortUUID(test);
const serverTestCases = [];
Expand All @@ -263,6 +264,9 @@ class OQOTestRunner {

const response = await fetch(`${this.serverUrl}/bulk_test`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(serverTests),
});

Expand All @@ -271,66 +275,76 @@ class OQOTestRunner {
}

const data = await response.json();
this.queueId = data.queue_id;
this.jobId = data.job_id;
}

listenForResults() {
return new Promise((resolve, reject) => {
const eventSource = new EventSource(`${this.serverUrl}/stream/${this.queueId}`);
async pollForResults() {
const pollInterval = 5000; // 5 seconds
const maxAttempts = 60; // 5 minutes total polling time
let attempts = 0;

while (attempts < maxAttempts) {
const response = await fetch(`${this.serverUrl}/job_status/${this.jobId}`,{
method: 'GET',
headers: {
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0'
}
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.status === 'processing') {
// Optionally handle processing status
return;
}
if (data.status === 'all_completed') {
eventSource.close();
resolve();
return;
}
if (data.status === 'error') {
console.error(data.message);
eventSource.close();
reject(new Error(data.message));
return;
}
const data = await response.json();

if (data.hasOwnProperty('id') && data.id !== null) {
if (data.case === 'natLang') {
const results = [];
const test = this.tests.find(test => objectMD5ShortUUID(test) === data.id);
for (const _result of data.results) {
const result = OQOTestRunner.queriesEqual(_result.oqo, test.query, test.ignore ?? []);
if (!result.equal) {
result.expected = test.query;
result.actual = _result.oqo;
}
results.push({
"case": "natLang",
prompt: _result.prompt,
isPassing: result.equal,
details: result,
});
if (data.is_completed) {
return this.processResults(data.results);
}

if (data.status === 'failed') {
throw new Error(`Job failed: ${data.error || 'Unknown error'}`);
}

await new Promise(resolve => setTimeout(resolve, pollInterval));
attempts++;
}

throw new Error('Polling timed out');
}

processResults(results) {
for (const result of results) {
if (result.case === 'natLang') {
const test = this.tests.find(test => objectMD5ShortUUID(test) === result.id);
const subTests = result.results.map(subResult => {
let comparisonResult = {equal: false};
try {
comparisonResult = OQOTestRunner.queriesEqual(subResult.oqo, test.query, test.ignore ?? []);
} catch (e) {}
return {
case: "natLang",
prompt: subResult.prompt,
isPassing: comparisonResult.equal && !subResult.hasOwnProperty("error"),
details: comparisonResult.equal ? {} : {
expected: test.query,
actual: subResult.oqo,
...(subResult.hasOwnProperty("error") ? { error: subResult.error } : {}),
...comparisonResult
}
this.onTestResultCb({
"case": "natLang",
id: data.id,
isPassing: results.every((o) => o.isPassing),
subTests: results
});
} else if (data.case === 'jsonToSearch') {
this.onTestResultCb(data);
}
}
};
};
});

eventSource.onerror = (error) => {
console.error('EventSource failed:', error);
eventSource.close();
reject(error);
};
});
this.onTestResultCb({
case: "natLang",
id: result.id,
isPassing: subTests.every(st => st.isPassing),
subTests: subTests
});
} else if (result.case === 'queryToSearch') {
this.onTestResultCb(result);
}
}
}

async runTests(cases = ["oqlToQuery", "queryToOql", "natLang", "queryToSearch"]) {
Expand All @@ -354,7 +368,7 @@ class OQOTestRunner {
// Run server-side tests
if (cases.includes("natLang") || cases.includes("queryToSearch")) {
await this.startServerTests(cases);
await this.listenForResults();
await this.pollForResults();
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ const routes = [
component: OQOTestDetails,
props: (route) => ({
testId: route.params.id,
autoRun: route.path.endsWith('/run')
autoRun: route.query.run
})
},
{
path: '/tests/:id/run',
component: OQOTestDetails,
path: '/tests/tag/:tag',
component: OQOTests,
props: (route) => ({
testId: route.params.id,
autoRun: true
initialTag: route.params.tag,
autoRun: route.query.run
})
},

Expand Down

0 comments on commit 2493b8a

Please sign in to comment.