Skip to content

Commit

Permalink
WX-1282 Update failedJobs Query to use lo_get instead of INNER JOIN…
Browse files Browse the repository at this point in the history
… against pg_largeobject (#7228)
  • Loading branch information
JVThomas authored and AlexITC committed Jan 14, 2024
1 parent 93d1759 commit 034c19d
Showing 1 changed file with 5 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,22 +319,14 @@ trait MetadataEntryComponent {
if(isPostgres) s"${'"'}$identifier${'"'}" else identifier
}

def dbMetadataValueColCheckName(isPostgres: Boolean): String = {
if(isPostgres) "obj.data" else "METADATA_VALUE"
def evaluateMetadataValue(isPostgres: Boolean, colName: String): String = {
if(isPostgres) s"convert_from(lo_get(${colName}::oid), 'UTF8')" else colName
}

def attemptAndIndexSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String, variablePrefix: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as ${variablePrefix}Scatter, MAX(COALESCE(${retryAttempt}, 0)) AS ${variablePrefix}Retry"
}

def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = {
if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else ""
}

def failedTaskGroupByClause(metadataValue: String, callFqn: String): String = {
return s"GROUP BY ${callFqn}, ${metadataValue}"
}

val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
Expand All @@ -358,11 +350,10 @@ trait MetadataEntryComponent {
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
#${failedTaskGroupByClause(dbMetadataValueColCheckName(isPostgres), callFqn)}
HAVING #${dbMetadataValueColCheckName(isPostgres)} = 'Failed'
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${evaluateMetadataValue(isPostgres, metadataValue)} = 'Failed')
GROUP BY #${callFqn}, #${metadataValue}
HAVING #${evaluateMetadataValue(isPostgres, metadataValue)} = 'Failed'
) AS failedCalls
ON me.#${callFqn} = failedCalls.#${callFqn}
INNER JOIN (
Expand Down

0 comments on commit 034c19d

Please sign in to comment.