Skip to content
This repository has been archived by the owner on Dec 1, 2024. It is now read-only.

Commit

Permalink
feat: Added methods to filter out redundant messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dikayx committed Aug 19, 2024
1 parent 7208e88 commit b0fabf6
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 30 deletions.
81 changes: 52 additions & 29 deletions mapy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,67 +423,68 @@ def extract_message_data(mail_data: str) -> tuple:
attachments.append(attachment_info)

elif content_type in ['text/plain', 'text/html']:
message_info = process_message_part(part, email_date)
message_info = process_message_part(part, email_date, content_type)
if message_info:
messages.append(message_info)

else:
message_info = process_message_part(msg, email_date)
content_type = msg.get_content_type()
message_info = process_message_part(msg, email_date, content_type)
if message_info:
messages.append(message_info)

return messages, attachments


def process_attachment(part: Message) -> Optional[dict]:
"""
Process an email part as an attachment and return attachment information.
:param part: The email part representing the attachment
:return: A dictionary containing the attachment's filename, base64 encoded data, and length
"""
filename = part.get_filename()
if not filename:
return None

attachment_data = part.get_payload(decode=True)
messages = filter_duplicate_messages(messages)

encoded_data = base64.b64encode(attachment_data).decode('utf-8')
return {
'filename': filename,
'data': encoded_data,
'length': len(attachment_data)
}
return messages, attachments


def process_message_part(part: Message, email_date: str) -> Optional[dict]:
def process_message_part(part: Message, email_date: str, content_type: str) -> Optional[dict]:
"""
Process an email part and return the message content.
:param part: The email part to process
:param email_date: The date of the email
:param content_type: The content type of the email part (e.g., 'text/plain', 'text/html')
:return: A dictionary containing the date and clean message content
:return: A dictionary containing the date, message content, and content type
"""
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or 'utf-8'

try:
decoded_payload = payload.decode(charset, errors='replace')
clean_text = extract_text_from_html(decoded_payload)
clean_text = extract_text_from_html(decoded_payload) if content_type == 'text/html' else decoded_payload

return {
'date': email_date,
'content': clean_text
'content': clean_text.strip(),
'content_type': content_type
}
except Exception as e:
return {
'date': email_date,
'content': f"Error decoding message: {e}"
'content': f"Error decoding message: {e}",
'content_type': content_type
}


def filter_duplicate_messages(messages: list) -> list:
"""
Filter out duplicate messages, keeping only the raw text/plain content.
:param messages: A list of message dictionaries
:return: A filtered list of message dictionaries
"""
filtered_messages = {}
for message in messages:
date = message['date']
if date not in filtered_messages or message['content_type'] == 'text/plain':
filtered_messages[date] = message

return list(filtered_messages.values())


def extract_text_from_html(html_content: str) -> str:
"""
Extract and clean text from HTML content.
Expand All @@ -494,3 +495,25 @@ def extract_text_from_html(html_content: str) -> str:
"""
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text()


def process_attachment(part: Message) -> Optional[dict]:
"""
Process an email part as an attachment and return attachment information.
:param part: The email part representing the attachment
:return: A dictionary containing the attachment's filename, base64 encoded data, and length
"""
filename = part.get_filename()
if not filename:
return None

attachment_data = part.get_payload(decode=True)

encoded_data = base64.b64encode(attachment_data).decode('utf-8')
return {
'filename': filename,
'data': encoded_data,
'length': len(attachment_data)
}
95 changes: 94 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,104 @@ def test_process_attachment():

def test_process_message_part():
email_date = "Fri, 23 Jul 2024 10:21:35 -0700"
message_info = process_message_part(message_part, email_date)
content_type = 'text/plain'
message_info = process_message_part(message_part, email_date, content_type)
assert message_info['date'] == email_date
assert message_info['content'] == 'This is a test message.'


def test_extract_text_from_html():
extracted_text = extract_text_from_html(html_content)
assert extracted_text == '\n\nTest Email\n\nThis is a test email.\n\n\n'


def test_filter_duplicate_messages():
messages = [
{
'date': "2024-08-17T10:21:35Z",
'content': 'This is a plain text message.',
'content_type': 'text/plain'
},
{
'date': "2024-08-17T10:21:35Z",
'content': '<p>This is a formatted HTML message.</p>',
'content_type': 'text/html'
},
{
'date': "2024-08-17T11:00:00Z",
'content': 'Another plain text message.',
'content_type': 'text/plain'
},
{
'date': "2024-08-17T12:00:00Z",
'content': '<p>Another HTML message.</p>',
'content_type': 'text/html'
}
]

filtered_messages = filter_duplicate_messages(messages)

assert len(filtered_messages) == 3

# Check that the first message is the text version for the duplicate date
assert filtered_messages[0]['content'] == 'This is a plain text message.'
assert filtered_messages[0]['content_type'] == 'text/plain'

# Check that the second message is the unique plain text message
assert filtered_messages[1]['content'] == 'Another plain text message.'
assert filtered_messages[1]['content_type'] == 'text/plain'

# Check that the third message is the HTML message for the unique date
assert filtered_messages[2]['content'] == '<p>Another HTML message.</p>'
assert filtered_messages[2]['content_type'] == 'text/html'


def test_filter_duplicate_messages_no_duplicates():
messages = [
{
'date': "2024-08-17T10:21:35Z",
'content': 'This is a plain text message.',
'content_type': 'text/plain'
},
{
'date': "2024-08-17T11:00:00Z",
'content': 'Another plain text message.',
'content_type': 'text/plain'
}
]

filtered_messages = filter_duplicate_messages(messages)

assert len(filtered_messages) == 2

# Check that the first message is kept
assert filtered_messages[0]['content'] == 'This is a plain text message.'
assert filtered_messages[0]['content_type'] == 'text/plain'

# Check that the second message is kept
assert filtered_messages[1]['content'] == 'Another plain text message.'
assert filtered_messages[1]['content_type'] == 'text/plain'


def test_filter_duplicate_messages_text_preferred():
messages = [
{
'date': "2024-08-17T10:21:35Z",
'content': '<p>This is a cleaned HTML message.</p>',
'content_type': 'text/html'
},
{
'date': "2024-08-17T10:21:35Z",
'content': 'This is a plain text message.',
'content_type': 'text/plain'
}
]

filtered_messages = filter_duplicate_messages(messages)

assert len(filtered_messages) == 1

# When handling duplicates, the plain text message should be preferred
# as it gets formatted better than the HTML message
assert filtered_messages[0]['content'] == 'This is a plain text message.'
assert filtered_messages[0]['content_type'] == 'text/plain'

0 comments on commit b0fabf6

Please sign in to comment.