Skip to content

Commit

Permalink
Cleanup IP race condition on EC2Architect (#606)
Browse files Browse the repository at this point in the history
* Creating the EC2Architect

* small fixes

* Allowing multiple ssh ips

* Cleaning up scripts

* Formatting

* Fallback server doesn't log liveliness checks

* Shutdown and relaunch cleanups

* Fallback server push cleanup

* Forgot to register zone records

* EC2 bugfixes

* Mypy

* Forgotten local path

* Tested
  • Loading branch information
JackUrb authored Nov 16, 2021
1 parent bc3526f commit e70e460
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 131 deletions.
12 changes: 1 addition & 11 deletions mephisto/abstractions/architects/ec2/ec2_architect.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ def __init__(

self.server_dir: Optional[str] = None
self.server_id: Optional[str] = None
self.allocation_id: Optional[str] = None
self.association_id: Optional[str] = None
self.target_group_arn: Optional[str] = None
self.router_rule_arn: Optional[str] = None
self.created = False
Expand Down Expand Up @@ -231,11 +229,7 @@ def __setup_ec2_server(self) -> str:
print("EC2: Starting instance...")

# Launch server
(
server_id,
self.allocation_id,
self.association_id,
) = ec2_helpers.create_instance(
server_id = ec2_helpers.create_instance(
self.session,
self.fallback_details["key_pair_name"],
self.fallback_details["security_group_id"],
Expand Down Expand Up @@ -264,8 +258,6 @@ def __setup_ec2_server(self) -> str:
server_details = {
"balancer_rule_arn": self.router_rule_arn,
"instance_id": self.server_id,
"ip_allocation_id": self.allocation_id,
"ip_association_id": self.association_id,
"subdomain": self.subdomain,
"target_group_arn": self.target_group_arn,
}
Expand Down Expand Up @@ -301,8 +293,6 @@ def __delete_ec2_server(self):
ec2_helpers.delete_instance(
self.session,
server_id,
self.allocation_id,
self.association_id,
)
os.unlink(self.server_detail_path)

Expand Down
269 changes: 160 additions & 109 deletions mephisto/abstractions/architects/ec2/ec2_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
SCRIPTS_DIRECTORY = os.path.join(MY_DIR, "run_scripts")
DEFAULT_FALLBACK_FILE = os.path.join(DEFAULT_SERVER_DETAIL_LOCATION, "fallback.json")
FALLBACK_SERVER_LOC = os.path.join(MY_DIR, "fallback_server")
KNOWN_HOST_PATH = os.path.expanduser("~/.ssh/known_hosts")
MAX_RETRIES = 10


Expand Down Expand Up @@ -533,7 +534,7 @@ def create_instance(
instance_name: str,
volume_size: int = 8,
instance_type: str = DEFAULT_INSTANCE_TYPE,
) -> Tuple[str, str, str]:
) -> str:
"""
Create an instance, return the instance id, allocation id, and association id
"""
Expand Down Expand Up @@ -589,36 +590,13 @@ def create_instance(
)
instance_id = instance_response["Instances"][0]["InstanceId"]

allocation_response = client.allocate_address(
Domain="vpc",
TagSpecifications=[
{
"ResourceType": "elastic-ip",
"Tags": [
{
"Key": "Name",
"Value": f"{instance_name}-ip-address",
}
],
}
],
)
allocation_id = allocation_response["AllocationId"]

logger.debug(f"Waiting for instance {instance_id} to come up before continuing")
waiter = client.get_waiter("instance_running")
waiter.wait(
InstanceIds=[instance_id],
)

associate_response = client.associate_address(
AllocationId=allocation_id,
InstanceId=instance_id,
AllowReassociation=False,
)
association_id = associate_response["AssociationId"]

return instance_id, allocation_id, association_id
return instance_id


def create_target_group(
Expand Down Expand Up @@ -810,104 +788,191 @@ def configure_base_balancer(
return listener_arn


def deploy_fallback_server(
def get_instance_address(
session: boto3.Session,
instance_id: str,
key_pair: str,
log_access_pass: str,
) -> bool:
) -> Tuple[str, str, str]:
"""
Deploy the fallback server to the given instance,
return True if successful
Create a temporary publicly accessible IP for the given instance.
Return the IP address, the allocation id, and the association id.
"""
client = session.client("ec2")
server_response = client.describe_instances(InstanceIds=[instance_id])
server_host = server_response["Reservations"][0]["Instances"][0]["PublicIpAddress"]
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
with open(password_file_name, "w+") as password_file:
password_file.write(log_access_pass)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
allocation_response = client.allocate_address(
Domain="vpc",
TagSpecifications=[
{
"ResourceType": "elastic-ip",
"Tags": [
{
"Key": "Name",
"Value": f"{instance_id}-ip-address",
}
],
}
],
)
ip_address = allocation_response["PublicIp"]
allocation_id = allocation_response["AllocationId"]

dest = f"{remote_server}:/home/ec2-user/"
subprocess.check_call(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{FALLBACK_SERVER_LOC}",
dest,
]
associate_response = client.associate_address(
AllocationId=allocation_id,
InstanceId=instance_id,
AllowReassociation=False,
)
association_id = associate_response["AssociationId"]

# Remove this IP from known hosts in case it's there,
# as it's definitely not the old host anymore
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/fallback_server/scripts/first_setup.sh",
"ssh-keygen",
"-f",
f"{KNOWN_HOST_PATH}",
"-R",
f'"{ip_address}"',
]
)

os.unlink(password_file_name)
return True
return ip_address, allocation_id, association_id


def deploy_to_routing_server(
def detete_instance_address(
session: boto3.Session,
instance_id: str,
key_pair: str,
push_directory: str,
) -> bool:
allocation_id: str,
association_id: str,
) -> None:
"""
Removes the public ip described by the given allocation and association ids
"""
client = session.client("ec2")
server_response = client.describe_instances(InstanceIds=[instance_id])
server_host = server_response["Reservations"][0]["Instances"][0]["PublicIpAddress"]
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
client.disassociate_address(
AssociationId=association_id,
)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
client.release_address(
AllocationId=allocation_id,
)

print("Uploading files to server, then attempting to run")
dest = f"{remote_server}:/home/ec2-user/"
retries = 5
sleep_time = 10.0

def try_server_push(subprocess_args: List[str], retries=5, sleep_time=10.0):
"""
Try to execute the server push provided in subprocess args
"""
while retries > 0:
try:
subprocess.check_call(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{push_directory}",
dest,
]
)
break
subprocess.check_call(subprocess_args)
return
except subprocess.CalledProcessError:
retries -= 1
sleep_time *= 1.5
logger.info(
f"Timed out trying to push to server. Retries remaining: {retries}"
)
time.sleep(sleep_time)
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/routing_server/setup/init_server.sh",
]
raise Exception(
"Could not successfully push to the ec2 instance. See log for errors."
)
print("Server setup complete!")


def deploy_fallback_server(
session: boto3.Session,
instance_id: str,
key_pair: str,
log_access_pass: str,
) -> bool:
"""
Deploy the fallback server to the given instance,
return True if successful
"""
client = session.client("ec2")
server_host, allocation_id, association_id = get_instance_address(
session, instance_id
)
try:
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")
password_file_name = os.path.join(FALLBACK_SERVER_LOC, f"access_key.txt")
with open(password_file_name, "w+") as password_file:
password_file.write(log_access_pass)

remote_server = f"{AMI_DEFAULT_USER}@{server_host}"

dest = f"{remote_server}:/home/ec2-user/"
try_server_push(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{FALLBACK_SERVER_LOC}",
dest,
]
)
os.unlink(password_file_name)
subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/fallback_server/scripts/first_setup.sh",
]
)
detete_instance_address(session, allocation_id, association_id)
except Exception as e:
detete_instance_address(session, allocation_id, association_id)
raise e

return True


def deploy_to_routing_server(
session: boto3.Session,
instance_id: str,
key_pair: str,
push_directory: str,
) -> bool:
client = session.client("ec2")
server_host, allocation_id, association_id = get_instance_address(
session, instance_id
)
keypair_file = os.path.join(DEFAULT_KEY_PAIR_DIRECTORY, f"{key_pair}.pem")

print("Uploading files to server, then attempting to run")
try:
remote_server = f"{AMI_DEFAULT_USER}@{server_host}"
dest = f"{remote_server}:/home/ec2-user/"
try_server_push(
[
"scp",
"-o",
"StrictHostKeyChecking=no",
"-i",
keypair_file,
"-r",
f"{push_directory}",
dest,
]
)

subprocess.check_call(
[
"ssh",
"-i",
keypair_file,
remote_server,
"bash",
"/home/ec2-user/routing_server/setup/init_server.sh",
]
)
detete_instance_address(session, allocation_id, association_id)
print("Server setup complete!")
except Exception as e:
detete_instance_address(session, allocation_id, association_id)
raise e

return True

Expand All @@ -933,21 +998,11 @@ def delete_rule(
def delete_instance(
session: boto3.Session,
instance_id: str,
allocation_id: str,
association_id: str,
) -> None:
"""
Remove the given instance and the associated elastic ip
"""
client = session.client("ec2")
client.disassociate_address(
AssociationId=association_id,
)

client.release_address(
AllocationId=allocation_id,
)

client.terminate_instances(InstanceIds=[instance_id])


Expand All @@ -970,8 +1025,6 @@ def remove_instance_and_cleanup(
delete_instance(
session,
details["instance_id"],
details["ip_allocation_id"],
details["ip_association_id"],
)
os.unlink(server_detail_path)
return None
Expand Down Expand Up @@ -1033,11 +1086,9 @@ def cleanup_fallback_server(
)

instance_id = details.get("instance_id")
ip_allocation_id = details.get("ip_allocation_id")
ip_association_id = details.get("ip_association_id")
if instance_id is not None:
print(f"Deleting instance {instance_id}...")
delete_instance(session, instance_id, ip_allocation_id, ip_association_id)
delete_instance(session, instance_id)

vpc_details = details.get("vpc_details")
if vpc_details is not None:
Expand Down
Loading

0 comments on commit e70e460

Please sign in to comment.