Skip to content

Commit

Permalink
Corp 5480 update dag template (#5)
Browse files Browse the repository at this point in the history
* feat(airflow): update template to orhttpoperator

BREAKING CHANGE: dag structure changes
jira: CORP-5480

---------

Co-authored-by: Mandeep Bahal <[email protected]>
  • Loading branch information
msbahal and Mandeep Bahal authored Sep 13, 2024
1 parent cf5403c commit e97479a
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 81 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/Linux.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Linux(ubuntu)

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./test/...
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

[![Actions Status](https://github.com/DeloitteOptimalReality/airflow-wrapper-go/actions/workflows/Linux.yml/badge.svg?branch=main)](https://github.com/DeloitteOptimalReality/airflow-wrapper-go/actions)

# Introduction
This is a wrapper library of the airflow-client-go library. The library is written in Go and allows your to create clients with interface methods to interact with the Apache Airflow REST API.

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -105,11 +107,14 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -352,6 +357,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
93 changes: 64 additions & 29 deletions pkg/codegen/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ type Args struct {
}

// Function to convert an interface to a JSON string
func toJSONString(v interface{}) string {
func ToJSONString(v interface{}) (string, error) {
jsonData, err := json.Marshal(v)
if err != nil {
return ""
return "", err
}
return string(jsonData)
return string(jsonData), nil
}

type StartDate struct {
Expand All @@ -52,25 +52,55 @@ type Connection struct {
}

type HttpOperator struct {
TaskID string
Name string
Endpoint string
Data interface{}
Downstream []string
TaskID string
ConnectionId string
Name string
Endpoint string
Data interface{}
Downstream []string
}

type GenData struct {
DagDef Dag
ConnectionDef Connection
Tasks []HttpOperator
DagDef Dag
Connections []Connection
Tasks []HttpOperator
}

func checkDeps(deps []string) bool {
func CheckDeps(deps []string) bool {
return len(deps) > 0
}

func writeValueToBuffer(v interface{}, buf *bytes.Buffer) bool {
switch v := v.(type) {
case bool:
if v {
buf.WriteString("True")
} else {
buf.WriteString("False")
}
case string:
buf.WriteString("\"")
buf.WriteString(v)
buf.WriteString("\"")
default:
buf.WriteString(fmt.Sprintf("%v", v))
}
return true
}

func ToMap(v interface{}) (map[string]interface{}, error) {
var res map[string]interface{}
fmt.Println("MARSHAlling")
a, err := json.Marshal(v)
if err != nil {
return res, err
}
json.Unmarshal(a, &res)
return res, err
}

// Function to convert a Go map to a Python dictionary string
func mapToPythonDict(m map[string]interface{}) (string, error) {
func MapToPythonDict(m map[string]interface{}) (string, error) {
var buf bytes.Buffer
buf.WriteString("{")
first := true
Expand All @@ -79,28 +109,24 @@ func mapToPythonDict(m map[string]interface{}) (string, error) {
buf.WriteString(", ")
}
first = false
buf.WriteString("'")
buf.WriteString("\"")
buf.WriteString(k)
buf.WriteString("': ")
buf.WriteString("\": ")
switch v := v.(type) {
case bool:
if v {
buf.WriteString("True")
} else {
buf.WriteString("False")
case []interface{}:
buf.WriteString("[")
for _, k_val := range v {
writeValueToBuffer(k_val, &buf)
}
case string:
buf.WriteString("'")
buf.WriteString(v)
buf.WriteString("'")
buf.WriteString("]")
case map[string]interface{}:
nested, err := mapToPythonDict(v)
nested, err := MapToPythonDict(v)
if err != nil {
return "", err
}
buf.WriteString(nested)
default:
buf.WriteString(fmt.Sprintf("%v", v))
writeValueToBuffer(v, &buf)
}
}
buf.WriteString("}")
Expand All @@ -115,7 +141,7 @@ func BoolTitle(b bool) string {
}

// Function to transform the task ID
func transformTaskID(taskID string) string {
func TransformTaskID(taskID string) string {
return "wt_" + strings.ReplaceAll(taskID, "-", "_")
}

Expand All @@ -128,10 +154,19 @@ func CreateDagGen(g GenData, directory string) (string, error) {
// Prepare a map of original task IDs to transformed task IDs
taskIDMap := make(map[string]string)
for _, task := range data.Tasks {
taskIDMap[task.TaskID] = transformTaskID(task.TaskID)
taskIDMap[task.TaskID] = TransformTaskID(task.TaskID)
}

t := template.New("dag").Funcs(template.FuncMap{"toJSONString": toJSONString, "BoolTitle": BoolTitle, "mapToPythonDict": mapToPythonDict, "checkDeps": checkDeps, "transformTaskID": transformTaskID, "originalTaskIDMap": func() map[string]string { return taskIDMap }})
t := template.New("dag").Funcs(
template.FuncMap{
"toJSONString": ToJSONString,
"toMap": ToMap,
"BoolTitle": BoolTitle,
"mapToPythonDict": MapToPythonDict,
"checkDeps": CheckDeps,
"transformTaskID": TransformTaskID,
"originalTaskIDMap": func() map[string]string { return taskIDMap },
})
tp, err := t.Parse(tmpl)

if err != nil {
Expand Down
27 changes: 0 additions & 27 deletions pkg/codegen/codegen_test.go

This file was deleted.

64 changes: 39 additions & 25 deletions pkg/codegen/dag.tpl
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# dag.tpl
############################################# WARNING #############################################
# This file is automatically generated. Please do not edit it manually as it could break the DAG. #
###################################################################################################

import json
import os
from datetime import datetime, timedelta
from airflow.models.dag import DAG

from airflow.models import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.db import provide_session

# custom OR operator
from custom_operators.or_http_operator import ORHttpOperator

# required so HTTP operator does not crash Python
os.environ["no_proxy"] = "*"
Expand All @@ -26,41 +30,47 @@ dag = DAG(
'{{.DagDef.ID}}',
default_args=default_args,
description='{{.DagDef.Description}}',
schedule_interval=timedelta(days=1),
start_date=datetime({{.DagDef.StartDate.Year}}, {{.DagDef.StartDate.Month}}, {{.DagDef.StartDate.Day}}),
schedule_interval=None,
start_date=None, # this is None by default but should be FE driven. Something like: datetime({{.DagDef.StartDate.Year}}, {{.DagDef.StartDate.Month}}, {{.DagDef.StartDate.Day}}),
catchup=False,
tags=[{{range $index, $tag := .DagDef.Tags}}{{if $index}}, {{end}}'{{$tag}}'{{end}}],
)

# ##################### HELPER FUNCTIONS ##########################

@provide_session
def create_http_connection(session=None):
def create_http_connection(conn, session=None):
connection = session.query(Connection).filter(
Connection.conn_id == '{{.ConnectionDef.ConnectionID}}').first()
Connection.conn_id == conn['ConnectionID']).first()

if connection:
connection.conn_id = '{{.ConnectionDef.ConnectionID}}'
connection.conn_id = conn['ConnectionID']
connection.conn_type = 'http'
connection.host = '{{.ConnectionDef.Host}}'
connection.port = {{.ConnectionDef.Port}}
connection.host = conn['Host']
connection.port = conn['Port']
session.commit()
else:
# Create a new connection if it doesn't exist
connection = Connection(
conn_id='{{.ConnectionDef.ConnectionID}}',
conn_id=conn['ConnectionID'],
conn_type='http',
host='{{.ConnectionDef.Host}}',
port={{.ConnectionDef.Port}}
host=conn['Host'],
port=conn['Port']
)
session.add(connection)
session.commit()
return f'Connection {conn["ConnectionID"]} successful!'

return '{{.ConnectionDef.ConnectionID}}'

{{ transformTaskID .ConnectionDef.ConnectionID }} = PythonOperator(
task_id='{{.ConnectionDef.ConnectionID}}',
# ##################### ESTABLISH DB/REDIS CONNECTIONS #######################
{{range $conn := .Connections}}
{{ transformTaskID $conn.ConnectionID }} = PythonOperator(
task_id='{{$conn.ConnectionID}}',
python_callable=create_http_connection,
op_args=[{{toMap $conn | mapToPythonDict}}],
dag=dag,
)
){{ end }}

# ##################### REFERENCE MAPPING ##########################

# Dictionary to map transformed task IDs to original task IDs
task_id_map = {
Expand All @@ -69,24 +79,28 @@ task_id_map = {
{{- end }}
}

# ##################### PROCESSING OPERATORS ##########################
{{range .Tasks}}
{{ transformTaskID .TaskID }}_data = {{mapToPythonDict .Data}}

{{ transformTaskID .TaskID }} = SimpleHttpOperator(
{{ transformTaskID .TaskID }} = ORHttpOperator(
task_id='{{.TaskID}}',
method='POST',
http_conn_id='{{$.ConnectionDef.ConnectionID}}',
http_conn_id='{{.ConnectionId}}',
endpoint='{{.Endpoint}}',
data=json.dumps({{ transformTaskID .TaskID }}_data),
headers={"Content-Type": "application/json"},
log_response=True,
dag=dag,
)
{{ end }}
use_cache=True,
# dependencies=[] # TODO: This needs to be implemented
){{ end }}

{{- if checkDeps .ConnectionDef.Downstream }}
{{- range $dep := .ConnectionDef.Downstream}}
{{ transformTaskID $.ConnectionDef.ConnectionID }}.set_downstream({{ transformTaskID $dep }})
# ##################### DIRECTED ACYLIC GRAPH DEFINITION ##########################
{{range $conn := .Connections}}
{{- if checkDeps $conn.Downstream }}
{{- range $dep := $conn.Downstream}}
{{ transformTaskID $conn.ConnectionID }}.set_downstream({{ transformTaskID $dep }})
{{- end -}}
{{- end -}}
{{- end -}}

Expand Down
Loading

0 comments on commit e97479a

Please sign in to comment.