Skip to content

Commit

Permalink
Update docs and log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
adriandieter committed Dec 11, 2024
1 parent c94271a commit e133384
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type StreamReconciler struct {
// - Initialize finalizer and ready condition if not present
// - Delete stream if it is marked for deletion.
// - Create or Update the stream
//
// A call to reconcile may perform only one action, expecting the reconciliation to be triggered again by an update.
// For example: Setting the finalizer triggers a second reconciliation. Reconcile returns after setting the finalizer,
// to prevent parallel reconciliations performing the same steps.
func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := klog.FromContext(ctx)

Expand Down Expand Up @@ -116,27 +120,28 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st
return fmt.Errorf("update ready condition: %w", err)
}

if stream.Spec.PreventDelete {
log.Info("PreventDelete set: skipping stream deletion.", "streamName", stream.Spec.Name)
} else if r.ReadOnly() {
log.Info("Read-only mode: skipping stream deletion.", "streamName", stream.Spec.Name)
} else {
if !stream.Spec.PreventDelete && !r.ReadOnly() {
log.Info("Deleting stream.")
err := r.WithJetStreamClient(mapToConnectionOptions(stream.Spec), func(js jetstream.JetStream) error {
err := r.WithJetStreamClient(getConnOpts(stream.Spec), func(js jetstream.JetStream) error {
return js.DeleteStream(ctx, stream.Spec.Name)
})
if errors.Is(err, jetstream.ErrStreamNotFound) {
log.Info("Managed stream was already deleted.")
} else if err != nil {
return fmt.Errorf("delete stream during finalization: %w", err)
}
} else {
log.Info("Skipping stream deletion.",
"streamName", stream.Spec.Name,
"preventDelete", stream.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
}

log.Info("Stream deleted. Removing stream finalizer.")
log.Info("Removing stream finalizer.")
if ok := controllerutil.RemoveFinalizer(stream, streamFinalizer); !ok {
return errors.New("failed to remove stream finalizer")
}

if err := r.Update(ctx, stream); err != nil {
return fmt.Errorf("remove finalizer: %w", err)
}
Expand All @@ -147,11 +152,12 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st
func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error {

// Create or Update the stream based on the spec
if stream.Spec.PreventUpdate {
log.Info("PreventUpdate is set: skipping stream creation/update.", "streamName", stream.Spec.Name)
return nil
} else if r.ReadOnly() {
log.Info("Read-only mode: skipping stream creation/update.", "streamName", stream.Spec.Name)
if stream.Spec.PreventDelete || r.ReadOnly() {
log.Info("Skipping stream creation or update.",
"streamName", stream.Spec.Name,
"preventDelete", stream.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
}

Expand All @@ -163,7 +169,7 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger,

// CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted.
// TODO(future-feature): Do we need to check if config differs?
err = r.WithJetStreamClient(mapToConnectionOptions(stream.Spec), func(js jetstream.JetStream) error {
err = r.WithJetStreamClient(getConnOpts(stream.Spec), func(js jetstream.JetStream) error {
log.Info("create or update stream", "streamName", targetConfig.Name)
_, err = js.CreateOrUpdateStream(ctx, targetConfig)
return err
Expand Down Expand Up @@ -193,7 +199,8 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger,
return nil
}

func mapToConnectionOptions(spec api.StreamSpec) *connectionOptions {
// getConnOpts extracts nats connection relevant fields from the given stream spec as connectionOptions.
func getConnOpts(spec api.StreamSpec) *connectionOptions {
return &connectionOptions{
Account: spec.Account, // TODO(review): Where does Spec.Account have to be considered?
Creds: spec.Creds,
Expand Down

0 comments on commit e133384

Please sign in to comment.