Skip to content

Commit

Permalink
Set AMBASSADOR_KUBEWATCH_NO_RETRY to cause kubewatch to exit if it ca…
Browse files Browse the repository at this point in the history
…n't talk to Kubernetes (see #928)
  • Loading branch information
Flynn committed Nov 21, 2018
1 parent e5dcd66 commit 51fdfcb
Showing 1 changed file with 59 additions and 46 deletions.
105 changes: 59 additions & 46 deletions ambassador/kubewatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

logging.basicConfig(
level=logging.INFO, # if appDebug else logging.INFO,
format="%%(asctime)s kubewatch %s %%(levelname)s: %%(message)s" % __version__,
format="%%(asctime)s kubewatch [%%(process)d T%%(threadName)s] %s %%(levelname)s: %%(message)s" % __version__,
datefmt="%Y-%m-%d %H:%M:%S"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ def get_filename(svc):
class Restarter(threading.Thread):

def __init__(self, ambassador_config_dir, namespace, envoy_config_file, delay, pid):
threading.Thread.__init__(self, daemon=True)
threading.Thread.__init__(self, daemon=True, name="Restarter")

self.ambassador_config_dir = ambassador_config_dir
self.config_root = os.path.abspath(os.path.dirname(self.ambassador_config_dir))
Expand Down Expand Up @@ -155,10 +155,15 @@ def changes(self):

def run(self):
while True:
logger.debug("Restarter sleeping...")
# This sleep rate limits the number of restart attempts.
time.sleep(self.delay)

with self.mutex:
changes = self.changes()

logger.debug("Restarter found %s changes" % changes)

if changes > 0:
logger.debug("Processing %s changes" % changes)
try:
Expand Down Expand Up @@ -365,30 +370,27 @@ def watch_loop(restarter):
v1 = kube_v1()

if v1:
while True:
try:
w = watch.Watch()

if "AMBASSADOR_SINGLE_NAMESPACE" in os.environ:
watched = w.stream(v1.list_namespaced_service, namespace=restarter.namespace)
else:
watched = w.stream(v1.list_service_for_all_namespaces)

for evt in watched:
logger.debug("Event: %s %s/%s" %
(evt["type"],
evt["object"].metadata.namespace, evt["object"].metadata.name))
sys.stdout.flush()

if evt["type"] == "DELETED":
restarter.delete(evt["object"])
else:
restarter.update_from_service(evt["object"])

logger.info("watch loop exited?")
except ProtocolError:
logger.debug("watch connection has been broken. retry automatically.")
continue
w = watch.Watch()

if "AMBASSADOR_SINGLE_NAMESPACE" in os.environ:
watched = w.stream(v1.list_namespaced_service, namespace=restarter.namespace)
else:
watched = w.stream(v1.list_service_for_all_namespaces)

for evt in watched:
logger.debug("Event: %s %s/%s" %
(evt["type"],
evt["object"].metadata.namespace, evt["object"].metadata.name))
sys.stdout.flush()

if evt["type"] == "DELETED":
restarter.delete(evt["object"])
else:
restarter.update_from_service(evt["object"])

# If here, something strange happened and the watch loop exited on its own.
# Let our caller handle that.
logger.info("watch loop exited?")
else:
logger.info("No K8s, idling")

Expand Down Expand Up @@ -423,26 +425,37 @@ def main(mode, ambassador_config_dir, envoy_config_file, delay, pid):

restarter = Restarter(ambassador_config_dir, namespace, envoy_config_file, delay, pid)

if mode == "sync":
sync(restarter)
elif mode == "watch":
restarter.start()

while True:
try:
# this is in a loop because sometimes the auth expires
# or the connection dies
logger.debug("starting watch loop")
watch_loop(restarter)
except KeyboardInterrupt:
raise
except Exception as e:
logger.exception("could not watch for Kubernetes service changes: %s" % e)
finally:
time.sleep(60)
else:
raise ValueError(mode)

try:
if mode == "sync":
sync(restarter)
elif mode == "watch":
restarter.start()

while True:
try:
# this is in a loop because sometimes the auth expires
# or the connection dies
logger.debug("starting watch loop")
watch_loop(restarter)
except KeyboardInterrupt:
raise
except Exception as e:
logger.warning("could not watch for Kubernetes service changes: %s" % e)

if 'AMBASSADOR_NO_KUBEWATCH_RETRY' in os.environ:
logger.info("not restarting! AMBASSADOR_NO_KUBEWATCH_RETRY is set")
raise
finally:
logger.debug("10-second watch loop delay")
time.sleep(10)
else:
raise ValueError(mode)
except KeyboardInterrupt:
logger.warning("exiting due to keyboard interrupt")
sys.exit(1)
except Exception as e:
logger.exception("fatal exception: %s" % e)
sys.exit(1)

if __name__ == "__main__":
main()

0 comments on commit 51fdfcb

Please sign in to comment.