-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathrunnable_job.go
125 lines (111 loc) · 3 KB
/
runnable_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package jobs
import (
"context"
"fmt"
"log/slog"
"time"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)
var defaultResyncDuration = 30 * time.Minute
type runnableJob struct {
clientset kubernetes.Interface
logsReader LogsReader
job *batchv1.Job // job to be run
}
// NewRunnableJob constructs a new Runnable task defined as Kubernetes
func NewRunnableJob(
clientset kubernetes.Interface,
job *batchv1.Job,
) Runnable {
return &runnableJob{
clientset: clientset,
logsReader: NewLogsReader(clientset),
job: job,
}
}
// Run runs synchronously the task as Kubernetes job.
// This method blocks and waits for the job completion or failure.
func (r *runnableJob) Run(ctx context.Context) error {
var err error
r.job, err = r.clientset.BatchV1().Jobs(r.job.Namespace).Create(ctx, r.job, metav1.CreateOptions{})
if err != nil {
return err
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(
r.clientset,
defaultResyncDuration,
informers.WithNamespace(r.job.Namespace),
)
jobsInformer := informerFactory.Batch().V1().Jobs()
complete := make(chan error)
_, err = jobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, newObj interface{}) {
newJob, ok := newObj.(*batchv1.Job)
if !ok {
return
}
if r.job.UID != newJob.UID {
return
}
if len(newJob.Status.Conditions) == 0 {
return
}
switch condition := newJob.Status.Conditions[0]; condition.Type {
case batchv1.JobComplete:
complete <- nil
case batchv1.JobFailed:
complete <- fmt.Errorf("job failed: %s: %s", condition.Reason, condition.Message)
}
},
})
if err != nil {
return err
}
eventsInformer := informerFactory.Core().V1().Events()
_, err = eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
event := obj.(*corev1.Event)
if event.InvolvedObject.UID != r.job.UID {
return
}
if event.Type == corev1.EventTypeWarning {
complete <- fmt.Errorf("warning event received: %s (%s)", event.Message, event.Reason)
return
}
},
})
if err != nil {
return err
}
informerFactory.Start(wait.NeverStop)
informerFactory.WaitForCacheSync(wait.NeverStop)
err = <-complete
if err != nil {
r.logTerminatedContainersErrors(ctx)
}
return err
}
func (r *runnableJob) logTerminatedContainersErrors(ctx context.Context) {
statuses, err := r.logsReader.GetTerminatedContainersStatusesByJob(ctx, r.job)
if err != nil {
slog.Error(fmt.Sprintf("Error while getting terminated containers statuses for job %q", r.job.Namespace+"/"+r.job.Name))
}
for _, status := range statuses {
if status.ExitCode == 0 {
continue
}
}
}
func GetActiveDeadlineSeconds(d time.Duration) *int64 {
if d > 0 {
return ptr.To[int64](int64(d.Seconds()))
}
return nil
}