Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Making JOIN Task Async failing some workflows #3528

Closed
vamsibandarupalli opened this issue Mar 9, 2023 · 4 comments
Closed

Making JOIN Task Async failing some workflows #3528

vamsibandarupalli opened this issue Mar 9, 2023 · 4 comments
Labels
type: bug bugs/ bug fixes

Comments

@vamsibandarupalli
Copy link

Describe the bug
After making JOIN task async some workflows are failing to execute successfully. JOIN task is failing with timeout. If workflow have DECISION tasks in FORK_JOIN branches JOIN whole workflow is failing cause JOIN task is failing with responseTimeOut error.
In the below provided workflow "mock_task" Task definition has 300 seconds timeout but failing even before JOIN starts.

Details
Conductor version: main branch
Persistence implementation: Dynomite
Queue implementation: Dynoqueues
Lock: Redis
Workflow definition:
{
"createTime": 1678401621130,
"updateTime": 1678308058828,
"accessPolicy": {},
"name": "fd-csd-flow_mock2",
"description": "fd-csd-flow",
"version": 1,
"tasks": [
{
"name": "mock_task",
"taskReferenceName": "InputJQ",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | {filters:[.filters[] | {repository :.repository, "filters":[.fields[] | {(.name):.values}] | add}],"filterflag": (.filters | length > 0)}",
"JSONJQINPUT": "${workflow.input.body}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "fork-join-0",
"inputParameters": {},
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "mock_task",
"taskReferenceName": "ARSCInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "arsc") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){'arsc';}else {for(var i = 0; i <$.filter.length; i++) { if ($.filter[i].repository == 'arsc'){ 'arsc'; break; }}}",
"decisionCases": {
"arsc": [
{
"name": "mock_task",
"taskReferenceName": "ARSCTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "arsc",
"response": "${ARSCTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCDecision_join",
"inputParameters": {
"output": "${ARSCJQT.output.result}",
"ARSCTaskOutput": "${ARSCTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "ARSCFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${ARSCDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "OEEHInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "oeeh") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'oeeh';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'oeeh'){ \n 'oeeh';\n break;\n }\n}\n}",
"decisionCases": {
"oeeh": [
{
"name": "mock_task",
"taskReferenceName": "OEEHTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "oeeh",
"response": "${OEEHTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHDecision_join",
"inputParameters": {
"output": "${OEEHJQT.output.result}",
"OEEHTaskOutput": "${OEEHTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "OEEHFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${OEEHDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
[
{
"name": "mock_task",
"taskReferenceName": "APSVInput",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | if .filters | length > 0 then .filters[] | select(.repository == "apsv") | .filters else {} end",
"JSONJQINPUT": "${InputJQ.output.result}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision",
"inputParameters": {
"filter": "${InputJQ.output.result.filters}",
"filterflag": "${InputJQ.output.result.filterflag}"
},
"type": "DECISION",
"caseExpression": "if (!$.filterflag ){\n\t'apsv';\n}else {\nfor(var i = 0; i <$.filter.length; i++) {\n if ($.filter[i].repository == 'apsv'){ \n 'apsv';\n break;\n }\n}\n}",
"decisionCases": {
"apsv": [
{
"name": "mock_task",
"taskReferenceName": "APSVTask",
"inputParameters": {
"http_request": {
"uri": "http://httpbin.org/anything",
"method": "POST",
"body": "${workflow.input.body}"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVJQT",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"objectType": "apsv",
"response": "${APSVTask.output.response.body}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
},
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVDecision_join",
"inputParameters": {
"output": "${APSVJQT.output.result}",
"APSVTaskOutput": "${APSVTask.output.response.body}"
},
"type": "SIMPLE",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "APSVFilter",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": "${APSVDecision_join.output.response.body.output}"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
],
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "join-0",
"inputParameters": {},
"type": "JOIN",
"startDelay": 0,
"joinOn": [
"ARSCFilter",
"OEEHFilter",
"APSVFilter"
],
"optional": false,
"asyncComplete": false
},
{
"name": "mock_task",
"taskReferenceName": "CSDMerge",
"inputParameters": {
"queryExpression": ".JSONJQINPUT | .",
"JSONJQINPUT": {
"arsc_jq": "${ARSCFilter.output.result}",
"oeeh_jq": "${OEEHFilter.output.result}",
"apsv_jq": "${APSVFilter.output.result}"
}
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
],
"inputParameters": [],
"outputParameters": {
"searchText": "${workflow.input.body.text}",
"applications": [
"${CSDMerge.output.result}"
]
},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"ownerEmail": "[email protected]",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"variables": {},
"inputTemplate": {}
}
Task definition:
{
"createTime": 1678399938459,
"createdBy": "",
"accessPolicy": {},
"name": "mock_task",
"description": "mock task request",
"retryCount": 0,
"timeoutSeconds": 0,
"inputKeys": [],
"outputKeys": [],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 2,
"responseTimeoutSeconds": 300,
"inputTemplate": {},
"rateLimitPerFrequency": 0,
"rateLimitFrequencyInSeconds": 1,
"ownerEmail": "[email protected]",
"backoffScaleFactor": 1
}

To Reproduce
Steps to reproduce the behavior:

  1. Use the above given workflow and task definitions to reproduce the issue
  2. Use this input for workflow
    {
    "body": {
    "text": "101",
    "version": 3,
    "logicalId": "lid://demo-products.1",
    "page_number": 1,
    "page_size": 20,
    "userContext": null,
    "filters": []
    }
    }
  3. See error
    conductor-server_1 | 2782317 [http-nio-8080-exec-3] INFO com.netflix.conductor.core.execution.WorkflowExecutor [] - Execution terminated of workflow: fd-csd-flow_mock2.1/1e79652f-0a2c-40d2-a9e9-9acec064cb4e.RUNNING
    conductor-server_1 | com.netflix.conductor.core.exception.TerminateWorkflowException: responseTimeout: 300 exceeded for the taskId: cc8dc3ff-3e09-406a-9867-ab7c6cc7747f with Task Definition: JOIN
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.retry(DeciderService.java:548) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:205) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.DeciderService.decide(DeciderService.java:109) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1007) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.decide(WorkflowExecutor.java:1040) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]
    conductor-server_1 | at com.netflix.conductor.core.execution.WorkflowExecutor.handleWorkflowEvaluationEvent(WorkflowExecutor.java:962) ~[conductor-core-3.14.0-SNAPSHOT.jar!/:3.14.0-SNAPSHOT]

Expected behavior
Expect JOIN task to run without error.

Additional context
When started workflow JOIN task scheduled is next here. JOIN is getting scheduled even before updating the taskUpdated time, so JOIN task is failing this check here and timing out.

I'll try to debug and will raise merge request if found anything.

@vamsibandarupalli vamsibandarupalli added the type: bug bugs/ bug fixes label Mar 9, 2023
@james-deee
Copy link
Contributor

james-deee commented Apr 18, 2023

@vamsibandarupalli there is also a pretty big performance issue that was introduced around JOIN in the newer versions of Conductor. See the discussion here: #3436 (reply in thread)

Which unfortunately, isn't getting much of a response from the maintainers/community here :(

@v1r3n
Copy link
Contributor

v1r3n commented May 30, 2023

#3594 addresses this.

@v1r3n v1r3n closed this as completed May 30, 2023
@Holmesus
Copy link

There is still this problem...

@Holmesus Holmesus mentioned this issue Oct 11, 2023
6 tasks
@v1r3n
Copy link
Contributor

v1r3n commented Nov 3, 2023

There is still this problem...

Hi @Holmesus can you provide more details?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type: bug bugs/ bug fixes
Projects
None yet
Development

No branches or pull requests

4 participants