diff --git a/tools/logs2seq.py b/tools/logs2seq.py index 9ee2559d1..002aa9f16 100644 --- a/tools/logs2seq.py +++ b/tools/logs2seq.py @@ -53,54 +53,57 @@ s3 = boto3.client('s3') s3.download_file(args.bucket_name, obj_name, file_name) + +class LogSender: + def __init__(self, url: str): + self.url = url + self.batch = b"" + self.ctr = 0 + + def __enter__(self): + return self + + def sendlog(self, line: str): + if line.startswith("{"): + try: + parsed = json.loads(line) + if "@t" not in parsed: + return + self.ctr = self.ctr + 1 + log_message = bytes(line + "\n", "utf-8") + if len(self.batch) + len(log_message) > 100000: + requests.post(self.url, data=self.batch) + self.batch = log_message + else: + self.batch += log_message + except JSONDecodeError as e: + logger.warning(f"Failed to parse JSON: {e}") + + def __exit__(self, exception_type, exception_value, traceback): + if self.batch != b"": + requests.post(self.url, data=self.batch) + logger.info(f"sent : {self.ctr}") + + def process_json_log(url: str, file_name: str): - ctr = 0 - tosend = b"" with open(file_name, "r") as file: - for line in file.readlines(): - if line.startswith("{"): - try: - parsed = json.loads(line) - if "@t" not in parsed: - continue - ctr = ctr + 1 - log_message = bytes(line + "\n", "utf-8") - if len(tosend) + len(log_message) > 100000: - requests.post(url, data=tosend) - tosend = log_message - else: - tosend += log_message - except JSONDecodeError as e: - logger.warning(f"Failed to parse JSON: {e}") - logger.info(f"sent : {ctr}") - if tosend != b"": - requests.post(url, data=tosend) + with LogSender(url) as log_sender: + for line in file.readlines(): + log_sender.sendlog(line) + def process_jsongz_log(url: str, file_name: str): - ctr = 0 - tosend = b"" with gzip.open(file_name, "r") as file: - for line in file.read().decode("utf-8").split("\n"): - if line.startswith("{"): - try: - parsed = json.loads(line) - if "@t" not in parsed: - continue - ctr = ctr + 1 - log_message = bytes(line + "\n", "utf-8") - if len(tosend) + len(log_message) > 100000: - requests.post(url, data=tosend) - tosend = log_message - else: - tosend += log_message - except JSONDecodeError as e: - logger.warning(f"Failed to parse JSON: {e}") - logger.info(f"sent : {ctr}") - if tosend != b"": - requests.post(url, data=tosend) + with LogSender(url) as log_sender: + for line in file.read().decode("utf-8").split("\n"): + log_sender.sendlog(line) + + if file_name.endswith(".json.tar.gz"): process_jsongz_log(args.url, file_name) elif file_name.endswith(".json"): process_json_log(args.url, file_name) + +