Skip to content

Commit

Permalink
Add unified %reset magic (#644)
Browse files Browse the repository at this point in the history
* Add unified %reset magic

* update changelog
  • Loading branch information
michaelnchin authored Jul 13, 2024
1 parent 6c6ecd5 commit 11387b4
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 83 deletions.
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Starting with v1.31.6, this file will contain a record of major features and upd

- New Neptune Database notebook - Games Industry Graphs ([Link to PR](https://github.com/aws/graph-notebook/pull/566))
- Path: 01-Neptune-Database > 03-Sample-Applications > 07-Games-Industry-Graphs
- Added unified `%reset` line magic ([Link to PR](https://github.com/aws/graph-notebook/pull/644))
- Added `--connected-table` option to magics with table widget output ([Link to PR](https://github.com/aws/graph-notebook/pull/634))
- Added `--silent` option to the `%%graph_notebook_config` line and cell magics ([Link to PR](https://github.com/aws/graph-notebook/pull/641))
- Added helpful redirect messaging for service-specific Neptune magics ([Link to PR](https://github.com/aws/graph-notebook/pull/643))
Expand Down
222 changes: 141 additions & 81 deletions src/graph_notebook/magics/graph_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
NEPTUNE_CONFIG_HOST_IDENTIFIERS, is_allowed_neptune_host, \
STATISTICS_LANGUAGE_INPUTS, STATISTICS_LANGUAGE_INPUTS_SPARQL, STATISTICS_MODES, SUMMARY_MODES, \
SPARQL_EXPLAIN_MODES, OPENCYPHER_EXPLAIN_MODES, GREMLIN_EXPLAIN_MODES, \
OPENCYPHER_PLAN_CACHE_MODES, OPENCYPHER_DEFAULT_TIMEOUT, OPENCYPHER_STATUS_STATE_MODES, \
normalize_service_name, GRAPH_PG_INFO_METRICS, \
OPENCYPHER_PLAN_CACHE_MODES, OPENCYPHER_DEFAULT_TIMEOUT, OPENCYPHER_STATUS_STATE_MODES,
normalize_service_name, NEPTUNE_DB_SERVICE_NAME, NEPTUNE_ANALYTICS_SERVICE_NAME, GRAPH_PG_INFO_METRICS, \
DEFAULT_GREMLIN_PROTOCOL, GREMLIN_PROTOCOL_FORMATS, DEFAULT_HTTP_PROTOCOL, normalize_protocol_name)
from graph_notebook.network import SPARQLNetwork
from graph_notebook.network.gremlin.GremlinNetwork import parse_pattern_list_str, GremlinNetwork
Expand Down Expand Up @@ -1531,82 +1531,141 @@ def get_graph(self, line='', local_ns: dict = None):
@line_magic
@needs_local_scope
@display_exceptions
@neptune_db_only
def db_reset(self, line, local_ns: dict = None):
def reset(self, line, local_ns: dict = None, service: str = None):
logger.info(f'calling system endpoint {self.client.host}')
parser = argparse.ArgumentParser()
parser.add_argument('-g', '--generate-token', action='store_true', help='generate token for database reset')
parser.add_argument('-t', '--token', default='', help='perform database reset with given token')
parser.add_argument('-y', '--yes', action='store_true', help='skip the prompt and perform database reset')
parser.add_argument('-g', '--generate-token', action='store_true',
help='Generate token for database reset. Database only.')
parser.add_argument('-t', '--token', default='',
help='Perform database reset with given token. Database only.')
parser.add_argument('-s', '--snapshot', action='store_true', default=False,
help='Creates a final graph snapshot before the graph data is deleted. Analytics only.')
parser.add_argument('-y', '--yes', action='store_true',
help='Skip the prompt and perform reset.')
parser.add_argument('-m', '--max-status-retries', type=int, default=10,
help='Specifies how many times we should attempt to check if the database reset has '
'completed, in intervals of 5 seconds. Default is 10')
args = parser.parse_args(line.split())
generate_token = args.generate_token
skip_prompt = args.yes
snapshot = args.snapshot
if not service:
service = self.graph_notebook_config.neptune_service
if service == NEPTUNE_DB_SERVICE_NAME:
using_db = True
graph_id = None
message_instance = "cluster"
else:
using_db = False
graph_id = self.client.get_graph_id()
message_instance = "graph"
max_status_retries = args.max_status_retries if args.max_status_retries > 0 else 1
if generate_token is False and args.token == '':
if not using_db or (generate_token is False and args.token == ''):
if skip_prompt:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
token = res['payload']['token']

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
logger.info(f'got the response {res}')
res = perform_reset_res.json()
return res
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
token = res['payload']['token']

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
logger.info(f'got the response {res}')
res = perform_reset_res.json()
return res
else:
try:
res = self.client.reset_graph(graph_id=graph_id, snapshot=snapshot)
print(
f"ResetGraph call submitted successfully for graph ID [{graph_id}]. "
f"Please note that the graph may take several minutes to become available again, "
f"You can use %status or %get_graph to check the current status of the graph.\n")
print(json.dumps(res, indent=2, default=str))
except Exception as e:
print("Received an error when attempting graph reset:")
print(e)
return

output = widgets.Output()
source = 'Are you sure you want to delete all the data in your cluster?'
label = widgets.Label(source)
text_hbox = widgets.HBox([label])
check_box = widgets.Checkbox(
confirm_source = f'Are you sure you want to delete all the data in your {message_instance}?'
confirm_label = widgets.Label(confirm_source)
confirm_text_hbox = widgets.HBox([confirm_label])
confirm_check_box = widgets.Checkbox(
value=False,
disabled=False,
indent=False,
description='I acknowledge that upon deletion the cluster data will no longer be available.',
description=f'I acknowledge that upon deletion the {message_instance} data will no longer be available.',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)
button_delete = widgets.Button(description="Delete")
button_cancel = widgets.Button(description="Cancel")
button_hbox = widgets.HBox([button_delete, button_cancel])

display(text_hbox, check_box, button_hbox, output)
if using_db:
display(confirm_text_hbox, confirm_check_box, button_hbox, output)
else:
snapshot_source = f'OPTIONAL: Create a final graph snapshot before reset?'
snapshot_label = widgets.Label(snapshot_source)
snapshot_text_hbox = widgets.HBox([snapshot_label])
snapshot_check_box = widgets.Checkbox(
value=snapshot,
disabled=False,
indent=False,
description=f'Yes',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)

display(confirm_text_hbox, confirm_check_box,
snapshot_text_hbox, snapshot_check_box,
button_hbox, output)

def on_button_delete_clicked(b):
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
result = initiate_res.json()

text_hbox.close()
check_box.close()
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
result = initiate_res.json()

confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()

if not check_box.value:
with output:
print('Checkbox is not checked.')
return
token = result['payload']['token']
if token == "":
if not confirm_check_box.value:
with output:
print('Failed to get token.')
print(result)
print('Reset confirmation checkbox is not checked.')
return

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
result = perform_reset_res.json()
if using_db:
token = result['payload']['token']
if token == "":
with output:
print('Failed to get token.')
print(result)
return

if 'status' not in result or result['status'] != '200 OK':
with output:
print('Database reset failed, please try the operation again or reboot the cluster.')
print(result)
logger.error(result)
return
perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
result = perform_reset_res.json()

if 'status' not in result or result['status'] != '200 OK':
with output:
print('Database reset failed, please see exception below for details.')
print(result)
logger.error(result)
return
else:
try:
result = self.client.reset_graph(graph_id=graph_id, snapshot=snapshot_check_box.value)
except Exception as e:
with output:
print("Failed to initiate graph reset, please see the exception below.")
print(f"\n{e}")
logger.error(e)
return

retry = max_status_retries
poll_interval = 5
Expand All @@ -1631,20 +1690,30 @@ def on_button_delete_clicked(b):
new_interval = True
try:
retry -= 1
status_res = self.client.status()
status_res.raise_for_status()
interval_check_response = status_res.json()
if using_db:
status_res = self.client.status()
status_res.raise_for_status()
interval_check_response = status_res.json()
else:
interval_check_response = self.client.get_graph(graph_id=graph_id)
except Exception as e:
# Exception is expected when database is resetting, continue waiting
with job_status_output:
last_poll_time = time.time()
time.sleep(1)
continue
if using_db:
with job_status_output:
last_poll_time = time.time()
time.sleep(1)
continue
else:
print('Graph status check failed, something went wrong.')
print(e)
logger.error(e)
return
job_status_output.clear_output()
with job_status_output:
if interval_check_response["status"] == 'healthy':
done_status = 'healthy' if using_db else 'AVAILABLE'
if interval_check_response["status"] == done_status:
interval_output.close()
print('Database has been reset.')
print(f'{message_instance.capitalize()} has been reset.')
return
last_poll_time = time.time()
else:
Expand All @@ -1663,16 +1732,19 @@ def on_button_delete_clicked(b):
if interval_check_response.get("status") != 'healthy':
print(f"Could not retrieve the status of the reset operation within the allotted time of "
f"{total_status_wait} seconds. If the database is not in healthy status after at least 1 "
f"minute, please try the operation again or reboot the cluster.")
f"minute, please try the operation again or reboot the {message_instance}.")

def on_button_cancel_clicked(b):
text_hbox.close()
check_box.close()
confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()
with output:
print('Database reset operation has been canceled.')
print(f'{message_instance.capitalize()} reset operation has been canceled.')

button_delete.on_click(on_button_delete_clicked)
button_cancel.on_click(on_button_cancel_clicked)
Expand All @@ -1690,38 +1762,26 @@ def on_button_cancel_clicked(b):
logger.info(f'got the response {res}')
return res

@line_magic
@needs_local_scope
@display_exceptions
@neptune_db_only
def db_reset(self, line, local_ns: dict = None):
self.reset(line, local_ns, service=NEPTUNE_DB_SERVICE_NAME)

@line_magic
@needs_local_scope
@display_exceptions
@neptune_graph_only
def graph_reset(self, line, local_ns: dict = None):
self.reset_graph(line, local_ns)
self.reset(line, local_ns, service=NEPTUNE_ANALYTICS_SERVICE_NAME)

@line_magic
@needs_local_scope
@display_exceptions
@neptune_graph_only
def reset_graph(self, line, local_ns: dict = None):
parser = argparse.ArgumentParser()
parser.add_argument('-ns', '--no-skip-snapshot', action='store_true', default=False,
help='Creates a final graph snapshot before the graph data is deleted.')
parser.add_argument('--silent', action='store_true', default=False, help="Display no output.")
parser.add_argument('--store-to', type=str, default='', help='Store query result to this variable.')
args = parser.parse_args(line.split())

try:
graph_id = self.client.get_graph_id()
res = self.client.reset_graph(graph_id=graph_id, no_skip_snapshot=args.no_skip_snapshot)
if not args.silent:
print(f"ResetGraph call submitted successfully for graph ID [{graph_id}]. Please note that the graph "
f"may take several minutes to become available again.\n")
print(json.dumps(res, indent=2, default=str))
store_to_ns(args.store_to, res, local_ns)
except Exception as e:
if not args.silent:
print("Received an error when attempting graph reset:")
print(e)
store_to_ns(args.store_to, e, local_ns)
self.reset(line, local_ns, service=NEPTUNE_ANALYTICS_SERVICE_NAME)

@magic_variables
@line_magic
Expand Down
4 changes: 2 additions & 2 deletions src/graph_notebook/neptune/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,11 @@ def perform_reset(self, token: str) -> requests.Response:
res = self._http_session.send(req, verify=self.ssl_verify)
return res

def reset_graph(self, graph_id: str = '', no_skip_snapshot: bool = False) -> dict:
def reset_graph(self, graph_id: str = '', snapshot: bool = False) -> dict:
try:
res = self.neptune_graph_client.reset_graph(
graphIdentifier=graph_id,
skipSnapshot=(not no_skip_snapshot)
skipSnapshot=(not snapshot)
)
return res
except ClientError as e:
Expand Down

0 comments on commit 11387b4

Please sign in to comment.