Skip to content

Commit

Permalink
Merge pull request #190 from nhuray/189-logging
Browse files Browse the repository at this point in the history
Setup logging library (#minor)
  • Loading branch information
jekkel authored Jun 2, 2022
2 parents 3d821f0 + 7e0401b commit 7e8a916
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 54 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ If the filename ends with `.url` suffix, the content will be processed as a URL
| `WATCH_SERVER_TIMEOUT` | polite request to the server, asking it to cleanly close watch connections after this amount of seconds ([#85](https://github.com/kiwigrid/k8s-sidecar/issues/85)) | false | `60` | integer |
| `WATCH_CLIENT_TIMEOUT` | If you have a network outage dropping all packets with no RST/FIN, this is how many seconds your client waits on watches before realizing & dropping the connection. You can keep this number low. ([#85](https://github.com/kiwigrid/k8s-sidecar/issues/85)) | false | `66` | integer |
| `IGNORE_ALREADY_PROCESSED` | Ignore already processed resource version. Avoid numerous checks on same unchanged resource. req kubernetes api >= v1.19 | false | `false` | boolean |
| `LOG_LEVEL` | Set the logging level. (DEBUG, INFO, WARN, ERROR, CRITICAL) | false | `INFO` | string |
| `LOG_FORMAT` | Set a log format. (JSON or LOGFMT) | false | `JSON` | string |
| `LOG_TZ` | Set the log timezone. (LOCAL or UTC) | false | `LOCAL` | string |
29 changes: 17 additions & 12 deletions src/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import subprocess
from datetime import datetime
from logger import get_logger

import requests
from requests.adapters import HTTPAdapter
Expand Down Expand Up @@ -35,6 +36,10 @@
# You can keep this number low, maybe 60 seconds.
WATCH_CLIENT_TIMEOUT = os.environ.get("WATCH_CLIENT_TIMEOUT", 66)

# Get logger
logger = get_logger()


def write_data_to_file(folder, filename, data, data_type=CONTENT_TYPE_TEXT):
"""
Write text to a file. If the parent folder doesn't exist, create it. If there are insufficient
Expand All @@ -47,8 +52,8 @@ def write_data_to_file(folder, filename, data, data_type=CONTENT_TYPE_TEXT):
if e.errno not in (errno.EACCES, errno.EEXIST):
raise
if e.errno == errno.EACCES:
print(f"{timestamp()} Error: insufficient privileges to create {folder}. "
f"Skipping {filename}.")
logger.error(f"Error: insufficient privileges to create {folder}. "
f"Skipping {filename}.")
return

absolute_path = os.path.join(folder, filename)
Expand All @@ -65,7 +70,7 @@ def write_data_to_file(folder, filename, data, data_type=CONTENT_TYPE_TEXT):
sha256_hash_cur.update(byte_block)

if sha256_hash_new.hexdigest() == sha256_hash_cur.hexdigest():
print(f"{timestamp()} Contents of {filename} haven't changed. Not overwriting existing file")
logger.debug(f"Contents of {filename} haven't changed. Not overwriting existing file")
return False

if data_type == "binary":
Expand Down Expand Up @@ -113,7 +118,7 @@ def request(url, method, enable_5xx=False, payload=None):
r.mount("http://", HTTPAdapter(max_retries=retries))
r.mount("https://", HTTPAdapter(max_retries=retries))
if url is None:
print(f"{timestamp()} No url provided. Doing nothing.")
logger.warning(f"No url provided. Doing nothing.")
return

# If method is not provided use GET as default
Expand All @@ -122,10 +127,10 @@ def request(url, method, enable_5xx=False, payload=None):
elif method == "POST":
res = r.post("%s" % url, auth=auth, json=payload, timeout=REQ_TIMEOUT)
else:
print(f"{timestamp()} Invalid REQ_METHOD: '{method}', please use 'GET' or 'POST'. Doing nothing.")
logger.warning(f"Invalid REQ_METHOD: '{method}', please use 'GET' or 'POST'. Doing nothing.")
return
print(f"{timestamp()} {method} request sent to {url}. "
f"Response: {res.status_code} {res.reason} {res.text}")
logger.debug(f"{method} request sent to {url}. "
f"Response: {res.status_code} {res.reason} {res.text}")
return res


Expand All @@ -151,13 +156,13 @@ def unique_filename(filename, namespace, resource, resource_name):


def execute(script_path):
print(f"{timestamp()} Executing script from {script_path}")
logger.debug(f"Executing script from {script_path}")
try:
result = subprocess.run(["sh", script_path],
capture_output=True,
check=True)
print(f"{timestamp()} Script stdout: {result.stdout}")
print(f"{timestamp()} Script stderr: {result.stderr}")
print(f"{timestamp()} Script exit code: {result.returncode}")
logger.debug(f"Script stdout: {result.stdout}")
logger.debug(f"Script stderr: {result.stderr}")
logger.debug(f"Script exit code: {result.returncode}")
except subprocess.CalledProcessError as e:
print(f"{timestamp()} Script failed with error: {e}")
logger.error(f"Script failed with error: {e}")
74 changes: 74 additions & 0 deletions src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import os
from datetime import datetime
from typing import Optional

from dateutil.tz import tzlocal, tzutc
from logfmter import Logfmter
from pythonjsonlogger import jsonlogger

# Supported Timezones for time format (in ISO 8601)
LogTimezones = {
'LOCAL': tzlocal(),
'UTC': tzutc()
}

# Get configuration
level = os.getenv("LOG_LEVEL", logging.INFO)
fmt = os.getenv("LOG_FORMAT", 'JSON')
tz = os.getenv("LOG_TZ", 'LOCAL')

log_tz = LogTimezones[tz.upper()] if LogTimezones.get(tz.upper()) else LogTimezones['LOCAL']


class Iso8601Formatter:
"""
A formatter mixin which always forces dates to be rendered in iso format.
Using `datefmt` parameter of logging.Formatter is insufficient because of missing fractional seconds.
"""

def formatTime(self, record, datefmt: Optional[str] = ...):
"""
Meant to override logging.Formatter.formatTime
"""
return datetime.fromtimestamp(record.created, log_tz).isoformat()


class LogfmtFormatter(Iso8601Formatter, Logfmter):
"""
A formatter combining logfmt style with iso dates
"""
pass


class JsonFormatter(Iso8601Formatter, jsonlogger.JsonFormatter):
"""
A formatter combining json logs with iso dates
"""

def add_fields(self, log_record, record, message_dict):
log_record['time'] = self.formatTime(record)
super(JsonFormatter, self).add_fields(log_record, record, message_dict)


# Supported Log Formatters
LogFormatters = {
'JSON': (JsonFormatter('%(levelname)s %(message)s',
rename_fields={"message": "msg", "levelname": "level"})),
'LOGFMT': (LogfmtFormatter(keys=["time", "level", "msg"],
mapping={"time": "asctime", "level": "levelname", "msg": "message"}))
}

log_fmt = LogFormatters[fmt.upper()] if LogFormatters.get(fmt.upper()) else LogFormatters['JSON']

# Initialize/configure root logger
root_logger = logging.getLogger()
log_handler = logging.StreamHandler()
log_handler.setFormatter(log_fmt)
root_logger.addHandler(log_handler)
root_logger.setLevel(level.upper() if isinstance(level, str) else level)
root_logger.addHandler(log_handler)


def get_logger():
return logging.getLogger('k8s-sidecar')
2 changes: 2 additions & 0 deletions src/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
kubernetes==23.6.0
requests==2.27.1
python-json-logger==2.0.2
logfmter==0.0.6
46 changes: 25 additions & 21 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
from kubernetes.client.rest import ApiException
from urllib3.exceptions import MaxRetryError
from urllib3.exceptions import ProtocolError
from logger import get_logger

from helpers import request, write_data_to_file, remove_file, timestamp, unique_filename, CONTENT_TYPE_TEXT, \
from helpers import request, write_data_to_file, remove_file, unique_filename, CONTENT_TYPE_TEXT, \
CONTENT_TYPE_BASE64_BINARY, execute, WATCH_SERVER_TIMEOUT, WATCH_CLIENT_TIMEOUT

RESOURCE_SECRET = "secret"
Expand All @@ -30,9 +31,12 @@

_resources_version_map = {}

# Get logger
logger = get_logger()


def signal_handler(signum, frame):
print(f"{timestamp()} Subprocess exiting gracefully")
logger.info("Subprocess exiting gracefully")
sys.exit(0)


Expand Down Expand Up @@ -61,8 +65,8 @@ def _get_destination_folder(metadata, default_folder, folder_annotation):
dest_folder = folder_annotation
else:
dest_folder = os.path.join(default_folder, folder_annotation)
print(f"{timestamp()} Found a folder override annotation, "
f"placing the {metadata.name} in: {dest_folder}")
logger.info(f"Found a folder override annotation, "
f"placing the {metadata.name} in: {dest_folder}")
return dest_folder
return default_folder

Expand Down Expand Up @@ -92,12 +96,12 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
# Avoid numerous logs about useless resource processing each time the LIST loop reconnects
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
# print(f"{timestamp()} Ignoring {resource} {metadata.namespace}/{metadata.name}")
logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}")
continue

_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

print(f"{timestamp()} Working on {resource}: {metadata.namespace}/{metadata.name}")
logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}")

# Get the destination folder
dest_folder = _get_destination_folder(metadata, target_folder, folder_annotation)
Expand All @@ -116,7 +120,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho

def _process_secret(dest_folder, secret, resource, unique_filenames, enable_5xx, is_removed=False):
if secret.data is None:
print(f"{timestamp()} No data field in {resource}")
logger.warning(f"No data field in {resource}")
return False
else:
return _iterate_data(
Expand All @@ -133,9 +137,9 @@ def _process_secret(dest_folder, secret, resource, unique_filenames, enable_5xx,
def _process_config_map(dest_folder, config_map, resource, unique_filenames, enable_5xx, is_removed=False):
files_changed = False
if config_map.data is None and config_map.binary_data is None:
print(f"{timestamp()} No data/binaryData field in {resource}")
logger.debug(f"No data/binaryData field in {resource}")
if config_map.data is not None:
print(f"{timestamp()} Found 'data' on {resource}")
logger.debug(f"Found 'data' on {resource}")
files_changed |= _iterate_data(
config_map.data,
dest_folder,
Expand All @@ -146,7 +150,7 @@ def _process_config_map(dest_folder, config_map, resource, unique_filenames, ena
enable_5xx,
is_removed)
if config_map.binary_data is not None:
print(f"{timestamp()} Found 'binary_data' on {resource}")
logger.debug(f"Found 'binary_data' on {resource}")
files_changed |= _iterate_data(
config_map.binary_data,
dest_folder,
Expand Down Expand Up @@ -190,13 +194,13 @@ def _update_file(data_key, data_content, dest_folder, metadata, resource,
resource=resource,
resource_name=metadata.name)
if not remove:
print(f"{timestamp()} Writing {filename}")
logger.debug(f"Writing {filename}")
return write_data_to_file(dest_folder, filename, file_data, content_type)
else:
print(f"{timestamp()} Deleting {filename}")
logger.debug(f"Deleting {filename}")
return remove_file(dest_folder, filename)
except Exception as e:
print(f"{timestamp()} Error when updating from ${data_key} into ${dest_folder}: ${e}")
logger.error(f"Error when updating from ${data_key} into ${dest_folder}: ${e}")
return False


Expand Down Expand Up @@ -228,15 +232,15 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if ignore_already_processed:
if _resources_version_map.get(metadata.namespace + metadata.name) == metadata.resource_version:
if event_type == "ADDED" or event_type == "MODIFIED":
# print(f"{timestamp()} Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}")
logger.debug(f"Ignoring {event_type} {resource} {metadata.namespace}/{metadata.name}")
continue
elif event_type == "DELETED":
_resources_version_map.pop(metadata.namespace + metadata.name)

if event_type == "ADDED" or event_type == "MODIFIED":
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

print(f"{timestamp()} Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")
logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")

files_changed = False

Expand Down Expand Up @@ -269,15 +273,15 @@ def _watch_resource_loop(mode, *args):
_watch_resource_iterator(*args)
except ApiException as e:
if e.status != 500:
print(f"{timestamp()} ApiException when calling kubernetes: {e}\n")
logger.error(f"ApiException when calling kubernetes: {e}\n")
else:
raise
except ProtocolError as e:
print(f"{timestamp()} ProtocolError when calling kubernetes: {e}\n")
logger.error(f"ProtocolError when calling kubernetes: {e}\n")
except MaxRetryError as e:
print(f"{timestamp()} MaxRetryError when calling kubernetes: {e}\n")
logger.error(f"MaxRetryError when calling kubernetes: {e}\n")
except Exception as e:
print(f"{timestamp()} Received unknown exception: {e}\n")
logger.error(f"Received unknown exception: {e}\n")
traceback.print_exc()


Expand All @@ -293,10 +297,10 @@ def watch_for_changes(mode, label, label_value, target_folder, request_url, requ
died = False
for proc, ns, resource in processes:
if not proc.is_alive():
print(f"{timestamp()} Process for {ns}/{resource} died")
logger.fatal(f"Process for {ns}/{resource} died")
died = True
if died:
print(f"{timestamp()} At least one process died. Stopping and exiting")
logger.fatal("At least one process died. Stopping and exiting")
for proc, ns, resource in processes:
if proc.is_alive():
proc.terminate()
Expand Down
Loading

0 comments on commit 7e8a916

Please sign in to comment.