diff --git a/.buildkite/docker-compose.py35.pg95.yaml b/.buildkite/docker-compose.py35.pg95.yaml index 2f14387fbc37..aaea33006b2d 100644 --- a/.buildkite/docker-compose.py35.pg95.yaml +++ b/.buildkite/docker-compose.py35.pg95.yaml @@ -6,6 +6,7 @@ services: image: postgres:9.5 environment: POSTGRES_PASSWORD: postgres + command: -c fsync=off testenv: image: python:3.5 diff --git a/.buildkite/docker-compose.py37.pg11.yaml b/.buildkite/docker-compose.py37.pg11.yaml index f3eec05ceb52..1b32675e7847 100644 --- a/.buildkite/docker-compose.py37.pg11.yaml +++ b/.buildkite/docker-compose.py37.pg11.yaml @@ -6,6 +6,7 @@ services: image: postgres:11 environment: POSTGRES_PASSWORD: postgres + command: -c fsync=off testenv: image: python:3.7 diff --git a/.buildkite/docker-compose.py37.pg95.yaml b/.buildkite/docker-compose.py37.pg95.yaml index 2a41db8eba90..7679f6508d12 100644 --- a/.buildkite/docker-compose.py37.pg95.yaml +++ b/.buildkite/docker-compose.py37.pg95.yaml @@ -6,6 +6,7 @@ services: image: postgres:9.5 environment: POSTGRES_PASSWORD: postgres + command: -c fsync=off testenv: image: python:3.7 diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index b75269a15594..d9327227ed78 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -45,8 +45,15 @@ steps: - docker#v3.0.1: image: "python:3.6" - - wait + - command: + - "python -m pip install tox" + - "tox -e mypy" + label: ":mypy: mypy" + plugins: + - docker#v3.0.1: + image: "python:3.5" + - wait - command: - "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev" @@ -55,6 +62,7 @@ steps: label: ":python: 3.5 / SQLite / Old Deps" env: TRIAL_FLAGS: "-j 2" + LANG: "C.UTF-8" plugins: - docker#v3.0.1: image: "ubuntu:xenial" # We use xenail to get an old sqlite and python diff --git a/.gitignore b/.gitignore index f6168a8819a3..e53d4908d5c3 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ _trial_temp*/ /*.signing.key /env/ /homeserver*.yaml +/logs /media_store/ /uploads @@ -29,8 +30,9 @@ _trial_temp*/ /.vscode/ # build products -/.coverage* !/.coveragerc +/.coverage* +/.mypy_cache/ /.tox /build/ /coverage.* @@ -38,4 +40,3 @@ _trial_temp*/ /docs/build/ /htmlcov /pip-wheel-metadata/ - diff --git a/changelog.d/5680.misc b/changelog.d/5680.misc new file mode 100644 index 000000000000..46a403a188ed --- /dev/null +++ b/changelog.d/5680.misc @@ -0,0 +1 @@ +Lay the groundwork for structured logging output. diff --git a/docs/structured_logging.md b/docs/structured_logging.md new file mode 100644 index 000000000000..decec9b8fa1e --- /dev/null +++ b/docs/structured_logging.md @@ -0,0 +1,83 @@ +# Structured Logging + +A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack". + +Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to). + +A structured logging configuration looks similar to the following: + +```yaml +structured: true + +loggers: + synapse: + level: INFO + synapse.storage.SQL: + level: WARNING + +drains: + console: + type: console + location: stdout + file: + type: file_json + location: homeserver.log +``` + +The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON). + +## Drain Types + +Drain types can be specified by the `type` key. + +### `console` + +Outputs human-readable logs to the console. + +Arguments: + +- `location`: Either `stdout` or `stderr`. + +### `console_json` + +Outputs machine-readable JSON logs to the console. + +Arguments: + +- `location`: Either `stdout` or `stderr`. + +### `console_json_terse` + +Outputs machine-readable JSON logs to the console, separated by newlines. This +format is not designed to be read and re-formatted into human-readable text, but +is optimal for a logging aggregation system. + +Arguments: + +- `location`: Either `stdout` or `stderr`. + +### `file` + +Outputs human-readable logs to a file. + +Arguments: + +- `location`: An absolute path to the file to log to. + +### `file_json` + +Outputs machine-readable logs to a file. + +Arguments: + +- `location`: An absolute path to the file to log to. + +### `network_json_terse` + +Delivers machine-readable JSON logs to a log aggregator over TCP. This is +compatible with LogStash's TCP input with the codec set to `json_lines`. + +Arguments: + +- `host`: Hostname or IP address of the log aggregator. +- `port`: Numerical port to contact on the host. \ No newline at end of file diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 69dcf3523fde..c30fdeee9af2 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -36,18 +36,20 @@ logger = logging.getLogger(__name__) +# list of tuples of function, args list, kwargs dict _sighup_callbacks = [] -def register_sighup(func): +def register_sighup(func, *args, **kwargs): """ Register a function to be called when a SIGHUP occurs. Args: func (function): Function to be called when sent a SIGHUP signal. - Will be called with a single argument, the homeserver. + Will be called with a single default argument, the homeserver. + *args, **kwargs: args and kwargs to be passed to the target function. """ - _sighup_callbacks.append(func) + _sighup_callbacks.append((func, args, kwargs)) def start_worker_reactor(appname, config, run_command=reactor.run): @@ -248,8 +250,8 @@ def handle_sighup(*args, **kwargs): # we're not using systemd. sdnotify(b"RELOADING=1") - for i in _sighup_callbacks: - i(hs) + for i, args, kwargs in _sighup_callbacks: + i(hs, *args, **kwargs) sdnotify(b"READY=1") diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 1fd52a552693..04751a6a5e29 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -227,8 +227,6 @@ def start(config_options): config.start_pushers = False config.send_federation = False - setup_logging(config, use_worker_options=True) - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -241,6 +239,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() # We use task.react as the basic run command as it correctly handles tearing diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 54bb114dec74..767b87d2db14 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -141,8 +141,6 @@ def start(config_options): assert config.worker_app == "synapse.app.appservice" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -167,6 +165,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ps, config, use_worker_options=True) + ps.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ps, config.worker_listeners diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 721bb5b119f3..86193d35a8b0 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -179,8 +179,6 @@ def start(config_options): assert config.worker_app == "synapse.app.client_reader" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -193,6 +191,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 473c8895d0fe..c67fe69a5009 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -175,8 +175,6 @@ def start(config_options): assert config.worker_replication_http_port is not None - setup_logging(config, use_worker_options=True) - # This should only be done on the user directory worker or the master config.update_user_directory = False @@ -192,6 +190,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 5255d9e8ccef..1ef027a88cb4 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -160,8 +160,6 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_reader" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -174,6 +172,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index c5a2880e6995..04fbb407af56 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -171,8 +171,6 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_sender" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -197,6 +195,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index e2822ca848b3..611d2854211d 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -232,8 +232,6 @@ def start(config_options): assert config.worker_main_http_uri is not None - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -246,6 +244,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 82339058447e..04f1ed14f3c9 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -341,8 +341,6 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - synapse.config.logger.setup_logging(config, use_worker_options=False) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -356,6 +354,8 @@ def setup(config_options): database_engine=database_engine, ) + synapse.config.logger.setup_logging(hs, config, use_worker_options=False) + logger.info("Preparing database: %s...", config.database_config["name"]) try: diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 3a168577c737..2ac783ffa39c 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -155,8 +155,6 @@ def start(config_options): "Please add ``enable_media_repo: false`` to the main config\n" ) - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -169,6 +167,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 692ffa2f0482..d84732ee3ca1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -184,8 +184,6 @@ def start(config_options): assert config.worker_app == "synapse.app.pusher" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts if config.start_pushers: @@ -210,6 +208,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ps, config, use_worker_options=True) + ps.setup() def start(): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index a1c3b162f7ed..473026fce513 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -435,8 +435,6 @@ def start(config_options): assert config.worker_app == "synapse.app.synchrotron" - setup_logging(config, use_worker_options=True) - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -450,6 +448,8 @@ def start(config_options): application_service_handler=SynchrotronApplicationService(), ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index cb29a1afabec..e01afb39f2b8 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -197,8 +197,6 @@ def start(config_options): assert config.worker_app == "synapse.app.user_dir" - setup_logging(config, use_worker_options=True) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -223,6 +221,8 @@ def start(config_options): database_engine=database_engine, ) + setup_logging(ss, config, use_worker_options=True) + ss.setup() reactor.addSystemEventTrigger( "before", "startup", _base.start, ss, config.worker_listeners diff --git a/synapse/config/logger.py b/synapse/config/logger.py index d321d00b80e8..981df5a10c27 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -25,6 +25,10 @@ import synapse from synapse.app import _base as appbase +from synapse.logging._structured import ( + reload_structured_logging, + setup_structured_logging, +) from synapse.logging.context import LoggingContextFilter from synapse.util.versionstring import get_version_string @@ -119,21 +123,10 @@ def generate_files(self, config, config_dir_path): log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file)) -def setup_logging(config, use_worker_options=False): - """ Set up python logging - - Args: - config (LoggingConfig | synapse.config.workers.WorkerConfig): - configuration data - - use_worker_options (bool): True to use the 'worker_log_config' option - instead of 'log_config'. - - register_sighup (func | None): Function to call to register a - sighup handler. +def _setup_stdlib_logging(config, log_config): + """ + Set up Python stdlib logging. """ - log_config = config.worker_log_config if use_worker_options else config.log_config - if log_config is None: log_format = ( "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" @@ -151,35 +144,10 @@ def setup_logging(config, use_worker_options=False): handler.addFilter(LoggingContextFilter(request="")) logger.addHandler(handler) else: + logging.config.dictConfig(log_config) - def load_log_config(): - with open(log_config, "r") as f: - logging.config.dictConfig(yaml.safe_load(f)) - - def sighup(*args): - # it might be better to use a file watcher or something for this. - load_log_config() - logging.info("Reloaded log config from %s due to SIGHUP", log_config) - - load_log_config() - appbase.register_sighup(sighup) - - # make sure that the first thing we log is a thing we can grep backwards - # for - logging.warn("***** STARTING SERVER *****") - logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) - logging.info("Server hostname: %s", config.server_name) - - # It's critical to point twisted's internal logging somewhere, otherwise it - # stacks up and leaks kup to 64K object; - # see: https://twistedmatrix.com/trac/ticket/8164 - # - # Routing to the python logging framework could be a performance problem if - # the handlers blocked for a long time as python.logging is a blocking API - # see https://twistedmatrix.com/documents/current/core/howto/logger.html - # filed as https://github.com/matrix-org/synapse/issues/1727 - # - # However this may not be too much of a problem if we are just writing to a file. + # Route Twisted's native logging through to the standard library logging + # system. observer = STDLibLogObserver() def _log(event): @@ -201,3 +169,54 @@ def _log(event): ) if not config.no_redirect_stdio: print("Redirected stdout/stderr to logs") + + +def _reload_stdlib_logging(*args, log_config=None): + logger = logging.getLogger("") + + if not log_config: + logger.warn("Reloaded a blank config?") + + logging.config.dictConfig(log_config) + + +def setup_logging(hs, config, use_worker_options=False): + """ + Set up the logging subsystem. + + Args: + config (LoggingConfig | synapse.config.workers.WorkerConfig): + configuration data + + use_worker_options (bool): True to use the 'worker_log_config' option + instead of 'log_config'. + """ + log_config = config.worker_log_config if use_worker_options else config.log_config + + def read_config(*args, callback=None): + if log_config is None: + return None + + with open(log_config, "rb") as f: + log_config_body = yaml.safe_load(f.read()) + + if callback: + callback(log_config=log_config_body) + logging.info("Reloaded log config from %s due to SIGHUP", log_config) + + return log_config_body + + log_config_body = read_config() + + if log_config_body and log_config_body.get("structured") is True: + setup_structured_logging(hs, config, log_config_body) + appbase.register_sighup(read_config, callback=reload_structured_logging) + else: + _setup_stdlib_logging(config, log_config_body) + appbase.register_sighup(read_config, callback=_reload_stdlib_logging) + + # make sure that the first thing we log is a thing we can grep backwards + # for + logging.warn("***** STARTING SERVER *****") + logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse)) + logging.info("Server hostname: %s", config.server_name) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c86903b98bba..94306c94a963 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -326,8 +326,9 @@ def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): ours = yield self.store.get_state_groups_ids(room_id, seen) # state_maps is a list of mappings from (type, state_key) to event_id - # type: list[dict[tuple[str, str], str]] - state_maps = list(ours.values()) + state_maps = list( + ours.values() + ) # type: list[dict[tuple[str, str], str]] # we don't need this any more, let's delete it. del ours diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py new file mode 100644 index 000000000000..0367d6dfc4b3 --- /dev/null +++ b/synapse/logging/_structured.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os.path +import sys +import typing +import warnings + +import attr +from constantly import NamedConstant, Names, ValueConstant, Values +from zope.interface import implementer + +from twisted.logger import ( + FileLogObserver, + FilteringLogObserver, + ILogObserver, + LogBeginner, + Logger, + LogLevel, + LogLevelFilterPredicate, + LogPublisher, + eventAsText, + globalLogBeginner, + jsonFileLogObserver, +) + +from synapse.config._base import ConfigError +from synapse.logging._terse_json import ( + TerseJSONToConsoleLogObserver, + TerseJSONToTCPLogObserver, +) +from synapse.logging.context import LoggingContext + + +def stdlib_log_level_to_twisted(level: str) -> LogLevel: + """ + Convert a stdlib log level to Twisted's log level. + """ + lvl = level.lower().replace("warning", "warn") + return LogLevel.levelWithName(lvl) + + +@attr.s +@implementer(ILogObserver) +class LogContextObserver(object): + """ + An ILogObserver which adds Synapse-specific log context information. + + Attributes: + observer (ILogObserver): The target parent observer. + """ + + observer = attr.ib() + + def __call__(self, event: dict) -> None: + """ + Consume a log event and emit it to the parent observer after filtering + and adding log context information. + + Args: + event (dict) + """ + # Filter out some useless events that Twisted outputs + if "log_text" in event: + if event["log_text"].startswith("DNSDatagramProtocol starting on "): + return + + if event["log_text"].startswith("(UDP Port "): + return + + if event["log_text"].startswith("Timing out client") or event[ + "log_format" + ].startswith("Timing out client"): + return + + context = LoggingContext.current_context() + + # Copy the context information to the log event. + if context is not None: + context.copy_to_twisted_log_entry(event) + else: + # If there's no logging context, not even the root one, we might be + # starting up or it might be from non-Synapse code. Log it as if it + # came from the root logger. + event["request"] = None + event["scope"] = None + + self.observer(event) + + +class PythonStdlibToTwistedLogger(logging.Handler): + """ + Transform a Python stdlib log message into a Twisted one. + """ + + def __init__(self, observer, *args, **kwargs): + """ + Args: + observer (ILogObserver): A Twisted logging observer. + *args, **kwargs: Args/kwargs to be passed to logging.Handler. + """ + self.observer = observer + super().__init__(*args, **kwargs) + + def emit(self, record: logging.LogRecord) -> None: + """ + Emit a record to Twisted's observer. + + Args: + record (logging.LogRecord) + """ + + self.observer( + { + "log_time": record.created, + "log_text": record.getMessage(), + "log_format": "{log_text}", + "log_namespace": record.name, + "log_level": stdlib_log_level_to_twisted(record.levelname), + } + ) + + +def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver: + """ + A log observer that formats events like the traditional log formatter and + sends them to `outFile`. + + Args: + outFile (file object): The file object to write to. + """ + + def formatEvent(_event: dict) -> str: + event = dict(_event) + event["log_level"] = event["log_level"].name.upper() + event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + ( + event.get("log_format", "{log_text}") or "{log_text}" + ) + return eventAsText(event, includeSystem=False) + "\n" + + return FileLogObserver(outFile, formatEvent) + + +class DrainType(Names): + CONSOLE = NamedConstant() + CONSOLE_JSON = NamedConstant() + CONSOLE_JSON_TERSE = NamedConstant() + FILE = NamedConstant() + FILE_JSON = NamedConstant() + NETWORK_JSON_TERSE = NamedConstant() + + +class OutputPipeType(Values): + stdout = ValueConstant(sys.__stdout__) + stderr = ValueConstant(sys.__stderr__) + + +@attr.s +class DrainConfiguration(object): + name = attr.ib() + type = attr.ib() + location = attr.ib() + options = attr.ib(default=None) + + +@attr.s +class NetworkJSONTerseOptions(object): + maximum_buffer = attr.ib(type=int) + + +DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}} + + +def parse_drain_configs( + drains: dict +) -> typing.Generator[DrainConfiguration, None, None]: + """ + Parse the drain configurations. + + Args: + drains (dict): A list of drain configurations. + + Yields: + DrainConfiguration instances. + + Raises: + ConfigError: If any of the drain configuration items are invalid. + """ + for name, config in drains.items(): + if "type" not in config: + raise ConfigError("Logging drains require a 'type' key.") + + try: + logging_type = DrainType.lookupByName(config["type"].upper()) + except ValueError: + raise ConfigError( + "%s is not a known logging drain type." % (config["type"],) + ) + + if logging_type in [ + DrainType.CONSOLE, + DrainType.CONSOLE_JSON, + DrainType.CONSOLE_JSON_TERSE, + ]: + location = config.get("location") + if location is None or location not in ["stdout", "stderr"]: + raise ConfigError( + ( + "The %s drain needs the 'location' key set to " + "either 'stdout' or 'stderr'." + ) + % (logging_type,) + ) + + pipe = OutputPipeType.lookupByName(location).value + + yield DrainConfiguration(name=name, type=logging_type, location=pipe) + + elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]: + if "location" not in config: + raise ConfigError( + "The %s drain needs the 'location' key set." % (logging_type,) + ) + + location = config.get("location") + if os.path.abspath(location) != location: + raise ConfigError( + "File paths need to be absolute, '%s' is a relative path" + % (location,) + ) + yield DrainConfiguration(name=name, type=logging_type, location=location) + + elif logging_type in [DrainType.NETWORK_JSON_TERSE]: + host = config.get("host") + port = config.get("port") + maximum_buffer = config.get("maximum_buffer", 1000) + yield DrainConfiguration( + name=name, + type=logging_type, + location=(host, port), + options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer), + ) + + else: + raise ConfigError( + "The %s drain type is currently not implemented." + % (config["type"].upper(),) + ) + + +def setup_structured_logging( + hs, + config, + log_config: dict, + logBeginner: LogBeginner = globalLogBeginner, + redirect_stdlib_logging: bool = True, +) -> LogPublisher: + """ + Set up Twisted's structured logging system. + + Args: + hs: The homeserver to use. + config (HomeserverConfig): The configuration of the Synapse homeserver. + log_config (dict): The log configuration to use. + """ + if config.no_redirect_stdio: + raise ConfigError( + "no_redirect_stdio cannot be defined using structured logging." + ) + + logger = Logger() + + if "drains" not in log_config: + raise ConfigError("The logging configuration requires a list of drains.") + + observers = [] + + for observer in parse_drain_configs(log_config["drains"]): + # Pipe drains + if observer.type == DrainType.CONSOLE: + logger.debug( + "Starting up the {name} console logger drain", name=observer.name + ) + observers.append(SynapseFileLogObserver(observer.location)) + elif observer.type == DrainType.CONSOLE_JSON: + logger.debug( + "Starting up the {name} JSON console logger drain", name=observer.name + ) + observers.append(jsonFileLogObserver(observer.location)) + elif observer.type == DrainType.CONSOLE_JSON_TERSE: + logger.debug( + "Starting up the {name} terse JSON console logger drain", + name=observer.name, + ) + observers.append( + TerseJSONToConsoleLogObserver(observer.location, metadata={}) + ) + + # File drains + elif observer.type == DrainType.FILE: + logger.debug("Starting up the {name} file logger drain", name=observer.name) + log_file = open(observer.location, "at", buffering=1, encoding="utf8") + observers.append(SynapseFileLogObserver(log_file)) + elif observer.type == DrainType.FILE_JSON: + logger.debug( + "Starting up the {name} JSON file logger drain", name=observer.name + ) + log_file = open(observer.location, "at", buffering=1, encoding="utf8") + observers.append(jsonFileLogObserver(log_file)) + + elif observer.type == DrainType.NETWORK_JSON_TERSE: + metadata = {"server_name": hs.config.server_name} + log_observer = TerseJSONToTCPLogObserver( + hs=hs, + host=observer.location[0], + port=observer.location[1], + metadata=metadata, + maximum_buffer=observer.options.maximum_buffer, + ) + log_observer.start() + observers.append(log_observer) + else: + # We should never get here, but, just in case, throw an error. + raise ConfigError("%s drain type cannot be configured" % (observer.type,)) + + publisher = LogPublisher(*observers) + log_filter = LogLevelFilterPredicate() + + for namespace, namespace_config in log_config.get( + "loggers", DEFAULT_LOGGERS + ).items(): + # Set the log level for twisted.logger.Logger namespaces + log_filter.setLogLevelForNamespace( + namespace, + stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")), + ) + + # Also set the log levels for the stdlib logger namespaces, to prevent + # them getting to PythonStdlibToTwistedLogger and having to be formatted + if "level" in namespace_config: + logging.getLogger(namespace).setLevel(namespace_config.get("level")) + + f = FilteringLogObserver(publisher, [log_filter]) + lco = LogContextObserver(f) + + if redirect_stdlib_logging: + stuff_into_twisted = PythonStdlibToTwistedLogger(lco) + stdliblogger = logging.getLogger() + stdliblogger.addHandler(stuff_into_twisted) + + # Always redirect standard I/O, otherwise other logging outputs might miss + # it. + logBeginner.beginLoggingTo([lco], redirectStandardIO=True) + + return publisher + + +def reload_structured_logging(*args, log_config=None) -> None: + warnings.warn( + "Currently the structured logging system can not be reloaded, doing nothing" + ) diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py new file mode 100644 index 000000000000..7f1e8f23fe89 --- /dev/null +++ b/synapse/logging/_terse_json.py @@ -0,0 +1,278 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Log formatters that output terse JSON. +""" + +import sys +from collections import deque +from ipaddress import IPv4Address, IPv6Address, ip_address +from math import floor +from typing.io import TextIO + +import attr +from simplejson import dumps + +from twisted.application.internet import ClientService +from twisted.internet.endpoints import ( + HostnameEndpoint, + TCP4ClientEndpoint, + TCP6ClientEndpoint, +) +from twisted.internet.protocol import Factory, Protocol +from twisted.logger import FileLogObserver, Logger +from twisted.python.failure import Failure + + +def flatten_event(event: dict, metadata: dict, include_time: bool = False): + """ + Flatten a Twisted logging event to an dictionary capable of being sent + as a log event to a logging aggregation system. + + The format is vastly simplified and is not designed to be a "human readable + string" in the sense that traditional logs are. Instead, the structure is + optimised for searchability and filtering, with human-understandable log + keys. + + Args: + event (dict): The Twisted logging event we are flattening. + metadata (dict): Additional data to include with each log message. This + can be information like the server name. Since the target log + consumer does not know who we are other than by host IP, this + allows us to forward through static information. + include_time (bool): Should we include the `time` key? If False, the + event time is stripped from the event. + """ + new_event = {} + + # If it's a failure, make the new event's log_failure be the traceback text. + if "log_failure" in event: + new_event["log_failure"] = event["log_failure"].getTraceback() + + # If it's a warning, copy over a string representation of the warning. + if "warning" in event: + new_event["warning"] = str(event["warning"]) + + # Stdlib logging events have "log_text" as their human-readable portion, + # Twisted ones have "log_format". For now, include the log_format, so that + # context only given in the log format (e.g. what is being logged) is + # available. + if "log_text" in event: + new_event["log"] = event["log_text"] + else: + new_event["log"] = event["log_format"] + + # We want to include the timestamp when forwarding over the network, but + # exclude it when we are writing to stdout. This is because the log ingester + # (e.g. logstash, fluentd) can add its own timestamp. + if include_time: + new_event["time"] = round(event["log_time"], 2) + + # Convert the log level to a textual representation. + new_event["level"] = event["log_level"].name.upper() + + # Ignore these keys, and do not transfer them over to the new log object. + # They are either useless (isError), transferred manually above (log_time, + # log_level, etc), or contain Python objects which are not useful for output + # (log_logger, log_source). + keys_to_delete = [ + "isError", + "log_failure", + "log_format", + "log_level", + "log_logger", + "log_source", + "log_system", + "log_time", + "log_text", + "observer", + "warning", + ] + + # If it's from the Twisted legacy logger (twisted.python.log), it adds some + # more keys we want to purge. + if event.get("log_namespace") == "log_legacy": + keys_to_delete.extend(["message", "system", "time"]) + + # Rather than modify the dictionary in place, construct a new one with only + # the content we want. The original event should be considered 'frozen'. + for key in event.keys(): + + if key in keys_to_delete: + continue + + if isinstance(event[key], (str, int, bool, float)) or event[key] is None: + # If it's a plain type, include it as is. + new_event[key] = event[key] + else: + # If it's not one of those basic types, write out a string + # representation. This should probably be a warning in development, + # so that we are sure we are only outputting useful data. + new_event[key] = str(event[key]) + + # Add the metadata information to the event (e.g. the server_name). + new_event.update(metadata) + + return new_event + + +def TerseJSONToConsoleLogObserver(outFile: TextIO, metadata: dict) -> FileLogObserver: + """ + A log observer that formats events to a flattened JSON representation. + + Args: + outFile: The file object to write to. + metadata: Metadata to be added to each log object. + """ + + def formatEvent(_event: dict) -> str: + flattened = flatten_event(_event, metadata) + return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n" + + return FileLogObserver(outFile, formatEvent) + + +@attr.s +class TerseJSONToTCPLogObserver(object): + """ + An IObserver that writes JSON logs to a TCP target. + + Args: + hs (HomeServer): The Homeserver that is being logged for. + host: The host of the logging target. + port: The logging target's port. + metadata: Metadata to be added to each log entry. + """ + + hs = attr.ib() + host = attr.ib(type=str) + port = attr.ib(type=int) + metadata = attr.ib(type=dict) + maximum_buffer = attr.ib(type=int) + _buffer = attr.ib(default=attr.Factory(deque), type=deque) + _writer = attr.ib(default=None) + _logger = attr.ib(default=attr.Factory(Logger)) + + def start(self) -> None: + + # Connect without DNS lookups if it's a direct IP. + try: + ip = ip_address(self.host) + if isinstance(ip, IPv4Address): + endpoint = TCP4ClientEndpoint( + self.hs.get_reactor(), self.host, self.port + ) + elif isinstance(ip, IPv6Address): + endpoint = TCP6ClientEndpoint( + self.hs.get_reactor(), self.host, self.port + ) + except ValueError: + endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port) + + factory = Factory.forProtocol(Protocol) + self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) + self._service.startService() + + def _write_loop(self) -> None: + """ + Implement the write loop. + """ + if self._writer: + return + + self._writer = self._service.whenConnected() + + @self._writer.addBoth + def writer(r): + if isinstance(r, Failure): + r.printTraceback(file=sys.__stderr__) + self._writer = None + self.hs.get_reactor().callLater(1, self._write_loop) + return + + try: + for event in self._buffer: + r.transport.write( + dumps(event, ensure_ascii=False, separators=(",", ":")).encode( + "utf8" + ) + ) + r.transport.write(b"\n") + self._buffer.clear() + except Exception as e: + sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),)) + + self._writer = False + self.hs.get_reactor().callLater(1, self._write_loop) + + def _handle_pressure(self) -> None: + """ + Handle backpressure by shedding events. + + The buffer will, in this order, until the buffer is below the maximum: + - Shed DEBUG events + - Shed INFO events + - Shed the middle 50% of the events. + """ + if len(self._buffer) <= self.maximum_buffer: + return + + # Strip out DEBUGs + self._buffer = deque( + filter(lambda event: event["level"] != "DEBUG", self._buffer) + ) + + if len(self._buffer) <= self.maximum_buffer: + return + + # Strip out INFOs + self._buffer = deque( + filter(lambda event: event["level"] != "INFO", self._buffer) + ) + + if len(self._buffer) <= self.maximum_buffer: + return + + # Cut the middle entries out + buffer_split = floor(self.maximum_buffer / 2) + + old_buffer = self._buffer + self._buffer = deque() + + for i in range(buffer_split): + self._buffer.append(old_buffer.popleft()) + + end_buffer = [] + for i in range(buffer_split): + end_buffer.append(old_buffer.pop()) + + self._buffer.extend(reversed(end_buffer)) + + def __call__(self, event: dict) -> None: + flattened = flatten_event(event, self.metadata, include_time=True) + self._buffer.append(flattened) + + # Handle backpressure, if it exists. + try: + self._handle_pressure() + except Exception: + # If handling backpressure fails,clear the buffer and log the + # exception. + self._buffer.clear() + self._logger.failure("Failed clearing backpressure") + + # Try and write immediately. + self._write_loop() diff --git a/synapse/logging/context.py b/synapse/logging/context.py index b456c31f7071..63379bfb9378 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -25,6 +25,7 @@ import logging import threading import types +from typing import Any, List from twisted.internet import defer, threads @@ -194,7 +195,7 @@ class LoggingContext(object): class Sentinel(object): """Sentinel to represent the root context""" - __slots__ = [] + __slots__ = [] # type: List[Any] def __str__(self): return "sentinel" @@ -202,6 +203,10 @@ def __str__(self): def copy_to(self, record): pass + def copy_to_twisted_log_entry(self, record): + record["request"] = None + record["scope"] = None + def start(self): pass @@ -330,6 +335,13 @@ def copy_to(self, record): # we also track the current scope: record.scope = self.scope + def copy_to_twisted_log_entry(self, record): + """ + Copy logging fields from this context to a Twisted log record. + """ + record["request"] = self.request + record["scope"] = self.scope + def start(self): if get_thread_id() != self.main_thread: logger.warning("Started logcontext %s on different thread", self) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index c6465c0386df..ec0ac547c18b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -47,9 +47,9 @@ "idna>=2.5", # validating SSL certs for IP addresses requires service_identity 18.1. "service_identity>=18.1.0", - # our logcontext handling relies on the ability to cancel inlineCallbacks - # (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7. - "Twisted>=18.7.0", + # Twisted 18.9 introduces some logger improvements that the structured + # logger utilises + "Twisted>=18.9.0", "treq>=15.1", # Twisted has required pyopenssl 16.0 since about Twisted 16.6. "pyopenssl>=16.0.0", diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py new file mode 100644 index 000000000000..a786de0233d2 --- /dev/null +++ b/tests/logging/test_structured.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import os.path +import shutil +import sys +import textwrap + +from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile + +from synapse.config.logger import setup_logging +from synapse.logging._structured import setup_structured_logging +from synapse.logging.context import LoggingContext + +from tests.unittest import DEBUG, HomeserverTestCase + + +class FakeBeginner(object): + def beginLoggingTo(self, observers, **kwargs): + self.observers = observers + + +class StructuredLoggingTestCase(HomeserverTestCase): + """ + Tests for Synapse's structured logging support. + """ + + def test_output_to_json_round_trip(self): + """ + Synapse logs can be outputted to JSON and then read back again. + """ + temp_dir = self.mktemp() + os.mkdir(temp_dir) + self.addCleanup(shutil.rmtree, temp_dir) + + json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json")) + + log_config = { + "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}} + } + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, self.hs.config, log_config, logBeginner=beginner + ) + + # Make a logger and send an event + logger = Logger( + namespace="tests.logging.test_structured", observer=beginner.observers[0] + ) + logger.info("Hello there, {name}!", name="wally") + + # Read the log file and check it has the event we sent + with open(json_log_file, "r") as f: + logged_events = list(eventsFromJSONLogFile(f)) + self.assertEqual(len(logged_events), 1) + + # The event pulled from the file should render fine + self.assertEqual( + eventAsText(logged_events[0], includeTimestamp=False), + "[tests.logging.test_structured#info] Hello there, wally!", + ) + + def test_output_to_text(self): + """ + Synapse logs can be outputted to text. + """ + temp_dir = self.mktemp() + os.mkdir(temp_dir) + self.addCleanup(shutil.rmtree, temp_dir) + + log_file = os.path.abspath(os.path.join(temp_dir, "out.log")) + + log_config = {"drains": {"file": {"type": "file", "location": log_file}}} + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, self.hs.config, log_config, logBeginner=beginner + ) + + # Make a logger and send an event + logger = Logger( + namespace="tests.logging.test_structured", observer=beginner.observers[0] + ) + logger.info("Hello there, {name}!", name="wally") + + # Read the log file and check it has the event we sent + with open(log_file, "r") as f: + logged_events = f.read().strip().split("\n") + self.assertEqual(len(logged_events), 1) + + # The event pulled from the file should render fine + self.assertTrue( + logged_events[0].endswith( + " - tests.logging.test_structured - INFO - None - Hello there, wally!" + ) + ) + + def test_collects_logcontext(self): + """ + Test that log outputs have the attached logging context. + """ + log_config = {"drains": {}} + + # Begin the logger with our config + beginner = FakeBeginner() + publisher = setup_structured_logging( + self.hs, self.hs.config, log_config, logBeginner=beginner + ) + + logs = [] + + publisher.addObserver(logs.append) + + # Make a logger and send an event + logger = Logger( + namespace="tests.logging.test_structured", observer=beginner.observers[0] + ) + + with LoggingContext("testcontext", request="somereq"): + logger.info("Hello there, {name}!", name="steve") + + self.assertEqual(len(logs), 1) + self.assertEqual(logs[0]["request"], "somereq") + + +class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + tempdir = self.mktemp() + os.mkdir(tempdir) + log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml")) + self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log")) + + config = self.default_config() + config["log_config"] = log_config_file + + with open(log_config_file, "w") as f: + f.write( + textwrap.dedent( + """\ + structured: true + + drains: + file: + type: file_json + location: %s + """ + % (self.homeserver_log,) + ) + ) + + self.addCleanup(self._sys_cleanup) + + return self.setup_test_homeserver(config=config) + + def _sys_cleanup(self): + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + # Do not remove! We need the logging system to be set other than WARNING. + @DEBUG + def test_log_output(self): + """ + When a structured logging config is given, Synapse will use it. + """ + setup_logging(self.hs, self.hs.config) + + # Make a logger and send an event + logger = Logger(namespace="tests.logging.test_structured") + + with LoggingContext("testcontext", request="somereq"): + logger.info("Hello there, {name}!", name="steve") + + with open(self.homeserver_log, "r") as f: + logged_events = [ + eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f) + ] + + logs = "\n".join(logged_events) + self.assertTrue("***** STARTING SERVER *****" in logs) + self.assertTrue("Hello there, steve!" in logs) diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py new file mode 100644 index 000000000000..514282591d8a --- /dev/null +++ b/tests/logging/test_terse_json.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from collections import Counter + +from twisted.logger import Logger + +from synapse.logging._structured import setup_structured_logging + +from tests.server import connect_client +from tests.unittest import HomeserverTestCase + +from .test_structured import FakeBeginner + + +class TerseJSONTCPTestCase(HomeserverTestCase): + def test_log_output(self): + """ + The Terse JSON outputter delivers simplified structured logs over TCP. + """ + log_config = { + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": 8000, + } + } + } + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, self.hs.config, log_config, logBeginner=beginner + ) + + logger = Logger( + namespace="tests.logging.test_terse_json", observer=beginner.observers[0] + ) + logger.info("Hello there, {name}!", name="wally") + + # Trigger the connection + self.pump() + + _, server = connect_client(self.reactor, 0) + + # Trigger data being sent + self.pump() + + # One log message, with a single trailing newline + logs = server.data.decode("utf8").splitlines() + self.assertEqual(len(logs), 1) + self.assertEqual(server.data.count(b"\n"), 1) + + log = json.loads(logs[0]) + + # The terse logger should give us these keys. + expected_log_keys = [ + "log", + "time", + "level", + "log_namespace", + "request", + "scope", + "server_name", + "name", + ] + self.assertEqual(set(log.keys()), set(expected_log_keys)) + + # It contains the data we expect. + self.assertEqual(log["name"], "wally") + + def test_log_backpressure_debug(self): + """ + When backpressure is hit, DEBUG logs will be shed. + """ + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": 8000, + "maximum_buffer": 10, + } + }, + } + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, + self.hs.config, + log_config, + logBeginner=beginner, + redirect_stdlib_logging=False, + ) + + logger = Logger( + namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] + ) + + # Send some debug messages + for i in range(0, 3): + logger.debug("debug %s" % (i,)) + + # Send a bunch of useful messages + for i in range(0, 7): + logger.info("test message %s" % (i,)) + + # The last debug message pushes it past the maximum buffer + logger.debug("too much debug") + + # Allow the reconnection + _, server = connect_client(self.reactor, 0) + self.pump() + + # Only the 7 infos made it through, the debugs were elided + logs = server.data.splitlines() + self.assertEqual(len(logs), 7) + + def test_log_backpressure_info(self): + """ + When backpressure is hit, DEBUG and INFO logs will be shed. + """ + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": 8000, + "maximum_buffer": 10, + } + }, + } + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, + self.hs.config, + log_config, + logBeginner=beginner, + redirect_stdlib_logging=False, + ) + + logger = Logger( + namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] + ) + + # Send some debug messages + for i in range(0, 3): + logger.debug("debug %s" % (i,)) + + # Send a bunch of useful messages + for i in range(0, 10): + logger.warn("test warn %s" % (i,)) + + # Send a bunch of info messages + for i in range(0, 3): + logger.info("test message %s" % (i,)) + + # The last debug message pushes it past the maximum buffer + logger.debug("too much debug") + + # Allow the reconnection + client, server = connect_client(self.reactor, 0) + self.pump() + + # The 10 warnings made it through, the debugs and infos were elided + logs = list(map(json.loads, server.data.decode("utf8").splitlines())) + self.assertEqual(len(logs), 10) + + self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) + + def test_log_backpressure_cut_middle(self): + """ + When backpressure is hit, and no more DEBUG and INFOs cannot be culled, + it will cut the middle messages out. + """ + log_config = { + "loggers": {"synapse": {"level": "DEBUG"}}, + "drains": { + "tersejson": { + "type": "network_json_terse", + "host": "127.0.0.1", + "port": 8000, + "maximum_buffer": 10, + } + }, + } + + # Begin the logger with our config + beginner = FakeBeginner() + setup_structured_logging( + self.hs, + self.hs.config, + log_config, + logBeginner=beginner, + redirect_stdlib_logging=False, + ) + + logger = Logger( + namespace="synapse.logging.test_terse_json", observer=beginner.observers[0] + ) + + # Send a bunch of useful messages + for i in range(0, 20): + logger.warn("test warn", num=i) + + # Allow the reconnection + client, server = connect_client(self.reactor, 0) + self.pump() + + # The first five and last five warnings made it through, the debugs and + # infos were elided + logs = list(map(json.loads, server.data.decode("utf8").splitlines())) + self.assertEqual(len(logs), 10) + self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10}) + self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs]) diff --git a/tests/server.py b/tests/server.py index e573c4e4c5ce..c8269619b1fd 100644 --- a/tests/server.py +++ b/tests/server.py @@ -11,9 +11,13 @@ from twisted.internet._resolver import SimpleResolverComplexifier from twisted.internet.defer import Deferred, fail, succeed from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple +from twisted.internet.interfaces import ( + IReactorPluggableNameResolver, + IReactorTCP, + IResolverSimple, +) from twisted.python.failure import Failure -from twisted.test.proto_helpers import MemoryReactorClock +from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock from twisted.web.http import unquote from twisted.web.http_headers import Headers @@ -465,3 +469,22 @@ def flush(self, maxbytes=None): self.buffer = self.buffer[len(to_write) :] if self.buffer and self.autoflush: self._reactor.callLater(0.0, self.flush) + + +def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol: + """ + Connect a client to a fake TCP transport. + + Args: + reactor + factory: The connecting factory to build. + """ + factory = reactor.tcpClients[client_id][2] + client = factory.buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, reactor)) + client.makeConnection(FakeTransport(server, reactor)) + + reactor.tcpClients.pop(client_id) + + return client, server diff --git a/tox.ini b/tox.ini index 09b4b8fc3ca7..f9a3b7e49ac6 100644 --- a/tox.ini +++ b/tox.ini @@ -146,3 +146,13 @@ commands = coverage combine coverage xml codecov -X gcov + +[testenv:mypy] +basepython = python3.5 +deps = + {[base]deps} + mypy +extras = all +commands = mypy --ignore-missing-imports \ + synapse/logging/_structured.py \ + synapse/logging/_terse_json.py \ No newline at end of file