-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
models.py
1262 lines (1026 loc) · 46.9 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Models for Student Identity Verification
This is where we put any models relating to establishing the real-life identity
of a student over a period of time. Right now, the only models are the abstract
`PhotoVerification`, and its one concrete implementation
`SoftwareSecurePhotoVerification`. The hope is to keep as much of the
photo verification process as generic as possible.
"""
import base64
import codecs
import functools
import json
import logging
import os.path
import uuid
from datetime import timedelta
from email.utils import formatdate
import requests
from config_models.models import ConfigurationModel
from django.conf import settings
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.core.files.base import ContentFile
from django.db import models, transaction
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy
from model_utils import Choices
from model_utils.models import StatusModel, TimeStampedModel
from lms.djangoapps.verify_student.statuses import VerificationAttemptStatus
from opaque_keys.edx.django.models import CourseKeyField
from lms.djangoapps.verify_student.ssencrypt import (
decode_and_decrypt,
encrypt_and_encode,
generate_signed_message,
random_aes_key,
rsa_decrypt,
rsa_encrypt
)
from openedx.core.djangoapps.signals.signals import LEARNER_SSO_VERIFIED, PHOTO_VERIFICATION_APPROVED
from openedx.core.storage import get_storage
from .utils import auto_verify_for_testing_enabled, earliest_allowed_verification_date, submit_request_to_ss
log = logging.getLogger(__name__)
def generateUUID(): # pylint: disable=invalid-name
""" Utility function; generates UUIDs """
return str(uuid.uuid4())
class VerificationException(Exception):
pass
def status_before_must_be(*valid_start_statuses):
"""
Helper decorator with arguments to make sure that an object with a `status`
attribute is in one of a list of acceptable status states before a method
is called. You could use it in a class definition like:
@status_before_must_be("submitted", "approved", "denied")
def refund_user(self, user_id):
# Do logic here...
If the object has a status that is not listed when the `refund_user` method
is invoked, it will throw a `VerificationException`. This is just to avoid
distracting boilerplate when looking at a Model that needs to go through a
workflow process.
"""
def decorator_func(func):
"""
Decorator function that gets returned
"""
@functools.wraps(func)
def with_status_check(obj, *args, **kwargs):
if obj.status not in valid_start_statuses:
exception_msg = (
"Error calling {} {}: status is '{}', must be one of: {}"
).format(func, obj, obj.status, valid_start_statuses)
raise VerificationException(exception_msg)
return func(obj, *args, **kwargs)
return with_status_check
return decorator_func
class IDVerificationAttempt(StatusModel):
"""
Each IDVerificationAttempt represents a Student's attempt to establish
their identity through one of several methods that inherit from this Model,
including PhotoVerification and SSOVerification.
.. pii: The User's name is stored in this and sub-models
.. pii_types: name
.. pii_retirement: retained
"""
STATUS = Choices('created', 'ready', 'submitted', 'must_retry', 'approved', 'denied')
user = models.ForeignKey(User, db_index=True, on_delete=models.CASCADE)
# They can change their name later on, so we want to copy the value here so
# we always preserve what it was at the time they requested. We only copy
# this value during the mark_ready() step. Prior to that, you should be
# displaying the user's name from their user.profile.name.
name = models.CharField(blank=True, max_length=255)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
updated_at = models.DateTimeField(auto_now=True, db_index=True)
def expiration_default(): # lint-amnesty, pylint: disable=no-method-argument
return now() + timedelta(days=settings.VERIFY_STUDENT["DAYS_GOOD_FOR"])
# Datetime that the verification will expire.
expiration_date = models.DateTimeField(
null=True,
blank=True,
db_index=True,
default=expiration_default
)
class Meta:
app_label = "verify_student"
abstract = True
ordering = ['-created_at']
@property
def expiration_datetime(self):
"""Account for old DB entries which have `expiration_date` set to NULL."""
if self.expiration_date:
return self.expiration_date
days_good_for = settings.VERIFY_STUDENT["DAYS_GOOD_FOR"]
return self.created_at + timedelta(days=days_good_for)
def should_display_status_to_user(self):
"""Whether or not the status from this attempt should be displayed to the user."""
raise NotImplementedError
def active_at_datetime(self, deadline):
"""Check whether the verification was active at a particular datetime.
Arguments:
deadline (datetime): The date at which the verification was active
(created before and expiration datetime is after today).
Returns:
bool
"""
return (
self.created_at <= deadline and
self.expiration_datetime > now()
)
class ManualVerification(IDVerificationAttempt):
"""
Each ManualVerification represents a user's verification that bypasses the need for
any other verification.
.. pii: The User's name is stored in the parent model
.. pii_types: name
.. pii_retirement: retained
"""
reason = models.CharField(
max_length=255,
blank=True,
help_text=(
'Specifies the reason for manual verification of the user.'
)
)
class Meta:
app_label = 'verify_student'
def __str__(self):
return 'ManualIDVerification for {name}, status: {status}'.format(
name=self.name,
status=self.status,
)
def should_display_status_to_user(self):
"""
Whether or not the status should be displayed to the user.
"""
return False
class SSOVerification(IDVerificationAttempt):
"""
Each SSOVerification represents a Student's attempt to establish their identity
by signing in with SSO. ID verification through SSO bypasses the need for
photo verification.
.. no_pii:
"""
OAUTH2 = 'common.djangoapps.third_party_auth.models.OAuth2ProviderConfig'
SAML = 'common.djangoapps.third_party_auth.models.SAMLProviderConfig'
LTI = 'common.djangoapps.third_party_auth.models.LTIProviderConfig'
IDENTITY_PROVIDER_TYPE_CHOICES = (
(OAUTH2, 'OAuth2 Provider'),
(SAML, 'SAML Provider'),
(LTI, 'LTI Provider'),
)
identity_provider_type = models.CharField(
max_length=100,
blank=False,
choices=IDENTITY_PROVIDER_TYPE_CHOICES,
default=SAML,
help_text=(
'Specifies which type of Identity Provider this verification originated from.'
)
)
identity_provider_slug = models.SlugField(
max_length=30, db_index=True, default='default',
help_text=(
'The slug uniquely identifying the Identity Provider this verification originated from.'
))
class Meta:
app_label = "verify_student"
def __str__(self):
return 'SSOIDVerification for {name}, status: {status}'.format(
name=self.name,
status=self.status,
)
def should_display_status_to_user(self):
"""Whether or not the status from this attempt should be displayed to the user."""
return False
def send_approval_signal(self, approved_by='None'):
"""
Send a signal indicating that this verification was approved.
"""
log.info("Verification for user '{user_id}' approved by '{reviewer}' SSO.".format(
user_id=self.user, reviewer=approved_by
))
# Emit signal to find and generate eligible certificates
LEARNER_SSO_VERIFIED.send_robust(
sender=PhotoVerification,
user=self.user,
)
message = 'LEARNER_SSO_VERIFIED signal fired for {user} from SSOVerification'
log.info(message.format(user=self.user.username))
class PhotoVerification(IDVerificationAttempt):
"""
Each PhotoVerification represents a Student's attempt to establish
their identity by uploading a photo of themselves and a picture ID. An
attempt actually has a number of fields that need to be filled out at
different steps of the approval process. While it's useful as a Django Model
for the querying facilities, **you should only edit a `PhotoVerification`
object through the methods provided**. Initialize them with a user:
attempt = PhotoVerification(user=user)
We track this attempt through various states:
`created`
Initial creation and state we're in after uploading the images.
`ready`
The user has uploaded their images and checked that they can read the
images. There's a separate state here because it may be the case that we
don't actually submit this attempt for review until payment is made.
`submitted`
Submitted for review. The review may be done by a staff member or an
external service. The user cannot make changes once in this state.
`must_retry`
We submitted this, but there was an error on submission (i.e. we did not
get a 200 when we POSTed to Software Secure)
`approved`
An admin or an external service has confirmed that the user's photo and
photo ID match up, and that the photo ID's name matches the user's.
`denied`
The request has been denied. See `error_msg` for details on why. An
admin might later override this and change to `approved`, but the
student cannot re-open this attempt -- they have to create another
attempt and submit it instead.
Because this Model inherits from IDVerificationAttempt, which inherits
from StatusModel, we can also do things like:
attempt.status == PhotoVerification.STATUS.created
attempt.status == "created"
pending_requests = PhotoVerification.submitted.all()
.. pii: The User's name is stored in the parent model, this one stores links to face and photo ID images
.. pii_types: name, image
.. pii_retirement: retained
"""
######################## Fields Set During Creation ########################
# See class docstring for description of status states
# Where we place the uploaded image files (e.g. S3 URLs)
face_image_url = models.URLField(blank=True, max_length=255)
photo_id_image_url = models.URLField(blank=True, max_length=255)
# Randomly generated UUID so that external services can post back the
# results of checking a user's photo submission without use exposing actual
# user IDs or something too easily guessable.
receipt_id = models.CharField(
db_index=True,
default=generateUUID,
max_length=255,
)
# Indicates whether or not a user wants to see the verification status
# displayed on their dash. Right now, only relevant for allowing students
# to "dismiss" a failed midcourse reverification message
# TODO: This field is deprecated.
display = models.BooleanField(db_index=True, default=True)
######################## Fields Set When Submitting ########################
submitted_at = models.DateTimeField(null=True, db_index=True)
#################### Fields Set During Approval/Denial #####################
# If the review was done by an internal staff member, mark who it was.
reviewing_user = models.ForeignKey(
User,
db_index=True,
default=None,
null=True,
related_name="photo_verifications_reviewed",
on_delete=models.CASCADE,
)
# Mark the name of the service used to evaluate this attempt (e.g
# Software Secure).
reviewing_service = models.CharField(blank=True, max_length=255)
# If status is "denied", this should contain text explaining why.
error_msg = models.TextField(blank=True)
# Non-required field. External services can add any arbitrary codes as time
# goes on. We don't try to define an exhaustive list -- this is just
# capturing it so that we can later query for the common problems.
error_code = models.CharField(blank=True, max_length=50)
class Meta:
app_label = "verify_student"
abstract = True
ordering = ['-created_at']
def parsed_error_msg(self):
"""
Sometimes, the error message we've received needs to be parsed into
something more human readable
The default behavior is to return the current error message as is.
"""
return self.error_msg
@status_before_must_be("created")
def upload_face_image(self, img_data):
raise NotImplementedError
@status_before_must_be("created")
def upload_photo_id_image(self, img_data):
raise NotImplementedError
@status_before_must_be("created")
def mark_ready(self):
"""
Mark that the user data in this attempt is correct. In order to
succeed, the user must have uploaded the necessary images
(`face_image_url`, `photo_id_image_url`). This method will also copy
their name from their user profile. Prior to marking it ready, we read
this value directly from their profile, since they're free to change it.
This often happens because people put in less formal versions of their
name on signup, but realize they want something different to go on a
formal document.
Valid attempt statuses when calling this method:
`created`
Status after method completes: `ready`
Other fields that will be set by this method:
`name`
State Transitions:
`created` → `ready`
This is what happens when the user confirms to us that the pictures
they uploaded are good. Note that we don't actually do a submission
anywhere yet.
"""
# If a name is not already set via the verified_name flow,
# pick up the profile name at this time.
if not self.name:
self.name = self.user.profile.name # pylint: disable=no-member
self.status = self.STATUS.ready
self.save()
@status_before_must_be("must_retry", "submitted", "approved", "denied")
def approve(self, user_id=None, service=""):
"""
Approve this attempt. `user_id`
Valid attempt statuses when calling this method:
`submitted`, `approved`, `denied`
Status after method completes: `approved`
Other fields that will be set by this method:
`reviewed_by_user_id`, `reviewed_by_service`, `error_msg`
State Transitions:
`submitted` → `approved`
This is the usual flow, whether initiated by a staff user or an
external validation service.
`approved` → `approved`
No-op. First one to approve it wins.
`denied` → `approved`
This might happen if a staff member wants to override a decision
made by an external service or another staff member (say, in
response to a support request). In this case, the previous values
of `reviewed_by_user_id` and `reviewed_by_service` will be changed
to whoever is doing the approving, and `error_msg` will be reset.
The only record that this record was ever denied would be in our
logs. This should be a relatively rare occurrence.
"""
# If someone approves an outdated version of this, the first one wins
if self.status == self.STATUS.approved:
return
log.info("Verification for user '{user_id}' approved by '{reviewer}'.".format(
user_id=self.user, reviewer=user_id
))
self.error_msg = "" # reset, in case this attempt was denied before
self.error_code = "" # reset, in case this attempt was denied before
self.reviewing_user = user_id
self.reviewing_service = service
self.status = self.STATUS.approved
self.expiration_date = now() + timedelta(
days=settings.VERIFY_STUDENT["DAYS_GOOD_FOR"]
)
self.save()
# Emit signal to find and generate eligible certificates
PHOTO_VERIFICATION_APPROVED.send_robust(
sender=PhotoVerification,
user=self.user,
)
message = 'PHOTO_VERIFICATION_APPROVED signal fired for {user} from PhotoVerification'
log.info(message.format(user=self.user.username))
@status_before_must_be("ready", "must_retry")
def mark_submit(self):
"""
Submit this attempt.
Valid attempt statuses when calling this method:
`ready`, `must_retry`
Status after method completes: `submitted`
State Transitions:
→ → → `must_retry`
↑ ↑ ↓
`ready` → `submitted`
"""
self.submitted_at = now()
self.status = self.STATUS.submitted
self.save()
@status_before_must_be("ready", "must_retry", "submitted")
def mark_must_retry(self, error=""):
"""
Set the attempt status to `must_retry`.
Mark that this attempt could not be completed because of a system error.
Status should be moved to `must_retry`. For example, if Software Secure
service is down and we couldn't process the request even after retrying.
Valid attempt statuses when calling this method:
`ready`, `submitted`
Status after method completes: `must_retry`
State Transitions:
→ → → `must_retry`
↑ ↑ ↓
`ready` → `submitted`
"""
if self.status == self.STATUS.must_retry:
return
self.status = self.STATUS.must_retry
self.error_msg = error
self.save()
@status_before_must_be("must_retry", "submitted", "approved", "denied")
def deny(self,
error_msg,
error_code="",
reviewing_user=None,
reviewing_service=""):
"""
Deny this attempt.
Valid attempt statuses when calling this method:
`submitted`, `approved`, `denied`
Status after method completes: `denied`
Other fields that will be set by this method:
`reviewed_by_user_id`, `reviewed_by_service`, `error_msg`,
`error_code`
State Transitions:
`submitted` → `denied`
This is the usual flow, whether initiated by a staff user or an
external validation service.
`approved` → `denied`
This might happen if a staff member wants to override a decision
made by an external service or another staff member, or just correct
a mistake made during the approval process. In this case, the
previous values of `reviewed_by_user_id` and `reviewed_by_service`
will be changed to whoever is doing the denying. The only record
that this record was ever approved would be in our logs. This should
be a relatively rare occurrence.
`denied` → `denied`
Update the error message and reviewing_user/reviewing_service. Just
lets you amend the error message in case there were additional
details to be made.
"""
log.info("Verification for user '{user_id}' denied by '{reviewer}'.".format(
user_id=self.user, reviewer=reviewing_user
))
self.error_msg = error_msg
self.error_code = error_code
self.reviewing_user = reviewing_user
self.reviewing_service = reviewing_service
self.status = self.STATUS.denied
self.save()
@status_before_must_be("must_retry", "submitted", "approved", "denied")
def system_error(self,
error_msg,
error_code="",
reviewing_user=None,
reviewing_service=""):
"""
Mark that this attempt could not be completed because of a system error.
Status should be moved to `must_retry`. For example, if Software Secure
reported to us that they couldn't process our submission because they
couldn't decrypt the image we sent.
"""
if self.status in [self.STATUS.approved, self.STATUS.denied]:
return # If we were already approved or denied, just leave it.
self.error_msg = error_msg
self.error_code = error_code
self.reviewing_user = reviewing_user
self.reviewing_service = reviewing_service
self.status = self.STATUS.must_retry
self.save()
@classmethod
def retire_user(cls, user_id):
"""
Retire user as part of GDPR Phase I
Returns 'True' if records found
:param user_id: int
:return: bool
"""
try:
user_obj = User.objects.get(id=user_id)
except User.DoesNotExist:
return False
photo_objects = cls.objects.filter(
user=user_obj
).update(
name='',
face_image_url='',
photo_id_image_url='',
photo_id_key=''
)
return photo_objects > 0
class SoftwareSecurePhotoVerification(PhotoVerification):
"""
Model to verify identity using a service provided by Software Secure. Much
of the logic is inherited from `PhotoVerification`, but this class
encrypts the photos.
Software Secure (http://www.softwaresecure.com/) is a remote proctoring
service that also does identity verification. A student uses their webcam
to upload two images: one of their face, one of a photo ID. Due to the
sensitive nature of the data, the following security precautions are taken:
1. The snapshot of their face is encrypted using AES-256 in CBC mode. All
face photos are encrypted with the same key, and this key is known to
both Software Secure and edx-platform.
2. The snapshot of a user's photo ID is also encrypted using AES-256, but
the key is randomly generated using os.urandom. Every verification
attempt has a new key. The AES key is then encrypted using a public key
provided by Software Secure. We store only the RSA-encrypted AES key.
Since edx-platform does not have Software Secure's private RSA key, it
means that we can no longer even read photo ID.
3. The encrypted photos are base64 encoded and stored in an S3 bucket that
edx-platform does not have read access to.
Note: this model handles *initial* verifications (which you must perform
at the time you register for a verified cert).
.. pii: The User's name is stored in the parent model, this one stores links to face and photo ID images
.. pii_types: name, image
.. pii_retirement: retained
"""
# This is a base64.urlsafe_encode(rsa_encrypt(photo_id_aes_key), ss_pub_key)
# So first we generate a random AES-256 key to encrypt our photo ID with.
# Then we RSA encrypt it with Software Secure's public key. Then we base64
# encode that. The result is saved here. Actual expected length is 344.
photo_id_key = models.TextField(max_length=1024)
IMAGE_LINK_DURATION = 5 * 60 * 60 * 24 # 5 days in seconds
copy_id_photo_from = models.ForeignKey("self", null=True, blank=True, on_delete=models.CASCADE)
# This field is used to maintain a check for learners to which email
# to notify for expired verification is already sent.
expiry_email_date = models.DateTimeField(null=True, blank=True, db_index=True)
@classmethod
def get_initial_verification(cls, user, earliest_allowed_date=None):
"""Get initial verification for a user with the 'photo_id_key'.
Arguments:
user(User): user object
earliest_allowed_date(datetime): override expiration date for initial verification
Return:
SoftwareSecurePhotoVerification (object) or None
"""
init_verification = cls.objects.filter(
user=user,
status__in=["submitted", "approved"],
created_at__gte=(
earliest_allowed_date or earliest_allowed_verification_date()
)
).exclude(photo_id_key='')
return init_verification.latest('created_at') if init_verification.exists() else None
@classmethod
def get_verification_from_receipt(cls, receipt_id):
"""Get a verification for a user based on the photo receipt_id
Arguments:
receipt_id(String): receipt ID of the user photo or ID photo
Return:
SoftwareSecurePhotoVerification (object) or None
"""
try:
verification = cls.objects.get(receipt_id=receipt_id)
return verification
except cls.DoesNotExist:
return None
def _save_image_to_storage(self, path, img_data):
"""
Given a path and data, save to S3
Separated out for ease of mocking in testing
"""
buff = ContentFile(img_data)
self._storage.save(path, buff)
@status_before_must_be("created")
def upload_face_image(self, img_data):
"""
Upload an image of the user's face. `img_data` should be a raw
bytestream of a PNG image. This method will take the data, encrypt it
using our FACE_IMAGE_AES_KEY, encode it with base64 and save it to the
storage backend.
Yes, encoding it to base64 adds compute and disk usage without much real
benefit, but that's what the other end of this API is expecting to get.
"""
# Skip this whole thing if we're running acceptance tests or if we're
# developing and aren't interested in working on student identity
# verification functionality. If you do want to work on it, you have to
# explicitly enable these in your private settings.
if auto_verify_for_testing_enabled():
return
aes_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["FACE_IMAGE_AES_KEY"]
aes_key = codecs.decode(aes_key_str, "hex")
encrypted_data = encrypt_and_encode(img_data, aes_key)
path = self._get_path("face")
self._save_image_to_storage(path, encrypted_data)
@status_before_must_be("created")
def upload_photo_id_image(self, img_data):
"""
Upload an the user's photo ID image. `img_data` should be a raw
bytestream of a PNG image. This method will take the data, encrypt it
using a randomly generated AES key, encode it with base64 and save it
to the storage backend. The random key is also encrypted using Software
Secure's public RSA key and stored in our `photo_id_key` field.
Yes, encoding it to base64 adds compute and disk usage without much real
benefit, but that's what the other end of this API is expecting to get.
"""
# Skip this whole thing if we're running acceptance tests or if we're
# developing and aren't interested in working on student identity
# verification functionality. If you do want to work on it, you have to
# explicitly enable these in your private settings.
if auto_verify_for_testing_enabled():
# fake photo id key is set only for initial verification
self.photo_id_key = 'fake-photo-id-key'
self.save()
return
aes_key = random_aes_key()
rsa_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["RSA_PUBLIC_KEY"]
rsa_encrypted_aes_key = rsa_encrypt(aes_key, rsa_key_str)
# Save this to the storage backend
path = self._get_path("photo_id")
encrypted_data = encrypt_and_encode(img_data, aes_key)
self._save_image_to_storage(path, encrypted_data)
# Update our record fields
self.photo_id_key = codecs.encode(rsa_encrypted_aes_key, 'base64').decode('utf-8')
self.save()
def _get_image_from_storage(self, path):
"""
Given a path, read data from storage and return
Separated for ease of mocking in testing
"""
with self._storage.open(path, mode='rb') as img_file:
byte_img_data = img_file.read()
return byte_img_data
@status_before_must_be("must_retry", "submitted", "approved", "denied")
def download_face_image(self):
"""
Download the associated face image from storage
"""
if not settings.VERIFY_STUDENT["SOFTWARE_SECURE"].get("RSA_PRIVATE_KEY", None):
return None
path = self._get_path("face")
byte_img_data = self._get_image_from_storage(path)
aes_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["FACE_IMAGE_AES_KEY"]
try:
aes_key = codecs.decode(aes_key_str, "hex")
img_bytes = decode_and_decrypt(byte_img_data, aes_key)
return img_bytes
except Exception as e: # pylint: disable=broad-except
log.exception('Failed to decrypt face image due to an exception: %s', e)
return None
@status_before_must_be("must_retry", "submitted", "approved", "denied")
def download_photo_id_image(self):
"""
Download the associated id image from storage
"""
if not settings.VERIFY_STUDENT["SOFTWARE_SECURE"].get("RSA_PRIVATE_KEY", None):
return None
path = self._get_path("photo_id")
byte_img_data = self._get_image_from_storage(path)
try:
# decode rsa encrypted aes key from base64
rsa_encrypted_aes_key = base64.urlsafe_b64decode(self.photo_id_key.encode('utf-8'))
# decrypt aes key using rsa private key
rsa_private_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["RSA_PRIVATE_KEY"]
decrypted_aes_key = rsa_decrypt(rsa_encrypted_aes_key, rsa_private_key_str)
img_bytes = decode_and_decrypt(byte_img_data, decrypted_aes_key)
return img_bytes
except Exception as e: # pylint: disable=broad-except
log.exception('Failed to decrypt photo id image due to an exception: %s', e)
return None
@status_before_must_be("must_retry", "ready", "submitted")
def submit(self, copy_id_photo_from=None):
"""
Submit our verification attempt to Software Secure for validation. This
will set our status to "submitted", if the post is successful or will set to
"must_retry" if the post fails.
Keyword Arguments:
copy_id_photo_from (SoftwareSecurePhotoVerification): If provided, re-send the ID photo
data from this attempt. This is used for re-verification, in which new face photos
are sent with previously-submitted ID photos.
"""
if auto_verify_for_testing_enabled():
self.mark_submit()
fake_response = requests.Response()
fake_response.status_code = 200
return fake_response
if copy_id_photo_from is not None:
log.info(
('Software Secure attempt for user: %r and receipt ID: %r used the same photo ID data as the '
'receipt with ID %r.'),
self.user.username,
self.receipt_id,
copy_id_photo_from.receipt_id,
)
transaction.on_commit(
lambda: submit_request_to_ss(user_verification=self, copy_id_photo_from=copy_id_photo_from)
)
def parsed_error_msg(self):
"""
Parse the error messages we receive from SoftwareSecure
Error messages are written in the form:
`[{"photoIdReasons": ["Not provided"]}]`
Returns:
str[]: List of error messages.
"""
parsed_errors = []
error_map = {
'EdX name not provided': 'name_mismatch',
'Name mismatch': 'name_mismatch',
'Photo/ID Photo mismatch': 'photos_mismatched',
'ID name not provided': 'id_image_missing_name',
'Invalid Id': 'id_invalid',
'No text': 'id_invalid',
'Not provided': 'id_image_missing',
'Photo hidden/No photo': 'id_image_not_clear',
'Text not clear': 'id_image_not_clear',
'Face out of view': 'user_image_not_clear',
'Image not clear': 'user_image_not_clear',
'Photo not provided': 'user_image_missing',
}
try:
messages = set()
message_groups = json.loads(self.error_msg)
for message_group in message_groups:
messages = messages.union(set(*message_group.values()))
for message in messages:
parsed_error = error_map.get(message)
if parsed_error:
parsed_errors.append(parsed_error)
else:
log.debug('Ignoring photo verification error message: %s', message)
except Exception: # pylint: disable=broad-except
log.exception('Failed to parse error message for SoftwareSecurePhotoVerification %d', self.pk)
return parsed_errors
def image_url(self, name, override_receipt_id=None):
"""
We dynamically generate this, since we want it the expiration clock to
start when the message is created, not when the record is created.
Arguments:
name (str): Name of the image (e.g. "photo_id" or "face")
Keyword Arguments:
override_receipt_id (str): If provided, use this receipt ID instead
of the ID for this attempt. This is useful for reverification
where we need to construct a URL to a previously-submitted
photo ID image.
Returns:
string: The expiring URL for the image.
"""
path = self._get_path(name, override_receipt_id=override_receipt_id)
return self._storage.url(path)
@cached_property
def _storage(self):
"""
Return the configured django storage backend.
"""
config = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]
# Default to the S3 backend for backward compatibility
storage_class = config.get("STORAGE_CLASS", "storages.backends.s3boto3.S3Boto3Storage")
storage_kwargs = config.get("STORAGE_KWARGS", {})
# Map old settings to the parameters expected by the storage backend
if "AWS_ACCESS_KEY" in config:
storage_kwargs["access_key"] = config["AWS_ACCESS_KEY"]
if "AWS_SECRET_KEY" in config:
storage_kwargs["secret_key"] = config["AWS_SECRET_KEY"]
if "S3_BUCKET" in config:
storage_kwargs["bucket_name"] = config["S3_BUCKET"]
storage_kwargs["querystring_expire"] = self.IMAGE_LINK_DURATION
return get_storage(storage_class, **storage_kwargs)
def _get_path(self, prefix, override_receipt_id=None):
"""
Returns the path to a resource with this instance's `receipt_id`.
If `override_receipt_id` is given, the path to that resource will be
retrieved instead. This allows us to retrieve images submitted in
previous attempts (used for reverification, where we send a new face
photo with the same photo ID from a previous attempt).
"""
receipt_id = self.receipt_id if override_receipt_id is None else override_receipt_id
return os.path.join(prefix, receipt_id)
def _encrypted_user_photo_key_str(self):
"""
Software Secure needs to have both UserPhoto and PhotoID decrypted in
the same manner. So even though this is going to be the same for every
request, we're also using RSA encryption to encrypt the AES key for
faces.
"""
face_aes_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["FACE_IMAGE_AES_KEY"]
face_aes_key = codecs.decode(face_aes_key_str, 'hex')
rsa_key_str = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["RSA_PUBLIC_KEY"]
rsa_encrypted_face_aes_key = rsa_encrypt(face_aes_key, rsa_key_str)
return base64.b64encode(rsa_encrypted_face_aes_key).decode('utf-8')
def create_request(self, copy_id_photo_from=None):
"""
Construct the HTTP request to the photo verification service.
Keyword Arguments:
copy_id_photo_from (SoftwareSecurePhotoVerification): If provided, re-send the ID photo
data from this attempt. This is used for re-verification, in which new face photos
are sent with previously-submitted ID photos.
Returns:
tuple of (header, body), where both `header` and `body` are dictionaries.
"""
access_key = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["API_ACCESS_KEY"]
secret_key = settings.VERIFY_STUDENT["SOFTWARE_SECURE"]["API_SECRET_KEY"]
scheme = "https" if settings.HTTPS == "on" else "http"
callback_url = "{}://{}{}".format(
scheme, settings.SITE_NAME, reverse('verify_student_results_callback')
)
# If we're copying the photo ID image from a previous verification attempt,
# then we need to send the old image data with the correct image key.
photo_id_url = (
self.image_url("photo_id")
if copy_id_photo_from is None
else self.image_url("photo_id", override_receipt_id=copy_id_photo_from.receipt_id)
)
photo_id_key = (
self.photo_id_key
if copy_id_photo_from is None else
copy_id_photo_from.photo_id_key
)
body = {
"EdX-ID": str(self.receipt_id),
"ExpectedName": self.name,
"PhotoID": photo_id_url,
"PhotoIDKey": photo_id_key,
"SendResponseTo": callback_url,
"UserPhoto": self.image_url("face"),
"UserPhotoKey": self._encrypted_user_photo_key_str(),
}
headers = {