Skip to content

Commit

Permalink
refactor: Create a class LogSender
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyzyngeraneo committed Sep 19, 2023
1 parent ee8d926 commit dff0303
Showing 1 changed file with 43 additions and 40 deletions.
83 changes: 43 additions & 40 deletions tools/logs2seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,54 +53,57 @@
s3 = boto3.client('s3')
s3.download_file(args.bucket_name, obj_name, file_name)


class LogSender:
def __init__(self, url: str):
self.url = url
self.batch = b""
self.ctr = 0

def __enter__(self):
return self

def sendlog(self, line: str):
if line.startswith("{"):
try:
parsed = json.loads(line)
if "@t" not in parsed:
return
self.ctr = self.ctr + 1
log_message = bytes(line + "\n", "utf-8")
if len(self.batch) + len(log_message) > 100000:
requests.post(self.url, data=self.batch)
self.batch = log_message
else:
self.batch += log_message
except JSONDecodeError as e:
logger.warning(f"Failed to parse JSON: {e}")

def __exit__(self, exception_type, exception_value, traceback):
if self.batch != b"":
requests.post(self.url, data=self.batch)
logger.info(f"sent : {self.ctr}")


def process_json_log(url: str, file_name: str):
ctr = 0
tosend = b""
with open(file_name, "r") as file:
for line in file.readlines():
if line.startswith("{"):
try:
parsed = json.loads(line)
if "@t" not in parsed:
continue
ctr = ctr + 1
log_message = bytes(line + "\n", "utf-8")
if len(tosend) + len(log_message) > 100000:
requests.post(url, data=tosend)
tosend = log_message
else:
tosend += log_message
except JSONDecodeError as e:
logger.warning(f"Failed to parse JSON: {e}")
logger.info(f"sent : {ctr}")
if tosend != b"":
requests.post(url, data=tosend)
with LogSender(url) as log_sender:
for line in file.readlines():
log_sender.sendlog(line)



def process_jsongz_log(url: str, file_name: str):
ctr = 0
tosend = b""
with gzip.open(file_name, "r") as file:
for line in file.read().decode("utf-8").split("\n"):
if line.startswith("{"):
try:
parsed = json.loads(line)
if "@t" not in parsed:
continue
ctr = ctr + 1
log_message = bytes(line + "\n", "utf-8")
if len(tosend) + len(log_message) > 100000:
requests.post(url, data=tosend)
tosend = log_message
else:
tosend += log_message
except JSONDecodeError as e:
logger.warning(f"Failed to parse JSON: {e}")
logger.info(f"sent : {ctr}")
if tosend != b"":
requests.post(url, data=tosend)
with LogSender(url) as log_sender:
for line in file.read().decode("utf-8").split("\n"):
log_sender.sendlog(line)



if file_name.endswith(".json.tar.gz"):
process_jsongz_log(args.url, file_name)
elif file_name.endswith(".json"):
process_json_log(args.url, file_name)


0 comments on commit dff0303

Please sign in to comment.