Skip to content

Commit

Permalink
Merge pull request #1138 from TOMToolkit/feature/persistent_sharing
Browse files Browse the repository at this point in the history
Add in persistentshare model and views and hook it into the target sh…
  • Loading branch information
jchate6 authored Dec 17, 2024
2 parents d830117 + bbba270 commit 820fa5d
Show file tree
Hide file tree
Showing 23 changed files with 625 additions and 56 deletions.
76 changes: 76 additions & 0 deletions docs/managing_data/continuous_sharing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
Setting up Continuous Sharing
---------------------------------------

After setting up your TOM's `DATA_SHARING` destinations in your settings, you can set up individual Targets to share
their data automatically with a sharing destination as the data arrives in the TOM. Continuous Sharing is handled through
the `PersistentShare` model in the `tom_targets` module.


Permissions:
#############################

In order to setup continuous sharing, your user account must have the proper permissions, which means permissions to
add, view, and delete `PersistentShare` objects. A superuser account will have all permissions by default, but to give
permissions to another user, you can use code like this one time in the console:

.. code:: python
from guardian.shortcuts import assign_perm
# To assign the permission to a single user
user = User.objects.get(username='myusername')
assign_perm('tom_targets.view_persistentshare', user)
assign_perm('tom_targets.add_persistentshare', user)
assign_perm('tom_targets.delete_persistentshare', user)
# To assign the permission to all users of a group
group = Group.objects.get(name='mygroupname')
assign_perm('tom_targets.view_persistentshare', group)
assign_perm('tom_targets.add_persistentshare', group)
assign_perm('tom_targets.delete_persistentshare', group)
The user must also have `change_target` permissions on the specific Target they are attempting to continuously share.


Managing Continuous Sharing:
*************************************************

There are a few ways to manage continuous sharing. First, you can navigate to any Target's share page `/targets/<target_pk>/share`
and you should see a tab for creating and viewing continuous sharing for that Target. You can also navigate to
`/targets/persistentshare/manage` to create and view all persistentshare objects you have permissions to see. There is also
a REST API for persistentshare objects that can be accessed at `/targets/persistentshare/`, which is used internally from the
manage pages.

If you have a custom target details page, you can integrate the controls for creating or managing continuous sharing using the
template partials below:

.. code:: html

<h3>Continously Share data for Target <a href="{% url 'targets:detail' pk=target.id %}" title="Back">{{ target.name }}</a></h3>
<div id='target-persistent-share-create'>
{% create_persistent_share target %}
</div>
<h3>Manage Continuous Sharing for Target <a href="{% url 'targets:detail' pk=target.id %}"
title="Back">{{ target.name }}</a></h3>
<div id='target-persistent-share-table'>
{% persistent_share_table target %}
</div>

Note that setting up Continuous Sharing stores the destination from your `DATA_SHARING` settings. If you later change or remove that
destination then continuous shares using it will fail.

Also note that by default, continuous sharing will occur when a ReducedDatum is saved, or when the default `tom_base` `DataProcessor` is used
to load in a `DataProduct`. If you create your own `DataProcessor` subclass in your TOM, you must add the following lines to trigger continuous
sharing after you have bulk created the `ReducedDatums`:

.. code:: python
from tom_dataproducts.sharing import continuous_share_data
# After all your logic to bulk_create ReducedDatums
# Trigger any sharing you may have set to occur when new data comes in
# Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion
try:
continuous_share_data(dp.target, reduced_datums)
except Exception as e:
logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}")
3 changes: 3 additions & 0 deletions docs/managing_data/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Managing Data
customizing_data_processing
tom_direct_sharing
stream_pub_sub
continuous_sharing
single_target_data_service


Expand All @@ -24,5 +25,7 @@ TOM from uploaded data products.

:doc:`Publish and Subscribe to a Kafka Stream <stream_pub_sub>` - Learn how to publish and subscribe to a Kafka stream topic.

:doc:`Setting up Continuous Sharing of a target's data to a TOM or Kafka stream <continuous_sharing>` - Learn how to set up continuous sharing of a Target's data products.

:doc:`Integrating Single-Target Data Service Queries <single_target_data_service>` - Learn how to integrate the existing Atlas, panSTARRS, and ZTF
single-target data services into your TOM, and learn how to add new services.
11 changes: 6 additions & 5 deletions tom_dataproducts/alertstreams/hermes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_hermes_spectroscopy(self, datum):
def convert_astropy_brightness_to_hermes(brightness_unit):
if not brightness_unit:
return brightness_unit
elif brightness_unit.uppercase() == 'AB' or brightness_unit.uppercase() == 'ABFLUX':
elif brightness_unit.upper() == 'AB' or brightness_unit.upper() == 'ABFLUX':
return 'AB mag'
else:
return brightness_unit
Expand All @@ -158,11 +158,11 @@ def convert_astropy_brightness_to_hermes(brightness_unit):
def convert_astropy_wavelength_to_hermes(wavelength_unit):
if not wavelength_unit:
return wavelength_unit
elif wavelength_unit.lowercase() == 'angstrom' or wavelength_unit == 'AA':
elif wavelength_unit.lower() == 'angstrom' or wavelength_unit == 'AA':
return 'Å'
elif wavelength_unit.lowercase() == 'micron':
elif wavelength_unit.lower() == 'micron':
return 'µm'
elif wavelength_unit.lowercase() == 'hertz':
elif wavelength_unit.lower() == 'hertz':
return 'Hz'
else:
return wavelength_unit
Expand Down Expand Up @@ -209,7 +209,8 @@ def publish_to_hermes(message_info, datums, targets=Target.objects.none(), **kwa
response = requests.post(url=submit_url, json=alert, headers=headers)
response.raise_for_status()
# Only mark the datums as shared if the sharing was successful
hermes_alert = AlertStreamMessage(topic=message_info.topic, exchange_status='published')
hermes_alert = AlertStreamMessage(
topic=message_info.topic, message_id=response.json().get('uuid'), exchange_status='published')
hermes_alert.save()
for tomtoolkit_photometry in datums:
tomtoolkit_photometry.message.add(hermes_alert)
Expand Down
10 changes: 9 additions & 1 deletion tom_dataproducts/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from importlib import import_module

from tom_dataproducts.models import ReducedDatum
from tom_dataproducts.sharing import continuous_share_data

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,7 +71,14 @@ def run_data_processor(dp):
# timestamp=datum[0], value=datum[1], source_name=datum[2]) for datum in data]

# 3. Finally, insert the new ReducedDatum objects into the database
ReducedDatum.objects.bulk_create(new_reduced_datums)
reduced_datums = ReducedDatum.objects.bulk_create(new_reduced_datums)

# 4. Trigger any sharing you may have set to occur when new data comes in
# Encapsulate this in a try/catch so sharing failure doesn't prevent dataproduct ingestion
try:
continuous_share_data(dp.target, reduced_datums)
except Exception as e:
logger.warning(f"Failed to share new dataproduct {dp.product_id}: {repr(e)}")

# log what happened
if skipped_data:
Expand Down
2 changes: 1 addition & 1 deletion tom_dataproducts/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tom_observations.models import ObservationRecord
from tom_observations.serializers import ObservationRecordFilteredPrimaryKeyRelatedField
from tom_targets.models import Target
from tom_targets.serializers import TargetFilteredPrimaryKeyRelatedField
from tom_targets.fields import TargetFilteredPrimaryKeyRelatedField


class DataProductGroupSerializer(serializers.ModelSerializer):
Expand Down
30 changes: 29 additions & 1 deletion tom_dataproducts/sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,40 @@
from django.http import StreamingHttpResponse
from django.utils.text import slugify

from tom_targets.models import Target
from tom_targets.models import Target, PersistentShare
from tom_dataproducts.models import DataProduct, ReducedDatum
from tom_dataproducts.alertstreams.hermes import publish_to_hermes, BuildHermesMessage, get_hermes_topics
from tom_dataproducts.serializers import DataProductSerializer, ReducedDatumSerializer


def continuous_share_data(target, reduced_datums):
"""
Triggered when new ReducedDatums are created.
Shares those ReducedDatums to the sharing destination of any PersistentShares on the target.
:param target: Target instance that these reduced_datums belong to
:param reduced_datums: list of ReducedDatum instances to share
"""
persistentshares = PersistentShare.objects.filter(target=target)
for persistentshare in persistentshares:
share_destination = persistentshare.destination
reduced_datum_pks = [rd.pk for rd in reduced_datums]
if 'HERMES' in share_destination.upper():
hermes_topic = share_destination.split(':')[1]
destination = share_destination.split(':')[0]
filtered_reduced_datums = check_for_share_safe_datums(
destination, ReducedDatum.objects.filter(pk__in=reduced_datum_pks), topic=hermes_topic)
sharing = getattr(settings, "DATA_SHARING", {})
message = BuildHermesMessage(title=f"Updated data for {target.name} from "
f"{getattr(settings, 'TOM_NAME', 'TOM Toolkit')}.",
authors=sharing.get('hermes', {}).get('DEFAULT_AUTHORS', None),
message=None,
topic=hermes_topic
)
publish_to_hermes(message, filtered_reduced_datums)
else:
share_data_with_tom(share_destination, None, None, None, selected_data=reduced_datum_pks)


def share_target_list_with_hermes(share_destination, form_data, selected_targets=None, include_all_data=False):
"""
Serialize and share a set of selected targets and their data with Hermes
Expand Down
14 changes: 13 additions & 1 deletion tom_targets/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.contrib import admin
from .models import Target, TargetList, TargetExtra
from .models import Target, TargetList, TargetExtra, PersistentShare
from .forms import AdminPersistentShareForm


class TargetExtraInline(admin.TabularInline):
Expand All @@ -17,6 +18,17 @@ class TargetListAdmin(admin.ModelAdmin):
model = TargetList


class PersistentShareAdmin(admin.ModelAdmin):
model = PersistentShare
form = AdminPersistentShareForm
raw_id_fields = (
'target',
'user'
)


admin.site.register(Target, TargetAdmin)

admin.site.register(TargetList, TargetListAdmin)

admin.site.register(PersistentShare, PersistentShareAdmin)
4 changes: 4 additions & 0 deletions tom_targets/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class TomTargetsConfig(AppConfig):
name = 'tom_targets'

def ready(self):
import tom_targets.signals.handlers # noqa
super().ready()
14 changes: 14 additions & 0 deletions tom_targets/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from guardian.shortcuts import get_objects_for_user
from rest_framework import serializers


class TargetFilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField):
# This PrimaryKeyRelatedField subclass is used to implement get_queryset based on the permissions of the user
# submitting the request. The pattern was taken from this StackOverflow answer: https://stackoverflow.com/a/32683066

def get_queryset(self):
request = self.context.get('request', None)
queryset = super().get_queryset()
if not (request and queryset):
return None
return get_objects_for_user(request.user, 'tom_targets.change_target')
28 changes: 27 additions & 1 deletion tom_targets/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from guardian.shortcuts import assign_perm, get_groups_with_perms, remove_perm

from tom_dataproducts.sharing import get_sharing_destination_options
from .models import Target, TargetExtra, TargetName, TargetList
from .models import Target, TargetExtra, TargetName, TargetList, PersistentShare
from tom_targets.base_models import (SIDEREAL_FIELDS, NON_SIDEREAL_FIELDS, REQUIRED_SIDEREAL_FIELDS,
REQUIRED_NON_SIDEREAL_FIELDS, REQUIRED_NON_SIDEREAL_FIELDS_PER_SCHEME,
IGNORE_FIELDS)
Expand Down Expand Up @@ -241,3 +241,29 @@ class TargetMergeForm(forms.Form):
'hx-target': '#id_target_merge_fields', # replace name_select element
})
)


class AdminPersistentShareForm(forms.ModelForm):
destination = forms.ChoiceField(choices=[], label='Share Destination', required=True)

class Meta:
model = PersistentShare
fields = '__all__'

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['destination'].choices = get_sharing_destination_options()


class PersistentShareForm(AdminPersistentShareForm):
target = forms.IntegerField(label='Target ID', initial=0, required=True)

def __init__(self, *args, **kwargs):
try:
self.target_id = kwargs.pop('target_id')
except KeyError:
self.target_id = None
super().__init__(*args, **kwargs)
if self.target_id:
self.fields['target'].initial = self.target_id
self.fields['target'].disabled = True
30 changes: 30 additions & 0 deletions tom_targets/migrations/0022_persistentshare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.13 on 2024-11-22 22:29

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('tom_targets', '0021_rename_target_basetarget_alter_basetarget_options'),
]

operations = [
migrations.CreateModel(
name='PersistentShare',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('destination', models.CharField(help_text='The sharing destination, as it appears in your DATA_SHARING settings dict', max_length=200)),
('created', models.DateTimeField(auto_now_add=True, help_text='The time which this PersistentShare was created in the TOM database.')),
('target', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='tom_targets.basetarget')),
('user', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ('-created',),
'unique_together': {('target', 'destination')},
},
),
]
31 changes: 31 additions & 0 deletions tom_targets/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging

from django.conf import settings
from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.module_loading import import_string
Expand Down Expand Up @@ -193,3 +194,33 @@ class Meta:

def __str__(self):
return self.name


class PersistentShare(models.Model):
"""
Class representing a persistent share setup between a sharing destination and a Target
:param target: The ``Target`` you want to share
:param user: The ``User`` that created this PersistentShare, for accountability purposes.
:param destination: The sharing destination, as it appears in your TOM's DATA_SHARING settings dict
:type destination: str
:param created: The time at which this PersistentShare was created
:type created: datetime
"""
target = models.ForeignKey(BaseTarget, on_delete=models.CASCADE)
user = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
destination = models.CharField(
max_length=200, help_text='The sharing destination, as it appears in your DATA_SHARING settings dict')
created = models.DateTimeField(
auto_now_add=True, help_text='The time which this PersistentShare was created in the TOM database.'
)

class Meta:
ordering = ('-created',)
unique_together = ['target', 'destination']

def __str__(self):
return f'{self.target}-{self.destination}'
21 changes: 10 additions & 11 deletions tom_targets/serializers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from django.contrib.auth.models import Group
from guardian.shortcuts import assign_perm, get_groups_with_perms, get_objects_for_user
from guardian.shortcuts import assign_perm, get_groups_with_perms
from rest_framework import serializers

from tom_common.serializers import GroupSerializer
from tom_targets.models import Target, TargetExtra, TargetName, TargetList
from tom_targets.models import Target, TargetExtra, TargetName, TargetList, PersistentShare
from tom_targets.validators import RequiredFieldsTogetherValidator
from tom_targets.fields import TargetFilteredPrimaryKeyRelatedField
from tom_dataproducts.sharing import get_sharing_destination_options


class TargetNameSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -181,13 +183,10 @@ def update(self, instance, validated_data):
return instance


class TargetFilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField):
# This PrimaryKeyRelatedField subclass is used to implement get_queryset based on the permissions of the user
# submitting the request. The pattern was taken from this StackOverflow answer: https://stackoverflow.com/a/32683066
class PersistentShareSerializer(serializers.ModelSerializer):
destination = serializers.ChoiceField(choices=get_sharing_destination_options(), required=True)
target = TargetFilteredPrimaryKeyRelatedField(queryset=Target.objects.all(), required=True)

def get_queryset(self):
request = self.context.get('request', None)
queryset = super().get_queryset()
if not (request and queryset):
return None
return get_objects_for_user(request.user, 'tom_targets.change_target')
class Meta:
model = PersistentShare
fields = ('id', 'target', 'destination', 'user', 'created')
Empty file added tom_targets/signals/__init__.py
Empty file.
Loading

0 comments on commit 820fa5d

Please sign in to comment.