Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for k8s 1.16 and cert-manager 0.11.0+ #18

Merged
merged 3 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
{
public static class CertManagerConstants
{
public static string CrdGroup => "certmanager.k8s.io";
public static string CertificatePlural => "certificates";
public static string CertificateNameLabel => "certmanager.k8s.io/certificate-name";
public static string CertificateKind => "Certificate";
public static string CertificatePlural => "certificates";

public static string CrdGroup => "cert-manager.io";
public static string CertificateNameLabel => "cert-manager.io/certificate-name";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ namespace ES.Kubernetes.Reflector.CertManager.Events
{
public class InternalCertificateWatcherEvent : WatcherEvent<Certificate>
{
public string CertificateResourceDefinitionVersion { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using ES.Kubernetes.Reflector.Core.Events;
using System.Collections.Generic;
using ES.Kubernetes.Reflector.Core.Events;
using k8s.Models;

namespace ES.Kubernetes.Reflector.CertManager.Events
{
public class InternalSecretWatcherEvent : WatcherEvent<V1Secret>
{
public string CertificateResourceDefinitionVersion { get; set; }
public List<string> CertificateResourceDefinitionVersions { get; set; }
}
}
115 changes: 67 additions & 48 deletions ES.Kubernetes.Reflector.CertManager/Monitor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ES.Kubernetes.Reflector.CertManager.Constants;
Expand All @@ -20,24 +22,26 @@ namespace ES.Kubernetes.Reflector.CertManager
{
public class Monitor : IHostedService, IHealthCheck
{
private readonly ManagedWatcher<Certificate> _certificatesWatcher;
private readonly ManagedWatcher<V1beta1CustomResourceDefinition> _crdWatcher;
private readonly Func<ManagedWatcher<Certificate, object>> _certificatesWatcherFactory;

private readonly Dictionary<string, ManagedWatcher<Certificate, object>> _certificatesWatchers =
new Dictionary<string, ManagedWatcher<Certificate, object>>();

private readonly ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> _crdWatcher;
private readonly FeederQueue<WatcherEvent> _eventQueue;
private readonly ILogger<Monitor> _logger;
private readonly IMediator _mediator;
private readonly ManagedWatcher<V1Secret> _secretsWatcher;

private string _crdVersion;
private readonly ManagedWatcher<V1Secret, V1SecretList> _secretsWatcher;

public Monitor(ILogger<Monitor> logger,
ManagedWatcher<V1beta1CustomResourceDefinition> crdWatcher,
ManagedWatcher<Certificate> certificatesWatcher,
ManagedWatcher<V1Secret> secretsWatcher,
ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> crdWatcher,
Func<ManagedWatcher<Certificate, object>> certificatesWatcherFactory,
ManagedWatcher<V1Secret, V1SecretList> secretsWatcher,
IMediator mediator)
{
_logger = logger;
_crdWatcher = crdWatcher;
_certificatesWatcher = certificatesWatcher;
_certificatesWatcherFactory = certificatesWatcherFactory;
_secretsWatcher = secretsWatcher;
_mediator = mediator;

Expand All @@ -47,17 +51,14 @@ public Monitor(ILogger<Monitor> logger,
_secretsWatcher.OnStateChanged = OnWatcherStateChanged;
_secretsWatcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalSecretWatcherEvent
{ Item = e.Item, Type = e.Type, CertificateResourceDefinitionVersion = _crdVersion });
{
Item = e.Item, Type = e.Type,
CertificateResourceDefinitionVersions = _certificatesWatchers.Keys.ToList()
});
_secretsWatcher.RequestFactory = async c =>
await c.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true);


_certificatesWatcher.OnStateChanged = OnWatcherStateChanged;
_certificatesWatcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent
{ Item = e.Item, Type = e.Type, CertificateResourceDefinitionVersion = _crdVersion });


_crdWatcher.EventHandlerFactory = OnCrdEvent;
_crdWatcher.RequestFactory = async c =>
await c.ListCustomResourceDefinitionWithHttpMessagesAsync(watch: true);
Expand All @@ -66,22 +67,31 @@ public Monitor(ILogger<Monitor> logger,
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
await sender.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}",
typeof(V1beta1CustomResourceDefinition).Name, update.State);
typeof(V1CustomResourceDefinition).Name, update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
break;
}
};
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(_crdWatcher.IsFaulted || _secretsWatcher.IsFaulted ||
_certificatesWatchers.Values.Any(s => s.IsFaulted)
? HealthCheckResult.Unhealthy()
: HealthCheckResult.Healthy());
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Start();
Expand All @@ -90,30 +100,32 @@ public async Task StartAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();
}

private async Task OnWatcherStateChanged<TS>(ManagedWatcher<TS, WatcherEvent<TS>> sender,
private async Task OnWatcherStateChanged<TS, TSL>(ManagedWatcher<TS, TSL, WatcherEvent<TS>> sender,
ManagedWatcherStateUpdate update) where TS : class, IKubernetesObject
{
var tag = sender.Tag ?? string.Empty;
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
await _secretsWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();

await _eventQueue.WaitAndClear();

await _secretsWatcher.Start();
await _certificatesWatcher.Start();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogError(update.Exception, "{type} watcher {tag} {state}", typeof(TS).Name, tag,
update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(TS).Name, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
break;
}
}
Expand All @@ -131,47 +143,54 @@ private async Task OnEventHandlingError(WatcherEvent e, Exception ex)
_logger.LogError(ex, "Failed to process {eventType} {kind} {@id} due to exception",
e.Type, e.Item.Kind, id);
await _secretsWatcher.Stop();
await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
_eventQueue.Clear();

_logger.LogTrace("Watchers restarting");
await _secretsWatcher.Start();
await _certificatesWatcher.Start();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
_logger.LogTrace("Watchers restarted");
}


private async Task OnCrdEvent(WatcherEvent<V1beta1CustomResourceDefinition> request)
private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
{
if (request.Type != WatchEventType.Added && request.Type != WatchEventType.Modified) return;
if (request.Item.Spec?.Names == null) return;

if (request.Item.Spec.Group != CertManagerConstants.CrdGroup ||
request.Item.Spec.Names.Kind != CertManagerConstants.CertificateKind) return;
if (request.Item.Spec.Version == _crdVersion) return;
var versions = request.Item.Spec.Versions.Select(s => s.Name).ToList();
if (versions.TrueForAll(s => _certificatesWatchers.ContainsKey(s))) return;

_crdVersion = request.Item.Spec.Version;
_logger.LogInformation("{crdType} {kind} version updated to {crdGroup}/{version}",
typeof(V1beta1CustomResourceDefinition).Name,
CertManagerConstants.CertificateKind,
CertManagerConstants.CrdGroup,
request.Item.Spec.Version);
_logger.LogInformation("{crdType} {kind} in group {group} versions updated to {versions}",
typeof(V1CustomResourceDefinition).Name,
request.Item.Spec.Names.Kind,
request.Item.Spec.Group,
versions);

await _certificatesWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();

_certificatesWatcher.RequestFactory = async client =>
await client.ListClusterCustomObjectWithHttpMessagesAsync(request.Item.Spec.Group,
request.Item.Spec.Version, request.Item.Spec.Names.Plural, watch: true, timeoutSeconds: (int)TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Clear();

await _certificatesWatcher.Start();
await _secretsWatcher.Start();
}
foreach (var version in versions)
{
var watcher = _certificatesWatcherFactory();
watcher.Tag = version;
watcher.OnStateChanged = OnWatcherStateChanged;
watcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent {Item = e.Item, Type = e.Type});
watcher.RequestFactory = async client => await client.ListClusterCustomObjectWithHttpMessagesAsync(
request.Item.Spec.Group,
version, request.Item.Spec.Names.Plural, watch: true,
timeoutSeconds: (int) TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Add(version, watcher);
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(_crdWatcher.IsFaulted || _secretsWatcher.IsFaulted || _certificatesWatcher.IsFaulted
? HealthCheckResult.Unhealthy()
: HealthCheckResult.Healthy());

foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
await _secretsWatcher.Start();
}
}
}
6 changes: 4 additions & 2 deletions ES.Kubernetes.Reflector.CertManager/Resources/Certificate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ public class Certificate : IKubernetesObject
[JsonProperty(PropertyName = "metadata")]
public V1ObjectMeta Metadata { get; set; }

[JsonProperty(PropertyName = "spec")] public SpecDefinition Spec { get; set; }
[JsonProperty(PropertyName = "spec")]
public SpecDefinition Spec { get; set; }

[JsonProperty(PropertyName = "apiVersion")]
public string ApiVersion { get; set; }

[JsonProperty(PropertyName = "kind")] public string Kind { get; set; }
[JsonProperty(PropertyName = "kind")]
public string Kind { get; set; }

public class SpecDefinition
{
Expand Down
35 changes: 20 additions & 15 deletions ES.Kubernetes.Reflector.CertManager/SecretEtcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,26 @@ public async Task Handle(InternalSecretWatcherEvent notification, CancellationTo
CertManagerConstants.CertificateKind, certificateId, secretId);

Certificate certificate = null;
try
{
var certificateJObject = await _client.GetNamespacedCustomObjectAsync(CertManagerConstants.CrdGroup,
notification.CertificateResourceDefinitionVersion, metadata.NamespaceProperty,
CertManagerConstants.CertificatePlural,
certificateName, cancellationToken);
certificate = ((JObject) certificateJObject).ToObject<Certificate>();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogDebug("Could not find {kind} {@id}",
CertManagerConstants.CertificateKind, certificateId);
}

if (certificate != null) await Annotate(secret, certificate);
foreach (var certificateResourceDefinitionVersion in notification.CertificateResourceDefinitionVersions)
try
{
var certificateJObject = await _client.GetNamespacedCustomObjectAsync(
CertManagerConstants.CrdGroup,
certificateResourceDefinitionVersion, metadata.NamespaceProperty,
CertManagerConstants.CertificatePlural,
certificateName, cancellationToken);
certificate = ((JObject) certificateJObject).ToObject<Certificate>();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode ==
HttpStatusCode.NotFound)
{
}

if (certificate != null)
await Annotate(secret, certificate);
else
_logger.LogDebug("Could not find {kind} {@id}", CertManagerConstants.CertificateKind,
certificateId);
}
}

Expand Down
19 changes: 10 additions & 9 deletions ES.Kubernetes.Reflector.ConfigMaps/Mirror.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

namespace ES.Kubernetes.Reflector.ConfigMaps
{
public class Mirror : ResourceMirror<V1ConfigMap>, IHostedService, IHealthCheck
public class Mirror : ResourceMirror<V1ConfigMap, V1ConfigMapList>, IHostedService, IHealthCheck
{
public Mirror(ILogger<Mirror> logger, IKubernetes client,
ManagedWatcher<V1ConfigMap> configMapWatcher,
ManagedWatcher<V1Namespace> namespaceWatcher)
ManagedWatcher<V1ConfigMap, V1ConfigMapList> configMapWatcher,
ManagedWatcher<V1Namespace, V1NamespaceList> namespaceWatcher)
: base(logger, client, configMapWatcher, namespaceWatcher)
{
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(IsFaulted ? HealthCheckResult.Unhealthy() : HealthCheckResult.Healthy());
}


public async Task StartAsync(CancellationToken cancellationToken)
{
Expand All @@ -38,7 +44,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
}


protected override async Task<HttpOperationResponse> OnResourceWatcher(IKubernetes client)
protected override async Task<HttpOperationResponse<V1ConfigMapList>> OnResourceWatcher(IKubernetes client)
{
return await client.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true);
}
Expand Down Expand Up @@ -98,10 +104,5 @@ protected override async Task OnResourcePatch(IKubernetes client, V1ConfigMap ta
await client.PatchNamespacedConfigMapWithHttpMessagesAsync(new V1Patch(patch),
target.Metadata.Name, target.Metadata.NamespaceProperty);
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(IsFaulted ? HealthCheckResult.Unhealthy() : HealthCheckResult.Healthy());
}
}
}
2 changes: 1 addition & 1 deletion ES.Kubernetes.Reflector.Core/CoreModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ public class CoreModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterGeneric(typeof(ManagedWatcher<>));
builder.RegisterGeneric(typeof(ManagedWatcher<,>));
builder.RegisterGeneric(typeof(ManagedWatcher<,,>));


builder.Register(s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.2" />
<PackageReference Include="KubernetesClient" Version="1.5.19" />
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="KubernetesClient" Version="1.6.3" />
<PackageReference Include="MediatR" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="2.2.5" />
Expand Down
Loading