From 57040f8fdbb4cdddff53cf8d6183dd7c7410737d Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Fri, 22 Nov 2024 12:22:54 +0100 Subject: [PATCH] fix: Guard against invalid gameserver events (#1041) --- src/config/providers/k8s.rs | 41 ++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/src/config/providers/k8s.rs b/src/config/providers/k8s.rs index 3e3b2a954..758b74d3f 100644 --- a/src/config/providers/k8s.rs +++ b/src/config/providers/k8s.rs @@ -20,7 +20,7 @@ use std::{collections::BTreeSet, sync::Arc}; use futures::Stream; use k8s_openapi::api::core::v1::ConfigMap; -use kube::runtime::watcher::Event; +use kube::{core::DeserializeGuard, runtime::watcher::Event}; use agones::GameServer; @@ -104,10 +104,13 @@ fn configmap_events( fn gameserver_events( client: kube::Client, namespace: impl AsRef, -) -> impl Stream, kube::runtime::watcher::Error>> { +) -> impl Stream>, kube::runtime::watcher::Error>> +{ let gameservers_namespace = namespace.as_ref(); - let gameservers: kube::Api = kube::Api::namespaced(client, gameservers_namespace); - let gs_writer = kube::runtime::reflector::store::Writer::::default(); + let gameservers: kube::Api> = + kube::Api::namespaced(client, gameservers_namespace); + let gs_writer = + kube::runtime::reflector::store::Writer::>::default(); let mut config = kube::runtime::watcher::Config::default() // Default timeout is 5 minutes, far too slow for us to react. .timeout(15) @@ -134,7 +137,15 @@ pub fn update_endpoints_from_gameservers( for await event in gameserver_events(client, namespace) { let ads = address_selector.as_ref(); match event? { - Event::Apply(server) => { + Event::Apply(result) => { + let server = match result.0 { + Ok(server) => server, + Err(error) => { + tracing::debug!(%error, "couldn't decode gameserver event"); + continue; + } + }; + tracing::debug!("received applied event from k8s"); if !server.is_allocated() { yield Ok(()); @@ -151,7 +162,15 @@ pub fn update_endpoints_from_gameservers( .replace(locality.clone(), endpoint); } Event::Init => {}, - Event::InitApply(server) => { + Event::InitApply(result) => { + let server = match result.0 { + Ok(server) => server, + Err(error) => { + tracing::debug!(%error, "couldn't decode gameserver event"); + continue; + } + }; + if server.is_allocated() { if let Some(ep) = server.endpoint(ads) { servers.insert(ep); @@ -168,7 +187,15 @@ pub fn update_endpoints_from_gameservers( config.clusters.write().insert(locality.clone(), std::mem::take(&mut servers)); } - Event::Delete(server) => { + Event::Delete(result) => { + let server = match result.0 { + Ok(server) => server, + Err(error) => { + tracing::debug!(%error, "couldn't decode gameserver event"); + continue; + } + }; + tracing::debug!("received delete event from k8s"); let found = if let Some(endpoint) = server.endpoint(ads) { config.clusters.write().remove_endpoint(&endpoint)