diff --git a/docassemble/AssemblyLine/data/questions/demo_search_sessions.yml b/docassemble/AssemblyLine/data/questions/demo_search_sessions.yml
new file mode 100644
index 0000000..890abdb
--- /dev/null
+++ b/docassemble/AssemblyLine/data/questions/demo_search_sessions.yml
@@ -0,0 +1,67 @@
+---
+include:
+ - assembly_line.yml
+---
+metadata:
+ title: |
+ Search sessions demo
+ required privileges:
+ - admin
+ - developer
+ - advocate
+---
+mandatory: True
+code: |
+ search_keyword
+ if not matching_results:
+ no_matches
+ show_results
+ show_selected_interview
+---
+id: search_keyword
+question: |
+ What would you like to search for?
+subquestion: |
+ Searching is not case sensitive.
+fields:
+ - Keyword: search_keyword
+---
+code: |
+ matching_results = find_matching_sessions(search_keyword, user_id="all")
+---
+event: no_matches
+question: |
+ We didn't find any matches
+subquestion: |
+ Check your spelling and try again.
+---
+continue button field: show_results
+question: |
+ We found ${ len(matching_results) } results that match your keyword
+subquestion: |
+ % for result in matching_results:
+ * ${ nice_interview_subtitle(result, exclude_identical=False)}
+ ${ result["modtime"]}
+ % endfor
+
+ Raw results:
+
+ ${ matching_results }
+
+fields:
+ - Which session do you want to load: interview_to_load
+ datatype: integer
+ code: |
+ [{idx: f"{nice_interview_subtitle(answer, exclude_identical=False)}" + f' ({answer.get("modtime", DAEmpty()).strftime("%B %d, %Y")})'} for idx, answer in enumerate(matching_results)]
+---
+event: show_selected_interview
+id: show the selected interview
+question: |
+ Here is some information from the session you chose
+subquestion: |
+ Filename: ${ matching_results[interview_to_load]["filename"] } [BR]
+ Session ID: ${ matching_results[interview_to_load]["key"] }
+
+ bool:
def set_interview_metadata(
- filename: str, session_id: int, data: Dict, metadata_key_name="metadata"
+ filename: str, session_id: str, data: Dict, metadata_key_name="metadata"
) -> None:
"""Add searchable interview metadata for the specified filename and session ID.
Intended to be used to add an interview title, etc.
@@ -265,7 +267,7 @@ def set_interview_metadata(
Args:
filename (str): The filename of the interview to add metadata for
- session_id (int): The session ID of the interview to add metadata for
+ session_id (str): The session ID of the interview to add metadata for
data (Dict): The metadata to add
metadata_key_name (str, optional): The name of the metadata key. Defaults to "metadata".
"""
@@ -275,14 +277,14 @@ def set_interview_metadata(
def get_interview_metadata(
- filename: str, session_id: int, metadata_key_name: str = "metadata"
+ filename: str, session_id: str, metadata_key_name: str = "metadata"
) -> Dict[str, Any]:
"""Retrieve the unencrypted metadata associated with an interview.
We implement this with the docassemble jsonstorage table and a dedicated `tag` which defaults to `metadata`.
Args:
filename (str): The filename of the interview to retrieve metadata for
- session_id (int): The session ID of the interview to retrieve metadata for
+ session_id (str): The session ID of the interview to retrieve metadata for
metadata_key_name (str, optional): The name of the metadata key. Defaults to "metadata".
Returns:
@@ -343,20 +345,20 @@ def get_saved_interview_list(
# Up to ~ 1,000 sessions performs well and is higher than expected for an end-user
get_sessions_query = text(
"""
- SELECT userdict.indexno
- ,userdict.filename as filename
- ,num_keys
- ,userdictkeys.user_id as user_id
- ,userdict.modtime as modtime
- ,userdict.key as key
- ,jsonstorage.data->'auto_title' as auto_title
- ,jsonstorage.data->'title' as title
- ,jsonstorage.data->'description' as description
- ,jsonstorage.data->'steps' as steps
- ,jsonstorage.data->'progress' as progress
- ,jsonstorage.data->'original_interview_filename' as original_interview_filename
- ,jsonstorage.data->'answer_count' as answer_count
- ,jsonstorage.data as data
+ SELECT userdict.indexno
+ ,userdict.filename as filename
+ ,num_keys
+ ,userdictkeys.user_id as user_id
+ ,userdict.modtime as modtime
+ ,userdict.key as key
+ ,jsonstorage.data->'auto_title' as auto_title
+ ,jsonstorage.data->'title' as title
+ ,jsonstorage.data->'description' as description
+ ,jsonstorage.data->'steps' as steps
+ ,jsonstorage.data->'progress' as progress
+ ,jsonstorage.data->'original_interview_filename' as original_interview_filename
+ ,jsonstorage.data->'answer_count' as answer_count
+ ,jsonstorage.data as data
FROM userdict
NATURAL JOIN
(
@@ -371,9 +373,7 @@ def get_saved_interview_list(
LEFT JOIN jsonstorage
ON userdict.key = jsonstorage.key AND (jsonstorage.tags = :metadata)
WHERE (userdictkeys.user_id = :user_id or :user_id is null)
-
- AND
- (userdict.filename = :filename OR :filename is null)
+ AND (userdict.filename = :filename OR :filename is null)
AND (userdict.filename NOT IN :filenames_to_exclude)
AND (NOT :exclude_newly_started_sessions OR num_keys > 1)
ORDER BY modtime desc
@@ -436,6 +436,156 @@ def get_saved_interview_list(
return sessions
+def find_matching_sessions(
+ keyword: str,
+ metadata_column_names: Optional[Union[Set[str], List[str]]] = None,
+ filenames: Optional[Union[Set[str], List[str]]] = None,
+ user_id: Union[int, str, None] = None,
+ metadata_key_name: str = "metadata",
+ limit: int = 50,
+ offset: int = 0,
+ filename_to_exclude: str = "",
+ exclude_current_filename: bool = True,
+ exclude_filenames: Optional[List[str]] = None,
+ exclude_newly_started_sessions: bool = False,
+ global_search_allowed_roles: Optional[Union[Set[str], List[str]]] = None,
+) -> List[Dict[str, Any]]:
+ """Get a list of sessions where the metadata for the session matches the provided keyword search terms.
+ This function is designed to be used in a search interface where the user can search for sessions by keyword.
+ The keyword search is case-insensitive and will match any part of the metadata column values.
+
+ Args:
+ keyword (str): The keyword to search for in the metadata
+ metadata_column_names (List[str], optional): The names of the metadata columns to search. If not provided, defaults to ["title", "auto_title", "description"].
+ filenames (List[str], optional): The filename or filenames of the interviews to retrieve sessions for.
+ user_id (Union[int, str, None], optional): The user ID to retrieve sessions for. Defaults to current user. Specify "all" if you want and have the necessary privileges to search all sessions.
+ metadata_key_name (str, optional): The name of the metadata key. Defaults to "metadata".
+ limit (int, optional): The maximum number of results to return. Defaults to 50.
+ offset (int, optional): The offset to start returning results from. Defaults to 0.
+ filename_to_exclude (str, optional): The filename to exclude from the results. Defaults to "".
+ exclude_current_filename (bool, optional): Whether to exclude the current filename from the results. Defaults to True.
+ exclude_filenames (Optional[List[str]], optional): A list of filenames to exclude from the results. Defaults to None.
+ exclude_newly_started_sessions (bool, optional): Whether to exclude sessions that are still on "step 1". Defaults to False.
+ global_search_allowed_roles (Union[Set[str],List[str]], optional): A list or set of roles that are allowed to search all sessions. Defaults to {'admin','developer', 'advocate'}. 'admin' and 'developer' are always allowed to search all sessions.
+
+ Returns:
+ List[Dict[str, Any]]: A list of saved sessions for the specified filename that match the search keyword
+
+ Example:
+
+ ```python
+ matching_sessions=find_matching_sessions("smith", user_id="all", filenames=[f"{user_info().package}:intake.yml", "docassemble.MyPackage:intake.yml"])
+ ```
+ """
+ if not metadata_column_names:
+ metadata_column_names = {"title", "auto_title", "description"}
+ if not global_search_allowed_roles:
+ global_search_allowed_roles = {"admin", "developer", "advocate"}
+ global_search_allowed_roles = set(global_search_allowed_roles).union(
+ {"admin", "developer"}
+ )
+
+ # Construct the dynamic part of the SQL query for metadata column selection and keyword search
+ metadata_search_conditions = " OR ".join(
+ f"COALESCE(jsonstorage.data->>{repr(column)}, '') ILIKE '%' || :keyword || '%'"
+ for column in metadata_column_names
+ )
+
+ # we retrieve the default metadata columns even if we don't search them
+ metadata_column_names = set(metadata_column_names).union(
+ {"title", "auto_title", "description"}
+ )
+
+ if filenames:
+ # Create a parameterized string with placeholders for filenames
+ filenames_placeholder = ", ".join(
+ [":filename{}".format(i) for i in range(len(filenames))]
+ )
+ filename_condition = f"userdict.filename IN ({filenames_placeholder})"
+ else:
+ filename_condition = "TRUE" # If no filenames are provided, this condition does not filter anything.
+
+ get_sessions_query = text(
+ f"""
+ SELECT userdict.indexno,
+ userdict.filename as filename,
+ num_keys,
+ userdictkeys.user_id as user_id,
+ userdict.modtime as modtime,
+ userdict.key as key,
+ {', '.join(f"jsonstorage.data->>{repr(column)} as {column}" for column in metadata_column_names)},
+ jsonstorage.data as data
+ FROM userdict
+ NATURAL JOIN (
+ SELECT key, MAX(modtime) AS modtime, COUNT(key) AS num_keys
+ FROM userdict
+ GROUP BY key
+ ) mostrecent
+ LEFT JOIN userdictkeys ON userdictkeys.key = userdict.key
+ LEFT JOIN jsonstorage ON userdict.key = jsonstorage.key AND (jsonstorage.tags = :metadata)
+ WHERE (userdictkeys.user_id = :user_id OR :user_id is NULL)
+ AND {filename_condition}
+ AND (userdict.filename NOT IN :filenames_to_exclude)
+ AND (NOT :exclude_newly_started_sessions OR num_keys > 1)
+ AND ({metadata_search_conditions})
+ ORDER BY modtime DESC
+ LIMIT :limit OFFSET :offset;
+ """
+ )
+
+ if offset < 0:
+ offset = 0
+
+ if exclude_current_filename:
+ current_filename = user_info().filename
+ else:
+ current_filename = ""
+ if not filename_to_exclude:
+ filename_to_exclude = ""
+ filenames_to_exclude = []
+ if exclude_filenames:
+ filenames_to_exclude.extend(exclude_filenames)
+ filenames_to_exclude.extend([current_filename, filename_to_exclude])
+ if user_id is None:
+ if user_logged_in():
+ user_id = user_info().id
+ else:
+ log("Asked to get interview list for user that is not logged in")
+ return []
+
+ if user_id == "all":
+ if user_has_privilege(global_search_allowed_roles):
+ user_id = None
+ elif user_logged_in():
+ user_id = user_info().id
+ log(
+ f"User {user_info().email} does not have permission to list interview sessions belonging to other users"
+ )
+ else:
+ log("Asked to get interview list for user that is not logged in")
+ return []
+
+ with db.connect() as con:
+ rs = con.execute(
+ get_sessions_query,
+ {
+ "metadata": metadata_key_name,
+ "keyword": keyword,
+ "user_id": user_id,
+ "limit": limit,
+ "offset": offset,
+ "filenames_to_exclude": tuple(filenames_to_exclude),
+ "exclude_newly_started_sessions": exclude_newly_started_sessions,
+ },
+ )
+
+ sessions = []
+ for session in rs:
+ sessions.append(dict(session._mapping))
+
+ return sessions
+
+
def delete_interview_sessions(
user_id: Optional[int] = None,
filename_to_exclude: str = al_session_store_default_filename,
@@ -512,6 +662,7 @@ def interview_list_html(
offset: int = 0,
display_interview_title: bool = True,
show_view_button: bool = True,
+ answers: Optional[List[Dict[str, Any]]] = None,
) -> str:
"""Return a string containing an HTML-formatted table with the list of saved answers
associated with the specified filename.
@@ -541,21 +692,23 @@ def interview_list_html(
offset (int, optional): Offset for the session list. Defaults to 0.
display_interview_title (bool, optional): If True, displays the title of the interview. Defaults to True.
show_view_button (bool, optional): If True, shows the view button. Defaults to True.
+ answers (Optional[List[Dict[str, Any]]], optional): A list of answers to format and display. Defaults to showing all sessions for the current user.
Returns:
str: HTML-formatted table containing the list of saved answers.
"""
# TODO: Currently, using the `word()` function for translation, but templates
# might be more flexible
- answers = get_saved_interview_list(
- filename=filename,
- user_id=user_id,
- metadata_key_name=metadata_key_name,
- limit=limit,
- offset=offset,
- exclude_current_filename=False,
- exclude_newly_started_sessions=exclude_newly_started_sessions,
- )
+ if not answers:
+ answers = get_saved_interview_list(
+ filename=filename,
+ user_id=user_id,
+ metadata_key_name=metadata_key_name,
+ limit=limit,
+ offset=offset,
+ exclude_current_filename=False,
+ exclude_newly_started_sessions=exclude_newly_started_sessions,
+ )
if not answers:
return ""
@@ -911,7 +1064,7 @@ def session_list_html(
def rename_interview_answers(
filename: str,
- session_id: int,
+ session_id: str,
new_name: str,
metadata_key_name: str = "metadata",
) -> None:
@@ -920,7 +1073,7 @@ def rename_interview_answers(
Args:
filename (str): The filename of the interview to rename
- session_id (int): The session ID of the interview to rename
+ session_id (str): The session ID of the interview to rename
new_name (str): The new name to set for the interview
metadata_key_name (str, optional): The name of the metadata key. Defaults to "metadata".
@@ -1074,7 +1227,7 @@ def save_interview_answers(
def get_filtered_session_variables(
filename: Optional[str] = None,
- session_id: Optional[int] = None,
+ session_id: Optional[str] = None,
variables_to_filter: Optional[Union[Set[str], List[str]]] = None,
additional_variables_to_filter: Optional[Union[Set[str], List[str]]] = None,
) -> Dict[str, Any]:
@@ -1148,9 +1301,10 @@ def get_filtered_session_variables(
def get_filtered_session_variables_string(
filename: Optional[str] = None,
- session_id: Optional[int] = None,
+ session_id: Optional[str] = None,
variables_to_filter: Union[Set[str], List[str], None] = None,
additional_variables_to_filter: Optional[Union[Set[str], List[str]]] = None,
+ indent: int = 4,
) -> str:
"""
Returns a JSON string that represents the filtered contents of a specified filename and session ID.
@@ -1161,6 +1315,7 @@ def get_filtered_session_variables_string(
session_id (Optional[int], optional): Session ID to retrieve variables from. Defaults to None.
variables_to_filter (Union[Set[str], List[str], None], optional): List or set of variables to exclude. Defaults to `al_sessions_variables_to_remove`.
additional_variables_to_filter (Union[Set[str], List[str], None], optional): List or set of additional variables to exclude. Defaults to None.
+ indent (int, optional): Number of spaces to indent the JSON string. Defaults to 4.
Returns:
str: A JSON-formatted string of filtered session variables.
@@ -1173,12 +1328,12 @@ def get_filtered_session_variables_string(
additional_variables_to_filter=additional_variables_to_filter,
)
)
- return json.dumps(simple_vars)
+ return json.dumps(simple_vars, indent=indent)
def load_interview_answers(
old_interview_filename: str,
- old_session_id: int,
+ old_session_id: str,
new_session: bool = False,
new_interview_filename: Optional[str] = None,
variables_to_filter: Optional[List[str]] = None,
@@ -1191,7 +1346,7 @@ def load_interview_answers(
Args:
old_interview_filename (str): Filename of the old interview.
- old_session_id (int): Session ID of the old interview.
+ old_session_id (str): Session ID of the old interview.
new_session (bool, optional): Determines whether to create a new session. Defaults to False.
new_interview_filename (Optional[str], optional): Filename for the new session. Defaults to None.
variables_to_filter (Optional[List[str]], optional): List of variables to exclude. Defaults to None.
@@ -1262,7 +1417,7 @@ def load_interview_json(
def export_interview_variables(
filename: Optional[str] = None,
- session_id: Optional[int] = None,
+ session_id: Optional[str] = None,
variables_to_filter: Union[Set, List[str], None] = None,
output: DAFile = None,
additional_variables_to_filter: Union[Set, List[str], None] = None,