Skip to content

Commit

Permalink
Merge pull request #17 from Zinkelburger/stop_task_refactor
Browse files Browse the repository at this point in the history
Refactor task.py abstract `stop`
  • Loading branch information
wizhaoredhat authored Apr 25, 2024
2 parents 5553bbf + ca3d714 commit 7a046cd
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 151 deletions.
25 changes: 13 additions & 12 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
TFT_TESTS = "tft-tests"


@dataclass
class Result:
out: str
err: str
returncode: int


class TestType(Enum):
IPERF_TCP = 1
IPERF_UDP = 2
Expand Down Expand Up @@ -79,26 +86,20 @@ class TestMetadata:


@dataclass
class IperfOutput:
tft_metadata: TestMetadata
class BaseOutput:
command: str
result: dict


@dataclass
class PluginOutput:
plugin_metadata: dict
command: str
result: dict
name: str
class IperfOutput(BaseOutput):
tft_metadata: TestMetadata


@dataclass
class RxTxData:
rx_start: int
tx_start: int
rx_end: int
tx_end: int
class PluginOutput(BaseOutput):
plugin_metadata: dict
name: str


@dataclass
Expand Down
6 changes: 1 addition & 5 deletions host.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import subprocess
from collections import namedtuple
import os
import json
import shlex
import sys
from logger import logger


Result = namedtuple("Result", "out err returncode")

from common import Result

class Host:
def ipa(self) -> dict:
Expand Down
41 changes: 15 additions & 26 deletions iperf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from testConfig import TestConfig
from thread import ReturnValueThread
from task import Task
from host import Result
from common import Result
from testSettings import TestSettings
import json

Expand Down Expand Up @@ -74,7 +74,7 @@ def setup(self):

logger.info(f"Running {cmd}")

def server(self, cmd: str):
def server(self, cmd: str) -> Result:
if self.connection_mode == ConnectionMode.EXTERNAL_IP:
return self.lh.run(cmd)
elif self.exec_persistent:
Expand All @@ -84,21 +84,15 @@ def server(self, cmd: str):
self.exec_thread = ReturnValueThread(target=server, args=(self, cmd))
self.exec_thread.start()

def run(self, duration: int):
def run(self, duration: int) -> None:
pass

def stop(self):
logger.info(f"Stopping execution on {self.pod_name}")
r = self.exec_thread.join()
if r.returncode != 0:
logger.error(
f"Error occured while stopping Iperf server: errcode: {r.returncode} err {r.err}"
)
logger.debug(f"IperfServer.stop(): {r.out}")

def output(self, out: common.TftAggregateOutput):
def output(self, out: common.TftAggregateOutput) -> None:
pass

def generate_output(self, data: str) -> common.BaseOutput:
return common.BaseOutput("", {})


class IperfClient(Task):
def __init__(self, tc: TestConfig, ts: TestSettings, server: IperfServer):
Expand Down Expand Up @@ -141,8 +135,8 @@ def __init__(self, tc: TestConfig, ts: TestSettings, server: IperfServer):
common.j2_render(self.in_file_template, self.out_file_yaml, self.template_args)
logger.info(f"Generated Client Pod Yaml {self.out_file_yaml}")

def run(self, duration: int):
def client(self, cmd: str):
def run(self, duration: int) -> None:
def client(self, cmd: str) -> Result:
return self.run_oc(cmd)

server_ip = self.get_target_ip()
Expand All @@ -154,25 +148,20 @@ def client(self, cmd: str):
self.exec_thread = ReturnValueThread(target=client, args=(self, self.cmd))
self.exec_thread.start()

def stop(self):
logger.info(f"Stopping execution on {self.pod_name}")
r = self.exec_thread.join()
if r.returncode != 0:
logger.error(r)
logger.debug(f"IperfClient.stop(): {r.out}")
data = json.loads(r.out)
self._output = self.generate_output(data)

def generate_output(self, data: dict) -> IperfOutput:
def generate_output(self, data: str) -> IperfOutput:
parsed_data = json.loads(data)
json_dump = IperfOutput(
tft_metadata=self.ts.get_test_metadata(),
command=self.cmd,
result=data,
result=parsed_data,
)
return json_dump

def output(self, out: common.TftAggregateOutput):
# Return machine-readable output to top level
assert isinstance(
self._output, IperfOutput
), f"Expected variable to be of type IperfOutput, got {type(self._output)} instead."
out.flow_test = self._output

# Print summary to console logs
Expand Down
3 changes: 2 additions & 1 deletion k8sClient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import kubernetes
import yaml
import host
from common import Result
from typing import List


Expand All @@ -23,6 +24,6 @@ def get_nodes_with_label(self, label_selector: str) -> List[str]:
for e in self._client.list_node(label_selector=label_selector).items
]

def oc(self, cmd: str) -> host.Result:
def oc(self, cmd: str) -> Result:
lh = host.LocalHost()
return lh.run(f"oc --kubeconfig {self._kc} {cmd} ")
27 changes: 11 additions & 16 deletions measureCpu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from common import TFT_TOOLS_IMG, PluginOutput, j2_render, TftAggregateOutput
from common import TFT_TOOLS_IMG, PluginOutput, j2_render, TftAggregateOutput, Result
from logger import logger
from testConfig import TestConfig
from thread import ReturnValueThread
from task import Task
import jc
from typing import List, Dict, Any, cast


class MeasureCPU(Task):
Expand All @@ -24,8 +25,8 @@ def __init__(self, tc: TestConfig, node_name: str, tenant: bool):
j2_render(self.in_file_template, self.out_file_yaml, self.template_args)
logger.info(f"Generated Server Pod Yaml {self.out_file_yaml}")

def run(self, duration: int):
def stat(self, cmd: str):
def run(self, duration: int) -> None:
def stat(self, cmd: str) -> Result:
return self.run_oc(cmd)

# 1 report at intervals defined by the duration in seconds.
Expand All @@ -34,34 +35,28 @@ def stat(self, cmd: str):
self.exec_thread.start()
logger.info(f"Running {self.cmd}")

def stop(self):
logger.info(f"Stopping measureCPU execution on {self.pod_name}")
r = self.exec_thread.join()
if r.returncode != 0:
logger.info(r)
logger.debug(f"measureCpu.stop(): {r.out}")
data = jc.parse("mpstat", r.out)
p_idle = data[0]["percent_idle"]
logger.info(f"Idle on {self.node_name} = {p_idle}%")
self._output = self.generate_output(data)

def output(self, out: TftAggregateOutput):
# Return machine-readable output to top level
assert isinstance(
self._output, PluginOutput
), f"Expected variable to be of type PluginOutput, got {type(self._output)} instead."
out.plugins.append(self._output)

# Print summary to console logs
p_idle = self._output.result["percent_idle"]
logger.info(f"Idle on {self.node_name} = {p_idle}%")

# TODO: We are currently only storing the "cpu: all" data from mpstat
def generate_output(self, data) -> PluginOutput:
def generate_output(self, data: str) -> PluginOutput:
# satisfy the linter. jc.parse returns a list of dicts in this case
parsed_data = cast(List[Dict[str, Any]], jc.parse("mpstat", data))
return PluginOutput(
plugin_metadata={
"name": "MeasureCPU",
"node_name": self.node_name,
"pod_name": self.pod_name,
},
command=self.cmd,
result=data[0],
result=parsed_data[0],
name="measure_cpu",
)
24 changes: 11 additions & 13 deletions measurePower.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from testConfig import TestConfig
from thread import ReturnValueThread
from task import Task
from host import Result
from common import Result
import re
import time
import json


class MeasurePower(Task):
Expand All @@ -26,7 +27,7 @@ def __init__(self, tc: TestConfig, node_name: str, tenant: bool):
j2_render(self.in_file_template, self.out_file_yaml, self.template_args)
logger.info(f"Generated Server Pod Yaml {self.out_file_yaml}")

def run(self, duration: int):
def run(self, duration: int) -> None:
def extract(r: Result) -> int:
for e in r.out.split("\n"):
if "Instantaneous power reading" in e:
Expand All @@ -36,7 +37,7 @@ def extract(r: Result) -> int:
logger.error(f"Could not find Instantaneous power reading: {e}.")
return 0

def stat(self, cmd: str, duration: int):
def stat(self, cmd: str, duration: int) -> Result:
end_time = time.time() + float(duration)
total_pwr = 0
iteration = 0
Expand All @@ -62,28 +63,25 @@ def stat(self, cmd: str, duration: int):
self.exec_thread.start()
logger.info(f"Running {self.cmd}")

def stop(self):
logger.info(f"Stopping measurePower execution on {self.pod_name}")
r = self.exec_thread.join()
if r.returncode != 0:
logger.error(r)
self._output = self.generate_output(data=r.out)

def output(self, out: TftAggregateOutput):
def output(self, out: TftAggregateOutput) -> None:
# Return machine-readable output to top level
assert isinstance(
self._output, PluginOutput
), f"Expected variable to be of type PluginOutput, got {type(self._output)} instead."
out.plugins.append(self._output)

# Print summary to console logs
logger.info(f"measurePower results: {self._output.result}")

def generate_output(self, data) -> PluginOutput:
def generate_output(self, data: str) -> PluginOutput:
parsed_data = json.loads(data)
return PluginOutput(
plugin_metadata={
"name": "MeasurePower",
"node_name": self.node_name,
"pod_name": self.pod_name,
},
command=self.cmd,
result=data,
result=parsed_data,
name="measure_power",
)
26 changes: 21 additions & 5 deletions task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, tc: TestConfig, index: int, node_name: str, tenant: bool):
self.template_args["node_name"] = self.node_name
self.tc = tc

def run_oc(self, cmd: str) -> host.Result:
def run_oc(self, cmd: str) -> common.Result:
if self.tenant:
r = self.tc.client_tenant.oc(cmd)
else:
Expand Down Expand Up @@ -105,12 +105,24 @@ def setup(self):
sys.exit(-1)

@abstractmethod
def run(self, duration: int):
def run(self, duration: int) -> None:
raise NotImplementedError("Must implement run()")

@abstractmethod
def stop(self):
raise NotImplementedError("Must implement stop()")
def stop(self) -> None:
class_name = self.__class__.__name__
logger.info(f"Stopping execution on {class_name}")
self.exec_thread.join()
if self.exec_thread.result is not None:
r = self.exec_thread.result
if r.returncode != 0:
logger.error(
f"Error occurred while stopping {class_name}: errcode: {r.returncode} err {r.err}"
)
logger.debug(f"{class_name}.stop(): {r.out}")
self._output = self.generate_output(data=r.out)
else:
logger.error(f"Thread {class_name} did not return a result")
self._output = common.BaseOutput("", {})

"""
output() should be called to store the results of this task in a PluginOutput class object, and return this by appending the instance to the
Expand All @@ -121,3 +133,7 @@ def stop(self):
@abstractmethod
def output(self, out: common.TftAggregateOutput):
raise NotImplementedError("Must implement output()")

@abstractmethod
def generate_output(self, data: str) -> common.BaseOutput:
raise NotImplementedError("Must implement generate_output()")
16 changes: 9 additions & 7 deletions thread.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
from threading import Thread
from logger import logger
from common import Result
from typing import Callable, Any, Optional


class ReturnValueThread(Thread):
def __init__(self, *args, **kwargs):
self._target = None
def __init__(self, *args: Any, **kwargs: Any):
self._target: Callable[..., Result] = None
self._args = args
self._kwargs = kwargs
super().__init__(*args, **kwargs)
self.result = None
self.result: Optional[Result] = None

def run(self):
def run(self) -> None:
if self._target is None:
logger.error("Called ReturnValueThread with target=None")
return
try:
self.result = self._target(*self._args, **self._kwargs)
except Exception as e:
logger.error(e)
pass
logger.error(f"Thread with target {self._target} experienced exception {e}")

def join(self, *args, **kwargs):
def join(self, *args: Any, **kwargs: Any) -> Result:
super().join(*args, **kwargs)
return self.result
2 changes: 1 addition & 1 deletion trafficFlowTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _run(
self.tft_output.append(output)

def _run_test_case(self, tests: dict, test_id: int):
duration = tests["duration"]
duration = int(tests["duration"])
# TODO Allow for multiple connections / instances to run simultaneously
for connections in tests["connections"]:
logger.info(f"Starting {connections['name']}")
Expand Down
Loading

0 comments on commit 7a046cd

Please sign in to comment.