Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Factory] Convert PipelineResource to JSON #25081

Closed
romanzdk opened this issue Jul 5, 2022 · 12 comments
Closed

[Data Factory] Convert PipelineResource to JSON #25081

romanzdk opened this issue Jul 5, 2022 · 12 comments
Assignees
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Data Factory Mgmt This issue is related to a management-plane library. needs-author-feedback Workflow: More information is needed from author to address the issue. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team no-recent-activity There has been no recent activity on this issue. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@romanzdk
Copy link

romanzdk commented Jul 5, 2022

I am creating Azure Data Factory pipeline using Python SDK (azure.mgmt.datafactory.models.PipelineResource). I need to convert PipelineResource object to JSON file. Is it possible anyhow?

I tried json.loads(pipeline_object) , json.dumps(pipeline_object) but no luck.

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Jul 5, 2022
@azure-sdk azure-sdk added Data Factory Mgmt This issue is related to a management-plane library. needs-team-triage Workflow: This issue needs the team to triage. labels Jul 5, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Jul 5, 2022
@mccoyp
Copy link
Member

mccoyp commented Jul 5, 2022

Hi @romanzdk, thank you for opening an issue! I'll tag the appropriate folks so we can take a closer look.

In the meantime, you may be able to get unblocked by adding a default argument to json.dumps in order to make objects that aren't JSON serializable into dicts first. For example:

import json
from azure.mgmt.datafactory.models import Activity, PipelineResource

activity = Activity(name="activity-name")
resource = PipelineResource(activities=[activity])

json_dict = json.dumps(resource, default=lambda obj: obj.__dict__)
print(json_dict)

@mccoyp mccoyp added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team CXP Attention and removed needs-team-triage Workflow: This issue needs the team to triage. labels Jul 5, 2022
@ghost
Copy link

ghost commented Jul 5, 2022

Thank you for your feedback. This has been routed to the support team for assistance.

@romanzdk
Copy link
Author

romanzdk commented Jul 7, 2022

@mccoyp (I added resource.name = "my-name" to your code)

Unfortunately, when I use this Azure Data Factory returns an error Could not load resource 'my-name'. Please ensure no mistakes in the JSON and that referenced resources exist. Status: Cannot read properties of undefined (reading 'policy'), Possible reason: Cannot read properties of undefined (reading 'policy')

I have multiple activities like this:

activity = DatabricksNotebookActivity(
              name = <name>,
              depends_on= <dependency-name>,
              notebook_path = <notebook-path>,
              base_parameters= <some-parameters>,
              linked_service_name= <ls-name>
          )

@Wzb123456789 Wzb123456789 added needs-author-feedback Workflow: More information is needed from author to address the issue. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Jul 8, 2022
@Wzb123456789
Copy link
Contributor

Hi @romanzdk

According to the error message you gave, I judged that it was an error in your policy information, which has nothing to do with the above code and the code you gave.

It is recommended that you check your Azure policy information to see if it works.

@romanzdk
Copy link
Author

romanzdk commented Jul 8, 2022

Well, not really, apparently. It looks like some issue with the generated JSON file. All activities and the pipeline has policy=null. I also tried to remove all these policy keys, but the same issue.

@ghost ghost added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Jul 8, 2022
@Wzb123456789
Copy link
Contributor

Hi @romanzdk

You want to modify the parameters through resource.name = "my-name" above, but in the swagger definition, name is a read-only parameter, so it cannot be modified. SDK is generated by swagger.
https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/datafactory/azure-mgmt-datafactory/azure/mgmt/datafactory/models/_models_py3.py
image

Because the code mentioned by @mccoyp above is only tested locally and not through Azure, all activities and the pipeline has policy=null.

@romanzdk
Copy link
Author

Hi @Wzb123456789 , I know that name parameter is read-only, but when I convert the class to JSON, the name is not defined, so the pipeline is not valid for the ADF. The name has to be present - that's the reason I changed the name.

@romanzdk
Copy link
Author

romanzdk commented Jul 12, 2022

So the issue is that json_dict = json.dumps(pipeline, default=lambda obj: obj.__dict__, indent=2) generates pipeline which looks like this:

{
    "name": "pipelineName",
    "activities": [
        {
            "name": "taskName",
            "description": null,
            "type": "DatabricksNotebook",
            "depends_on": [
                {
                    "additional_properties": null,
                    "activity": "dependencyName",
                    "dependency_conditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "additional_properties": null,
            "user_properties": null,
            "linked_service_name": {
                "reference_name": "LinkedServiceName"
            },
            "notebook_path": "some/path",
            "base_parameters": {
                "parameters": "some_parameter"
            },
            "libraries": null
        },
        {
            "name": "dependencyName",
            "description": null,
            "type": "DatabricksNotebook",
            "depends_on": [],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "additional_properties": null,
            "user_properties": null,
            "linked_service_name": {
                "reference_name": "LinkedServiceName"
            },
            "notebook_path": "some/path",
            "base_parameters": {
                "parameters": "some_parameter"
            },
            "libraries": null
        }
    ],
    "additional_properties": null,
    "id": null,
    "type": null,
    "etag": null,
    "description": null,
    "parameters": null,
    "variables": null,
    "concurrency": null,
    "annotations": null,
    "run_dimensions": null,
    "folder": {
        "name": "someFolder"
    },
    "policy": null
}

Which looks almost good, however there are 3 issues.

  1. activities should be placed in the properties which should be in the root. The correct path should be properties - activities
  2. depends_on should be dependsOn
  3. dependency_conditions should be dependencyConditions

I implemented the changes like this:

pipeline = PipelineResource(
    activities = activities,
    folder = PipelineFolder(name = "someFolder")
)

pipeline.name = pipeline_name

json_dict = json.dumps(pipeline, default=lambda obj: obj.__dict__, indent=2)
json_obj = json.loads(json_dict)
json_obj['properties'] = {}

activities = json_obj['activities']

# following is wrong (not accepted by ADF so its has to be replaced)
for activity in activities:
    activity['dependsOn'] = activity['depends_on']
    del activity['depends_on']
    for dependency in activity['dependsOn']:
        dependency['dependencyConditions'] = dependency['dependency_conditions']
        del dependency['dependency_conditions']

json_obj['properties']['activities'] = activities
del json_obj['activities']

with open(f"{pipeline_name}.json", "w") as f:
    f.write(json.dumps(json_obj, indent=2))

Could you please take a look at this and fix it? So we can generate JSON from the PipelineResource class by default?

EDIT:
Actually, all of the keys are wrong.. I fixed only the ones I need:

def fix_activities(activities:dict) -> dict:
    # following is wrong (not accepted by ADF so its has to be replaced)
    for activity in activities:
        activity['dependsOn'] = activity['depends_on']
        del activity['depends_on']
        
        for dependency in activity['dependsOn']:
            dependency['dependencyConditions'] = dependency['dependency_conditions']
            del dependency['dependency_conditions']
        
        activity['linkedServiceName'] = activity.pop('linked_service_name')
        activity['linkedServiceName']['referenceName'] = activity['linkedServiceName'].pop('reference_name')

        activity['typeProperties'] = {}
        try:
            activity['typeProperties']['notebookPath'] = activity.pop('notebook_path')
            activity['typeProperties']['baseParameters'] = activity.pop('base_parameters')
        except KeyError:
            pass
        
        try:
            activity['typeProperties']['pythonFile'] = activity.pop('python_file')
            activity['typeProperties']['parameters'] = activity.pop('parameters')
        except KeyError:
            pass
    
    return activities

@Wzb123456789
Copy link
Contributor

Wzb123456789 commented Jul 14, 2022

Hi @romanzdk

There are two ways to pass parameters to the model:

1.
activity1 = Activity(depends_on=...)
2.
activity1 = Activity(
    {
       "dependsOn": ...
    }
)

So, if you need to use the model to get data, it is recommended that you pass the parameters you want in json, like:
image

Regarding some keys that are not in the format you need, I cannot modify this, because this parameter returned by the service response is also a read-only parameter, which is defined by swagger, but it is useless to modify it.

@romanzdk
Copy link
Author

There is still a misunderstanding.. I am generating the entire ADF pipeline using just Python SDK. No JSON files. And I need to convert the result Pipeline object into JSON because I need to upload it into GIT repository associated with the ADF.

The problem is that converting the ADF python models into JSON objects is not working correctly - dictionary keys have wrong case - e.g. instead of dependsOn converted object has key depends_on which is not accepted by the ADF.

@Wzb123456789
Copy link
Contributor

Wzb123456789 commented Jul 19, 2022

Hi @romanzdk

Since the model of PipelineResource is fixed, I have no way to modify what comes out after model deserialization.

I found a method that might work for you based on your description, like:

def callback(request):
    origin_req = json.loads(request.http_request.body)
    with open("result.json", "w") as file:
        file.write(json.dumps(origin_req, indent=4))
    


res = ADF_client.pipelines.create_or_update(
    resource_group_name='resource_group_name',
    factory_name='factory_name',
    pipeline_name='pipeline_name',
    pipeline=PipelineResource(
        activities=[activity],
        folder=PipelineFolder(name="someFolder")
    ),
    raw_request_hook=callback
)

This method is to obtain the serialized content of the model in the sent request.

Hope it helps you.

@xiangyan99 xiangyan99 added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Jul 22, 2022
@ghost ghost added the no-recent-activity There has been no recent activity on this issue. label Jul 29, 2022
@ghost
Copy link

ghost commented Jul 29, 2022

Hi, we're sending this friendly reminder because we haven't heard back from you in a while. We need more information about this issue to help address it. Please be sure to give us your input within the next 7 days. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

@ghost ghost closed this as completed Aug 13, 2022
This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Data Factory Mgmt This issue is related to a management-plane library. needs-author-feedback Workflow: More information is needed from author to address the issue. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team no-recent-activity There has been no recent activity on this issue. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
None yet
Development

No branches or pull requests

6 participants