-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathread_email.py
255 lines (229 loc) · 9.3 KB
/
read_email.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import base64
import binascii
import email
import imaplib
import socket
import warnings
from collections.abc import Generator
from datetime import datetime, timedelta, timezone
from email.header import decode_header, make_header
from email.message import Message
from typing import Iterable, Union
import pytz
from typing_extensions import Unpack
from .models.config import IngressConfig
from .models.options import Category, Condition
from .models.responder import Email, Response
class ReadEmail:
"""Initiates Emailer object to authenticate and yield the emails according the conditions/filters.
>>> ReadEmail
"""
LOCAL_TIMEZONE = datetime.now(timezone.utc).astimezone().tzinfo
def __init__(self, **kwargs: "Unpack[IngressConfig]"):
"""Loads all the necessary args, creates a connection with Gmail host to read emails from the chosen folder.
Keyword Args:
gmail_user: Gmail username to authenticate SMTP lib.
gmail_pass: Gmail password to authenticate SMTP lib.
timeout: Connection timeout for SMTP lib.
gmail_host: Hostname for gmail's smtp server.
folder: Folder where the emails have to be read from.
References:
https://imapclient.readthedocs.io/en/2.1.0/_modules/imapclient/imapclient.html#IMAPClient.xlist_folders
See Also:
Uses broad ``Exception`` clause to catch login errors, since the same is raised by ``imaplib``
"""
self.error, self.mail = None, None
self._authenticated = False
self.env = IngressConfig(**kwargs)
self.create_ssl_connection()
def create_ssl_connection(self) -> None:
"""Creates an SSL connection to gmail's SSL server."""
try:
self.mail = imaplib.IMAP4_SSL(
host=self.env.gmail_host, port=993, timeout=self.env.timeout
)
except socket.error as error:
self.error = error.__str__()
@property
def authenticate(self) -> Response:
"""Initiates authentication.
Returns:
Response:
A custom response object with properties: ok, status and body to the user.
"""
if self.mail is None:
return Response(
dictionary={
"ok": False,
"status": 408,
"body": self.error
or "failed to create a connection with gmail's SMTP server",
}
)
try:
self.mail.login(user=self.env.gmail_user, password=self.env.gmail_pass)
self.mail.list() # list all the folders within your mailbox (like inbox, sent, drafts, etc)
self.mail.select(self.env.folder)
self._authenticated = True
return Response(
dictionary={"ok": True, "status": 200, "body": "authentication success"}
)
except Exception as error:
self.error = error.__str__()
return Response(
dictionary={"ok": False, "status": 401, "body": "authentication failed"}
)
def instantiate(
self,
filters: Union[
Iterable[Category.__str__], Iterable[Condition.__str__]
] = "UNSEEN",
) -> Response:
"""Searches the number of emails for the category received and forms.
Args:
filters: Category or Condition
References:
https://imapclient.readthedocs.io/en/2.1.0/api.html#imapclient.IMAPClient.search
Returns:
Response:
A Response class containing number of email messages, return code and the encoded messages itself.
"""
if self._authenticated is False:
status = self.authenticate
if not status.ok:
return status
if type(filters) in (list, tuple):
filters = " ".join(filters)
return_code, messages = self.mail.search(None, filters)
if return_code != "OK":
return Response(
dictionary={
"ok": False,
"status": 404,
"body": "Unable to read emails.",
}
)
num = len(messages[0].split())
if not num:
return Response(
dictionary={
"ok": False,
"status": 204,
"body": f"No emails found in {self.env.gmail_user} [{self.env.folder}] "
f"for the filter(s) {filters.lower()!r}",
"count": num,
}
)
if return_code == "OK":
return Response(
dictionary={"ok": True, "status": 200, "body": messages, "count": num}
)
def get_info(self, response_part: tuple, dt_flag: bool) -> Email:
"""Extracts sender, subject, body and time received from response part.
Args:
response_part: Encoded tuple of the response part in the email.
dt_flag: Boolean flag whether to convert datetime as human-readable format.
Returns:
Email:
Email object with information.
"""
original_email = email.message_from_bytes(response_part[1])
if received := original_email.get("Received"):
date = received.split(";")[-1].strip()
else:
date = original_email.get("Date")
if "(PDT)" in date:
datetime_obj = datetime.strptime(date, "%a, %d %b %Y %H:%M:%S -0700 (PDT)")
elif "(PST)" in date:
datetime_obj = datetime.strptime(date, "%a, %d %b %Y %H:%M:%S -0800 (PST)")
else:
datetime_obj = datetime.now()
from_ = original_email["From"].split(" <")
sub = (
make_header(decode_header(original_email["Subject"]))
if original_email["Subject"]
else None
)
# Converts pacific time to local timezone as the default is pacific
local_time = datetime_obj.replace(
tzinfo=pytz.timezone("US/Pacific")
).astimezone(tz=self.LOCAL_TIMEZONE)
if dt_flag:
received_date = local_time.strftime("%Y-%m-%d")
current_date_ = datetime.today().date()
# Replaces current date with today or yesterday to make it simpler
if received_date == str(current_date_):
receive = local_time.strftime("Today, at %I:%M %p")
elif received_date == str(current_date_ - timedelta(days=1)):
receive = local_time.strftime("Yesterday, at %I:%M %p")
else:
receive = local_time.strftime("on %A, %B %d, at %I:%M %p")
else:
receive = local_time
if (
original_email.get_content_type() == "text/plain"
): # ignore attachments and html
body = original_email.get_payload(decode=True)
body = body.decode("utf-8")
else:
body = ""
for payload in original_email.get_payload():
if isinstance(payload, Message):
body += payload.as_string()
elif isinstance(payload, str):
body += payload
elif isinstance(payload, bytes):
try:
decoded = base64.b64decode(payload)
except binascii.Error:
try:
decoded = (
payload.decode()
) # encoding is unknown at this point so default to UTF-8
except UnicodeDecodeError:
warnings.warn("Unknown encoding type for payload")
continue
body += decoded
else:
warnings.warn(f"Unsupported payload type: {type(payload)}")
if len(from_) == 1:
return Email(
dictionary=dict(
sender=None,
sender_email=from_[0].lstrip("<").rstrip(">"),
subject=sub,
date_time=receive,
body=body,
)
)
return Email(
dictionary=dict(
sender=from_[0],
sender_email=from_[1].rstrip(">"),
subject=sub,
date_time=receive,
body=body,
)
)
def read_mail(
self, messages: Union[list, str], humanize_datetime: bool = False
) -> Generator[Email]:
"""Yield emails matching the filters' criteria.
Args:
messages: Takes the encoded message list as an argument. This is the body of the ``instantiate`` method.
humanize_datetime: Converts received time to human-readable format.
Yields:
dict:
A custom response object with properties: ok, status and body to the user.
"""
for nm in messages[0].split():
dummy, data = self.mail.fetch(nm, "(RFC822)")
for each_response in data:
if isinstance(each_response, tuple):
yield self.get_info(
response_part=each_response, dt_flag=humanize_datetime
)
else:
if self.mail:
self.mail.close()
self.mail.logout()