Skip to content

Commit

Permalink
Merge pull request #208 from stratosphereips/ondra-new-test-components
Browse files Browse the repository at this point in the history
Ondra new test components
  • Loading branch information
ondrej-lukas authored Apr 23, 2024
2 parents 3d680dd + 70aa411 commit 0aca96b
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 9 deletions.
43 changes: 37 additions & 6 deletions env/game_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def is_private(self):
"""
try:
return ipaddress.IPv4Network(self.ip).is_private
except:
except ipaddress.AddressValueError:
# The IP is a string
# In the concepts, 'external' is the string used for external hosts.
if self.ip != 'external':
Expand Down Expand Up @@ -83,8 +83,8 @@ def is_private(self):
Return if a network is private or not
"""
try:
return ipaddress.IPv4Network(f'{self.ip}/{self.mask}').is_private
except:
return ipaddress.IPv4Network(f'{self.ip}/{self.mask}',strict=False).is_private
except ipaddress.AddressValueError:
# If we are dealing with strings, assume they are local networks
return True

Expand Down Expand Up @@ -192,11 +192,40 @@ def parameters(self)->dict:

@property
def as_dict(self)->dict:
return {"type": str(self.type), "params": {k:str(v) for k,v in self.parameters.items()} }
params = {}
for k,v in self.parameters.items():
if isinstance(v, Service):
params[k] = vars(v)
elif isinstance(v, Data):
params[k] = vars(v)
elif isinstance(v, AgentInfo):
params[k] = vars(v)
else:
params[k] = str(v)
return {"type": str(self.type), "params": params}

@classmethod
def from_dict(cls, data_dict:dict):
action = Action(action_type=ActionType.from_string(data_dict["type"]), params=data_dict["params"])
action_type = ActionType.from_string(data_dict["type"])
params = {}
for k,v in data_dict["params"].items():
match k:
case "source_host":
params[k] = IP(v)
case "target_host":
params[k] = IP(v)
case "target_network":
net,mask = v.split("/")
params[k] = Network(net ,int(mask))
case "target_service":
params[k] = Service(**v)
case "data":
params[k] = Data(**v)
case "agent_info":
params[k] = AgentInfo(**v)
case _:
raise ValueError(f"Unsupported Value in {k}:{v}")
action = Action(action_type=action_type, params=params)
return action

def __repr__(self) -> str:
Expand All @@ -211,7 +240,9 @@ def __eq__(self, __o: object) -> bool:
return False

def __hash__(self) -> int:
return hash(self._type) + hash("".join(sorted(self._parameters)))
sorted_params = sorted(self._parameters.items(), key= lambda x: x[0])
sorted_params = [f"{x}{str(y)}" for x,y in sorted_params]
return hash(self._type) + hash("".join(sorted_params))

def as_json(self)->str:
ret_dict = {"action_type":str(self.type)}
Expand Down
192 changes: 189 additions & 3 deletions tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import json
from os import path
sys.path.append( path.dirname(path.dirname( path.abspath(__file__) ) ))
from env.game_components import ActionType, Action, IP, Data, Network, Service, GameState
from env.game_components import ActionType, Action, IP, Data, Network, Service, GameState, AgentInfo

class TestComponentsIP:
"""
Expand Down Expand Up @@ -35,6 +35,14 @@ def test_ip_not_str(self):
ip_2 = "192.168.2.15"
assert ip_1 != ip_2

def test_ip_is_private(self):
ip_1 = IP("192.168.1.15")
assert ip_1.is_private() is True

def test_ip_is_not_private(self):
ip_1 = IP("192.143.1.15")
assert ip_1.is_private() is False

class TestServices:
"""
Tests related to the Service dataclass
Expand Down Expand Up @@ -108,6 +116,14 @@ def test_net_not_equal(self):
net_2 = Network("192.168.1.3", 16)
assert net_1 != net_2

def test_net_is_not_private(self):
net_1 = Network("125.36.21.3", 16)
assert net_1.is_private() is False

def test_net_is_private(self):
net_1 = Network("192.168.1.0", 16)
assert net_1.is_private() is True

class TestData:
"""
Test cases for the Data class
Expand Down Expand Up @@ -246,6 +262,27 @@ def test_action_not_equal_different_action_type(self):
action2 = Action(action_type=ActionType.FindData,
params={"source_host":IP("192.168.12.11"),"target_host":IP("172.16.1.22")})
assert action != action2

def test_action_hash(self):
action = Action(
action_type=ActionType.FindServices,
params={"target_host":IP("172.16.1.22"),"source_host":IP("192.168.12.11")}
)
action2 = Action(
action_type=ActionType.FindServices,
params={"target_host":IP("172.16.1.22"), "source_host":IP("192.168.12.11")}
)
action3 = Action(
action_type=ActionType.FindServices,
params={"target_host":IP("172.16.13.48"), "source_host":IP("192.168.12.11")}
)
action4 = Action(
action_type=ActionType.FindData,
params={"target_host":IP("172.16.1.25"), "source_host":IP("192.168.12.11")}
)
assert hash(action) == hash(action2)
assert hash(action) != hash(action3)
assert hash(action2) != hash(action4)

def test_action_set_member(self):
action_set = set()
Expand All @@ -266,10 +303,10 @@ def test_action_set_member(self):
#reverse params order
assert Action(action_type=ActionType.ExploitService, params={"target_service": Service("ssh", "passive", "0.23", False), "target_host":IP("172.16.1.24"), "source_host":IP("192.168.12.11")})in action_set
assert Action(action_type=ActionType.ScanNetwork, params={"target_network":Network("172.16.1.12", 24), "source_host":IP("192.168.12.11")}) in action_set
assert Action(action_type=ActionType.ExfiltrateData, params={"target_host":IP("172.16.1.3"), "source_host": IP("172.16.1.2"), "data":Data("User2", "PublicKey")}) in action_set
assert Action(action_type=ActionType.ExfiltrateData, params={"target_host":IP("172.16.1.3"), "source_host": IP("172.16.1.2"), "data":Data("User2", "PublicKey")}) in action_set
#reverse params orders
assert Action(action_type=ActionType.ExfiltrateData, params={"source_host": IP("172.16.1.2"), "target_host":IP("172.16.1.3"), "data":Data("User2", "PublicKey")}) in action_set

def test_action_as_json(self):
# Scan Network
action = Action(action_type=ActionType.ScanNetwork,
Expand Down Expand Up @@ -370,6 +407,155 @@ def test_action_exfiltrate_serialization(self):
action_json = action.as_json()
new_action = Action.from_json(action_json)
assert action == new_action

def test_action_exfiltrate_join_game(self):
action = Action(
action_type=ActionType.JoinGame,
params={
"agent_info": AgentInfo(name="TestingAgent", role="attacker"),
}
)
action_json = action.as_json()
new_action = Action.from_json(action_json)
assert action == new_action

def test_action_exfiltrate_reset_game(self):
action = Action(
action_type=ActionType.ResetGame,
params={}
)
action_json = action.as_json()
new_action = Action.from_json(action_json)
assert action == new_action

def test_action_exfiltrate_quit_game(self):
action = Action(
action_type=ActionType.QuitGame,
params={}
)
action_json = action.as_json()
new_action = Action.from_json(action_json)
assert action == new_action

def test_action_to_dict_scan_network(self):
action = Action(
action_type=ActionType.ScanNetwork,
params={
"target_network":Network("172.16.1.12", 24),
"source_host": IP("172.16.1.2")
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["target_network"] == "172.16.1.12/24"
assert action_dict["params"]["source_host"] == "172.16.1.2"

def test_action_to_dict_find_services(self):
action = Action(
action_type=ActionType.FindServices,
params={
"target_host":IP("172.16.1.22"),
"source_host": IP("172.16.1.2")
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["target_host"] == "172.16.1.22"
assert action_dict["params"]["source_host"] == "172.16.1.2"

def test_action_to_dict_find_data(self):
action = Action(
action_type=ActionType.FindData,
params={
"target_host":IP("172.16.1.22"),
"source_host": IP("172.16.1.2")
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["target_host"] == "172.16.1.22"
assert action_dict["params"]["source_host"] == "172.16.1.2"

def test_action_to_dict_exploit_service(self):
action = Action(
action_type=ActionType.ExploitService,
params={
"source_host": IP("172.16.1.2"),
"target_host":IP("172.16.1.24"),
"target_service": Service("ssh", "passive", "0.23", False)
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["target_host"] == "172.16.1.24"
assert action_dict["params"]["source_host"] == "172.16.1.2"
assert action_dict["params"]["target_service"]["name"] == "ssh"
assert action_dict["params"]["target_service"]["type"] == "passive"
assert action_dict["params"]["target_service"]["version"] == "0.23"
assert action_dict["params"]["target_service"]["is_local"] is False

def test_action_to_dict_exfiltrate_data(self):
action = Action(
action_type=ActionType.ExfiltrateData,
params={
"target_host":IP("172.16.1.3"),
"source_host": IP("172.16.1.2"),
"data":Data("User2", "PublicKey")
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["target_host"] == "172.16.1.3"
assert action_dict["params"]["source_host"] == "172.16.1.2"
assert action_dict["params"]["data"]["owner"] == "User2"
assert action_dict["params"]["data"]["id"] == "PublicKey"

def test_action_to_dict_join_game(self):
action = Action(
action_type=ActionType.JoinGame,
params={
"agent_info": AgentInfo(name="TestingAgent", role="attacker"),
"source_host": IP("172.16.1.2")
}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert action_dict["params"]["agent_info"]["name"] == "TestingAgent"
assert action_dict["params"]["agent_info"]["role"] == "attacker"

def test_action_to_dict_reset_game(self):
action = Action(
action_type=ActionType.ResetGame,
params={}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert len(action_dict["params"]) == 0

def test_action_to_dict_quit_game(self):
action = Action(
action_type=ActionType.QuitGame,
params={}
)
action_dict = action.as_dict
new_action = Action.from_dict(action_dict)
assert action == new_action
assert action_dict["type"] == str(action.type)
assert len(action_dict["params"]) == 0

class TestGameState:
"""
Expand Down

0 comments on commit 0aca96b

Please sign in to comment.