-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This game is a idea and placeholder as well a start to further implement it into the meshing-around bot setup. Kelly, the idea is cool, but needs some work. and as well a implementation for the dashboard with maybe a seperated map to track players there movement. But let's figure this out later hehe Enjoy the draft!
- Loading branch information
Showing
1 changed file
with
288 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
""" | ||
Mesh Trekker Game | ||
Game Rules: | ||
1. Players compete to cover the most distance over time using their Meshtastic devices. | ||
2. The game tracks players' movements via GPS coordinates sent by their devices. | ||
3. Total distance traveled is calculated and summed over time for each player. | ||
4. Leaderboards show top distances for daily, weekly, and all-time periods. | ||
5. Players can form teams, with team distances being the sum of all team members' distances. | ||
6. Special achievements are awarded for milestones (e.g., 10km, 50km, 100km total distance). | ||
7. The game runs continuously, allowing players to participate at their own pace. | ||
8. Players can use the 'whereami' command to check their current location and update their position in the game. | ||
""" | ||
|
||
import pickle | ||
from datetime import datetime, timedelta | ||
from geopy.distance import geodesic | ||
import os | ||
import logging | ||
|
||
# Set up logging | ||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | ||
logger = logging.getLogger(__name__) | ||
|
||
class MeshTrekkerError(Exception): | ||
"""Base class for exceptions in this module.""" | ||
pass | ||
|
||
class DataLoadError(MeshTrekkerError): | ||
"""Raised when there's an error loading data.""" | ||
pass | ||
|
||
class DataSaveError(MeshTrekkerError): | ||
"""Raised when there's an error saving data.""" | ||
pass | ||
|
||
class InvalidGPSDataError(MeshTrekkerError): | ||
"""Raised when invalid GPS data is provided.""" | ||
pass | ||
|
||
class MeshTrekker: | ||
def __init__(self, data_file='mesh_trekker_data.pkl'): | ||
self.data_file = data_file | ||
try: | ||
self.data = self.load_data() | ||
except DataLoadError as e: | ||
logger.error(f"Failed to load data: {e}") | ||
self.data = self.initialize_data() | ||
|
||
def initialize_data(self): | ||
return { | ||
'gps_data': {}, | ||
'user_distances': {}, | ||
'teams': {}, | ||
'achievements': {}, | ||
} | ||
|
||
def load_data(self): | ||
try: | ||
if os.path.exists(self.data_file): | ||
with open(self.data_file, 'rb') as f: | ||
return pickle.load(f) | ||
else: | ||
logger.info(f"Data file {self.data_file} not found. Initializing new data.") | ||
return self.initialize_data() | ||
except (pickle.PickleError, EOFError, FileNotFoundError) as e: | ||
raise DataLoadError(f"Error loading data: {e}") | ||
|
||
def save_data(self): | ||
try: | ||
with open(self.data_file, 'wb') as f: | ||
pickle.dump(self.data, f) | ||
except (pickle.PickleError, IOError) as e: | ||
raise DataSaveError(f"Error saving data: {e}") | ||
|
||
def validate_gps_data(self, latitude, longitude, timestamp): | ||
try: | ||
lat = float(latitude) | ||
lon = float(longitude) | ||
if not -90 <= lat <= 90: | ||
raise InvalidGPSDataError(f"Invalid latitude: {latitude}") | ||
if not -180 <= lon <= 180: | ||
raise InvalidGPSDataError(f"Invalid longitude: {longitude}") | ||
if not isinstance(timestamp, datetime): | ||
raise InvalidGPSDataError(f"Invalid timestamp: {timestamp}") | ||
except ValueError: | ||
raise InvalidGPSDataError(f"Invalid GPS data: latitude={latitude}, longitude={longitude}") | ||
|
||
def process_gps_data(self, user_id, latitude, longitude, timestamp): | ||
try: | ||
self.validate_gps_data(latitude, longitude, timestamp) | ||
|
||
if user_id not in self.data['gps_data']: | ||
self.data['gps_data'][user_id] = [] | ||
|
||
self.data['gps_data'][user_id].append((float(latitude), float(longitude), timestamp)) | ||
|
||
if len(self.data['gps_data'][user_id]) > 1: | ||
last_lat, last_lon, last_time = self.data['gps_data'][user_id][-2] | ||
last_point = (last_lat, last_lon) | ||
new_point = (float(latitude), float(longitude)) | ||
|
||
distance = geodesic(last_point, new_point).kilometers | ||
|
||
if user_id not in self.data['user_distances']: | ||
self.data['user_distances'][user_id] = (0, timestamp) | ||
|
||
total_distance, _ = self.data['user_distances'][user_id] | ||
new_total_distance = total_distance + distance | ||
self.data['user_distances'][user_id] = (new_total_distance, timestamp) | ||
|
||
self.check_achievements(user_id, new_total_distance) | ||
|
||
self.save_data() | ||
return new_total_distance | ||
except InvalidGPSDataError as e: | ||
logger.error(f"Invalid GPS data for user {user_id}: {e}") | ||
except DataSaveError as e: | ||
logger.error(f"Failed to save data after processing GPS for user {user_id}: {e}") | ||
except Exception as e: | ||
logger.error(f"Unexpected error processing GPS data for user {user_id}: {e}") | ||
return None | ||
|
||
def get_leaderboard(self, timeframe='all'): | ||
try: | ||
now = datetime.now() | ||
if timeframe == 'daily': | ||
start_time = now - timedelta(days=1) | ||
elif timeframe == 'weekly': | ||
start_time = now - timedelta(weeks=1) | ||
else: | ||
start_time = datetime.min | ||
|
||
leaderboard = [] | ||
for user_id, (distance, last_updated) in self.data['user_distances'].items(): | ||
if last_updated > start_time: | ||
leaderboard.append((user_id, distance)) | ||
|
||
return sorted(leaderboard, key=lambda x: x[1], reverse=True)[:10] | ||
except Exception as e: | ||
logger.error(f"Error generating leaderboard: {e}") | ||
return [] | ||
|
||
def get_team_leaderboard(self): | ||
try: | ||
team_distances = {} | ||
for team_name, members in self.data['teams'].items(): | ||
team_distance = sum(self.data['user_distances'].get(member, (0, None))[0] for member in members) | ||
team_distances[team_name] = team_distance | ||
|
||
return sorted(team_distances.items(), key=lambda x: x[1], reverse=True)[:10] | ||
except Exception as e: | ||
logger.error(f"Error generating team leaderboard: {e}") | ||
return [] | ||
|
||
def get_user_stats(self, user_id): | ||
try: | ||
distance, last_updated = self.data['user_distances'].get(user_id, (0, None)) | ||
achievements = self.data['achievements'].get(user_id, []) | ||
return { | ||
'distance': distance, | ||
'last_updated': last_updated, | ||
'achievements': achievements | ||
} | ||
except Exception as e: | ||
logger.error(f"Error retrieving stats for user {user_id}: {e}") | ||
return None | ||
|
||
def create_team(self, team_name, user_id): | ||
try: | ||
if team_name not in self.data['teams']: | ||
self.data['teams'][team_name] = [user_id] | ||
self.save_data() | ||
return True | ||
return False | ||
except DataSaveError as e: | ||
logger.error(f"Failed to save data after creating team {team_name}: {e}") | ||
return False | ||
except Exception as e: | ||
logger.error(f"Error creating team {team_name}: {e}") | ||
return False | ||
|
||
def join_team(self, team_name, user_id): | ||
try: | ||
if team_name in self.data['teams'] and user_id not in self.data['teams'][team_name]: | ||
self.data['teams'][team_name].append(user_id) | ||
self.save_data() | ||
return True | ||
return False | ||
except DataSaveError as e: | ||
logger.error(f"Failed to save data after user {user_id} joined team {team_name}: {e}") | ||
return False | ||
except Exception as e: | ||
logger.error(f"Error joining team {team_name} for user {user_id}: {e}") | ||
return False | ||
|
||
def check_achievements(self, user_id, total_distance): | ||
try: | ||
if user_id not in self.data['achievements']: | ||
self.data['achievements'][user_id] = [] | ||
|
||
milestones = [10, 50, 100, 500, 1000] # in km | ||
new_achievements = [] | ||
for milestone in milestones: | ||
if total_distance >= milestone and milestone not in self.data['achievements'][user_id]: | ||
self.data['achievements'][user_id].append(milestone) | ||
new_achievements.append(milestone) | ||
logger.info(f"User {user_id} achieved {milestone}km milestone!") | ||
return new_achievements | ||
except Exception as e: | ||
logger.error(f"Error checking achievements for user {user_id}: {e}") | ||
return [] | ||
|
||
def get_achievements(self, user_id): | ||
try: | ||
return self.data['achievements'].get(user_id, []) | ||
except Exception as e: | ||
logger.error(f"Error retrieving achievements for user {user_id}: {e}") | ||
return [] | ||
|
||
# Integrating the handle_whereami function | ||
def get_node_location(message_from_id, deviceID, channel_number): | ||
# This function should be implemented to get the location from the Meshtastic device | ||
# For now, we'll use a placeholder implementation | ||
return (0, 0) # Placeholder coordinates | ||
|
||
def where_am_i(latitude, longitude): | ||
# This function should return a human-readable location description | ||
# For now, we'll just return the coordinates | ||
return f"You are at coordinates: {latitude}, {longitude}" | ||
|
||
def handle_whereami(message_from_id, deviceID, channel_number): | ||
location = get_node_location(message_from_id, deviceID, channel_number) | ||
return where_am_i(str(location[0]), str(location[1])) | ||
|
||
# Main game handler | ||
game = MeshTrekker() | ||
|
||
def process_whereami_command(user_id, deviceID, channel_number): | ||
location_info = handle_whereami(user_id, deviceID, channel_number) | ||
latitude, longitude = location_info.split(": ")[1].split(", ") | ||
|
||
current_time = datetime.now() | ||
new_distance = game.process_gps_data(user_id, latitude, longitude, current_time) | ||
|
||
if new_distance is not None: | ||
new_achievements = game.check_achievements(user_id, new_distance) | ||
response = f"{location_info}\nTotal distance: {new_distance:.2f} km" | ||
if new_achievements: | ||
response += f"\nNew achievements: {', '.join([f'{a}km' for a in new_achievements])}" | ||
else: | ||
response = f"{location_info}\nFailed to update distance. Please try again." | ||
|
||
return response | ||
|
||
# Usage example | ||
if __name__ == "__main__": | ||
try: | ||
# Simulating 'whereami' commands from users | ||
print(process_whereami_command("user1", "device1", 1)) | ||
print(process_whereami_command("user1", "device1", 1)) | ||
print(process_whereami_command("user2", "device2", 1)) | ||
print(process_whereami_command("user2", "device2", 1)) | ||
|
||
# Create and join teams | ||
game.create_team("Team A", "user1") | ||
game.join_team("Team A", "user2") | ||
|
||
# Get individual leaderboard | ||
print("\nAll-time individual leaderboard:") | ||
for user, distance in game.get_leaderboard(): | ||
print(f"{user}: {distance:.2f} km") | ||
|
||
# Get team leaderboard | ||
print("\nTeam leaderboard:") | ||
for team, distance in game.get_team_leaderboard(): | ||
print(f"{team}: {distance:.2f} km") | ||
|
||
# Get user stats | ||
user_stats = game.get_user_stats("user1") | ||
print(f"\nUser1 stats: {user_stats}") | ||
|
||
# Get achievements | ||
achievements = game.get_achievements("user1") | ||
print(f"User1 achievements: {achievements}") | ||
|
||
except Exception as e: | ||
logger.error(f"An unexpected error occurred: {e}") |