Skip to content

Commit

Permalink
Truncate master and worker logs
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewbaldwin44 committed Jun 4, 2024
1 parent 54f0e96 commit 6e95899
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 9 deletions.
22 changes: 14 additions & 8 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,9 +1422,19 @@ def stats_reporter(self) -> NoReturn:
logger.error(f"Temporary connection lost to master server: {e}, will retry later.")
gevent.sleep(WORKER_REPORT_INTERVAL)

def logs_reporter(self) -> NoReturn:
def logs_reporter(self) -> None:
while True:
self._send_logs()
current_logs = get_logs()

if (len(current_logs) - len(self.logs)) > 10:
current_logs = [*self.logs, "The worker experienced an issue and will stop outputting logs"]
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})
self._send_logs(current_logs)
break
if len(current_logs) > len(self.logs):
self._send_logs(current_logs)

self.logs = current_logs
gevent.sleep(WORKER_LOG_REPORT_INTERVAL)

def send_message(self, msg_type: str, data: dict[str, Any] | None = None, client_id: str | None = None) -> None:
Expand All @@ -1443,12 +1453,8 @@ def _send_stats(self) -> None:
self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)
self.client.send(Message("stats", data, self.client_id))

def _send_logs(self) -> None:
current_logs = get_logs()

if len(current_logs) > len(self.logs):
self.logs = current_logs
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})
def _send_logs(self, current_logs) -> None:
self.send_message("logs", {"worker_id": self.client_id, "logs": current_logs})

def connect_to_master(self):
self.retry += 1
Expand Down
33 changes: 33 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -3992,6 +3992,39 @@ def my_task(self):
self.assertEqual(worker.client_id, client.outbox[3].data.get("worker_id"))
worker.quit()

def test_quit_worker_logs(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
short_time = 0.05

log_handler = LogReader()
log_handler.name = "log_reader"
log_handler.setLevel(logging.INFO)
logger = logging.getLogger("root")
logger.addHandler(log_handler)
log_line = "spamming log"

for _ in range(11):
logger.info(log_line)

worker = self.get_runner(environment=Environment(), user_classes=[MyUser], client=client)

gevent.sleep(short_time)

self.assertEqual("logs", client.outbox[3].type)
self.assertEqual(
"The worker experienced an issue and will stop outputting logs",
client.outbox[3].data.get("logs", [])[0],
)
self.assertEqual(worker.client_id, client.outbox[3].data.get("worker_id"))
worker.quit()


class TestMessageSerializing(unittest.TestCase):
def test_message_serialize(self):
Expand Down
4 changes: 3 additions & 1 deletion locust/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ def tasks() -> dict[str, dict[str, dict[str, float]]]:
@app.route("/logs")
@self.auth_required_if_enabled
def logs():
return jsonify({"master": get_logs(), "workers": self.environment.worker_logs})
# truncate to get the tail of the logs
master_logs = get_logs()[-500:]
return jsonify({"master": master_logs, "workers": self.environment.worker_logs})

@app.route("/login")
def login():
Expand Down

0 comments on commit 6e95899

Please sign in to comment.