Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate StrimziPodSets before rolling during CA key replacement #10876

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Ability to move data between JBOD disks using Cruise Control.
* Only roll pods once for ClientsCa cert renewal.
This change also means the restart reason ClientCaCertKeyReplaced is removed and either CaCertRenewed or CaCertHasOldGeneration will be used.
* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again.

### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotationsToBeUpdated Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotationsToBeUpdated) {
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotationsToBeUpdated);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -24,6 +23,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -54,11 +54,11 @@

import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation
Expand Down Expand Up @@ -234,8 +234,6 @@ Future<Void> reconcileCas(Clock clock) {
Secret clusterCaKeySecret = null;
Secret clientsCaCertSecret = null;
Secret clientsCaKeySecret = null;
List<HasMetadata> clusterCaSecrets = new ArrayList<>();
List<HasMetadata> clientsCaSecrets = new ArrayList<>();

for (Secret secret : clusterSecrets) {
String secretName = secret.getMetadata().getName();
Expand All @@ -247,13 +245,6 @@ Future<Void> reconcileCas(Clock clock) {
clientsCaCertSecret = secret;
} else if (secretName.equals(clientsCaKeyName)) {
clientsCaKeySecret = secret;
} else if (secretName.equals(KafkaResources.kafkaSecretName(reconciliation.name()))) {
clusterCaSecrets.add(secret);
clientsCaSecrets.add(secret);
} else if (secretName.equals(KafkaResources.clusterOperatorCertsSecretName(reconciliation.name()))) {
// The CO certificate is excluded as it is renewed in a separate cycle
} else {
clusterCaSecrets.add(secret);
}
}

Expand All @@ -266,7 +257,6 @@ Future<Void> reconcileCas(Clock clock) {
reconciliation.namespace(), caLabels,
clusterCaCertLabels, clusterCaCertAnnotations,
clusterCaConfig != null && !clusterCaConfig.isGenerateSecretOwnerReference() ? null : ownerRef,
clusterCaSecrets,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

clientsCa = new ClientsCa(reconciliation, certManager,
Expand All @@ -280,7 +270,6 @@ Future<Void> reconcileCas(Clock clock) {
clientsCa.createRenewOrReplace(reconciliation.namespace(),
caLabels, Map.of(), Map.of(),
clientsCaConfig != null && !clientsCaConfig.isGenerateSecretOwnerReference() ? null : ownerRef,
clientsCaSecrets,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

return null;
Expand Down Expand Up @@ -368,7 +357,7 @@ Future<Void> maybeRollingUpdateForNewClusterCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> rollZookeeper(replicas, restartReason, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(i -> patchClusterCaKeyGenerationAndReturnNodes())
.compose(nodes -> rollKafkaBrokers(nodes, RestartReasons.of(restartReason), coTlsPemIdentity))
.compose(i -> rollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), restartReason))
.compose(i -> rollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), restartReason))
Expand Down Expand Up @@ -494,30 +483,46 @@ Future<Void> maybeRollingUpdateForNewClusterCaKey() {
.maybeRollingUpdate(reconciliation, replicas, zkSelectorLabels, rollZkPodAndLogReason, coTlsPemIdentity);
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/**
* Patches the Kafka StrimziPodSets to update the Cluster CA key generation annotation and returns the nodes.
*
* @return Future containing the set of Kafka nodes which completes when the StrimziPodSets have been patched.
*/
/* test */ Future<Set<NodeRef>> patchClusterCaKeyGenerationAndReturnNodes() {
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()))
)).toList();
return strimziPodSetOperator.batchReconcile(reconciliation, reconciliation.namespace(), updatedPodSets, selectorLabels)
.map(i -> updatedPodSets.stream().flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream())
.collect(Collectors.toSet()));
} else {
return Future.succeededFuture(Set.of());
}

return Future.succeededFuture(nodes);
});
}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
return createKafkaRoller(nodes, coTlsPemIdentity).rollingRestart(pod -> {
LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons());
return podRollReasons;
int clusterCaKeyGeneration = clusterCa.caKeyGeneration();
int podClusterCaKeyGeneration = Annotations.intAnnotation(pod, Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, clusterCaKeyGeneration);
if (clusterCaKeyGeneration == podClusterCaKeyGeneration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity ... in this case we are just ignoring the podRollReasons. Did you check that it's empty or there is some code which is wrong somewhere which pass a non empty podRollReasons (so the pods should be rolled) but we decide to not roll them for, anyway, a good reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppatierno I'm not sure I fully follow the question, but will try my best to add more context.

If any of the pods don't have the correct cluster CA key generation annotation we call this method to rollKafka. However, we pass through to KafkaRoller all the pods, not just the pods that need to be rolled. So this code is basically skipping over the pods that are already up to date.

The alternative would be updating this class to build the list of pods that need rolling when we are checking the annotations in verifyClusterCaFullyTrustedAndUsed, however that would require a bigger refactor to this code so I chose this option instead. Also I think in general in Strimzi it's the convention to pass all nodes to KafkaRoller and then use this callback to specify which need to actually be rolled

Copy link
Contributor Author

@katheris katheris Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change this to have:

if (podRollReasons.getReasons().size() == 1 && podRollReasons.contains(RestartReason.CLUSTER_CA_CERT_KEY_REPLACED) && clusterCaKeyGeneration == podClusterCaKeyGeneration) {
  return RestartReasons.empty();
} else {
  return podRollReasons
}

but since the first two parts will always be true at the moment it seems maybe overkill

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah my question was mostly around ignoring podRollReasons in the first condition. If you think adding the additional check is overkilling, I would add a comment at least.

LOGGER.debugCr(reconciliation, "Not rolling Pod {} since the Cluster CA cert key generation is correct.", pod.getMetadata().getName());
return RestartReasons.empty();
} else {
LOGGER.debugCr(reconciliation, "Rolling Pod {} due to {}", pod.getMetadata().getName(), podRollReasons.getReasons());
return podRollReasons;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.time.Instant;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
Expand All @@ -41,7 +40,7 @@ public void testRemoveExpiredCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));

// force key replacement so certificate renewal ...
Expand All @@ -56,7 +55,7 @@ public void testRemoveExpiredCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), caKeySecretWithReplaceAnno);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(4));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(true));

Expand All @@ -66,7 +65,7 @@ public void testRemoveExpiredCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), clusterCa.caKeySecret());
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(false));
}
Expand All @@ -79,7 +78,7 @@ public void testIsExpiringCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);

// check certificate expiration out of the renewal period, certificate is not expiring
instantExpected = "2023-02-15T09:00:00Z";
Expand All @@ -102,7 +101,7 @@ public void testRemoveOldCertificate() {

ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, null, null);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(3));

// force key replacement so certificate renewal ...
Expand All @@ -117,7 +116,7 @@ public void testRemoveOldCertificate() {

clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(clock), new PasswordGenerator(10, "a", "a"), cluster, clusterCa.caCertSecret(), caKeySecretWithReplaceAnno);
clusterCa.setClock(clock);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(namespace, emptyMap(), emptyMap(), emptyMap(), null, true);
assertThat(clusterCa.caCertSecret().getData().size(), is(4));
assertThat(clusterCa.caCertSecret().getData().containsKey("ca-2023-03-23T09-00-00Z.crt"), is(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ private void checkHeadlessService(Service headless) {

private Secret generateBrokerSecret(Set<String> externalBootstrapAddress, Map<Integer, Set<String>> externalAddresses) {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, true);
ClientsCa clientsCa = new ClientsCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), null, null, null, null, 365, 30, true, CertificateExpirationPolicy.RENEW_CERTIFICATE);
clientsCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, List.of(), true);
clientsCa.createRenewOrReplace(NAMESPACE, Map.of(), Map.of(), Map.of(), null, true);

return KC.generateCertificatesSecret(clusterCa, clientsCa, null, externalBootstrapAddress, externalAddresses, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ private void checkHeadlessService(Service headless) {

private Secret generateBrokerSecret(Set<String> externalBootstrapAddress, Map<Integer, Set<String>> externalAddresses) {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);
ClientsCa clientsCa = new ClientsCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), null, null, null, null, 365, 30, true, CertificateExpirationPolicy.RENEW_CERTIFICATE);
clientsCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clientsCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);

return KC.generateCertificatesSecret(clusterCa, clientsCa, null, externalBootstrapAddress, externalAddresses, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() {
assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12"));
}

@Test
public void testPatchPodAnnotations() {
Map<String, String> annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3");
List<Pod> pods = new ArrayList<>();
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-0")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-1")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);

StrimziPodSet sps = new StrimziPodSetBuilder()
.withNewMetadata()
.withName("my-sps")
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();

List<Pod> resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4")));
assertThat(resultPods.size(), is(2));
Map<String, String> expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4");
assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations));
assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations));
}

//////////////////////////////////////////////////
// Stateful Pod tests
//////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void checkHeadlessService(Service headless) {

private Secret generateCertificatesSecret() {
ClusterCa clusterCa = new ClusterCa(Reconciliation.DUMMY_RECONCILIATION, new OpenSslCertManager(), new PasswordGenerator(10, "a", "a"), CLUSTER, null, null);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, List.of(), true);
clusterCa.createRenewOrReplace(NAMESPACE, emptyMap(), emptyMap(), emptyMap(), null, true);

return ZC.generateCertificatesSecret(clusterCa, null, true);
}
Expand Down
Loading
Loading