From 65166faec4f9e639a1cbd6c53e319dac2d8a454e Mon Sep 17 00:00:00 2001 From: Domhnall Morrissey Date: Sun, 30 Jun 2024 13:49:34 +0100 Subject: [PATCH] V0.1.0 --- src/main.py | 5 +- .../race_engine_controller.py | 34 +- src/race_engine_model/commentary.py | 7 + src/race_engine_model/race_engine_model.py | 431 +----------------- .../race_engine_particpant_model.py | 76 +-- .../race_engine_practice_model.py | 15 + .../race_engine_qualy_model.py | 15 + .../race_engine_race_model.py | 203 +++++++++ .../race_engine_session_model.py | 103 +++++ .../race_engine_timed_session_model.py | 85 ++++ src/test.py | 27 ++ 11 files changed, 548 insertions(+), 453 deletions(-) create mode 100644 src/race_engine_model/race_engine_practice_model.py create mode 100644 src/race_engine_model/race_engine_qualy_model.py create mode 100644 src/race_engine_model/race_engine_race_model.py create mode 100644 src/race_engine_model/race_engine_session_model.py create mode 100644 src/race_engine_model/race_engine_timed_session_model.py create mode 100644 src/test.py diff --git a/src/main.py b/src/main.py index fb653e2..3e5a4e6 100644 --- a/src/main.py +++ b/src/main.py @@ -1,12 +1,13 @@ +import os import customtkinter from race_engine_controller import race_engine_controller -customtkinter.set_appearance_mode("System") # Modes: system (default), light, dark +customtkinter.set_appearance_mode("dark") # Modes: system (default), light, dark customtkinter.set_default_color_theme("blue") # Themes: blue (default), dark-blue, green app = customtkinter.CTk() # create CTk window like you do with the Tk window -app.title("Race Engine V0.0.8") +app.title("Race Engine V0.1.0") controller = race_engine_controller.RaceEngineController(app) app.after(0, lambda:app.state("zoomed")) diff --git a/src/race_engine_controller/race_engine_controller.py b/src/race_engine_controller/race_engine_controller.py index 5b797cf..9037453 100644 --- a/src/race_engine_controller/race_engine_controller.py +++ b/src/race_engine_controller/race_engine_controller.py @@ -8,7 +8,7 @@ class RaceEngineController: def __init__(self, app): self.app = app - self.model = race_engine_model.RaceEngineModel() + self.model = race_engine_model.RaceEngineModel("UI") self.view = race_engine_view.RaceEngineView(self, self.model.driver_names) # self.update_timing_screen() @@ -22,27 +22,27 @@ def start_session(self): def simulate_session(self): - if self.model.status == "running" or self.model.status == "pre_session": + if self.model.current_session.status == "running" or self.model.current_session.status == "pre_session": - self.model.advance() - self.view.timing_screen.update_view(self.model.get_data_for_view()) + self.model.current_session.advance() + self.view.timing_screen.update_view(self.model.current_session.get_data_for_view()) # GET PIT STRATEGY FROM VIEW driver1_data = self.view.timing_screen.strategy_editor_driver1.get_data() driver2_data = self.view.timing_screen.strategy_editor_driver2.get_data() self.model.update_player_drivers_strategy(driver1_data, driver2_data) - self.app.after(1000, self.simulate_session) + self.app.after(1500, self.simulate_session) - if self.model.status == "post_session": + if self.model.current_session.status == "post_session": pass #self.view. def pause_resume(self): - if self.model.status == "running": - self.model.status = "paused" + if self.model.current_session.status == "running": + self.model.current_session.status = "paused" self.view.timing_screen.start_btn.configure(text="Resume", image=self.view.play_icon2) - elif self.model.status == "paused": - self.model.status = "running" + elif self.model.current_session.status == "paused": + self.model.current_session.status = "running" self.view.timing_screen.start_btn.configure(text="Pause", image=self.view.pause_icon2) self.start_session() @@ -57,12 +57,18 @@ def go_to_race(self): pass def go_to_session(self, session): - self.model.setup_session(session) + if "FP" in session: + self.model.setup_practice(20*60, session) + elif "Q" in session: + self.model.setup_qualfying(60*60, session) + elif "race" in session: + self.model.setup_race() + self.view.launch_session(session) - self.view.timing_screen.update_view(self.model.get_data_for_view()) + self.view.timing_screen.update_view(self.model.current_session.get_data_for_view()) def end_session(self, session): - self.model.end_session(session) + self.model.current_session.end_session(session) self.view.end_session(session, self.model.results[session]) def auto_simulate_session(self, session): @@ -70,4 +76,4 @@ def auto_simulate_session(self, session): self.end_session(session) def send_player_car_out(self, driver_name, fuel_load_laps, number_laps_to_run): - self.model.send_player_car_out(driver_name, fuel_load_laps, number_laps_to_run) + self.model.current_session.send_player_car_out(driver_name, fuel_load_laps, number_laps_to_run) diff --git a/src/race_engine_model/commentary.py b/src/race_engine_model/commentary.py index 3e1e30e..f873a21 100644 --- a/src/race_engine_model/commentary.py +++ b/src/race_engine_model/commentary.py @@ -89,4 +89,11 @@ def gen_leaving_pit_lane_message(driver): f"{driver} heading out on track!" ] + return random.choice(messages) + +def gen_entering_pit_lane_message(driver): + messages = [ + f"{driver} dives into the pits!" + ] + return random.choice(messages) \ No newline at end of file diff --git a/src/race_engine_model/race_engine_model.py b/src/race_engine_model/race_engine_model.py index 5f4842f..ccc354d 100644 --- a/src/race_engine_model/race_engine_model.py +++ b/src/race_engine_model/race_engine_model.py @@ -1,69 +1,23 @@ -import copy -import logging import random -import numpy as np -import pandas as pd - from race_engine_model import race_engine_circuit_model, race_engine_particpant_model, race_engine_car_model, race_engine_driver_model -from race_engine_model import race_engine_model -from race_engine_model import commentary +from race_engine_model import race_engine_practice_model, race_engine_qualy_model, race_engine_race_model class RaceEngineModel: - def __init__(self): - + def __init__(self, mode): + assert mode in ["UI", "headless"], f"Unsupported Mode {mode}" # headless is model only, UI means were using the GUI + + self.mode = mode self.circuit_model = race_engine_circuit_model.RaceEngineCircuitModel("Sepang", 56, 93_000) self.setup_participants() - logging.basicConfig(level=logging.INFO, format="%(message)s", - handlers=[ - logging.StreamHandler(), # Logs to terminal - logging.FileHandler("race.log", mode='w') - ] - ) - - # self.setup_standings() - - self.current_lap = 1 - self.time_left = None - self.status = "pre_session" - self.fastest_laptime = None - self.fastest_laptime_driver = None - self.fastest_laptime_lap = None - - self.commentary_messages = [] - self.commentary_to_process = [] - - self.retirements = [] - self.cars_on_track = [] # for practice and qualy + self.results = {} self.player_driver1 = self.get_particpant_model_by_name("Nico Rosberg") self.player_driver2 = self.get_particpant_model_by_name("Michael Schumacher") - self.results = {} # {"FP1": {"p1": "Vettel"}} - - def log_event(self, event): - logging.info(f"Lap {self.current_lap}: {event}") - - @property - def number_cars_on_track(self): - return len(self.cars_on_track) - - @property - def drivers(self): - return [p.driver for p in self.participants] - - @property - def driver_names(self): - return [p.driver.name for p in self.participants] - - @property - def leader(self): - return self.standings_df.iloc[0]["Driver"] - def setup_participants(self): - - roster_file = "roster.txt" + roster_file = r"C:\Users\domhn\Documents\python\race_engine\Race-Engine\src\roster.txt" with open(roster_file) as f: data = f.readlines() @@ -99,372 +53,29 @@ def setup_participants(self): driver = race_engine_driver_model.RaceEngineDriverModel(driver_name, driver_speed, driver_status) self.participants.append(race_engine_particpant_model.RaceEngineParticpantModel(driver, car, self.circuit_model, driver_count)) + self.driver_names = [p.driver.name for p in self.participants] + def get_particpant_model_by_name(self, name): for p in self.participants: if p.name == name: return p - def setup_standings(self): - ''' - setup pandas dataframe to track standings, assumes participants have been supplied in starting grid order! - ''' - columns = ["Position", "Driver", "Team", "Lap", "Total Time", "Gap Ahead", "Gap to Leader", "Last Lap", "Status", "Lapped Status", "Pit", "Fastest Lap"] # all times in milliseconds - - data = [] - for participant in self.participants: - data.append([participant.position, participant.name, "", 0, 0, 0, 0, "-", "running", None, 0, None]) - - self.standings_df = pd.DataFrame(columns=columns, data=data) - - # SETUP STARTING GRID FOR RACE - if "Q1" in self.results.keys(): - self.standings_df["Driver"] = self.results["Q1"]["results"]["Driver"] - - def run_race(self): - self.calculate_start() - - while True: - self.calculate_lap() - self.update_standings() - self.current_lap += 1 - - if self.current_lap > self.circuit_model.number_of_laps or self.current_lap == 999: - break - - self.log_event("Race Over") - print(self.standings_df) - - def advance(self): - if self.session == "race": - self.advance_race() - elif self.session in ["FP1", "Q1"]: - self.advance_practice() - - def advance_race(self): - if self.status == "pre_session": - self.commentary_to_process.append(commentary.gen_race_start_message()) - self.calculate_start() - self.status = "running" - - else: - if len(self.commentary_to_process) == 0: - if self.status == "running": - self.calculate_lap() - self.update_fastest_lap() - self.update_standings() - - self.current_lap += 1 - if self.current_lap > self.circuit_model.number_of_laps or self.current_lap == 999: - self.commentary_to_process.append(commentary.gen_race_over_message(self.leader)) - self.status = "post_race" - self.log_event("Race Over") - - for c in self.commentary_to_process: - self.commentary_messages.append(c) - - - def advance_practice(self): - if self.status == "pre_session": - self.status = "running" - self.commentary_to_process.append(commentary.gen_practice_start_message()) - - else: - if len(self.commentary_to_process) == 0: - if self.status == "running": - - time_delta = 10 - self.find_particpants_leaving_pit_lane() - self.update_participants_in_practice() - - self.time_left -= time_delta - - # UPDATE STANDINGS - self.standings_df.sort_values("Fastest Lap", inplace=True) - self.standings_df.reset_index(drop=True, inplace=True) - self.standings_df["Position"] = self.standings_df.index + 1 - - # Update gaps and intervals - leader_time = self.standings_df.loc[self.standings_df["Position"] == 1, "Fastest Lap"].values[0] - laptime_ahead = leader_time - - if leader_time is None: - self.standings_df["Gap to Leader"] = "-" - self.standings_df["Gap Ahead"] = "-" - else: - gaps = [] - intervals = [] - for idx, row in self.standings_df.iterrows(): - if row["Fastest Lap"] is None: - gaps.append("-") - else: - gaps.append(row["Fastest Lap"] - leader_time) - - if laptime_ahead != "-" and row["Fastest Lap"] is not None: - intervals.append(row["Fastest Lap"] - laptime_ahead) - else: - intervals.append("-") - - laptime_ahead = row["Fastest Lap"] - - self.standings_df["Gap to Leader"] = gaps - self.standings_df["Gap Ahead"] = intervals - - if self.time_left <= 0.0: - self.status = "post_session" - - def find_particpants_leaving_pit_lane(self): - for p in self.participants: - is_leaving = p.check_leaving_pit_lane(self.time_left) - - if is_leaving is True: - self.commentary_to_process.append(commentary.gen_leaving_pit_lane_message(p.name)) - - self.standings_df.loc[self.standings_df["Driver"] == p.name, "Status"] = "out_lap" - - def update_participants_in_practice(self): - participants_running = [p for p in self.participants if p.status == "running"] - for p in self.participants: - if p.status == "running": - if p.next_update_time > self.time_left: - self.standings_df.loc[self.standings_df["Driver"] == p.name, "Lap"] = p.practice_laps_completed - self.standings_df.loc[self.standings_df["Driver"] == p.name, "Last Lap"] = p.laptime - self.standings_df.loc[self.standings_df["Driver"] == p.name, "Fastest Lap"] = p.fastest_laptime - p.update_practice(self.time_left) - - # UPDATE STATUS COLUMN FOR ALL - self.standings_df.loc[self.standings_df["Driver"] == p.name, "Status"] = p.status - - def calculate_lap(self): - ''' - Process - - determine driver strategy (push/conserve) - determine which drivers are fighting for position (within 1s of car in front) - calculate laptime for each driver, account for dirty air - determine any mistakes - determine if overtake if attempted and successfull - adjust laptimes accordingly - update standings - update tyre wear and fuel load - ''' - - for idx, row in self.standings_df.iterrows(): - driver = row["Driver"] - participant = self.get_particpant_model_by_name(driver) - - # ONLY PROCESS IF STILL RUNNING - if participant.status != "retired": - - gap_ahead = row["Gap Ahead"] - participant.calculate_laptime(gap_ahead) - - # IF RETIRED THIS LAP - if participant.status == "retired": - self.commentary_to_process.append(commentary.gen_retirement_message(participant.name)) - self.retirements.append(participant.name) - self.log_event(f"{participant.name} retires") - laptime_ahead = None - - else: - if participant.status == "pitting in": - self.log_event(f"{participant.name} Pitting In") - - if idx > 0 and laptime_ahead is not None: # laptime_ahead being None indicates car in front has retired - delta = participant.laptime - laptime_ahead - - if gap_ahead + delta <= 500 and participant_ahead.status not in ["pitting in", "retired"]: # if car ahead is about to pit, don't handle for overtaking - - self.log_event(f"{driver} Attacking {participant_ahead.name}") - self.commentary_to_process.append(commentary.gen_attacking_message(driver, participant_ahead.name)) - - if random.randint(0, 100) < 25: # overtake successfull - self.log_event(f"{participant.name} passes {participant_ahead.name}") - self.commentary_to_process.append(commentary.gen_overtake_message(participant.name, participant_ahead.name)) - - # add some random time to overtaking car, held up when passing - participant.laptime += random.randint(700, 1_500) - - # recalculate delta due to laptime updated above - delta = participant.laptime - laptime_ahead - - #update participant that has been passed so laptime brings them behind overtaking car - orig_gap = gap_ahead + delta - #if orig_gap >= 0: - revised_laptime = participant_ahead.laptime + orig_gap + random.randint(700, 1_500) - participant_ahead.recalculate_laptime_when_passed(revised_laptime) - else: # overtake unsuccessfull - participant.laptime = laptime_ahead + random.randint(100, 1_400) - - - laptime_ahead = participant.laptime - participant_ahead = participant - participant.complete_lap() - - def calculate_start(self): - # Calculate Turn 1 - order_after_turn1 = self.calculate_run_to_turn1() - - # redefine particpants based on turn1 order - self.participants = [o[1] for o in order_after_turn1] - - ''' - just spread field out after turn1 - ''' - for idx, p in enumerate(self.participants): - p.laptime = self.circuit_model.base_laptime + 6_000 + (idx * 1_000) + random.randint(100, 1500) - p.complete_lap() - - self.update_standings() - - # set fastest lap to leader - self.fastest_laptime_driver = self.participants[0].name - self.fastest_laptime = self.participants[0].laptime - self.fastest_laptime_lap = 1 - - self.current_lap += 1 - - def calculate_run_to_turn1(self): - dist_to_turn1 = self.circuit_model.dist_to_turn1 - average_speed = 47.0 #m/s - - order_after_turn1 = [] - for idx, p in enumerate([self.get_particpant_model_by_name(n) for n in self.standings_df["Driver"].values.tolist()]): - random_factor = random.randint(-2000, 2000)/1000 - time_to_turn1 = round(dist_to_turn1 / (average_speed + random_factor), 3) - order_after_turn1.append([time_to_turn1, p]) - - dist_to_turn1 += 5 # add 5 meters per grid slot - - order_after_turn1 = sorted(order_after_turn1, key=lambda x: x[0], reverse=False) - - self.commentary_to_process.append(commentary.gen_lead_after_turn1_message(order_after_turn1[0][1].name)) - - return order_after_turn1 - - def update_standings(self): - for driver in self.standings_df["Driver"]: - particpant_model = self.get_particpant_model_by_name(driver) - particpant_model.update_fastest_lap() - self.standings_df.loc[self.standings_df["Driver"] == driver, "Total Time"] = particpant_model.total_time - self.standings_df.loc[self.standings_df["Driver"] == driver, "Last Lap"] = particpant_model.laptime - self.standings_df.loc[self.standings_df["Driver"] == driver, "Lap"] = particpant_model.current_lap - self.standings_df.loc[self.standings_df["Driver"] == driver, "Status"] = particpant_model.status - - self.standings_df = self.standings_df.sort_values(by=["Lap", "Total Time"], ascending=[False, True]) - - # RESET INDEX AND POSITION COLUMNS - self.standings_df.reset_index(drop=True, inplace=True) - self.standings_df["Position"] = self.standings_df.index + 1 - - # CALC GAP TO CAR IN FRONT - self.standings_df["Gap Ahead"] = self.standings_df["Total Time"].diff() - - leader_time = self.standings_df.loc[self.standings_df["Position"] == 1, "Total Time"].values[0] - self.standings_df["Gap to Leader"] = (self.standings_df["Total Time"] - leader_time) - - # UPDATE GAPS TO LEADER IN PARTICIPANT MODEL AND CHECK IF LAPPED - for idx, row in self.standings_df.iterrows(): - particpant_model = self.get_particpant_model_by_name(row["Driver"]) - particpant_model.positions_by_lap.append(idx + 1) - particpant_model.gaps_to_leader.append(row["Gap to Leader"]) - - if row["Gap to Leader"] > self.circuit_model.base_laptime: - self.standings_df.at[idx, "Lapped Status"] = f"lapped {int(row['Gap to Leader']/self.circuit_model.base_laptime)}" # add number of laps down to status - - # UPDATE NUMBER OF PITSTOPS - self.standings_df.at[idx, "Pit"] = particpant_model.number_of_pitstops - - # UPDATE FASTEST LAP - self.standings_df.at[idx, "Fastest Lap"] = particpant_model.fastest_laptime - - self.log_event("\nCurrent Standings:\n" + self.standings_df.to_string(index=False)) + def setup_practice(self, session_time, session_name): + self.current_session = race_engine_practice_model.PracticeModel(self, session_time) + self.setup_session() - def update_fastest_lap(self): - for driver in self.standings_df["Driver"]: - particpant_model = self.get_particpant_model_by_name(driver) - if particpant_model.laptime < self.fastest_laptime: - self.fastest_laptime = particpant_model.laptime - self.fastest_laptime_driver = driver - self.fastest_laptime_lap = self.current_lap + def setup_qualfying(self, session_time, session_name): + self.current_session = race_engine_qualy_model.QualyModel(self, session_time) + self.setup_session() - def get_data_for_view(self): - data = {} + def setup_race(self): + self.current_session = race_engine_race_model.RaceModel(self) + self.setup_session() - data["status"] = self.status - data["current_lap"] = self.current_lap - data["total_laps"] = self.circuit_model.number_of_laps - data["time_left"] = self.time_left - data["standings"] = self.standings_df.copy(deep=True) - - data["fastest_lap_times"] = [p.name for p in self.participants if p.fastest_laptime == p.laptime] - data["fastest_laptime"] = self.fastest_laptime - data["fastest_laptime_driver"] = self.fastest_laptime_driver - - data["retirements"] = self.retirements - - if len(self.commentary_to_process) > 0: - data["commentary"] = self.commentary_to_process.pop(0) - else: - data["commentary"] = "" - - # HACK FOR NOW - driver1 = self.get_particpant_model_by_name("Nico Rosberg") - data["driver1_fuel"] = driver1.car_model.fuel_load - - # LAP TIMES DATA - data["laptimes"] = {} + def setup_session(self): for p in self.participants: - data["laptimes"][p.name] = copy.deepcopy(p.laptimes) + p.setup_variables_for_session() - return data - def update_player_drivers_strategy(self, driver1_data, driver2_data): self.player_driver1.update_player_pitstop_laps(driver1_data) - self.player_driver2.update_player_pitstop_laps(driver2_data) - - def setup_session(self, session): - self.session = session - self.status = "pre_session" - - self.setup_standings() - - if session in ["FP1", "Q1"]: - if session == "FP1": - self.time_left = 120 * 60 # 2 hours in seconds - elif session == "Q1": - self.time_left = 60 * 60 # 1hr qualy session - - for participant in self.participants: - participant.setup_session() - if participant not in [self.player_driver1, self.player_driver2]: - participant.generate_practice_runs(self.time_left, session) - participant.status = "in_pits" - - # SET STAUS COLUMN TO "PIT" - self.standings_df["Status"] = "PIT" - - - elif session == "race": - for participant in self.participants: - participant.status = "running" - - def end_session(self, session): - fastest_driver = self.standings_df.iloc[0]["Driver"] - fastest_laptime = self.standings_df.iloc[0]["Fastest Lap"] - - self.results[session] = {} - self.results[session]["p1"] = fastest_driver - self.results[session]["fastest lap"] = fastest_laptime - self.results[session]["results"] = self.standings_df.copy(deep=True) - - def simulate_session(self, session): - self.setup_session(session) - - while self.status != "post_session": - self.advance() - if len(self.commentary_to_process) > 0: - self.commentary_to_process.pop(0) - - def send_player_car_out(self, driver_name, fuel_load_laps, number_laps_to_run): - particpant_model = self.get_particpant_model_by_name(driver_name) - particpant_model.send_player_car_out(self.time_left, fuel_load_laps, number_laps_to_run) + self.player_driver2.update_player_pitstop_laps(driver2_data) \ No newline at end of file diff --git a/src/race_engine_model/race_engine_particpant_model.py b/src/race_engine_model/race_engine_particpant_model.py index dd4601f..058a2cb 100644 --- a/src/race_engine_model/race_engine_particpant_model.py +++ b/src/race_engine_model/race_engine_particpant_model.py @@ -7,27 +7,11 @@ def __init__(self, driver, car, circuit, starting_position): self.circuit_model = circuit self.position = starting_position - self.current_lap = 1 - - self.laptimes = [] - self.gaps_to_leader = [] - self.total_time = 0 - self.pitstop_times = [] - self.positions_by_lap = [] - self.number_of_pitstops = 0 self.calculate_base_laptime() self.calculate_pitstop_laps() self.calculate_if_retires() - self.status = "running" - self.attacking = False - self.defending = False - - self.fastest_laptime = None - self.laptime = None - - self.next_update_time = None # for updating practice session def __repr__(self) -> str: return f"" @@ -43,6 +27,25 @@ def linestyle(self): def name(self): return self.driver.name + def setup_variables_for_session(self): + self.current_lap = 1 + + self.laptimes = [] + self.gaps_to_leader = [] + self.total_time = 0 + self.pitstop_times = [] + self.positions_by_lap = [] + self.number_of_pitstops = 0 + + self.status = "in_pit" + self.attacking = False + self.defending = False + + self.fastest_laptime = None + self.laptime = None + + self.next_update_time = None # for updating practice session + def calculate_base_laptime(self): self.base_laptime = self.circuit_model.base_laptime @@ -122,7 +125,7 @@ def calculate_if_retires(self): if random.randint(0, 100) < 20: self.retires = True - self.retire_lap = random.randint(2, self.circuit_model.number_of_laps) + self.retire_lap = random.randint(3, self.circuit_model.number_of_laps) def update_fastest_lap(self): if self.fastest_laptime is None: @@ -147,19 +150,15 @@ def setup_session(self): self.practice_runs = [] # [[time_left, fuel, number_laps]] def generate_practice_runs(self, session_time, session): + assert session in ["FP"], f"Unsupported Session {session}" - time_left = session_time + time_left = int(session_time) while time_left > 0: - if session != "Q1": # practice run - leave_time = random.randint(time_left - (20*60), time_left) - number_laps = random.randint(3, 10) - min_fuel_load = int(self.circuit_model.fuel_consumption * number_laps) + 1 - fuel_load = random.randint(min_fuel_load, 155) - else: # qualy run - leave_time = random.randint(time_left - (10*60), time_left) - number_laps = 3 - fuel_load = 3 + leave_time = random.randint(time_left - (20*60), time_left) + number_laps = random.randint(3, 10) + min_fuel_load = int(self.circuit_model.fuel_consumption * number_laps) + 1 + fuel_load = random.randint(min_fuel_load, 155) self.practice_runs.append([leave_time, fuel_load, number_laps]) @@ -169,6 +168,29 @@ def generate_practice_runs(self, session_time, session): time_in_pits = random.randint(15, 35) * 60 time_left -= time_in_pits + time_left = int(time_left) # ensure time left is an int (for randint) + + def generate_qualy_runs(self): + # ASSUMES 1 HOUR QUALY, 12 laps (4 runs, 3 laps each) + + number_laps = 3 + fuel_load = 3 + + # RUN 1 + leave_time = random.randint(3_000, 3_550) + self.practice_runs.append([leave_time, fuel_load, number_laps]) + + # RUN 2 + leave_time = random.randint(1_960, 2_760) + self.practice_runs.append([leave_time, fuel_load, number_laps]) + + # RUN 3 + leave_time = random.randint(1_060, 1_860) + self.practice_runs.append([leave_time, fuel_load, number_laps]) + + # RUN 4 + leave_time = random.randint(150, 900) + self.practice_runs.append([leave_time, fuel_load, number_laps]) def check_leaving_pit_lane(self, time_left): leaving = False diff --git a/src/race_engine_model/race_engine_practice_model.py b/src/race_engine_model/race_engine_practice_model.py new file mode 100644 index 0000000..70de17f --- /dev/null +++ b/src/race_engine_model/race_engine_practice_model.py @@ -0,0 +1,15 @@ +from race_engine_model import race_engine_timed_session_model + +class PracticeModel(race_engine_timed_session_model.TimedSessionModel): + def __init__(self, model, session_time): + super().__init__(model, session_time) + + self.generate_practice_runs() + + def generate_practice_runs(self): + for participant in self.model.participants: + participant.setup_session() + if participant not in [self.model.player_driver1, self.model.player_driver2]: + participant.generate_practice_runs(self.time_left, "FP") + + \ No newline at end of file diff --git a/src/race_engine_model/race_engine_qualy_model.py b/src/race_engine_model/race_engine_qualy_model.py new file mode 100644 index 0000000..a5a987f --- /dev/null +++ b/src/race_engine_model/race_engine_qualy_model.py @@ -0,0 +1,15 @@ +from race_engine_model import race_engine_timed_session_model + +class QualyModel(race_engine_timed_session_model.TimedSessionModel): + def __init__(self,model, session_time): + super().__init__(model, session_time) + + self.generate_practice_runs() + + def generate_practice_runs(self): + for participant in self.model.participants: + participant.setup_session() + if participant not in [self.model.player_driver1, self.model.player_driver2]: + participant.generate_qualy_runs() + + \ No newline at end of file diff --git a/src/race_engine_model/race_engine_race_model.py b/src/race_engine_model/race_engine_race_model.py new file mode 100644 index 0000000..d8c4287 --- /dev/null +++ b/src/race_engine_model/race_engine_race_model.py @@ -0,0 +1,203 @@ +import random + +from race_engine_model import race_engine_session_model, commentary + +class RaceModel(race_engine_session_model.SessionModel): + def __init__(self, model): + super().__init__(model) + + self.current_lap = 1 + self.retirements = [] + + if "Q1" in self.model.results.keys(): + self.setup_grid_order() + + @property + def leader(self): + return self.standings_df.iloc[0]["Driver"] + + def setup_grid_order(self): + qualy_results = self.model.results["Q1"]["results"] + grid_order = qualy_results["Driver"] + + self.standings_df.set_index('Driver', inplace=True, drop=False) + self.standings_df = self.standings_df.loc[grid_order] + self.refresh_standings_column() + + def run_race(self): + while self.status != "post_race": + self.advance() + + self.process_headless_commentary() + self.process_headless_commentary() + + def advance(self): + if self.status == "pre_session": + self.commentary_to_process.append(commentary.gen_race_start_message()) + self.calculate_start() + self.status = "running" + + else: # process lap + if len(self.commentary_to_process) == 0: + if self.status == "running": + self.calculate_lap() + self.update_fastest_lap() + self.update_standings() + self.current_lap += 1 + + if self.current_lap > self.model.circuit_model.number_of_laps or self.current_lap == 999: + self.commentary_to_process.append(commentary.gen_race_over_message(self.leader)) + self.status = "post_race" + # self.log_event("Race Over") + + def calculate_start(self): + # Calculate Turn 1 + order_after_turn1 = self.calculate_run_to_turn1() + + # redefine particpants based on turn1 order + self.participants = [o[1] for o in order_after_turn1] + + ''' + just spread field out after turn1 + ''' + for idx, p in enumerate(self.participants): + p.laptime = self.model.circuit_model.base_laptime + 6_000 + (idx * 1_000) + random.randint(100, 1500) + p.complete_lap() + + self.update_standings() + + # set fastest lap to leader + self.fastest_laptime_driver = self.participants[0].name + self.fastest_laptime = self.participants[0].laptime + self.fastest_laptime_lap = 1 + + self.current_lap += 1 + + + def calculate_run_to_turn1(self): + dist_to_turn1 = self.model.circuit_model.dist_to_turn1 + average_speed = 47.0 #m/s + + order_after_turn1 = [] + for idx, p in enumerate([self.model.get_particpant_model_by_name(n) for n in self.standings_df["Driver"].values.tolist()]): + random_factor = random.randint(-2000, 2000)/1000 + time_to_turn1 = round(dist_to_turn1 / (average_speed + random_factor), 3) + order_after_turn1.append([time_to_turn1, p]) + + dist_to_turn1 += 5 # add 5 meters per grid slot + + order_after_turn1 = sorted(order_after_turn1, key=lambda x: x[0], reverse=False) + ''' + example of order_after_turn1 + [time_to_turn1, particpant model] + [[12.761, ], [13.124, ], [13.68, ],] + ''' + + self.commentary_to_process.append(commentary.gen_lead_after_turn1_message(order_after_turn1[0][1].name)) + + return order_after_turn1 + + def update_standings(self): + for driver in self.standings_df["Driver"]: + particpant_model = self.model.get_particpant_model_by_name(driver) + particpant_model.update_fastest_lap() + self.standings_df.loc[self.standings_df["Driver"] == driver, "Total Time"] = particpant_model.total_time + self.standings_df.loc[self.standings_df["Driver"] == driver, "Last Lap"] = particpant_model.laptime + self.standings_df.loc[self.standings_df["Driver"] == driver, "Lap"] = particpant_model.current_lap + self.standings_df.loc[self.standings_df["Driver"] == driver, "Status"] = particpant_model.status + + self.standings_df = self.standings_df.sort_values(by=["Lap", "Total Time"], ascending=[False, True]) + + # RESET INDEX AND POSITION COLUMNS + self.standings_df.reset_index(drop=True, inplace=True) + self.standings_df["Position"] = self.standings_df.index + 1 + + # CALC GAP TO CAR IN FRONT + self.standings_df["Gap Ahead"] = self.standings_df["Total Time"].diff() + + leader_time = self.standings_df.loc[self.standings_df["Position"] == 1, "Total Time"].values[0] + self.standings_df["Gap to Leader"] = (self.standings_df["Total Time"] - leader_time) + + # UPDATE GAPS TO LEADER IN PARTICIPANT MODEL AND CHECK IF LAPPED + for idx, row in self.standings_df.iterrows(): + particpant_model = self.model.get_particpant_model_by_name(row["Driver"]) + particpant_model.positions_by_lap.append(idx + 1) + particpant_model.gaps_to_leader.append(row["Gap to Leader"]) + + if row["Gap to Leader"] > self.model.circuit_model.base_laptime: + self.standings_df.at[idx, "Lapped Status"] = f"lapped {int(row['Gap to Leader']/self.model.circuit_model.base_laptime)}" # add number of laps down to status + + # UPDATE NUMBER OF PITSTOPS + self.standings_df.at[idx, "Pit"] = particpant_model.number_of_pitstops + + # UPDATE FASTEST LAP + self.standings_df.at[idx, "Fastest Lap"] = particpant_model.fastest_laptime + + # self.log_event("\nCurrent Standings:\n" + self.standings_df.to_string(index=False)) + + def calculate_lap(self): + ''' + Process + + determine driver strategy (push/conserve) + determine which drivers are fighting for position (within 1s of car in front) + calculate laptime for each driver, account for dirty air + determine any mistakes + determine if overtake if attempted and successfull + adjust laptimes accordingly + update standings + update tyre wear and fuel load + ''' + + for idx, row in self.standings_df.iterrows(): + driver = row["Driver"] + participant = self.model.get_particpant_model_by_name(driver) + + # ONLY PROCESS IF STILL RUNNING + if participant.status != "retired": + # print("not retired") + + gap_ahead = row["Gap Ahead"] + participant.calculate_laptime(gap_ahead) + + # IF RETIRED THIS LAP + if participant.status == "retired": + self.commentary_to_process.append(commentary.gen_retirement_message(participant.name)) + self.retirements.append(participant.name) + # self.log_event(f"{participant.name} retires") + laptime_ahead = None + + else: + if participant.status == "pitting in": + self.commentary_to_process.append(commentary.gen_entering_pit_lane_message(participant.name)) + + # print(laptime_ahead) + if idx > 0 and laptime_ahead is not None: # laptime_ahead being None indicates car in front has retired + delta = participant.laptime - laptime_ahead + # self.log_event(f"{participant.name} Pitting In") + + if gap_ahead + delta <= 500 and participant_ahead.status not in ["pitting in", "retired"]: # if car ahead is about to pit, don't handle for overtaking + # self.log_event(f"{driver} Attacking {participant_ahead.name}") + self.commentary_to_process.append(commentary.gen_attacking_message(driver, participant_ahead.name)) + + if random.randint(0, 100) < 25: # overtake successfull + # self.log_event(f"{participant.name} passes {participant_ahead.name}") + self.commentary_to_process.append(commentary.gen_overtake_message(participant.name, participant_ahead.name)) + + # add some random time to overtaking car, held up when passing + participant.laptime += random.randint(700, 1_500) + + # recalculate delta due to laptime updated above + delta = participant.laptime - laptime_ahead + + #update participant that has been passed so laptime brings them behind overtaking car + orig_gap = gap_ahead + delta + #if orig_gap >= 0: + revised_laptime = participant_ahead.laptime + orig_gap + random.randint(700, 1_500) + participant_ahead.recalculate_laptime_when_passed(revised_laptime) + else: # overtake unsuccessfull + participant.laptime = laptime_ahead + random.randint(100, 1_400) + + laptime_ahead = participant.laptime + participant_ahead = participant + participant.complete_lap() \ No newline at end of file diff --git a/src/race_engine_model/race_engine_session_model.py b/src/race_engine_model/race_engine_session_model.py new file mode 100644 index 0000000..6577a65 --- /dev/null +++ b/src/race_engine_model/race_engine_session_model.py @@ -0,0 +1,103 @@ +import copy +import pandas as pd + +class SessionModel: + def __init__(self, model): + self.model = model + self.setup_standings() + + self.status = "pre_session" + self.time_left = None + + self.commentary_messages = [] + self.commentary_to_process = [] + self.retirements = [] + + self.current_lap = None + self.fastest_laptime = None + self.fastest_laptime_driver = None + + def setup_standings(self): + ''' + setup pandas dataframe to track standings, assumes participants have been supplied in starting grid order! + ''' + columns = ["Position", "Driver", "Team", "Lap", "Total Time", "Gap Ahead", "Gap to Leader", "Last Lap", "Status", "Lapped Status", "Pit", "Fastest Lap"] # all times in milliseconds + + data = [] + for participant in self.model.participants: + data.append([participant.position, participant.name, "", 0, 0, 0, 0, "-", "running", None, 0, None]) + + self.standings_df = pd.DataFrame(columns=columns, data=data) + + ''' example; + Position Driver Team Lap Total Time Gap Ahead Gap to Leader Last Lap Status Lapped Status Pit Fastest Lap + 0 1 Sebastian Vettel 0 0 0 0 - running None 0 None + 1 2 Mark Webber 0 0 0 0 - running None 0 None + 2 3 Fernando Alonso 0 0 0 0 - running None 0 None + ''' + + def calculate_lap_time(self): + pass + + + def refresh_standings_column(self): + self.standings_df.reset_index(drop=True, inplace=True) + self.standings_df["Position"] = self.standings_df.index + 1 + + def process_lastest_commentary(self): + latest_commentary = None + + if len(self.commentary_to_process) > 0: + latest_commentary = self.commentary_to_process.pop(0) + + if self.model.mode == "headless": + print(latest_commentary) + + return latest_commentary + + def process_headless_commentary(self): + assert self.model.mode == "headless", f"This method is not supported for mode {self.model.mode}" + + while len(self.commentary_to_process) > 0: + self.process_lastest_commentary() + + def get_data_for_view(self): + data = {} + + data["status"] = self.status + data["current_lap"] = self.current_lap + data["total_laps"] = self.model.circuit_model.number_of_laps + data["time_left"] = self.time_left + data["standings"] = self.standings_df.copy(deep=True) + + data["fastest_lap_times"] = [p.name for p in self.model.participants if p.fastest_laptime == p.laptime] + data["fastest_laptime"] = self.fastest_laptime + data["fastest_laptime_driver"] = self.fastest_laptime_driver + + data["retirements"] = self.retirements + + if len(self.commentary_to_process) > 0: + data["commentary"] = self.commentary_to_process.pop(0) + else: + data["commentary"] = "" + + # HACK FOR NOW + driver1 = self.model.get_particpant_model_by_name("Nico Rosberg") + data["driver1_fuel"] = driver1.car_model.fuel_load + + # LAP TIMES DATA + data["laptimes"] = {} + for p in self.model.participants: + data["laptimes"][p.name] = copy.deepcopy(p.laptimes) + + return data + + def update_fastest_lap(self): + for driver in self.standings_df["Driver"]: + particpant_model = self.model.get_particpant_model_by_name(driver) + if particpant_model.laptime < self.fastest_laptime: + self.fastest_laptime = particpant_model.laptime + self.fastest_laptime_driver = driver + self.fastest_laptime_lap = self.current_lap + + \ No newline at end of file diff --git a/src/race_engine_model/race_engine_timed_session_model.py b/src/race_engine_model/race_engine_timed_session_model.py new file mode 100644 index 0000000..4f62466 --- /dev/null +++ b/src/race_engine_model/race_engine_timed_session_model.py @@ -0,0 +1,85 @@ +from race_engine_model import race_engine_session_model +from race_engine_model import commentary + +class TimedSessionModel(race_engine_session_model.SessionModel): + def __init__(self, model, session_time): + super().__init__(model) + + self.session_time = session_time + self.time_left = session_time + + self.setup_session() + + def setup_session(self): + + # SET STAUS COLUMN TO "PIT" + self.standings_df["Status"] = "PIT" + + # SET PARTICPANTS STATUS + for participant in self.model.participants: + participant.status = "in_pits" + + + def advance(self): + if self.status == "pre_session": + self.status = "running" + self.commentary_to_process.append(commentary.gen_practice_start_message()) + # self.update_participants_in_practice() + + else: + if len(self.commentary_to_process) == 0: + if self.status == "running": + time_delta = 10 + self.find_particpants_leaving_pit_lane() + self.update_participants_in_practice() + + self.time_left -= time_delta + + # UPDATE STANDINGS + self.standings_df.sort_values("Fastest Lap", inplace=True) + self.refresh_standings_column() # in SessionModel + + else: # we have some commentary to process + if self.model.mode == "headless": + self.process_lastest_commentary() + + # HANDLE SESSION ENDING + if self.time_left <= 0: + self.status = "post_session" + + def find_particpants_leaving_pit_lane(self): + for p in self.model.participants: + is_leaving = p.check_leaving_pit_lane(self.time_left) + + if is_leaving is True: + self.commentary_to_process.append(commentary.gen_leaving_pit_lane_message(p.name)) + + # Update standings + self.standings_df.loc[self.standings_df["Driver"] == p.name, "Status"] = "out_lap" + + def update_participants_in_practice(self): + participants_running = [p for p in self.model.participants if p.status == "running"] + for p in self.model.participants: + if p.status == "running": + if p.next_update_time > self.time_left: + self.standings_df.loc[self.standings_df["Driver"] == p.name, "Lap"] = p.practice_laps_completed + self.standings_df.loc[self.standings_df["Driver"] == p.name, "Last Lap"] = p.laptime + self.standings_df.loc[self.standings_df["Driver"] == p.name, "Fastest Lap"] = p.fastest_laptime + p.update_practice(self.time_left) + + # UPDATE STATUS COLUMN FOR ALL + self.standings_df.loc[self.standings_df["Driver"] == p.name, "Status"] = p.status + + + def send_player_car_out(self, driver_name, fuel_load_laps, number_laps_to_run): + particpant_model = self.model.get_particpant_model_by_name(driver_name) + particpant_model.send_player_car_out(self.time_left, fuel_load_laps, number_laps_to_run) + + def end_session(self, session): + fastest_driver = self.standings_df.iloc[0]["Driver"] + fastest_laptime = self.standings_df.iloc[0]["Fastest Lap"] + + self.model.results[session] = {} + self.model.results[session]["p1"] = fastest_driver + self.model.results[session]["fastest lap"] = fastest_laptime + self.model.results[session]["results"] = self.standings_df.copy(deep=True) \ No newline at end of file diff --git a/src/test.py b/src/test.py new file mode 100644 index 0000000..8cdb340 --- /dev/null +++ b/src/test.py @@ -0,0 +1,27 @@ + +from race_engine_model import race_engine_model + +for i in range(100): + model = race_engine_model.RaceEngineModel("headless") + # model.setup_practice(60 * 60, "FP1") + + # model.current_session.advance() + + # while model.current_session.status != "post_session": + # model.current_session.advance() + + # print(model.current_session.standings_df) + # print(model.current_session.time_left) + + # model.setup_qualfying(60 * 60, "Qualfying") + + # model.current_session.advance() + + # while model.current_session.status != "post_session": + # model.current_session.advance() + + # print(model.current_session.standings_df) + # print(model.current_session.time_left) + + model.setup_race() + model.current_session.run_race() \ No newline at end of file