forked from blaines/tasque-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaws_eks.go
127 lines (112 loc) · 3.34 KB
/
aws_eks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"github.com/davecgh/go-spew/spew"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/blaines/tasque-go/result"
jobsv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// AWSEKS handles the EKS integration
type AWSEKS struct {
DockerImage string
KubeConfigPath string
}
// Execute executes the Worker on EKS
func (r AWSEKS) Execute(handler MessageHandler) {
// This inits the handler
handler.initialize()
// Gets the message
handler.receive()
fmt.Printf("Message received: %s \n", *(handler.body()))
var clientset *kubernetes.Clientset
if r.KubeConfigPath != "" {
// We are on CLI mode
config, err := clientcmd.BuildConfigFromFlags("", r.KubeConfigPath)
if err != nil {
panic(err)
}
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
} else {
// We are in cluster
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
clientset, err = kubernetes.NewForConfig(config)
}
batchClient := clientset.BatchV1().Jobs("default")
job := jobsv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "vw-",
},
Spec: jobsv1.JobSpec{
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "vwpod-",
Labels: map[string]string{
"app": "volumetric-worker",
},
},
Spec: v1.PodSpec{
RestartPolicy: "Never",
NodeSelector: map[string]string{
"beta.kubernetes.io/instance-type": "i3.xlarge",
},
Containers: []v1.Container{
{
Name: "volumetric-worker",
Image: r.DockerImage,
Env: []v1.EnvVar{
{Name: "AWS_REGION", Value: "us-west-2"},
{Name: "ENVIRONMENT", Value: "development"},
{Name: "SKYAPI_AUDIENCE", Value: "https://api.skycatch.com"},
{Name: "SKYAPI_AUTH_URL", Value: "https://skycatch.auth0.com/oauth/token"},
{Name: "SKYAPI_CLIENT_ID", Value: "e3BhQzZgKaGlt2TtmZqq06DJH6OrlxvU"},
{Name: "SKYAPI_CLIENT_SECRET",
ValueFrom: &v1.EnvVarSource{
SecretKeyRef: &v1.SecretKeySelector{
LocalObjectReference: v1.LocalObjectReference{Name: "skyapi-client-secret"},
Key: "clientsecret",
},
},
},
{Name: "SKYAPI_URL", Value: "https://api.skycatch.com/v1/"},
{Name: "TASK_PAYLOAD", Value: *(handler.body())},
},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("10Gi"),
},
},
},
},
},
},
},
}
executedJob, err := batchClient.Create(&job)
if err != nil {
handler.failure(result.Result{Error: err.Error(), Exit: fmt.Sprintf("Job %s failed", executedJob.Name)})
} else {
// TODO David: We need to monitor the job till it finishes. It was launched into the cluster but can be long running
spew.Dump(executedJob)
handler.success()
}
}
// Result gets the result of the execution
func (r AWSEKS) Result() result.Result {
// TODO David: This needs to be worked out. Store the result instead of in-line return.
return result.Result{}
}