Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor extract keywords #5

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 88
272 changes: 146 additions & 126 deletions src/parenttext_pipeline/extract_keywords.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,158 @@
import json
from itertools import islice

import openpyxl


def process_keywords_to_file(sources, output):
content = process_keywords(sources)
with open(output + "/safeguarding_words.json", "w") as outfile:
json.dump(content, outfile, indent=4)
json.dump(process_keywords(sources), outfile, indent=4)


def process_keywords(sources):
return merge_dictionaries(
{source.get("key", "unknown"): process_source(source) for source in sources}
)


def process_source(source):
input_file = source["path"]
language = source["key"]
book = openpyxl.load_workbook(input_file)
all_tables = {}

for sheet in book.worksheets:
try:
all_tables[sheet.title] = process_sheet(sheet, language)
except TableNotFoundError:
continue

return all_tables


def process_sheet(sheet, language):
row_start = find_table_row_start(sheet) + 1
misspellings_start = find_misspellings_col_start(sheet)
pairs = batch(sheet.iter_rows(min_row=row_start), 2)

return [
wordset
for source, translation in pairs
if (
wordset := create_wordset(language, misspellings_start, source, translation)
)
]


def create_wordset(language, misspellings_start, source, translation):
words = [
read_cols(source, 1, misspellings_start),
read_cols(source, misspellings_start),
read_cols(translation, 1, misspellings_start),
read_cols(translation, misspellings_start),
]

if any(words):
return {
"English": {
"keywords": words[0],
"mispellings": words[1],
},
"Translation": {
language: {
"keywords": words[2],
"mispellings": words[3],
},
},
}
else:
return None


def find_table_row_start(sheet):
"""We assume that the table starts in the first column,
in the left-most cell in the row before it we have a
cell containing the matching string.

Note: We assume there's only one such table per sheet.
"""
MATCHING_STRING = (
"Please insert translation of each word under each corresponding cell. If the "
"particular word does not translate into the chosen language, please leave it "
"blank"
)
index = index_of(
[row[0] for row in sheet["A1":"A10"]],
MATCHING_STRING,
)

if index == -1:
raise TableNotFoundError()

# to reference the next row
return index + 1


def find_misspellings_col_start(sheet):
HEADER2 = "Range of possible misspellings and common slang used by the population"
index = -1

for row in sheet.iter_rows():
index = index_of(row, HEADER2)
if index > -1:
return index


def index_of(seq, value):
try:
return [i.value for i in seq].index(value)
except ValueError:
return -1


def read_cols(row, start, end=None):
return [
str(cell.value).strip() for cell in row[start:end] if cell.value is not None
]

MATCHING_STRING = "Please insert translation of each word under each corresponding cell. If the particular word does not translate into the chosen language, please leave it blank"
HEADER1 = "High-risk key words"
HEADER2 = "Range of possible misspellings and common slang used by the population"

dictionaries = {}

for source in sources:

input_file = source["path"]
language = source["key"]

book = openpyxl.load_workbook(input_file)
all_tables = dict()
for sheet in book.worksheets:
rows = list(sheet.iter_rows())
table_found = False
for y, row in enumerate(rows):
# We assume that the table starts in the first column,
# in the left-most cell in the row before it we have a
# cell containing the matching string.
if len(row) >= 1 and row[0].value == MATCHING_STRING:
table_found = True
# I simply assume that the misspellings start in column 5.
# If that's not always the case, you can write some code
# looking for the cell in row y-1 which contains HEADER2.
misspelling_x = 5
assert rows[y-1][1].value == HEADER1
assert rows[y-1][misspelling_x].value == HEADER2
y_start = y+1

# We found the table, so stop searching.
# Note: We assume there's only one such table per sheet.
break
if not table_found:
continue

table_content = []
for y in range(y_start, len(rows)-1, 2):
# Go through the rows of the table in pairs
row1 = rows[y]
row2 = rows[y+1]
lang1 = "English"
lang2 = language

high_risk_entries_lang1, high_risk_entries_lang2 = read_entries_from_range(1, misspelling_x, row1, row2)
misspelling_entries_lang1, misspelling_entries_lang2 = read_entries_from_range(misspelling_x, len(row1), row1, row2)

joint_entry = dict()
if high_risk_entries_lang1 or misspelling_entries_lang1:
lang1_entry = {
"keywords": high_risk_entries_lang1,
"mispellings": misspelling_entries_lang1,
}
joint_entry[lang1] = lang1_entry
joint_entry["Translation"] = {}
joint_entry["Translation"][lang2] = {}
# "keywords": [],
# "mispellings": [],
# }

if high_risk_entries_lang2 or misspelling_entries_lang2:
lang2_entry = {
"keywords": high_risk_entries_lang2,
"mispellings": misspelling_entries_lang2,
}
joint_entry["Translation"][lang2] = lang2_entry

if joint_entry:
# only store the entry if at least one of the lists is non-empty
table_content.append(joint_entry)

all_tables[sheet.title] = table_content

# #Always change the output name when processing new xlsx file to avoid overwritting.
# #with open("./keywords/keywords_json/safeguarding_template_drug_2.json", "w") as outfile:
# with open(output, "w") as outfile:
# json.dump(all_tables, outfile, indent=4)

dictionaries[language] = all_tables

return merge_dictionaries(dictionaries)


def read_entries_from_range(xmin, xmax, row1, row2):
entries = []
lang1_list = []
lang2_list = []
for x in range(xmin, xmax):
if row1[x].value is not None:
lang1_list.append(str(row1[x].value).strip())
if row2[x].value is not None:
lang2_list.append(str(row2[x].value).strip())
return lang1_list, lang2_list

def merge_dictionaries(dictionaries):
merged_dict = {}
for lang in dictionaries:
dic = dictionaries[lang]
if merged_dict == {}:
merged_dict = dic
else:
for sheet_ref in merged_dict:
for sheet in dic:
if sheet == sheet_ref:
print("Processing sheet "+ sheet)
for count, value in enumerate(merged_dict[sheet]):
original_english = value["English"]["keywords"][0]
try:
ref_english = dic[sheet][count]["English"]["keywords"][0]
except IndexError:
ref_english = None
if original_english == ref_english:

additional_translation = {}
additional_translation = dic[sheet][count]["Translation"][lang]
merged_dict[sheet][count]["Translation"][lang] = additional_translation
else:
print("There is a match problem in '" + sheet + "' sheet, please check all the spreadsheets have the same english rows in the same order")
break


return merged_dict




it = iter(dictionaries.values())
merged = next(it)
for lang, dic in it:
for sheet in merged:
if sheet in dic:
for count, value in enumerate(merged[sheet]):
key_src = value["English"]["keywords"][0]
key_ref = next(
iter(dic[sheet][count]["English"]["keywords"]),
None,
)
if key_src == key_ref:
merged[sheet][count]["Translation"][lang] = dic[sheet][count][
"Translation"
][lang]
else:
print(
f"There is a match problem in '{sheet}' sheet, please "
f"check all the spreadsheets have the same english rows in"
f"the same order"
)
break

return merged


def batch(iterable, n):
"""Batch data into tuples of length n.
Stops when a batch has fewer than n items.
batch('ABCDEFG', 3) --> ABC DEF"""
if n < 1:
raise ValueError("n must be at least one")
it = iter(iterable)
while len(batch := tuple(islice(it, n))) == n:
yield batch


class TableNotFoundError(Exception):
pass