Skip to content

Commit

Permalink
fix: Guard against invalid gameserver events (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Nov 22, 2024
1 parent 5953b80 commit 57040f8
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions src/config/providers/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::BTreeSet, sync::Arc};

use futures::Stream;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::runtime::watcher::Event;
use kube::{core::DeserializeGuard, runtime::watcher::Event};

use agones::GameServer;

Expand Down Expand Up @@ -104,10 +104,13 @@ fn configmap_events(
fn gameserver_events(
client: kube::Client,
namespace: impl AsRef<str>,
) -> impl Stream<Item = Result<Event<GameServer>, kube::runtime::watcher::Error>> {
) -> impl Stream<Item = Result<Event<DeserializeGuard<GameServer>>, kube::runtime::watcher::Error>>
{
let gameservers_namespace = namespace.as_ref();
let gameservers: kube::Api<GameServer> = kube::Api::namespaced(client, gameservers_namespace);
let gs_writer = kube::runtime::reflector::store::Writer::<GameServer>::default();
let gameservers: kube::Api<DeserializeGuard<GameServer>> =
kube::Api::namespaced(client, gameservers_namespace);
let gs_writer =
kube::runtime::reflector::store::Writer::<DeserializeGuard<GameServer>>::default();
let mut config = kube::runtime::watcher::Config::default()
// Default timeout is 5 minutes, far too slow for us to react.
.timeout(15)
Expand All @@ -134,7 +137,15 @@ pub fn update_endpoints_from_gameservers(
for await event in gameserver_events(client, namespace) {
let ads = address_selector.as_ref();
match event? {
Event::Apply(server) => {
Event::Apply(result) => {
let server = match result.0 {
Ok(server) => server,
Err(error) => {
tracing::debug!(%error, "couldn't decode gameserver event");
continue;
}
};

tracing::debug!("received applied event from k8s");
if !server.is_allocated() {
yield Ok(());
Expand All @@ -151,7 +162,15 @@ pub fn update_endpoints_from_gameservers(
.replace(locality.clone(), endpoint);
}
Event::Init => {},
Event::InitApply(server) => {
Event::InitApply(result) => {
let server = match result.0 {
Ok(server) => server,
Err(error) => {
tracing::debug!(%error, "couldn't decode gameserver event");
continue;
}
};

if server.is_allocated() {
if let Some(ep) = server.endpoint(ads) {
servers.insert(ep);
Expand All @@ -168,7 +187,15 @@ pub fn update_endpoints_from_gameservers(

config.clusters.write().insert(locality.clone(), std::mem::take(&mut servers));
}
Event::Delete(server) => {
Event::Delete(result) => {
let server = match result.0 {
Ok(server) => server,
Err(error) => {
tracing::debug!(%error, "couldn't decode gameserver event");
continue;
}
};

tracing::debug!("received delete event from k8s");
let found = if let Some(endpoint) = server.endpoint(ads) {
config.clusters.write().remove_endpoint(&endpoint)
Expand Down

0 comments on commit 57040f8

Please sign in to comment.