Skip to content

Commit

Permalink
Skip lines we've already seen
Browse files Browse the repository at this point in the history
  • Loading branch information
shanemcd committed Oct 26, 2022
1 parent 9e5b6b0 commit 7bfc749
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (kw *kubeUnit) runWorkUsingLogger() {
}

go func() {
var sinceTimeString string
var sinceTime time.Time
for {
if errStdin != nil {
break
Expand All @@ -422,19 +422,22 @@ func (kw *kubeUnit) runWorkUsingLogger() {
break
}

if sinceTimeString != "" {
if !sinceTime.IsZero() {
if kw.pod.Status.Phase != corev1.PodRunning {
logger.Info("Pod %s/%s is not in Running state: %s", ked.KubeNamespace, ked.PodName, kw.pod.Status.Phase)

break
}

time.Sleep(5 * time.Second)
}

logReq := kw.clientset.CoreV1().Pods(kw.pod.ObjectMeta.Namespace).GetLogs(
kw.pod.Name, &corev1.PodLogOptions{
Container: "worker",
Follow: true,
Timestamps: true,
SinceTime: parseTime(sinceTimeString),
SinceTime: &metav1.Time{Time: sinceTime},
},
)

Expand All @@ -450,7 +453,6 @@ func (kw *kubeUnit) runWorkUsingLogger() {
streamReader := bufio.NewReader(logStream)
for errStdin == nil { // check between every line read to see if we need to stop reading
line, err := streamReader.ReadString('\n')
logger.Info(line)
if err != nil {
if err != io.EOF {
errStdout = err
Expand All @@ -459,9 +461,13 @@ func (kw *kubeUnit) runWorkUsingLogger() {
break
}
split := strings.SplitN(line, " ", 2)
sinceTimeString = split[0]
timeStamp := parseTime(split[0])
if !timeStamp.After(sinceTime) {
continue
}
msg := split[1]
stdout.Write([]byte(msg))
sinceTime = *timeStamp
}

logStream.Close()
Expand Down Expand Up @@ -490,19 +496,15 @@ func (kw *kubeUnit) runWorkUsingLogger() {
kw.UpdateBasicStatus(WorkStateSucceeded, "Finished", stdout.Size())
}

func parseTime(s string) *metav1.Time {
func parseTime(s string) *time.Time {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
mt := metav1.NewTime(t)

return &mt
return &t
}

t, err = time.Parse(time.RFC3339Nano, s)
if err == nil {
mt := metav1.NewTime(t)

return &mt
return &t
}

return nil
Expand Down

0 comments on commit 7bfc749

Please sign in to comment.