Skip to content

Commit

Permalink
Creates a new text-only interface for CB2
Browse files Browse the repository at this point in the history
This allows us to test playing the game with text-only large language models

Also adds the demo cli_client, which lets you join a CB2 game from the command line.

Example text description from the follower's point of view:

========
MAP DIMENSIONS:
	25x25 hexagon map with 1 props.
INSTRUCTIONS
	Status: ACTIVE, Instruction: hi
You are playing as the FOLLOWER. Your role is to follower the ACTIVE instruction. Move by entering any number of 'R', 'L', 'F', and 'B' commands (Right, Left, Forward, and Back). You can also write 'D', to mark an instruction as completed. Headings are described in degrees, with positive meaning to the left and negative meaning to the right. You are on a discrete hex grid, each turn is 60 degrees.
PROP DESCRIPTIONS
	Card at distance 3.0 and heading 360
MAP DESCRIPTION
	City of size 2 at heading 341 and distance 2.6
NEARBY TILES
	Left tile: GROUND_TILE_PATH
	Right tile: GROUND_TILE_HOUSE
	Forward tile: GROUND_TILE_PATH
Enter action: ...
=========
  • Loading branch information
jsharf committed Mar 17, 2023
1 parent be9205c commit 3512d57
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 12 deletions.
188 changes: 188 additions & 0 deletions py_client/client_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from typing import List

from py_client.follower_data_masking import CoordinateIsVisible
from server.actor import Actor
from server.assets import AssetId
from server.config.config import Config
from server.hex import HecsCoord
from server.messages.map_update import MapUpdate
from server.messages.objective import ObjectiveMessage
from server.messages.prop import PropType, PropUpdate


def DescribeMap(
map_update: MapUpdate,
prop_update: PropUpdate,
instructions: List[ObjectiveMessage],
follower: Actor,
leader: Actor = None,
) -> str:
"""Returns a string describing the given map."""
header = f"MAP DIMENSIONS:\n\t{map_update.rows}x{map_update.cols} hexagon map with {len(prop_update.props)} props. \n"

fog_end = map_update.fog_end
if fog_end is None:
# Create a config object and use the default value.
default_config = Config()
fog_end = default_config.fog_end

instruction_descriptions = []
for i, instruction in enumerate(instructions):
# Determine instruction status from instruction.completed, instruction.cancelled (if neither, then in progress).
if instruction.completed:
status = "completed"
elif instruction.cancelled:
status = "cancelled"
else:
status = "ACTIVE"
instruction_descriptions.append(
f"Status: {status}, Instruction: {instruction.text}"
)
# There can only be one active instruction at a time. Any instructions after this are queued for future reveal. End iteration.
if status == "ACTIVE":
break

# Print each instruction description on a line, indented:
instruction_section = "INSTRUCTIONS\n"
for instruction in instruction_descriptions:
instruction_section += f"\t{instruction}\n"

header += instruction_section

header += "You are playing as the FOLLOWER. Your role is to follower the ACTIVE instruction. Move by entering any number of 'R', 'L', 'F', and 'B' commands (Right, Left, Forward, and Back). You can also write 'D', to mark an instruction as completed. Headings are described in degrees, with positive meaning to the left and negative meaning to the right. You are on a discrete hex grid, each turn is 60 degrees. \n"

# Describe the props
prop_descriptions = []
for prop in prop_update.props:
if prop.prop_type == PropType.CARD:
distance = follower.location().distance_to(prop.prop_info.location)
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(prop.prop_info.location)
- 60
)
# Only show shape, color, count for selected cards.
if prop.card_init.selected:
prop_descriptions.append(
f"Selected card at distance {distance:.1f}, and heading {direction:.0f}. Shape {prop.card_init.shape.name}, color {prop.card_init.color.name}, count {prop.card_init.count}"
)
else:
prop_descriptions.append(
f"Card at distance {distance:.1f} and heading {direction:.0f}"
)
# Describe the map metadata
metadata = map_update.metadata
metadata_descriptions = []
for lake in metadata.lakes:
if CoordinateIsVisible(
HecsCoord.from_offset(lake.r, lake.c), follower, fog_end
):
distance = follower.location().distance_to(
HecsCoord.from_offset(lake.r, lake.c)
)
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(
HecsCoord.from_offset(lake.r, lake.c)
)
- 60
)
metadata_descriptions.append(
f"{lake.type.name} lake of size {lake.size} and shape {lake.type.name} at heading {direction:.0f} and distance {distance:.1f}"
)
for mountain in metadata.mountains:
if CoordinateIsVisible(
HecsCoord.from_offset(mountain.r, mountain.c), follower, fog_end
):
distance = follower.location().distance_to(
HecsCoord.from_offset(mountain.r, mountain.c)
)
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(
HecsCoord.from_offset(mountain.r, mountain.c)
)
- 60
)
metadata_descriptions.append(
f"{mountain.type.name} mountain{' (snowy)' if mountain.snowy else ''} at heading {direction:.0f} and distance {distance:.1f}"
)
for city in metadata.cities:
if CoordinateIsVisible(
HecsCoord.from_offset(city.r, city.c), follower, fog_end
):
distance = follower.location().distance_to(
HecsCoord.from_offset(city.r, city.c)
)
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(
HecsCoord.from_offset(city.r, city.c)
)
- 60
)
metadata_descriptions.append(
f"City of size {city.size} at heading {direction:.0f} and distance {distance:.1f}"
)
for outpost in metadata.outposts:
if CoordinateIsVisible(
HecsCoord.from_offset(outpost.r, outpost.c), follower, fog_end
):
distance = follower.location().distance_to(
HecsCoord.from_offset(outpost.r, outpost.c)
)
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(
HecsCoord.from_offset(outpost.r, outpost.c)
)
- 60
)
metadata_descriptions.append(
f"Outpost at heading {direction:.0f} and distance {distance:.1f}"
)

# If provided, and if visible, describe the leader.
if leader:
if CoordinateIsVisible(leader.location(), follower, fog_end):
distance = follower.location().distance_to(leader.location())
direction = (
follower.heading_degrees()
- follower.location().degrees_to_precise(leader.location())
- 60
)
metadata_descriptions.append(
f"Leader at heading {direction:.0f} and distance {distance:.1f}"
)

# Describe nearby tiles
nearby_tiles = []
follower_forward = follower.location().neighbor_at_heading(
follower.heading_degrees()
)
follower_left = follower.location().neighbor_at_heading(
follower.heading_degrees() - 60
)
follower_right = follower.location().neighbor_at_heading(
follower.heading_degrees() + 60
)
for tile in map_update.tiles:
if tile.cell.coord == follower_forward:
nearby_tiles.append(f"Forward tile: {AssetId(tile.asset_id).name}")
elif tile.cell.coord == follower_left:
nearby_tiles.append(f"Left tile: {AssetId(tile.asset_id).name}")
elif tile.cell.coord == follower_right:
nearby_tiles.append(f"Right tile: {AssetId(tile.asset_id).name}")

# Combine all descriptions
[header] + prop_descriptions + metadata_descriptions + nearby_tiles
prompt = (
header
+ "PROP DESCRIPTIONS\n\t"
+ "\n\t".join(prop_descriptions)
+ "\nMAP DESCRIPTION\n\t"
+ "\n\t".join(metadata_descriptions)
+ "\nNEARBY TILES\n\t"
+ "\n\t".join(nearby_tiles)
)
return prompt
115 changes: 115 additions & 0 deletions py_client/demos/cli_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from datetime import timedelta

import fire

from py_client.client_utils import DescribeMap
from py_client.game_endpoint import Action
from py_client.remote_client import RemoteClient
from server.messages.prop import PropUpdate

logger = logging.getLogger(__name__)


def actions_from_code(action_code, i_uuid: str = None):
if len(action_code) == 0:
return None
# Convert to lower.
action_code = action_code.lower()
if "forward".startswith(action_code):
return Action.Forwards()
elif "backward".startswith(action_code):
return Action.Backwards()
elif "left".startswith(action_code):
return Action.Left()
elif "right".startswith(action_code):
return Action.Right()
elif "done".startswith(action_code):
return Action.InstructionDone(i_uuid)
return None


def get_active_instruction(instructions):
for instruction in instructions:
if not instruction.completed and not instruction.cancelled:
return instruction
return None


def get_actors(game_state):
(
_,
_,
_,
_,
actors,
_,
) = game_state
if len(actors) == 1:
return (None, actors[0])
else:
return actors


class CliFollower(object):
def __init__(self, game_endpoint, pause_per_turn):
self.instructions_processed = set()
self.actions = []
self.game = game_endpoint
self.exc = None
self.pause_per_turn = pause_per_turn

def run(self):
try:
game_state = self.game.initial_state()
(_, _, turn_state, _, _, _) = game_state
# It's always the leader's turn first. Wait for follower turn by executing a noop.
action = Action.NoopAction()
game_state = self.game.step(action)
while not self.game.over():
(mapu, props, turn_state, instrs, actors, feedback) = game_state
prop_update = PropUpdate(props)
(leader, follower) = get_actors(game_state)
description = DescribeMap(mapu, prop_update, instrs, follower, leader)
print("===============================")
print(description)
# Prompt for input.
action_code = input("Enter action: ")
active_instruction = get_active_instruction(instrs)
action = actions_from_code(action_code, active_instruction.uuid)
if action is None:
print("Invalid action. NOPing.")
action = Action.NoopAction()
logger.info(f"step({action})")
game_state = self.game.step(action)
(_, _, turn_state, _, _, _) = game_state
print(f"Game over. Score: {turn_state.score}")
except Exception as e:
self.exc = e

def join(self):
if self.exc:
raise self.exc


def main(host, render=False, lobby="bot-sandbox", pause_per_turn=0):
# Create client and connect to server.
client = RemoteClient(host, render, lobby_name=lobby)
connected, reason = client.Connect()
assert connected, f"Unable to connect: {reason}"

# Wait in the queue for a game to start.
game, reason = client.JoinGame(
timeout=timedelta(minutes=5),
queue_type=RemoteClient.QueueType.FOLLOWER_ONLY,
)
assert game is not None, f"Unable to join game: {reason}"

# Handles game logic.
follower = CliFollower(game, pause_per_turn)
follower.run()
follower.join()


if __name__ == "__main__":
fire.Fire(main)
19 changes: 7 additions & 12 deletions py_client/follower_data_masking.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@

def VisibleCoordinates(follower_actor, config):
"""Given an actor, returns all HecsCoords that are visible to that actor."""
config.fog_end / UNITY_COORDINATES_SCALE
# There's something wrong with orientation... I have to put - 60 everywhere
# Actor.heading_degrees() (actor.py) is used.
follower_actor.heading_degrees() - 60

visible_coords = []

# Get the two neighboring cells to the left and right. Special case them.
Expand All @@ -46,7 +41,7 @@ def VisibleCoordinates(follower_actor, config):
if coord in visible_coords:
continue
if (coord != follower_actor.location()) and (
not CoordinateInViewingDistance(coord, follower_actor, config)
not CoordinateInViewingDistance(coord, follower_actor, config.fog_end)
or not CoordinateInFov(coord, follower_actor, config)
):
continue
Expand All @@ -57,9 +52,9 @@ def VisibleCoordinates(follower_actor, config):
return visible_coords


def CoordinateInViewingDistance(coord, follower_actor, config):
def CoordinateInViewingDistance(coord, follower_actor, fog_end):
"""Returns true if the given coordinate should be visible to the given follower with the given config."""
view_depth = config.fog_end / UNITY_COORDINATES_SCALE
view_depth = fog_end / UNITY_COORDINATES_SCALE
# Check distance.
distance = coord.distance_to(follower_actor.location())
# Add 0.5 to round up to the next hex cell.
Expand Down Expand Up @@ -93,13 +88,13 @@ def CoordinateNeighborCells(follower_actor):
]


def CoordinateIsVisible(coord, follower_actor, config):
def CoordinateIsVisible(coord, follower_actor, fog_end):
# Get the two neighboring cells to the left and right. Special case them.
if coord in CoordinateNeighborCells(follower_actor):
return True

""" Returns true if the given coordinate should be visible to the given follower with the given config. """
view_depth = config.fog_end / UNITY_COORDINATES_SCALE
view_depth = fog_end / UNITY_COORDINATES_SCALE
# There's something wrong with orientation... I have to put - 60 everywhere
# Actor.heading_degrees() (actor.py) is used.
follower_orientation = follower_actor.heading_degrees() - 60
Expand Down Expand Up @@ -167,7 +162,7 @@ def CensorFollowerProps(props, follower_actor, config):
"""
new_props = []
for prop in props:
if CoordinateIsVisible(prop.prop_info.location, follower_actor, config):
if CoordinateIsVisible(prop.prop_info.location, follower_actor, config.fog_end):
new_props.append(dataclasses.replace(prop))
return new_props

Expand All @@ -185,7 +180,7 @@ def CensorActors(actors, follower_actor, config):
"""
new_actors = []
for actor in actors:
if CoordinateIsVisible(actor.location(), follower_actor, config):
if CoordinateIsVisible(actor.location(), follower_actor, config.fog_end):
new_actors.append(
Actor(
actor.actor_id(),
Expand Down

0 comments on commit 3512d57

Please sign in to comment.