diff --git a/modules/meshtrekker.py b/modules/meshtrekker.py new file mode 100644 index 0000000..a73550a --- /dev/null +++ b/modules/meshtrekker.py @@ -0,0 +1,288 @@ +""" +Mesh Trekker Game + +Game Rules: +1. Players compete to cover the most distance over time using their Meshtastic devices. +2. The game tracks players' movements via GPS coordinates sent by their devices. +3. Total distance traveled is calculated and summed over time for each player. +4. Leaderboards show top distances for daily, weekly, and all-time periods. +5. Players can form teams, with team distances being the sum of all team members' distances. +6. Special achievements are awarded for milestones (e.g., 10km, 50km, 100km total distance). +7. The game runs continuously, allowing players to participate at their own pace. +8. Players can use the 'whereami' command to check their current location and update their position in the game. +""" + +import pickle +from datetime import datetime, timedelta +from geopy.distance import geodesic +import os +import logging + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class MeshTrekkerError(Exception): + """Base class for exceptions in this module.""" + pass + +class DataLoadError(MeshTrekkerError): + """Raised when there's an error loading data.""" + pass + +class DataSaveError(MeshTrekkerError): + """Raised when there's an error saving data.""" + pass + +class InvalidGPSDataError(MeshTrekkerError): + """Raised when invalid GPS data is provided.""" + pass + +class MeshTrekker: + def __init__(self, data_file='mesh_trekker_data.pkl'): + self.data_file = data_file + try: + self.data = self.load_data() + except DataLoadError as e: + logger.error(f"Failed to load data: {e}") + self.data = self.initialize_data() + + def initialize_data(self): + return { + 'gps_data': {}, + 'user_distances': {}, + 'teams': {}, + 'achievements': {}, + } + + def load_data(self): + try: + if os.path.exists(self.data_file): + with open(self.data_file, 'rb') as f: + return pickle.load(f) + else: + logger.info(f"Data file {self.data_file} not found. Initializing new data.") + return self.initialize_data() + except (pickle.PickleError, EOFError, FileNotFoundError) as e: + raise DataLoadError(f"Error loading data: {e}") + + def save_data(self): + try: + with open(self.data_file, 'wb') as f: + pickle.dump(self.data, f) + except (pickle.PickleError, IOError) as e: + raise DataSaveError(f"Error saving data: {e}") + + def validate_gps_data(self, latitude, longitude, timestamp): + try: + lat = float(latitude) + lon = float(longitude) + if not -90 <= lat <= 90: + raise InvalidGPSDataError(f"Invalid latitude: {latitude}") + if not -180 <= lon <= 180: + raise InvalidGPSDataError(f"Invalid longitude: {longitude}") + if not isinstance(timestamp, datetime): + raise InvalidGPSDataError(f"Invalid timestamp: {timestamp}") + except ValueError: + raise InvalidGPSDataError(f"Invalid GPS data: latitude={latitude}, longitude={longitude}") + + def process_gps_data(self, user_id, latitude, longitude, timestamp): + try: + self.validate_gps_data(latitude, longitude, timestamp) + + if user_id not in self.data['gps_data']: + self.data['gps_data'][user_id] = [] + + self.data['gps_data'][user_id].append((float(latitude), float(longitude), timestamp)) + + if len(self.data['gps_data'][user_id]) > 1: + last_lat, last_lon, last_time = self.data['gps_data'][user_id][-2] + last_point = (last_lat, last_lon) + new_point = (float(latitude), float(longitude)) + + distance = geodesic(last_point, new_point).kilometers + + if user_id not in self.data['user_distances']: + self.data['user_distances'][user_id] = (0, timestamp) + + total_distance, _ = self.data['user_distances'][user_id] + new_total_distance = total_distance + distance + self.data['user_distances'][user_id] = (new_total_distance, timestamp) + + self.check_achievements(user_id, new_total_distance) + + self.save_data() + return new_total_distance + except InvalidGPSDataError as e: + logger.error(f"Invalid GPS data for user {user_id}: {e}") + except DataSaveError as e: + logger.error(f"Failed to save data after processing GPS for user {user_id}: {e}") + except Exception as e: + logger.error(f"Unexpected error processing GPS data for user {user_id}: {e}") + return None + + def get_leaderboard(self, timeframe='all'): + try: + now = datetime.now() + if timeframe == 'daily': + start_time = now - timedelta(days=1) + elif timeframe == 'weekly': + start_time = now - timedelta(weeks=1) + else: + start_time = datetime.min + + leaderboard = [] + for user_id, (distance, last_updated) in self.data['user_distances'].items(): + if last_updated > start_time: + leaderboard.append((user_id, distance)) + + return sorted(leaderboard, key=lambda x: x[1], reverse=True)[:10] + except Exception as e: + logger.error(f"Error generating leaderboard: {e}") + return [] + + def get_team_leaderboard(self): + try: + team_distances = {} + for team_name, members in self.data['teams'].items(): + team_distance = sum(self.data['user_distances'].get(member, (0, None))[0] for member in members) + team_distances[team_name] = team_distance + + return sorted(team_distances.items(), key=lambda x: x[1], reverse=True)[:10] + except Exception as e: + logger.error(f"Error generating team leaderboard: {e}") + return [] + + def get_user_stats(self, user_id): + try: + distance, last_updated = self.data['user_distances'].get(user_id, (0, None)) + achievements = self.data['achievements'].get(user_id, []) + return { + 'distance': distance, + 'last_updated': last_updated, + 'achievements': achievements + } + except Exception as e: + logger.error(f"Error retrieving stats for user {user_id}: {e}") + return None + + def create_team(self, team_name, user_id): + try: + if team_name not in self.data['teams']: + self.data['teams'][team_name] = [user_id] + self.save_data() + return True + return False + except DataSaveError as e: + logger.error(f"Failed to save data after creating team {team_name}: {e}") + return False + except Exception as e: + logger.error(f"Error creating team {team_name}: {e}") + return False + + def join_team(self, team_name, user_id): + try: + if team_name in self.data['teams'] and user_id not in self.data['teams'][team_name]: + self.data['teams'][team_name].append(user_id) + self.save_data() + return True + return False + except DataSaveError as e: + logger.error(f"Failed to save data after user {user_id} joined team {team_name}: {e}") + return False + except Exception as e: + logger.error(f"Error joining team {team_name} for user {user_id}: {e}") + return False + + def check_achievements(self, user_id, total_distance): + try: + if user_id not in self.data['achievements']: + self.data['achievements'][user_id] = [] + + milestones = [10, 50, 100, 500, 1000] # in km + new_achievements = [] + for milestone in milestones: + if total_distance >= milestone and milestone not in self.data['achievements'][user_id]: + self.data['achievements'][user_id].append(milestone) + new_achievements.append(milestone) + logger.info(f"User {user_id} achieved {milestone}km milestone!") + return new_achievements + except Exception as e: + logger.error(f"Error checking achievements for user {user_id}: {e}") + return [] + + def get_achievements(self, user_id): + try: + return self.data['achievements'].get(user_id, []) + except Exception as e: + logger.error(f"Error retrieving achievements for user {user_id}: {e}") + return [] + +# Integrating the handle_whereami function +def get_node_location(message_from_id, deviceID, channel_number): + # This function should be implemented to get the location from the Meshtastic device + # For now, we'll use a placeholder implementation + return (0, 0) # Placeholder coordinates + +def where_am_i(latitude, longitude): + # This function should return a human-readable location description + # For now, we'll just return the coordinates + return f"You are at coordinates: {latitude}, {longitude}" + +def handle_whereami(message_from_id, deviceID, channel_number): + location = get_node_location(message_from_id, deviceID, channel_number) + return where_am_i(str(location[0]), str(location[1])) + +# Main game handler +game = MeshTrekker() + +def process_whereami_command(user_id, deviceID, channel_number): + location_info = handle_whereami(user_id, deviceID, channel_number) + latitude, longitude = location_info.split(": ")[1].split(", ") + + current_time = datetime.now() + new_distance = game.process_gps_data(user_id, latitude, longitude, current_time) + + if new_distance is not None: + new_achievements = game.check_achievements(user_id, new_distance) + response = f"{location_info}\nTotal distance: {new_distance:.2f} km" + if new_achievements: + response += f"\nNew achievements: {', '.join([f'{a}km' for a in new_achievements])}" + else: + response = f"{location_info}\nFailed to update distance. Please try again." + + return response + +# Usage example +if __name__ == "__main__": + try: + # Simulating 'whereami' commands from users + print(process_whereami_command("user1", "device1", 1)) + print(process_whereami_command("user1", "device1", 1)) + print(process_whereami_command("user2", "device2", 1)) + print(process_whereami_command("user2", "device2", 1)) + + # Create and join teams + game.create_team("Team A", "user1") + game.join_team("Team A", "user2") + + # Get individual leaderboard + print("\nAll-time individual leaderboard:") + for user, distance in game.get_leaderboard(): + print(f"{user}: {distance:.2f} km") + + # Get team leaderboard + print("\nTeam leaderboard:") + for team, distance in game.get_team_leaderboard(): + print(f"{team}: {distance:.2f} km") + + # Get user stats + user_stats = game.get_user_stats("user1") + print(f"\nUser1 stats: {user_stats}") + + # Get achievements + achievements = game.get_achievements("user1") + print(f"User1 achievements: {achievements}") + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}")