Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unified %reset magic #644

Merged
merged 2 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Starting with v1.31.6, this file will contain a record of major features and upd

- New Neptune Database notebook - Games Industry Graphs ([Link to PR](https://github.com/aws/graph-notebook/pull/566))
- Path: 01-Neptune-Database > 03-Sample-Applications > 07-Games-Industry-Graphs
- Added unified `%reset` line magic ([Link to PR](https://github.com/aws/graph-notebook/pull/644))
- Added `--connected-table` option to magics with table widget output ([Link to PR](https://github.com/aws/graph-notebook/pull/634))
- Added `--silent` option to the `%%graph_notebook_config` line and cell magics ([Link to PR](https://github.com/aws/graph-notebook/pull/641))
- Changed `%%gremlin --store-to` to also store exceptions from non-Neptune queries ([Link to PR](https://github.com/aws/graph-notebook/pull/635))
Expand Down
222 changes: 141 additions & 81 deletions src/graph_notebook/magics/graph_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
NEPTUNE_CONFIG_HOST_IDENTIFIERS, is_allowed_neptune_host, \
STATISTICS_LANGUAGE_INPUTS, STATISTICS_LANGUAGE_INPUTS_SPARQL, STATISTICS_MODES, SUMMARY_MODES, \
SPARQL_EXPLAIN_MODES, OPENCYPHER_EXPLAIN_MODES, GREMLIN_EXPLAIN_MODES, \
OPENCYPHER_PLAN_CACHE_MODES, OPENCYPHER_DEFAULT_TIMEOUT, OPENCYPHER_STATUS_STATE_MODES, \
normalize_service_name, GRAPH_PG_INFO_METRICS, \
OPENCYPHER_PLAN_CACHE_MODES, OPENCYPHER_DEFAULT_TIMEOUT, OPENCYPHER_STATUS_STATE_MODES,
normalize_service_name, NEPTUNE_DB_SERVICE_NAME, NEPTUNE_ANALYTICS_SERVICE_NAME, GRAPH_PG_INFO_METRICS, \
DEFAULT_GREMLIN_PROTOCOL, GREMLIN_PROTOCOL_FORMATS, DEFAULT_HTTP_PROTOCOL, normalize_protocol_name)
from graph_notebook.network import SPARQLNetwork
from graph_notebook.network.gremlin.GremlinNetwork import parse_pattern_list_str, GremlinNetwork
Expand Down Expand Up @@ -1531,82 +1531,141 @@ def get_graph(self, line='', local_ns: dict = None):
@line_magic
@needs_local_scope
@display_exceptions
@neptune_db_only
def db_reset(self, line, local_ns: dict = None):
def reset(self, line, local_ns: dict = None, service: str = None):
logger.info(f'calling system endpoint {self.client.host}')
parser = argparse.ArgumentParser()
parser.add_argument('-g', '--generate-token', action='store_true', help='generate token for database reset')
parser.add_argument('-t', '--token', default='', help='perform database reset with given token')
parser.add_argument('-y', '--yes', action='store_true', help='skip the prompt and perform database reset')
parser.add_argument('-g', '--generate-token', action='store_true',
help='Generate token for database reset. Database only.')
parser.add_argument('-t', '--token', default='',
help='Perform database reset with given token. Database only.')
parser.add_argument('-s', '--snapshot', action='store_true', default=False,
help='Creates a final graph snapshot before the graph data is deleted. Analytics only.')
parser.add_argument('-y', '--yes', action='store_true',
help='Skip the prompt and perform reset.')
parser.add_argument('-m', '--max-status-retries', type=int, default=10,
help='Specifies how many times we should attempt to check if the database reset has '
'completed, in intervals of 5 seconds. Default is 10')
args = parser.parse_args(line.split())
generate_token = args.generate_token
skip_prompt = args.yes
snapshot = args.snapshot
if not service:
service = self.graph_notebook_config.neptune_service
if service == NEPTUNE_DB_SERVICE_NAME:
using_db = True
graph_id = None
message_instance = "cluster"
else:
using_db = False
graph_id = self.client.get_graph_id()
message_instance = "graph"
max_status_retries = args.max_status_retries if args.max_status_retries > 0 else 1
if generate_token is False and args.token == '':
if not using_db or (generate_token is False and args.token == ''):
if skip_prompt:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
token = res['payload']['token']

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
logger.info(f'got the response {res}')
res = perform_reset_res.json()
return res
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
res = initiate_res.json()
token = res['payload']['token']

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
logger.info(f'got the response {res}')
res = perform_reset_res.json()
return res
else:
try:
res = self.client.reset_graph(graph_id=graph_id, snapshot=snapshot)
print(
f"ResetGraph call submitted successfully for graph ID [{graph_id}]. "
f"Please note that the graph may take several minutes to become available again, "
f"You can use %status or %get_graph to check the current status of the graph.\n")
print(json.dumps(res, indent=2, default=str))
except Exception as e:
print("Received an error when attempting graph reset:")
print(e)
return

output = widgets.Output()
source = 'Are you sure you want to delete all the data in your cluster?'
label = widgets.Label(source)
text_hbox = widgets.HBox([label])
check_box = widgets.Checkbox(
confirm_source = f'Are you sure you want to delete all the data in your {message_instance}?'
confirm_label = widgets.Label(confirm_source)
confirm_text_hbox = widgets.HBox([confirm_label])
confirm_check_box = widgets.Checkbox(
value=False,
disabled=False,
indent=False,
description='I acknowledge that upon deletion the cluster data will no longer be available.',
description=f'I acknowledge that upon deletion the {message_instance} data will no longer be available.',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)
button_delete = widgets.Button(description="Delete")
button_cancel = widgets.Button(description="Cancel")
button_hbox = widgets.HBox([button_delete, button_cancel])

display(text_hbox, check_box, button_hbox, output)
if using_db:
display(confirm_text_hbox, confirm_check_box, button_hbox, output)
else:
snapshot_source = f'OPTIONAL: Create a final graph snapshot before reset?'
snapshot_label = widgets.Label(snapshot_source)
snapshot_text_hbox = widgets.HBox([snapshot_label])
snapshot_check_box = widgets.Checkbox(
value=snapshot,
disabled=False,
indent=False,
description=f'Yes',
layout=widgets.Layout(width='600px', margin='5px 5px 5px 5px')
)

display(confirm_text_hbox, confirm_check_box,
snapshot_text_hbox, snapshot_check_box,
button_hbox, output)

def on_button_delete_clicked(b):
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
result = initiate_res.json()

text_hbox.close()
check_box.close()
if using_db:
initiate_res = self.client.initiate_reset()
initiate_res.raise_for_status()
result = initiate_res.json()

confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()

if not check_box.value:
with output:
print('Checkbox is not checked.')
return
token = result['payload']['token']
if token == "":
if not confirm_check_box.value:
with output:
print('Failed to get token.')
print(result)
print('Reset confirmation checkbox is not checked.')
return

perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
result = perform_reset_res.json()
if using_db:
token = result['payload']['token']
if token == "":
with output:
print('Failed to get token.')
print(result)
return

if 'status' not in result or result['status'] != '200 OK':
with output:
print('Database reset failed, please try the operation again or reboot the cluster.')
print(result)
logger.error(result)
return
perform_reset_res = self.client.perform_reset(token)
perform_reset_res.raise_for_status()
result = perform_reset_res.json()

if 'status' not in result or result['status'] != '200 OK':
with output:
print('Database reset failed, please see exception below for details.')
print(result)
logger.error(result)
return
else:
try:
result = self.client.reset_graph(graph_id=graph_id, snapshot=snapshot_check_box.value)
except Exception as e:
with output:
print("Failed to initiate graph reset, please see the exception below.")
print(f"\n{e}")
logger.error(e)
return

retry = max_status_retries
poll_interval = 5
Expand All @@ -1631,20 +1690,30 @@ def on_button_delete_clicked(b):
new_interval = True
try:
retry -= 1
status_res = self.client.status()
status_res.raise_for_status()
interval_check_response = status_res.json()
if using_db:
status_res = self.client.status()
status_res.raise_for_status()
interval_check_response = status_res.json()
else:
interval_check_response = self.client.get_graph(graph_id=graph_id)
except Exception as e:
# Exception is expected when database is resetting, continue waiting
with job_status_output:
last_poll_time = time.time()
time.sleep(1)
continue
if using_db:
with job_status_output:
last_poll_time = time.time()
time.sleep(1)
continue
else:
print('Graph status check failed, something went wrong.')
print(e)
logger.error(e)
return
job_status_output.clear_output()
with job_status_output:
if interval_check_response["status"] == 'healthy':
done_status = 'healthy' if using_db else 'AVAILABLE'
if interval_check_response["status"] == done_status:
interval_output.close()
print('Database has been reset.')
print(f'{message_instance.capitalize()} has been reset.')
return
last_poll_time = time.time()
else:
Expand All @@ -1663,16 +1732,19 @@ def on_button_delete_clicked(b):
if interval_check_response.get("status") != 'healthy':
print(f"Could not retrieve the status of the reset operation within the allotted time of "
f"{total_status_wait} seconds. If the database is not in healthy status after at least 1 "
f"minute, please try the operation again or reboot the cluster.")
f"minute, please try the operation again or reboot the {message_instance}.")

def on_button_cancel_clicked(b):
text_hbox.close()
check_box.close()
confirm_text_hbox.close()
confirm_check_box.close()
if not using_db:
snapshot_text_hbox.close()
snapshot_check_box.close()
button_delete.close()
button_cancel.close()
button_hbox.close()
with output:
print('Database reset operation has been canceled.')
print(f'{message_instance.capitalize()} reset operation has been canceled.')

button_delete.on_click(on_button_delete_clicked)
button_cancel.on_click(on_button_cancel_clicked)
Expand All @@ -1690,38 +1762,26 @@ def on_button_cancel_clicked(b):
logger.info(f'got the response {res}')
return res

@line_magic
@needs_local_scope
@display_exceptions
@neptune_db_only
def db_reset(self, line, local_ns: dict = None):
self.reset(line, local_ns, service=NEPTUNE_DB_SERVICE_NAME)

@line_magic
@needs_local_scope
@display_exceptions
@neptune_graph_only
def graph_reset(self, line, local_ns: dict = None):
self.reset_graph(line, local_ns)
self.reset(line, local_ns, service=NEPTUNE_ANALYTICS_SERVICE_NAME)

@line_magic
@needs_local_scope
@display_exceptions
@neptune_graph_only
def reset_graph(self, line, local_ns: dict = None):
parser = argparse.ArgumentParser()
parser.add_argument('-ns', '--no-skip-snapshot', action='store_true', default=False,
help='Creates a final graph snapshot before the graph data is deleted.')
parser.add_argument('--silent', action='store_true', default=False, help="Display no output.")
parser.add_argument('--store-to', type=str, default='', help='Store query result to this variable.')
args = parser.parse_args(line.split())

try:
graph_id = self.client.get_graph_id()
res = self.client.reset_graph(graph_id=graph_id, no_skip_snapshot=args.no_skip_snapshot)
if not args.silent:
print(f"ResetGraph call submitted successfully for graph ID [{graph_id}]. Please note that the graph "
f"may take several minutes to become available again.\n")
print(json.dumps(res, indent=2, default=str))
store_to_ns(args.store_to, res, local_ns)
except Exception as e:
if not args.silent:
print("Received an error when attempting graph reset:")
print(e)
store_to_ns(args.store_to, e, local_ns)
self.reset(line, local_ns, service=NEPTUNE_ANALYTICS_SERVICE_NAME)

@magic_variables
@line_magic
Expand Down
4 changes: 2 additions & 2 deletions src/graph_notebook/neptune/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,11 @@ def perform_reset(self, token: str) -> requests.Response:
res = self._http_session.send(req, verify=self.ssl_verify)
return res

def reset_graph(self, graph_id: str = '', no_skip_snapshot: bool = False) -> dict:
def reset_graph(self, graph_id: str = '', snapshot: bool = False) -> dict:
try:
res = self.neptune_graph_client.reset_graph(
graphIdentifier=graph_id,
skipSnapshot=(not no_skip_snapshot)
skipSnapshot=(not snapshot)
)
return res
except ClientError as e:
Expand Down
Loading