Skip to content

Commit

Permalink
WX-1110[risk=low] Added endpoint to fetch failed tasks by root workfl…
Browse files Browse the repository at this point in the history
…ow id (#7165)
  • Loading branch information
JVThomas authored Jul 17, 2023
1 parent 63b035f commit af9660e
Show file tree
Hide file tree
Showing 17 changed files with 443 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,9 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
override def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]] = {
runAction(dataAccess.metadataTableSizeInformation())
}

override def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] = {
val isPostgres = databaseConfig.getValue("db.driver").toString.toLowerCase().contains("postgres")
runLobAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId, isPostgres))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,84 @@ trait MetadataEntryComponent {
}).headOption
}

def failedJobsMetadataWithWorkflowId(rootWorkflowId: String, isPostgres: Boolean) = {
val getMetadataEntryResult = GetResult(r => {
MetadataEntry(r.<<, r.<<, r.<<, r.<<, r.<<, r.nextClobOption().map(clob => new SerialClob(clob)), r.<<, r.<<, r.<<)
})

def dbIdentifierWrapper(identifier: String, isPostgres: Boolean) = {
if(isPostgres) s"${'"'}$identifier${'"'}" else identifier
}

def dbMetadataValueColCheckName(isPostgres: Boolean): String = {
if(isPostgres) "obj.data" else "METADATA_VALUE"
}

def targetCallsSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(${retryAttempt}, 0)) AS maxRetry"
}

def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = {
if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else ""
}

val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
val retryAttempt = dbIdentifierWrapper("JOB_RETRY_ATTEMPT", isPostgres)
val metadataKey = dbIdentifierWrapper("METADATA_KEY", isPostgres)
val metadataValueType = dbIdentifierWrapper("METADATA_VALUE_TYPE", isPostgres)
val metadataTimestamp = dbIdentifierWrapper("METADATA_TIMESTAMP", isPostgres)
val metadataJournalId = dbIdentifierWrapper("METADATA_JOURNAL_ID", isPostgres)
val rootUuid = dbIdentifierWrapper("ROOT_WORKFLOW_EXECUTION_UUID", isPostgres)
val metadataValue = dbIdentifierWrapper("METADATA_VALUE", isPostgres)
val metadataEntry = dbIdentifierWrapper("METADATA_ENTRY", isPostgres)
val wmse = dbIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres)
val resultSetColumnNames = s"me.${workflowUuid}, me.${callFqn}, me.${scatterIndex}, me.${retryAttempt}, me.${metadataKey}, me.${metadataValue}, me.${metadataValueType}, me.${metadataTimestamp}, me.${metadataJournalId}"

val query = sql"""
SELECT #${resultSetColumnNames}
FROM #${metadataEntry} me
INNER JOIN (
#${targetCallsSelectStatement(callFqn, scatterIndex, retryAttempt)}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND #${callFqn} IS NOT NULL
GROUP BY #${callFqn}
) AS targetCalls
ON me.#${callFqn} = targetCalls.#${callFqn}
LEFT JOIN (
SELECT DISTINCT #${callFqn}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND me.#${metadataKey} = 'subWorkflowId'
GROUP BY #${callFqn}
) AS avoidedCalls
ON me.#${callFqn} = avoidedCalls.#${callFqn}
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE avoidedCalls.#${callFqn} IS NULL
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
AND (
(COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} IS NULL)
OR (COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} = targetCalls.maxScatter)
)
GROUP BY #${resultSetColumnNames}
HAVING me.#${workflowUuid} IN (
SELECT DISTINCT wmse.#${workflowUuid}
FROM #${wmse} wmse
WHERE wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId
)
"""

query.as(getMetadataEntryResult)
}

private[this] def metadataEntryHasMetadataKeysLike(metadataEntry: MetadataEntries,
metadataKeysToFilterFor: List[String],
metadataKeysToFilterOut: List[String]): Rep[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
def countWorkflowsLeftToDeleteThatEndedOnOrBeforeThresholdTimestamp(workflowEndTimestampThreshold: Timestamp)(implicit ec: ExecutionContext): Future[Int]

def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]]

def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]]
}
32 changes: 31 additions & 1 deletion docs/api/RESTAPI.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,26 @@ paths:
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata/failed-jobs':
get:
operationId: failed-jobs
summary: Get call-level metadata of failed tasks for a specified root workflow
parameters:
- $ref: '#/parameters/versionParam'
- $ref: '#/parameters/singleId'
tags:
- Workflows
responses:
'200':
description: Successful request
schema:
$ref: '#/definitions/WorkflowMetadataResponse'
'400':
$ref: '#/responses/BadRequest'
'404':
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata':
get:
operationId: metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ trait MetadataRouteSupport extends HttpInstrumentation {
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata" / "failed-jobs") { (_, possibleWorkflowId) =>
instrumentRequest {
metadataLookup(
possibleWorkflowId,
(w: WorkflowId) => FetchFailedJobsMetadataWithWorkflowId(w),
serviceRegistryActor
)
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata") { (_, possibleWorkflowId) =>
instrumentRequest {
Expand Down Expand Up @@ -212,8 +223,7 @@ object MetadataRouteSupport {
}

def completeMetadataBuilderResponse(response: Future[MetadataJsonResponse]): Route = {
onComplete(response) {
case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
onComplete(response) { case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
case Success(r: FailedMetadataJsonResponse) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case Failure(e: UnrecognizedWorkflowException) => e.failRequest(StatusCodes.NotFound)
Expand Down
140 changes: 107 additions & 33 deletions engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
events: Seq[MetadataEvent],
expectedRes: String,
metadataBuilderActorName: String,
failedTasks: Boolean = false
): Future[Assertion] = {
val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
Expand All @@ -45,13 +46,17 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = metadataBuilderActorName,
)

val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryReply, events))
mockReadMetadataWorkerActor.reply(
if(failedTasks) FetchFailedJobsMetadataLookupResponse(events) else MetadataLookupResponse(queryReply, events)
)
response map { r => r shouldBe a [SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson}
}


def assertMetadataFailureResponse(action: MetadataServiceAction,
metadataServiceResponse: MetadataServiceResponse,
expectedException: Exception,
Expand Down Expand Up @@ -95,37 +100,37 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
// We'll use a Query instead of a SingleWorkflowMetadataGet, so we expect the WorkflowID this time:
val expectedRes =
s"""{
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
|}""".stripMargin
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
val queryAction = GetMetadataAction(mdQuery)
Expand All @@ -134,7 +139,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
metadataBuilderActorName = "mba-scope-tree",
metadataBuilderActorName = "mba-scope-tree"
)
}

Expand Down Expand Up @@ -162,6 +167,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
eventMaker: WorkflowId => (String, MetadataValue, OffsetDateTime) => MetadataEvent =
makeEvent,
metadataBuilderActorName: String,
isFailedTaskFetch: Boolean = false
): Future[Assertion] = {

val events = eventList map { e => (e._1, MetadataValue(e._2), e._3) } map Function.tupled(eventMaker(workflow))
Expand All @@ -172,6 +178,74 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
assertMetadataResponse(queryAction, mdQuery, events, expectedRes, metadataBuilderActorName)
}

it should "build the call list for failed tasks when prompted" in {

def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) = {
MetadataEvent(MetadataKey(workflow, key, "NOT_CHECKED"), MetadataValue("NOT_CHECKED"))
}

val workflowA = WorkflowId.randomId()

val workflowACalls = List(
Option(MetadataJobKey("callB", Option(1), 3)),
Option(MetadataJobKey("callB", None, 1)),
Option(MetadataJobKey("callB", Option(1), 2)),
Option(MetadataJobKey("callA", None, 1)),
Option(MetadataJobKey("callB", Option(1), 1)),
Option(MetadataJobKey("callB", Option(0), 1)),
None
)
val workflowAEvents = workflowACalls map {
makeEvent(workflowA, _)
}

val expectedRes =
s"""{
|"${workflowA}": {
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED"
| }
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
val queryAction = GetMetadataAction(mdQuery)
assertMetadataResponse(
action = queryAction,
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
failedTasks = true,
metadataBuilderActorName = "mba-failed-tasks",
)
}

it should "assume the event list is ordered and keep last event if 2 events have same key" in {
val eventBuilderList = List(
("a", "aLater", OffsetDateTime.parse("2000-01-02T12:00:00Z")),
Expand Down
Loading

0 comments on commit af9660e

Please sign in to comment.