Skip to content

Commit

Permalink
feat/updated dag template for custom python operator
Browse files Browse the repository at this point in the history
feat/updated template and structs

feat/updated codegen

feat/update dag template python operator

feat/removed httprequest key

feat/added unit test for Python Task

Co-authored-by: Gabriel Nge <[email protected]>
  • Loading branch information
gabrielnge and gabrielngedeloitte authored Oct 17, 2024
1 parent f01520b commit 2b1b777
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
21 changes: 17 additions & 4 deletions pkg/codegen/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Connection struct {
Downstream []string
}

type HttpOperator struct {
type HttpTask struct {
TaskID string
ConnectionId string
Name string
Expand All @@ -61,10 +61,20 @@ type HttpOperator struct {
Upstream []string
}

type PythonTask struct {
TaskID string
Name string
Data interface{}
Downstream []string
Upstream []string
}

type GenData struct {
DagDef Dag
Connections []Connection
Tasks []HttpOperator
DagDef Dag
Connections []Connection
PythonImports []string
Tasks []HttpTask
PythonTask []PythonTask
}

func CheckDeps(deps []string) bool {
Expand Down Expand Up @@ -159,6 +169,9 @@ func CreateDagGen(g GenData, directory string) (string, error) {
for _, task := range data.Tasks {
taskIDMap[task.TaskID] = TransformTaskID(task.TaskID)
}
for _, task := range data.PythonTask {
taskIDMap[task.TaskID] = TransformTaskID(task.TaskID)
}

t := template.New("dag").Funcs(
template.FuncMap{
Expand Down
18 changes: 18 additions & 0 deletions pkg/codegen/dag.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def create_http_connection(custom_conn_config, session=None):
session.commit()
return f'Connection {custom_conn_config["ConnectionID"]} successful!'

# ##################### IMPORT SPECIFIC PYTHON FUNCTIONS ##########################
{{range .PythonImports}}
from custom_functions.{{ . }} import {{ . }}
{{ end }}

# ##################### ESTABLISH DB/REDIS CONNECTIONS #######################
{{range $conn := .Connections}}
{{ transformTaskID $conn.ConnectionID }} = PythonOperator(
Expand Down Expand Up @@ -98,6 +103,19 @@ task_id_map = {
{{- end}}
){{ end }}

# ##################### CUSTOM PYTHON OPERATOR ##########################
{{range .PythonTask}}
{{ transformTaskID .TaskID }}_data = {{mapToPythonDict .Data}}
{{ transformTaskID .TaskID }} = PythonOperator(
task_id='{{ transformTaskID .TaskID }}',
python_callable={{ .Name }},
op_args=[
{{ transformTaskID .TaskID }}_data
],
dag=dag,
)
{{ end }}

# ##################### DIRECTED ACYLIC GRAPH DEFINITION ##########################
{{range $conn := .Connections}}
{{- if checkDeps $conn.Downstream }}
Expand Down
21 changes: 20 additions & 1 deletion test/codegen/codegen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestCreateDagObject(t *testing.T) {
}

func TestToMapValid(t *testing.T) {
testStruct := codegen.HttpOperator{
testStruct := codegen.HttpTask{
TaskID: "test_task_id",
ConnectionId: "test_connection_id",
Name: "test_name",
Expand All @@ -52,6 +52,25 @@ func TestToMapValid(t *testing.T) {

// TODO: Test the invalid case for ToMap function. I'm not sure how to hit the edge case yet.

func TestToMapValidWithPythonTask(t *testing.T) {
testStruct := codegen.PythonTask{
TaskID: "test_python_task_id",
Name: "test_name",
Data: map[string]interface{}{"key": "value"},
Downstream: []string{"downstream_task_1", "downstream_task_2"},
Upstream: []string{"upstream_task_1"},
}
res, err := codegen.ToMap(testStruct)
if err != nil {
t.Error(err)
}
assert.Equal(t, res["Name"], "test_name")
assert.Equal(t, res["TaskID"], "test_python_task_id")
assert.Equal(t, res["Data"], map[string]interface{}{"key": "value"})
assert.ElementsMatch(t, res["Downstream"], []interface{}{"downstream_task_1", "downstream_task_2"})
assert.ElementsMatch(t, res["Upstream"], []interface{}{"upstream_task_1"})
}

func TestMapToPythonDict(t *testing.T) {
expectedPythonDict := `
{
Expand Down

0 comments on commit 2b1b777

Please sign in to comment.