Skip to content

Commit

Permalink
test case improvement and comments address
Browse files Browse the repository at this point in the history
  • Loading branch information
shashankshampi committed Oct 27, 2024
1 parent 1966f50 commit 24cbf4c
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 337 deletions.
2 changes: 2 additions & 0 deletions tests-functional/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea/
.local/
39 changes: 31 additions & 8 deletions tests-functional/clients/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,38 @@ def on_message(self, ws, signal):
if signal.get("type") in self.await_signals:
self.received_signals[signal["type"]].append(signal)

def wait_for_signal(self, signal_type, timeout=20):
# def wait_for_signal(self, signal_type, timeout=20):
# start_time = time.time()
# while not self.received_signals.get(signal_type):
# if time.time() - start_time >= timeout:
# raise TimeoutError(
# f"Signal {signal_type} is not received in {timeout} seconds")
# time.sleep(0.2)
# logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds")
# return self.received_signals[signal_type][0]

def wait_for_signal(self, signal_type, expected_event=None, timeout=20):
"""
Wait for a signal of a specific type with optional expected event details.
"""
start_time = time.time()
while not self.received_signals.get(signal_type):
if time.time() - start_time >= timeout:
raise TimeoutError(
f"Signal {signal_type} is not received in {timeout} seconds")
time.sleep(0.2)
logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds")
return self.received_signals[signal_type][0]
while time.time() - start_time < timeout:
# Check if the signal list for this type has received signals
if self.received_signals.get(signal_type):
received_signal = self.received_signals[signal_type][0]
if expected_event:
# Check if the event in received_signal matches expected_event
event = received_signal.get("event", {})
if all(event.get(k) == v for k, v in expected_event.items()):
logging.debug(f"Signal {signal_type} with event {expected_event} received.")
return received_signal
else:
logging.debug(f"Signal {signal_type} received without specific event validation.")
return received_signal
time.sleep(0.2) # Wait before retrying to prevent excessive polling

# If no matching signal is found within the timeout
raise TimeoutError(f"Signal {signal_type} with event {expected_event} not received in {timeout} seconds")

def _on_error(self, ws, error):
logging.error(f"Error: {error}")
Expand Down
30 changes: 28 additions & 2 deletions tests-functional/constants.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import os
import random
from dataclasses import dataclass

from src.libs.common import create_unique_data_dir

@dataclass
class Account:
address: str
private_key: str
password: str


# User accounts
user_1 = Account(
address="0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266",
private_key="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80",
Expand All @@ -18,3 +20,27 @@ class Account:
private_key="0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d",
password="Strong12345"
)

# Paths and URLs
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
STATUS_BACKEND_URL = os.getenv("STATUS_BACKEND_URL", "http://127.0.0.1")
API_REQUEST_TIMEOUT = int(os.getenv("API_REQUEST_TIMEOUT", "15"))

# Paths relative to project root
DATA_DIR = os.path.join(PROJECT_ROOT, "tests-functional/local")
LOCAL_DATA_DIR1 = create_unique_data_dir(DATA_DIR, random.randint(1, 100))
LOCAL_DATA_DIR2 = create_unique_data_dir(DATA_DIR, random.randint(1, 100))
RESOURCES_FOLDER = os.path.join(PROJECT_ROOT, "resources")

# Account payload default values
ACCOUNT_PAYLOAD_DEFAULTS = {
"displayName": "user",
"password": "test_password",
"customizationColor": "primary"
}

# Network emulation commands
LATENCY_CMD = "sudo tc qdisc add dev eth0 root netem delay 1s 100ms distribution normal"
PACKET_LOSS_CMD = "sudo tc qdisc add dev eth0 root netem loss 50%"
LOW_BANDWIDTH_CMD = "sudo tc qdisc add dev eth0 root tbf rate 1kbit burst 1kbit"
REMOVE_TC_CMD = "sudo tc qdisc del dev eth0 root"
28 changes: 0 additions & 28 deletions tests-functional/src/config.py

This file was deleted.

12 changes: 12 additions & 0 deletions tests-functional/src/libs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from src.libs.custom_logger import get_custom_logger
import os
import allure
import uuid

logger = get_custom_logger(__name__)

Expand All @@ -14,3 +15,14 @@ def attach_allure_file(file):
def delay(num_seconds):
logger.debug(f"Sleeping for {num_seconds} seconds")
sleep(num_seconds)

def create_unique_data_dir(base_dir: str, index: int) -> str:
"""Generate a unique data directory for each node instance."""
unique_id = str(uuid.uuid4())[:8]
unique_dir = os.path.join(base_dir, f"data_{index}_{unique_id}")
os.makedirs(unique_dir, exist_ok=True)
return unique_dir

def get_project_root() -> str:
"""Returns the root directory of the project."""
return os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
18 changes: 5 additions & 13 deletions tests-functional/src/node/rpc_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from src.libs.base_api_client import BaseAPIClient
from src.config import Config
from constants import *
from src.libs.custom_logger import get_custom_logger
from tenacity import retry, stop_after_attempt, wait_fixed

Expand All @@ -16,15 +16,13 @@ def __init__(self, port, node_name):
wait=wait_fixed(1),
reraise=True
)
def send_rpc_request(self, method, params=None, timeout=Config.API_REQUEST_TIMEOUT):
def send_rpc_request(self, method, params=None, timeout=API_REQUEST_TIMEOUT):
"""Send JSON-RPC requests, used for standard JSON-RPC API calls."""
payload = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1}
logger.info(f"Sending JSON-RPC request to {self.base_url} with payload: {payload}")

response = self.send_post_request("CallRPC", payload, timeout=timeout)

logger.info(f"Response received: {response}")

if response.get("error"):
logger.error(f"RPC request failed with error: {response['error']}")
raise RuntimeError(f"RPC request failed with error: {response['error']}")
Expand All @@ -36,15 +34,13 @@ def send_rpc_request(self, method, params=None, timeout=Config.API_REQUEST_TIMEO
wait=wait_fixed(1),
reraise=True
)
def initialize_application(self, data_dir, timeout=Config.API_REQUEST_TIMEOUT):
def initialize_application(self, data_dir, timeout=API_REQUEST_TIMEOUT):
"""Send a direct POST request to the InitializeApplication endpoint."""
payload = {"dataDir": data_dir}
logger.info(f"Sending direct POST request to InitializeApplication with payload: {payload}")

response = self.send_post_request("InitializeApplication", payload, timeout=timeout)

logger.info(f"Response from InitializeApplication: {response}")

if response.get("error"):
logger.error(f"InitializeApplication request failed with error: {response['error']}")
raise RuntimeError(f"Failed to initialize application: {response['error']}")
Expand All @@ -56,7 +52,7 @@ def initialize_application(self, data_dir, timeout=Config.API_REQUEST_TIMEOUT):
wait=wait_fixed(1),
reraise=True
)
def create_account_and_login(self, account_data, timeout=Config.API_REQUEST_TIMEOUT):
def create_account_and_login(self, account_data, timeout=API_REQUEST_TIMEOUT):
"""Send a direct POST request to CreateAccountAndLogin endpoint."""
payload = {
"rootDataDir": account_data.get("rootDataDir"),
Expand All @@ -68,8 +64,6 @@ def create_account_and_login(self, account_data, timeout=Config.API_REQUEST_TIME

response = self.send_post_request("CreateAccountAndLogin", payload, timeout=timeout)

logger.info(f"Response from CreateAccountAndLogin: {response}")

if response.get("error"):
logger.error(f"CreateAccountAndLogin request failed with error: {response['error']}")
raise RuntimeError(f"Failed to create account and login: {response['error']}")
Expand All @@ -81,7 +75,7 @@ def create_account_and_login(self, account_data, timeout=Config.API_REQUEST_TIME
wait=wait_fixed(1),
reraise=True
)
def start_messenger(self, timeout=Config.API_REQUEST_TIMEOUT):
def start_messenger(self, timeout=API_REQUEST_TIMEOUT):
"""Send JSON-RPC request to start Waku messenger."""
payload = {
"jsonrpc": "2.0",
Expand All @@ -93,8 +87,6 @@ def start_messenger(self, timeout=Config.API_REQUEST_TIMEOUT):

response = self.send_post_request("CallRPC", payload, timeout=timeout)

logger.info(f"Response from Waku messenger start: {response}")

if response.get("error"):
logger.error(f"Starting Waku messenger failed with error: {response['error']}")
raise RuntimeError(f"Failed to start Waku messenger: {response['error']}")
Expand Down
98 changes: 34 additions & 64 deletions tests-functional/src/node/status_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
import signal
import string
import subprocess
import re
import threading
import time
import requests
from tenacity import retry, stop_after_delay, wait_fixed, stop_after_attempt
from src.data_storage import DS

from clients.status_backend import RpcClient
from conftest import option
from src.libs.custom_logger import get_custom_logger
from src.node.rpc_client import StatusNodeRPC
from tests.clients.signals import SignalClient
from clients.signals import SignalClient

logger = get_custom_logger(__name__)


class StatusNode:
def __init__(self, name=None, port=None, pubkey=None):
self.data_dir = None
try:
os.remove(f"{name}.log")
except:
Expand All @@ -33,8 +33,21 @@ def __init__(self, name=None, port=None, pubkey=None):
self.logs = []
self.pid = None
self.signal_client = None
self.last_response = None
self.api = StatusNodeRPC(self.port, self.name)

def setup_method(self):
# Set up RPC client
self.rpc_client = RpcClient(option.rpc_url_statusd)
# Set up WebSocket signal client
await_signals = ["history.request.started", "history.request.completed"]
self.signal_client = SignalClient(option.ws_url_statusd, await_signals)

# Start WebSocket connection in a separate thread
websocket_thread = threading.Thread(target=self.signal_client._connect)
websocket_thread.daemon = True
websocket_thread.start()

def initialize_node(self, name, port, data_dir, account_data):
"""Centralized method to initialize a node."""
self.name = name
Expand All @@ -43,6 +56,7 @@ def initialize_node(self, name, port, data_dir, account_data):
self.wait_fully_started()
self.create_account_and_login(account_data)
self.start_messenger()
self.pubkey = self.get_pubkey(account_data["displayName"])

def start_node(self, command):
"""Start the node using a subprocess command."""
Expand All @@ -54,10 +68,11 @@ def start_node(self, command):
def start(self, data_dir, capture_logs=True):
"""Start the status-backend node and initialize it before subscribing to signals."""
self.capture_logs = capture_logs
self.data_dir = data_dir
command = ["./status-backend", f"--address=localhost:{self.port}"]
self.start_node(command)
self.wait_fully_started()
self.api.initialize_application(data_dir)
self.last_response = self.api.initialize_application(data_dir)
self.api = StatusNodeRPC(self.port, self.name)
self.start_signal_client()

Expand All @@ -74,7 +89,7 @@ def start_messenger(self):
def start_signal_client(self):
"""Start a SignalClient for the given node to listen for WebSocket signals."""
ws_url = f"ws://localhost:{self.port}"
await_signals = ["community.chatMessage", "mediaserver.started"]
await_signals = ["history.request.started", "history.request.completed"]
self.signal_client = SignalClient(ws_url, await_signals)

websocket_thread = threading.Thread(target=self.signal_client._connect)
Expand Down Expand Up @@ -114,16 +129,19 @@ def random_node_name(self, length=10):
allowed_chars = string.ascii_lowercase + string.digits + "_-"
return ''.join(random.choice(allowed_chars) for _ in range(length))

def get_pubkey(self):
"""Retrieve the public key of the node."""
if self.pubkey:
return self.pubkey
else:
raise Exception(f"Public key not set for node {self.name}")
def get_pubkey(self, display_name):
"""Retrieve public-key based on display name from accounts_getAccounts response."""
response = self.api.send_rpc_request("accounts_getAccounts")

def wait_for_signal(self, signal_type, timeout=20):
"""Wait for a signal using the signal client."""
return self.signal_client.wait_for_signal(signal_type, timeout)
accounts = response.get("result", [])
for account in accounts:
if account.get("name") == display_name:
return account.get("public-key")
raise ValueError(f"Public key not found for display name: {display_name}")

def wait_for_signal(self, signal_type, expected_event=None, timeout=20):
"""Wait for a signal using the signal client and validate against expected event details."""
return self.signal_client.wait_for_signal(signal_type, expected_event, timeout)

def stop(self, remove_local_data=True):
"""Stop the status-backend process."""
Expand All @@ -141,58 +159,10 @@ def stop(self, remove_local_data=True):
logger.warning(f"Couldn't delete node dir {node_dir} because of {str(ex)}")
self.process = None

@retry(stop=stop_after_delay(30), wait=wait_fixed(0.1), reraise=True)
# wakuext_fetchCommunity times out sometimes so that's why we need this retry mechanism
def fetch_community(self, community_key):
params = [{"communityKey": community_key, "waitForResponse": True, "tryDatabase": True}]
return self.api.send_rpc_request("wakuext_fetchCommunity", params, timeout=10)

def request_to_join_community(self, community_id):
print("request_to_join_community: ", community_id, self.name, self.api)
params = [{"communityId": community_id, "addressesToReveal": ["fakeaddress"], "airdropAddress": "fakeaddress"}]
return self.api.send_rpc_request("wakuext_requestToJoinCommunity", params)

def accept_request_to_join_community(self, request_to_join_id):
print("accept_request_to_join_community: ", request_to_join_id, self.name, self.api)
self._ensure_api_initialized()
params = [{"id": request_to_join_id}]
return self.api.send_rpc_request("wakuext_acceptRequestToJoinCommunity", params)

def _ensure_api_initialized(self):
if not self.api:
logger.warning(f"API client is not initialized for node {self.name}. Reinitializing...")
self.api = StatusNodeRPC(self.port, self.name)
if not self.api:
raise Exception(f"Failed to initialize the RPC client for node {self.name}")

def send_community_chat_message(self, chat_id, message):
params = [{"chatId": chat_id, "text": message, "contentType": 1}]
return self.api.send_rpc_request("wakuext_sendChatMessage", params)

def leave_community(self, community_id):
params = [community_id]
return self.api.send_rpc_request("wakuext_leaveCommunity", params)

def send_contact_request(self, pubkey, message):
params = [{"id": pubkey, "message": message}]
return self.api.send_rpc_request("wakuext_sendContactRequest", params)

async def wait_for_logs_async(self, strings=None, timeout=10):
if not isinstance(strings, list):
raise ValueError("strings must be a list")
start_time = time.time()
while time.time() - start_time < timeout:
all_found = True
for string in strings:
logs = self.search_logs(string=string)
if not logs:
all_found = False
break
if all_found:
return True
await asyncio.sleep(0.5)
return False

def pause_process(self):
if self.pid:
logger.info(f"Pausing node with pid: {self.pid}")
Expand Down
Loading

0 comments on commit 24cbf4c

Please sign in to comment.