Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db partition analytics cmeyers2 #10023

Merged
merged 90 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
c6acca0
first draft of db partitioning
Jan 27, 2021
d10d196
Rename / remove old main_jobevent table
Jan 27, 2021
6ff15a9
Register new column created by SQL
Feb 8, 2021
de0b258
By default, save job_created as null in db
Feb 8, 2021
f70473d
When copying main_jobevent, include all table metadata
Feb 9, 2021
0574baf
Create fake partition
Feb 9, 2021
2f737f6
Drop primary key index before creating partition table
Feb 10, 2021
c0d38e9
When saving JobEvents, include job_created
Feb 11, 2021
1af1a5e
Convert job_created to string for serialization
Feb 11, 2021
0cb2d79
Only save job_created field on JobEvent for the time being
Feb 11, 2021
48f1910
Remove temporary catch-all partition
Feb 11, 2021
fb30528
Dynamically create initial partitions
Feb 17, 2021
7e0f2b0
first partition is _not_ a special case after all
Feb 17, 2021
ba45592
create helper method to create partitions
Feb 17, 2021
0c28920
Give new primary key constraint unique name, create first live partition
Feb 19, 2021
445042c
Create partition only if it doesn't exist
Feb 19, 2021
612e912
auto-create partition
Feb 20, 2021
c87d7b0
fix import
Feb 20, 2021
82df3eb
add option to create partitions that span a single minute
Feb 20, 2021
f259b0a
Move partition start date to 2000
Feb 22, 2021
0eb1984
Only create partitions for regular jobs
Feb 22, 2021
8fb3136
get job events based on job creation date
Feb 22, 2021
2ffa22e
Misc doc clean-up
Feb 24, 2021
ec484f8
Partition *all* job event tables
Feb 24, 2021
a5cfc30
create_partition needs tblname
Feb 24, 2021
ea2afee
Drop todo / answered question
Feb 24, 2021
acfa1c4
Drop todo / question / conditional
Feb 24, 2021
2c529f5
Update querysets for remaining job event views
Feb 24, 2021
d90d0fb
fix initial partition creation
Feb 25, 2021
221ddeb
Fix migration for _all_ job event tables
Feb 25, 2021
f642c52
Update fields for job event models to match raw sql operations
Feb 25, 2021
80b08d1
Continue updating job event views
Feb 25, 2021
f9b439a
include job_created field in all job event types
Feb 25, 2021
6704651
Push changes before rebasing
Feb 25, 2021
c7ab3ea
move the partition data migration to be a post-upgrade async process
ryanpetrello Feb 25, 2021
2a58605
some more events view cleanup
ryanpetrello Feb 25, 2021
1e45e2a
Rev migration number
Mar 5, 2021
08b96a0
correct filter events on the migration event horizon
ryanpetrello Feb 25, 2021
b19bcdd
remove the global `/api/v2/job_events/` endpoint
ryanpetrello Mar 1, 2021
373edbf
Update reference to partition migration
Mar 5, 2021
28f9c0b
Do not cascade delete job_events
Mar 6, 2021
0fa0a51
create tmp schema to reference when creating partitioned table
Mar 8, 2021
a1d1e70
correct constraint name
Mar 8, 2021
14b610d
bump partition migration
Mar 11, 2021
8501a45
lint fixes
Mar 11, 2021
b98b3ce
update tests to reflect new job_event schema
Mar 12, 2021
a7cabec
bump partition migration to 0132
Mar 12, 2021
0eddd5c
Enable partition pruning when fetching job's events
Mar 15, 2021
200901e
upgrade to partitions without a costly bulk data migration
ryanpetrello Mar 19, 2021
fbb74a9
remove code that leaves behind old bigint tables on fresh installs
ryanpetrello Mar 19, 2021
661cf0a
short-circuit event_processing_finished for wf jobs
Mar 24, 2021
74a0c5b
Minor rebase fallout
Mar 24, 2021
6b4effc
bump partition migration to 135
Mar 24, 2021
db6f565
black formatting
Mar 24, 2021
5c1a333
update mocks to reflect new migration
Mar 24, 2021
81db809
test updates
Mar 24, 2021
d749c17
Ensure `View.model` exists before checking type
Mar 24, 2021
4680720
Move created kwargs to right place
Mar 24, 2021
bdf11aa
add migrations for Unpartitioned{Job}Event proxy models
Mar 24, 2021
7e1814e
mock has_unpartitioned_events in collection tests
Mar 24, 2021
1416829
set event horizon to -1 for empty tables
Mar 25, 2021
5a78579
is_partitioned should default to true when pk not set
Mar 25, 2021
0f53d9b
bump db partition migration
Mar 30, 2021
fb97687
lint
Apr 13, 2021
4efbd45
add support for db partition in cleanup_jobs
chrismeyersfsu Apr 1, 2021
b86d365
collect job events based on job event modified time
Apr 6, 2021
6123b8e
query for jobevents based on table location
Apr 6, 2021
34c4967
Revert "query for jobevents based on table location"
chrismeyersfsu Apr 20, 2021
0f9f3f5
bump migration
Apr 20, 2021
4d7edbb
analytics support for db partitions
chrismeyersfsu Apr 20, 2021
ecdf6cc
json cast optimization
chrismeyersfsu Apr 23, 2021
4f05824
conditionally project job_created
chrismeyersfsu Apr 24, 2021
c5a1e4c
remove order by from partitioned events query
chrismeyersfsu Apr 26, 2021
1371113
bump migrations after devel rebase
chrismeyersfsu Apr 26, 2021
1c97b9a
no longer get the size of the gather set
chrismeyersfsu May 6, 2021
6ce227a
bump migrations
chrismeyersfsu May 6, 2021
7b188aa
lint
May 12, 2021
ef9f912
bump migration
May 12, 2021
84af610
remove rebase cruft
May 12, 2021
e371de3
update job cleanup tests for sqlite-based execution
May 13, 2021
f7d2f7a
lint
May 13, 2021
2a23b4c
bump migration
May 13, 2021
321135d
add limit pagination to jobs/<id>/job_events/
chrismeyersfsu May 20, 2021
30871bd
close db and cache connection in new threads
chrismeyersfsu May 13, 2021
1a1d66d
bump db partition migration
chrismeyersfsu May 20, 2021
c429563
update view to handle hosts/N/ad_hoc_command_events
May 25, 2021
2131703
add/remove indexes, more get_event_querset()
chrismeyersfsu May 21, 2021
31fe500
move get_queryset handling to child view
May 25, 2021
b648957
Update pagination scheme for jobs
jakemcdermott May 25, 2021
ffbbcd2
fix tests, add pagination tests
chrismeyersfsu May 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion awx/api/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class FieldLookupBackend(BaseFilterBackend):
Filter using field lookups provided via query string parameters.
"""

RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by', 'search', 'type', 'host_filter', 'count_disabled', 'no_truncate')
RESERVED_NAMES = ('page', 'page_size', 'format', 'order', 'order_by', 'search', 'type', 'host_filter', 'count_disabled', 'no_truncate', 'limit')

SUPPORTED_LOOKUPS = (
'exact',
Expand Down
66 changes: 66 additions & 0 deletions awx/api/pagination.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.

from collections import OrderedDict

# Django REST Framework
from django.conf import settings
from django.core.paginator import Paginator as DjangoPaginator
from rest_framework import pagination
from rest_framework.response import Response
from rest_framework.utils.urls import replace_query_param
from rest_framework.settings import api_settings
from django.utils.translation import gettext_lazy as _


class DisabledPaginator(DjangoPaginator):
Expand Down Expand Up @@ -65,3 +69,65 @@ def get_paginated_response(self, data):
if self.count_disabled:
return Response({'results': data})
return super(Pagination, self).get_paginated_response(data)


class LimitPagination(pagination.BasePagination):
default_limit = api_settings.PAGE_SIZE
limit_query_param = 'limit'
limit_query_description = _('Number of results to return per page.')
max_page_size = settings.MAX_PAGE_SIZE

def paginate_queryset(self, queryset, request, view=None):
self.limit = self.get_limit(request)
self.request = request

return list(queryset[0 : self.limit])

def get_paginated_response(self, data):
return Response(OrderedDict([('results', data)]))

def get_paginated_response_schema(self, schema):
return {
'type': 'object',
'properties': {
'results': schema,
},
}

def get_limit(self, request):
try:
return pagination._positive_int(request.query_params[self.limit_query_param], strict=True)
except (KeyError, ValueError):
pass

return self.default_limit


class UnifiedJobEventPagination(Pagination):
"""
By default, use Pagination for all operations.
If `limit` query parameter specified use LimitPagination
"""

def __init__(self, *args, **kwargs):
self.use_limit_paginator = False
self.limit_pagination = LimitPagination()
return super().__init__(*args, **kwargs)

def paginate_queryset(self, queryset, request, view=None):
if 'limit' in request.query_params:
self.use_limit_paginator = True

if self.use_limit_paginator:
return self.limit_pagination.paginate_queryset(queryset, request, view=view)
return super().paginate_queryset(queryset, request, view=view)

def get_paginated_response(self, data):
if self.use_limit_paginator:
return self.limit_pagination.get_paginated_response(data)
return super().get_paginated_response(data)

def get_paginated_response_schema(self, schema):
if self.use_limit_paginator:
return self.limit_pagination.get_paginated_response_schema(schema)
return super().get_paginated_response_schema(schema)
8 changes: 4 additions & 4 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3044,7 +3044,7 @@ def get_related(self, obj):
res = super(JobSerializer, self).get_related(obj)
res.update(
dict(
job_events=self.reverse('api:job_job_events_list', kwargs={'pk': obj.pk}),
job_events=self.reverse('api:job_job_events_list', kwargs={'pk': obj.pk}), # TODO: consider adding job_created
job_host_summaries=self.reverse('api:job_job_host_summaries_list', kwargs={'pk': obj.pk}),
activity_stream=self.reverse('api:job_activity_stream_list', kwargs={'pk': obj.pk}),
notifications=self.reverse('api:job_notifications_list', kwargs={'pk': obj.pk}),
Expand Down Expand Up @@ -3111,16 +3111,16 @@ class Meta:
fields = ('*', 'host_status_counts', 'playbook_counts', 'custom_virtualenv')

def get_playbook_counts(self, obj):
task_count = obj.job_events.filter(event='playbook_on_task_start').count()
play_count = obj.job_events.filter(event='playbook_on_play_start').count()
task_count = obj.get_event_queryset().filter(event='playbook_on_task_start').count()
play_count = obj.get_event_queryset().filter(event='playbook_on_play_start').count()

data = {'play_count': play_count, 'task_count': task_count}

return data

def get_host_status_counts(self, obj):
try:
counts = obj.job_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts()
counts = obj.get_event_queryset().only('event_data').get(event='playbook_on_stats').get_host_status_counts()
except JobEvent.DoesNotExist:
counts = {}

Expand Down
5 changes: 1 addition & 4 deletions awx/api/urls/job_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@

from django.conf.urls import url

from awx.api.views import JobEventList, JobEventDetail, JobEventChildrenList, JobEventHostsList

from awx.api.views import JobEventDetail, JobEventChildrenList

urls = [
url(r'^$', JobEventList.as_view(), name='job_event_list'),
url(r'^(?P<pk>[0-9]+)/$', JobEventDetail.as_view(), name='job_event_detail'),
url(r'^(?P<pk>[0-9]+)/children/$', JobEventChildrenList.as_view(), name='job_event_children_list'),
url(r'^(?P<pk>[0-9]+)/hosts/$', JobEventHostsList.as_view(), name='job_event_hosts_list'),
]

__all__ = ['urls']
92 changes: 65 additions & 27 deletions awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from django.conf import settings
from django.core.exceptions import FieldError, ObjectDoesNotExist
from django.db.models import Q, Sum
from django.db import IntegrityError, transaction, connection
from django.db import IntegrityError, ProgrammingError, transaction, connection
from django.shortcuts import get_object_or_404
from django.utils.safestring import mark_safe
from django.utils.timezone import now
Expand Down Expand Up @@ -172,11 +172,21 @@
ApiV2AttachView,
)
from awx.api.views.webhooks import WebhookKeyView, GithubWebhookReceiver, GitlabWebhookReceiver # noqa
from awx.api.pagination import UnifiedJobEventPagination


logger = logging.getLogger('awx.api.views')


def unpartitioned_event_horizon(cls):
with connection.cursor() as cursor:
try:
cursor.execute(f'SELECT MAX(id) FROM _unpartitioned_{cls._meta.db_table}')
return cursor.fetchone()[0] or -1
except ProgrammingError:
return 0


def api_exception_handler(exc, context):
"""
Override default API exception handler to catch IntegrityError exceptions.
Expand Down Expand Up @@ -878,11 +888,17 @@ class ProjectUpdateEventsList(SubListAPIView):
relationship = 'project_update_events'
name = _('Project Update Events List')
search_fields = ('stdout',)
pagination_class = UnifiedJobEventPagination

def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
return super(ProjectUpdateEventsList, self).finalize_response(request, response, *args, **kwargs)

def get_queryset(self):
pu = self.get_parent_object()
self.check_parent_access(pu)
return pu.get_event_queryset()


class SystemJobEventsList(SubListAPIView):

Expand All @@ -892,11 +908,17 @@ class SystemJobEventsList(SubListAPIView):
relationship = 'system_job_events'
name = _('System Job Events List')
search_fields = ('stdout',)
pagination_class = UnifiedJobEventPagination

def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
return super(SystemJobEventsList, self).finalize_response(request, response, *args, **kwargs)

def get_queryset(self):
job = self.get_parent_object()
self.check_parent_access(job)
return job.get_event_queryset()


class ProjectUpdateCancel(RetrieveAPIView):

Expand Down Expand Up @@ -3602,7 +3624,7 @@ def post(self, request, *args, **kwargs):
status=status.HTTP_400_BAD_REQUEST,
)
host_qs = obj.retry_qs(retry_hosts)
if not obj.job_events.filter(event='playbook_on_stats').exists():
if not obj.get_event_queryset().filter(event='playbook_on_stats').exists():
return Response(
{'hosts': _('Cannot retry on {status_value} hosts, playbook stats not available.').format(status_value=retry_hosts)},
status=status.HTTP_400_BAD_REQUEST,
Expand Down Expand Up @@ -3729,17 +3751,21 @@ class JobHostSummaryDetail(RetrieveAPIView):
serializer_class = serializers.JobHostSummarySerializer


class JobEventList(NoTruncateMixin, ListAPIView):
class JobEventDetail(RetrieveAPIView):

model = models.JobEvent
serializer_class = serializers.JobEventSerializer
search_fields = ('stdout',)


class JobEventDetail(RetrieveAPIView):
@property
def is_partitioned(self):
if 'pk' not in self.kwargs:
return True
return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent)

model = models.JobEvent
serializer_class = serializers.JobEventSerializer
@property
def model(self):
if self.is_partitioned:
return models.JobEvent
return models.UnpartitionedJobEvent

def get_serializer_context(self):
context = super().get_serializer_context()
Expand All @@ -3749,33 +3775,31 @@ def get_serializer_context(self):

class JobEventChildrenList(NoTruncateMixin, SubListAPIView):

model = models.JobEvent
serializer_class = serializers.JobEventSerializer
parent_model = models.JobEvent
relationship = 'children'
name = _('Job Event Children List')
search_fields = ('stdout',)

def get_queryset(self):
parent_event = self.get_parent_object()
self.check_parent_access(parent_event)
qs = self.request.user.get_queryset(self.model).filter(parent_uuid=parent_event.uuid)
return qs

@property
def is_partitioned(self):
if 'pk' not in self.kwargs:
return True
return int(self.kwargs['pk']) > unpartitioned_event_horizon(models.JobEvent)

class JobEventHostsList(HostRelatedSearchMixin, SubListAPIView):
@property
def model(self):
if self.is_partitioned:
return models.JobEvent
return models.UnpartitionedJobEvent

model = models.Host
serializer_class = serializers.HostSerializer
parent_model = models.JobEvent
relationship = 'hosts'
name = _('Job Event Hosts List')
@property
def parent_model(self):
return self.model

def get_queryset(self):
parent_event = self.get_parent_object()
self.check_parent_access(parent_event)
qs = self.request.user.get_queryset(self.model).filter(job_events_as_primary_host=parent_event)
return qs
return parent_event.job.get_event_queryset().filter(parent_uuid=parent_event.uuid)


class BaseJobEventsList(NoTruncateMixin, SubListAPIView):
Expand Down Expand Up @@ -3811,12 +3835,12 @@ class GroupJobEventsList(BaseJobEventsList):
class JobJobEventsList(BaseJobEventsList):

parent_model = models.Job
pagination_class = UnifiedJobEventPagination

def get_queryset(self):
job = self.get_parent_object()
self.check_parent_access(job)
qs = job.job_events.select_related('host').order_by('start_line')
return qs.all()
return job.get_event_queryset().select_related('host').order_by('start_line')


class AdHocCommandList(ListCreateAPIView):
Expand Down Expand Up @@ -3974,6 +3998,11 @@ class AdHocCommandEventList(NoTruncateMixin, ListAPIView):
serializer_class = serializers.AdHocCommandEventSerializer
search_fields = ('stdout',)

def get_queryset(self):
adhoc = self.get_parent_object()
self.check_parent_access(adhoc)
return adhoc.get_event_queryset()


class AdHocCommandEventDetail(RetrieveAPIView):

Expand All @@ -3994,12 +4023,21 @@ class BaseAdHocCommandEventsList(NoTruncateMixin, SubListAPIView):
relationship = 'ad_hoc_command_events'
name = _('Ad Hoc Command Events List')
search_fields = ('stdout',)
pagination_class = UnifiedJobEventPagination

def get_queryset(self):
parent = self.get_parent_object()
self.check_parent_access(parent)
return parent.get_event_queryset()


class HostAdHocCommandEventsList(BaseAdHocCommandEventsList):

parent_model = models.Host

def get_queryset(self):
return super(BaseAdHocCommandEventsList, self).get_queryset()


# class GroupJobEventsList(BaseJobEventsList):
# parent_model = Group
Expand Down
9 changes: 9 additions & 0 deletions awx/api/views/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
)
from awx.api.views.mixin import RelatedJobsPreventDeleteMixin, ControlledByScmMixin

from awx.api.pagination import UnifiedJobEventPagination


logger = logging.getLogger('awx.api.views.organization')


Expand All @@ -49,6 +52,12 @@ class InventoryUpdateEventsList(SubListAPIView):
relationship = 'inventory_update_events'
name = _('Inventory Update Events List')
search_fields = ('stdout',)
pagination_class = UnifiedJobEventPagination

def get_queryset(self):
iu = self.get_parent_object()
self.check_parent_access(iu)
return iu.get_event_queryset()

def finalize_response(self, request, response, *args, **kwargs):
response['X-UI-Max-Events'] = settings.MAX_UI_JOB_EVENTS
Expand Down
5 changes: 5 additions & 0 deletions awx/api/views/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def destroy(self, request, *args, **kwargs):
else:
# if it has been > 1 minute, events are probably lost
logger.warning('Allowing deletion of {} through the API without all events ' 'processed.'.format(obj.log_format))

# Manually cascade delete events if unpartitioned job
if obj.has_unpartitioned_events:
obj.get_event_queryset().delete()

obj.delete()
return Response(status=status.HTTP_204_NO_CONTENT)

Expand Down
1 change: 0 additions & 1 deletion awx/api/views/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def get(self, request, format=None):
data['hosts'] = reverse('api:host_list', request=request)
data['job_templates'] = reverse('api:job_template_list', request=request)
data['jobs'] = reverse('api:job_list', request=request)
data['job_events'] = reverse('api:job_event_list', request=request)
data['ad_hoc_commands'] = reverse('api:ad_hoc_command_list', request=request)
data['system_job_templates'] = reverse('api:system_job_template_list', request=request)
data['system_jobs'] = reverse('api:system_job_list', request=request)
Expand Down
Loading