Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robustness to driver pod taking time to create #2315

1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.replicas | int | `1` | Number of replicas of controller. |
| controller.workers | int | `10` | Reconcile concurrency, higher values might increase memory usage. |
| controller.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. |
| controller.driverPodCreationGracePeriod | string | `"10s"` | Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created. |
| controller.maxTrackedExecutorPerApp | int | `1000` | Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication. |
| controller.uiService.enable | bool | `true` | Specifies whether to create service for Spark web UI. |
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ spec:
{{- if .Values.controller.workqueueRateLimiter.maxDelay.enable }}
- --workqueue-ratelimiter-max-delay={{ .Values.controller.workqueueRateLimiter.maxDelay.duration }}
{{- end }}
{{- if .Values.controller.driverPodCreationGracePeriod }}
- --driver-pod-creation-grace-period={{ .Values.controller.driverPodCreationGracePeriod }}
{{- end }}
{{- if .Values.controller.maxTrackedExecutorPerApp }}
- --max-tracked-executor-per-app={{ .Values.controller.maxTrackedExecutorPerApp }}
{{- end }}
Expand Down
10 changes: 10 additions & 0 deletions charts/spark-operator-chart/tests/controller/deployment_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,16 @@ tests:
- notContains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-max-delay=1h

- it: Should contain `driver-pod-creation-grace-period` arg if `controller.driverPodCreationGracePeriod` is set
set:
controller:
driverPodCreationGracePeriod: 30s
asserts:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --driver-pod-creation-grace-period=30s

- it: Should contain `--max-tracked-executor-per-app` arg if `controller.maxTrackedExecutorPerApp` is set
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved
set:
controller:
Expand Down
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ controller:
# -- Configure the verbosity of logging, can be one of `debug`, `info`, `error`.
logLevel: info

# -- Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.
driverPodCreationGracePeriod: 10s

# -- Specifies the maximum number of Executor pods that can be tracked by the controller per SparkApplication.
maxTrackedExecutorPerApp: 1000

Expand Down
21 changes: 13 additions & 8 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ var (
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration

driverPodCreationGracePeriod time.Duration

// Metrics
enableMetrics bool
metricsBindAddress string
Expand Down Expand Up @@ -163,6 +165,8 @@ func NewStartCommand() *cobra.Command {
command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.")

command.Flags().DurationVar(&driverPodCreationGracePeriod, "driver-pod-creation-grace-period", 10*time.Second, "Grace period after a successful spark-submit when driver pod not found errors will be retried. Useful if the driver pod can take some time to be created.")

command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.")
command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+
"Use the port :8080. If not set, it will be 0 in order to disable the metrics server")
Expand Down Expand Up @@ -394,14 +398,15 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
sparkExecutorMetrics.Register()
}
options := sparkapplication.Options{
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
Namespaces: namespaces,
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
DriverPodCreationGracePeriod: driverPodCreationGracePeriod,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
MaxTrackedExecutorPerApp: maxTrackedExecutorPerApp,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
Expand Down
13 changes: 9 additions & 4 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Options struct {
IngressURLFormat string
DefaultBatchScheduler string

DriverPodCreationGracePeriod time.Duration

KubeSchedulerNames []string

SparkApplicationMetrics *metrics.SparkApplicationMetrics
Expand Down Expand Up @@ -773,10 +775,13 @@ func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkAppl
}

if driverPod == nil {
app.Status.AppState.State = v1beta2.ApplicationStateFailing
app.Status.AppState.ErrorMessage = "driver pod not found"
app.Status.TerminationTime = metav1.Now()
return nil
if app.Status.AppState.State != v1beta2.ApplicationStateSubmitted || metav1.Now().Sub(app.Status.LastSubmissionAttemptTime.Time) > r.options.DriverPodCreationGracePeriod {
app.Status.AppState.State = v1beta2.ApplicationStateFailing
app.Status.AppState.ErrorMessage = "driver pod not found"
app.Status.TerminationTime = metav1.Now()
return nil
}
return fmt.Errorf("driver pod not found, while inside the grace period. Grace period of %v expires at %v", r.options.DriverPodCreationGracePeriod, app.Status.LastSubmissionAttemptTime.Add(r.options.DriverPodCreationGracePeriod))
}

app.Status.SparkApplicationID = util.GetSparkApplicationID(driverPod)
Expand Down
143 changes: 143 additions & 0 deletions internal/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,149 @@ var _ = Describe("SparkApplication Controller", func() {
})
})

Context("When reconciling a submitted SparkApplication with no driver pod", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
Spec: v1beta2.SparkApplicationSpec{
MainApplicationFile: util.StringPtr("local:///dummy.jar"),
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())

app.Status.AppState.State = v1beta2.ApplicationStateSubmitted
app.Status.DriverInfo.PodName = "non-existent-driver"
app.Status.LastSubmissionAttemptTime = metav1.NewTime(time.Now())
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
}
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())
})

It("Should requeue submitted SparkApplication when driver pod not found inside the grace period", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 10 * time.Second},
)
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).To(MatchError(ContainSubstring("driver pod not found, while inside the grace period. Grace period of")))
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateSubmitted))
})

It("Should fail a SparkApplication when driver pod not found outside the grace period", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailing))
})
})

Context("When reconciling a SparkApplication with driver pod", func() {
ctx := context.Background()
appName := "test"
appNamespace := "default"
key := types.NamespacedName{
Name: appName,
Namespace: appNamespace,
}

BeforeEach(func() {
By("Creating a test SparkApplication")
app := &v1beta2.SparkApplication{}
if err := k8sClient.Get(ctx, key, app); err != nil && errors.IsNotFound(err) {
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: appNamespace,
},
Spec: v1beta2.SparkApplicationSpec{
MainApplicationFile: util.StringPtr("local:///dummy.jar"),
},
}
v1beta2.SetSparkApplicationDefaults(app)
Expect(k8sClient.Create(ctx, app)).To(Succeed())

app.Status.AppState.State = v1beta2.ApplicationStateSubmitted
driverPod := createDriverPod(appName, appNamespace)
Expect(k8sClient.Create(ctx, driverPod)).To(Succeed())
app.Status.DriverInfo.PodName = driverPod.Name
Expect(k8sClient.Status().Update(ctx, app)).To(Succeed())
}
})

AfterEach(func() {
app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting the created test SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())

By("Deleting the driver pod")
driverPod := &corev1.Pod{}
Expect(k8sClient.Get(ctx, getDriverNamespacedName(appName, appNamespace), driverPod)).To(Succeed())
Expect(k8sClient.Delete(ctx, driverPod)).To(Succeed())
})

It("When reconciling a submitted SparkApplication when driver pod exists", func() {
By("Reconciling the created test SparkApplication")
reconciler := sparkapplication.NewReconciler(
nil,
k8sClient.Scheme(),
k8sClient,
nil,
nil,
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

app := &v1beta2.SparkApplication{}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateSubmitted))
})
})

Context("When reconciling a completed SparkApplication", func() {
ctx := context.Background()
appName := "test"
Expand Down
Loading