diff --git a/perma_web/perma/templates/user_management/manage_users.html b/perma_web/perma/templates/user_management/manage_users.html
index a2d613536..83c370b47 100644
--- a/perma_web/perma/templates/user_management/manage_users.html
+++ b/perma_web/perma/templates/user_management/manage_users.html
@@ -86,7 +86,9 @@
Filter & Sort:
{% if group_name == 'sponsored_user' %}
-
+
+ {% elif group_name == 'organization_user' %}
+
{% endif %}
diff --git a/perma_web/perma/tests/test_permissions.py b/perma_web/perma/tests/test_permissions.py
index 32cb52ba5..471da201a 100644
--- a/perma_web/perma/tests/test_permissions.py
+++ b/perma_web/perma/tests/test_permissions.py
@@ -91,6 +91,7 @@ def test_permissions(client, admin_user, registrar_user, org_user, link_user_fac
{
'urls': [
['user_management_manage_organization_user'],
+ ['user_management_manage_organization_user_export_user_list'],
['user_management_manage_organization'],
['user_management_organization_user_add_user'],
],
diff --git a/perma_web/perma/tests/test_views_user_management.py b/perma_web/perma/tests/test_views_user_management.py
index ffd2c021d..ef06938a5 100644
--- a/perma_web/perma/tests/test_views_user_management.py
+++ b/perma_web/perma/tests/test_views_user_management.py
@@ -351,6 +351,54 @@ def test_org_export_user_list(self):
reader_record_count += 1
self.assertEqual(reader_record_count, record_count)
+ def test_organization_user_export_user_list(self):
+ expected_results = [
+ ('case_one_lawyer@firm.com', 'Some Case'),
+ ('multi_registrar_org_user@example.com', 'Another Journal'),
+ ('multi_registrar_org_user@example.com', "Another Library's Journal"),
+ ('multi_registrar_org_user@example.com', 'A Third Journal'),
+ ('multi_registrar_org_user@example.com', 'Test Journal'),
+ ('test_another_library_org_user@example.com', "Another Library's Journal"),
+ ('test_another_library_org_user@example.com', 'A Third Journal'),
+ ('test_yet_another_library_org_user@example.com', "Another Library's Journal"),
+ ('test_another_org_user@example.com', 'A Third Journal'),
+ ('test_org_rando_user@example.com', 'Test Journal'),
+ ('test_org_user@example.com', 'Test Journal'),
+ ]
+
+ # Get CSV export output
+ csv_response: HttpResponse = self.get(
+ 'user_management_manage_organization_user_export_user_list',
+ request_kwargs={'data': {'format': 'csv'}},
+ user=self.admin_user,
+ )
+ self.assertEqual(csv_response.headers['Content-Type'], 'text/csv')
+
+ # Validate CSV output against expected results
+ csv_file = StringIO(csv_response.content.decode('utf8'))
+ reader = csv.DictReader(csv_file)
+ for index, record in enumerate(reader):
+ expected_email, expected_organization_name = expected_results[index]
+ self.assertEqual(record['email'], expected_email)
+ self.assertEqual(record['organization_name'], expected_organization_name)
+ self.assertEqual(index + 1, len(expected_results))
+
+ # Get JSON export output
+ json_response: HttpResponse = self.get(
+ 'user_management_manage_organization_user_export_user_list',
+ request_kwargs={'data': {'format': 'json'}},
+ user=self.admin_user,
+ )
+ self.assertEqual(json_response.headers['Content-Type'], 'application/json')
+
+ # Validate JSON output against expected results
+ reader = json.loads(json_response.content)
+ for index, record in enumerate(reader):
+ expected_email, expected_organization_name = expected_results[index]
+ self.assertEqual(record['email'], expected_email)
+ self.assertEqual(record['organization_name'], expected_organization_name)
+ self.assertEqual(index + 1, len(expected_results))
+
def test_sponsored_user_export_user_list(self):
expected_results = [
('another_inactive_sponsored_user@example.com', 'inactive'),
diff --git a/perma_web/perma/urls.py b/perma_web/perma/urls.py
index 66d44cd1e..95952f041 100755
--- a/perma_web/perma/urls.py
+++ b/perma_web/perma/urls.py
@@ -147,6 +147,7 @@
re_path(r'^manage/users/resend-activation/(?P\d+)/?$', user_management.resend_activation, name='user_management_resend_activation'),
re_path(r'^manage/organization-users/?$', user_management.manage_organization_user, name='user_management_manage_organization_user'),
+ re_path(r'^manage/organization-users/export/?$', user_management.manage_organization_user_export_user_list, name='user_management_manage_organization_user_export_user_list'),
re_path(r'^manage/organization-users/add-user/?$', AddUserToOrganization.as_view(), name='user_management_organization_user_add_user'),
re_path(r'^manage/organization-users/(?P\d+)/?$', user_management.manage_single_organization_user, name='user_management_manage_single_organization_user'),
re_path(r'^manage/organization-users/(?P\d+)/delete/?$', user_management.manage_single_organization_user_delete, name='user_management_manage_single_organization_user_delete'),
diff --git a/perma_web/perma/utils.py b/perma_web/perma/utils.py
index e72b1cc8e..f17b57ec6 100644
--- a/perma_web/perma/utils.py
+++ b/perma_web/perma/utils.py
@@ -1,48 +1,55 @@
-from contextlib import contextmanager
from collections import OrderedDict
-from datetime import datetime, timedelta, timezone as tz
-from dateutil.relativedelta import relativedelta
-from functools import wraps, reduce
+from contextlib import contextmanager
+import csv
+from datetime import datetime, timedelta
+from datetime import timezone as tz
+from functools import reduce, wraps
import hashlib
-from hanzo import warctools
import itertools
import json
import logging
-from nacl import encoding
-from nacl.public import Box, PrivateKey, PublicKey
import operator
import os
-import requests
import string
-import surt
-import tempdir
import tempfile
-from typing import TypeVar
-from ua_parser import user_agent_parser
+from typing import Literal, TypeVar
import unicodedata
-from warcio.warcwriter import BufferWARCWriter
from wsgiref.util import FileWrapper
-from django.core.paginator import Paginator, EmptyPage, Page
-from django.db.models import Q
-from django.db.models.manager import BaseManager
+from dateutil.relativedelta import relativedelta
from django.conf import settings
+from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied, ValidationError
-from django.urls import reverse
+from django.core.files.storage import storages
+from django.core.paginator import EmptyPage, Page, Paginator
from django.core.serializers.json import DjangoJSONEncoder
+from django.db.models import Q
+from django.db.models.manager import BaseManager
from django.http import (
+ Http404,
HttpRequest,
+ HttpResponse,
HttpResponseForbidden,
- Http404,
+ JsonResponse,
StreamingHttpResponse,
- HttpResponse,
)
-from django.contrib.auth.decorators import login_required
-from django.core.files.storage import storages
+from django.urls import reverse
from django.utils import timezone
from django.views.decorators.debug import sensitive_variables
+from hanzo import warctools
+from nacl import encoding
+from nacl.public import Box, PrivateKey, PublicKey
+import requests
+import surt
+import tempdir
+from ua_parser import user_agent_parser
+from warcio.warcwriter import BufferWARCWriter
-from .exceptions import InvalidTransmissionException, ScoopAPIException, ScoopAPINetworkException
+from perma.exceptions import (
+ InvalidTransmissionException,
+ ScoopAPIException,
+ ScoopAPINetworkException,
+)
logger = logging.getLogger(__name__)
warn = logger.warn
@@ -176,6 +183,31 @@ def apply_pagination(request: HttpRequest, queryset: BaseManager[T]) -> Page:
except EmptyPage:
return paginator.page(1)
+
+def export_queryset(
+ queryset: BaseManager[T],
+ export_format: Literal['csv', 'json'],
+ field_names: list[str],
+ filename: str = 'export',
+) -> HttpResponse | JsonResponse:
+ """Export a queryset as a CSV or JSON response."""
+ match export_format:
+ case 'csv':
+ response = HttpResponse(
+ content_type='text/csv',
+ headers={'Content-Disposition': f'attachment; filename="{filename}.csv"'},
+ )
+ writer = csv.DictWriter(response, fieldnames=field_names)
+ writer.writeheader()
+ for record in queryset:
+ writer.writerow(record)
+ case 'json':
+ response = JsonResponse(list(queryset), safe=False)
+ case _:
+ raise ValueError('export_format must be one of: csv, json')
+ return response
+
+
### form view helpers ###
def get_form_data(request):
diff --git a/perma_web/perma/views/user_management.py b/perma_web/perma/views/user_management.py
index b62620090..d3a9a6aac 100755
--- a/perma_web/perma/views/user_management.py
+++ b/perma_web/perma/views/user_management.py
@@ -1,6 +1,4 @@
-import csv
import logging
-from typing import Literal
from django.conf import settings
from django.contrib import messages
@@ -56,6 +54,7 @@
apply_pagination,
apply_search_query,
apply_sort_order,
+ export_queryset,
get_form_data,
ratelimit_ip_key,
user_passes_test_or_403,
@@ -326,21 +325,13 @@ def manage_sponsored_user_export_user_list(request: HttpRequest) -> HttpResponse
sponsorship_status=F('sponsorships__status'),
sponsorship_created_at=F('sponsorships__created_at'),
).values(*field_names)
+ filename = 'perma-sponsored-users'
# Export records in appropriate format based on `format` URL parameter
- export_format: Literal['csv', 'json'] = request.GET.get('format', 'csv').casefold()
- match export_format:
- case 'json':
- response = JsonResponse(list(users), safe=False)
- case 'csv' | _:
- response = HttpResponse(
- content_type='text/csv',
- headers={'Content-Disposition': 'attachment; filename="perma-sponsored-users.csv"'},
- )
- writer = csv.DictWriter(response, fieldnames=field_names)
- writer.writeheader()
- for user in users:
- writer.writerow(user)
+ export_format = request.GET.get('format', '').casefold()
+ if export_format not in ['csv', 'json']:
+ export_format = 'csv'
+ response = export_queryset(users, export_format, field_names, filename)
return response
@@ -376,6 +367,33 @@ def manage_single_user_reactivate(request, user_id):
def manage_organization_user(request):
return list_users_in_group(request, 'organization_user')
+
+@user_passes_test_or_403(
+ lambda user: user.is_staff or user.is_registrar_user() or user.is_organization_user
+)
+def manage_organization_user_export_user_list(request: HttpRequest):
+ """Return a file listing users across organizations."""
+ # Get query results via list_sponsored_users
+ field_names = [
+ 'email',
+ 'first_name',
+ 'last_name',
+ 'date_joined',
+ 'last_login',
+ 'organization_name',
+ ]
+ records = list_users_in_group(request, 'organization_user', export=True)
+ org_users = records.annotate(organization_name=F('organizations__name')).values(*field_names)
+ filename = 'perma-organization-users'
+
+ # Export records in appropriate format based on `format` URL parameter
+ export_format = request.GET.get('format', '').casefold()
+ if export_format not in ['csv', 'json']:
+ export_format = 'csv'
+ response = export_queryset(org_users, export_format, field_names, filename)
+ return response
+
+
@user_passes_test_or_403(lambda user: user.is_staff or user.is_registrar_user() or user.is_organization_user)
def manage_single_organization_user(request, user_id):
return edit_user_in_group(request, user_id, 'organization_user')
@@ -400,35 +418,32 @@ def manage_single_organization_export_user_list(
if not request.user.can_edit_organization(target_org):
return HttpResponseForbidden()
+ # Generate output records from query results and add organization name
+ field_names = [
+ 'email',
+ 'first_name',
+ 'last_name',
+ 'date_joined',
+ 'last_login',
+ 'organization_name',
+ ]
org_users = (
LinkUser.objects.filter(organizations__id=org_id)
.annotate(organization_name=F('organizations__name'))
- .values('email', 'first_name', 'last_name', 'organization_name')
+ .values(*field_names)
)
- filename_stem = f'perma-organization-{org_id}-users'
-
- # Generate output records from query results and add organization name
- field_names = ['email', 'first_name', 'last_name', 'organization_name']
+ filename = f'perma-organization-{org_id}-users'
# Export records in appropriate format based on `format` URL parameter
- export_format: Literal['csv', 'json'] = request.GET.get('format', 'csv').casefold()
- match export_format:
- case 'json':
- response = JsonResponse(list(org_users), safe=False)
- case 'csv' | _:
- response = HttpResponse(
- content_type='text/csv',
- headers={'Content-Disposition': f'attachment; filename="{filename_stem}.csv"'},
- )
- writer = csv.DictWriter(response, fieldnames=field_names)
- writer.writeheader()
- for org_user in org_users:
- writer.writerow(org_user)
+ export_format = request.GET.get('format', '').casefold()
+ if export_format not in ['csv', 'json']:
+ export_format = 'csv'
+ response = export_queryset(org_users, export_format, field_names, filename)
return response
@user_passes_test_or_403(lambda user: user.is_staff or user.is_registrar_user() or user.is_organization_user)
-def list_users_in_group(request, group_name):
+def list_users_in_group(request: HttpRequest, group_name: str, export: bool = False):
"""
Show list of users with given group name.
"""
@@ -514,6 +529,10 @@ def list_users_in_group(request, group_name):
if sponsorship_status:
users = users.filter(sponsorships__status=sponsorship_status)
+ # if exporting records (e.g. for CSV or JSON output), return query manager directly
+ if export is True:
+ return users
+
# get total counts
active_users = users.filter(is_active=True, is_confirmed=True).count()
deactivated_users = users.filter(is_confirmed=True, is_active=False).count()