Skip to content

Commit

Permalink
Merge pull request #11 from Rafsan7238/large-file-fission-ingestion
Browse files Browse the repository at this point in the history
Large file fission ingestion
  • Loading branch information
chillingo117 authored May 15, 2024
2 parents 67b87c3 + 2446332 commit 7f41fe0
Show file tree
Hide file tree
Showing 72 changed files with 1,527 additions and 1,731 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
.ssh
.vscode-server
.dev
*.zip
backend.zip
.tool-versions
.vscode
apt-get
Expand All @@ -22,3 +22,6 @@ frontend/.idea/modules.xml
.venv
backend/harvesters/addobservations.zip
backend/harvesters/mharvester.zip
backend/__pycache__/__init__.cpython-311.pyc
backend/__pycache__/constants.cpython-311.pyc
backend/__pycache__/elastic_client_provider.cpython-311.pyc
91 changes: 63 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,71 @@
FOR COMP90024 ASSIGNMENT 2
BY TEAM 45:
William Chen 1400081
Petr Andreev 1375858
Rafsan Al Mamun 1407776
Ojaswi Dheer 1447227

# COMP90024_Assignment-2
This repo contains the assignment 2 files for COMP90024: Cluster and Cloud Programming at the University of Melbourne.

## Environment
Install stuff, set up clusters, install software.

1. Connect to unimelb VPN.
2. Start a docker runtime on your machine.
3. Run Docker Compose in root folder.
- `docker-compose up`
- This will compose a local docker container that install all dev dependencies.
4. Place a private ssh key as generated in MRC into a folder called `.ssh` in the root project.
- Note that the .ssh folder is gitignore'd and this should always be the case forever and always.
- Remember to restrict file access to this ssh key by running `chmod 600 ~/.ssh/{key name}.pem`.
- `chmod 600` prevents read and write access from any other users on your machine.
5. Using a shell attached to this docker container, connect to the bastion node.
- `ssh -i ~/.ssh/{key name}.pem -L 6443:192.168.10.153:6443 [email protected]`
- Remember that after any ssh connection or port forward request, you should keep that terminal open and open a new one for further interaction with the cloud system.
6. Obtain the Kube config from an Admin and add it to a folder called `.kube` in the root project.
## Admins
William Chen [email protected]
Petr Andreev [email protected]
Rafsan Al Mamun [email protected]
Ojaswi Dheer [email protected]

## Setup
### Installation
- As with the assignment class repository, you should install the below:
- OpenStack clients 6.3.x ([Installation instructions](https://docs.openstack.org/newton/user-guide/common/cli-install-openstack-command-line-clients.html)).
> Note: Please ensure the following Openstack clients are installed: `python-cinderclient`, `python-keystoneclient`, `python-magnumclient`, `python-neutronclient`, `python-novaclient`, `python-octaviaclient`. See: [Install the OpenStack client](https://docs.openstack.org/newton/user-guide/common/cli-install-openstack-command-line-clients.html).
- JQ 1.6.x ([Installation instructions](https://jqlang.github.io/jq/download/)).
- Kubectl 1.26.8 ([Installation instructions](https://kubernetes.io/docs/tasks/tools/)).
- Helm 3.6.3 ([Installation instructions](https://helm.sh/docs/intro/install/)).
- Connect to [Campus network](https://studentit.unimelb.edu.au/wifi-vpn#uniwireless) if on-campus or [UniMelb Student VPN](https://studentit.unimelb.edu.au/wifi-vpn#vpn) if off-campus
- Get access to this project and create a key pair in MRC. Send the **public** key to an admin. Place the private key in you machines `~/.ssh` directory.
- Tighten access to this key `chmod 600 ~/.ssh/{key name}.pem`.
- Once your access key has been added to the project, you will be given a Kube config file. Add it to a folder called `.kube` in this project.
- As with the ssh key, run `chmod 600 ~/.kube/config`.
??? I couldn't actually get the docker to work, I'm just using wsl right now.

## Setting up
1. Set up fission by running fission_startup.sh
- This will generate the packages used by fission
- These packages will be used to create fission functions
- These functions will get exposed in the endpoints described in the readme in `backend`
### Connecting to the cluster
- Ensure you are on the Unimelb campus network, either using the campus wifi or through the [student VPN](https://studentit.unimelb.edu.au/wifi-vpn#vpn)!
- ssh into the cluster bastion node
`ssh -i ~/.ssh/{key name as made in installation}.pem -L 6443:192.168.10.153:6443 [email protected]`

- Port-forward the cluster's Fission router to your localhost. Run the below in a separate terminal.
`kubectl port-forward service/router -n fission 9090:80`

- Optionally, forward the cluster's kibana port to your localhost. Run the below in yet another separate terminal. This will give you local access to the kibana dashboard.
`kubectl port-forward service/kibana-kibana -n elastic 5601:5601`

### Seeing function logs
- You can see the running logs of the repeating harvesters by using the lines

`fission function log -f --name addobservations`
`fission function log -f --name mharvester`

## Usage
### Static data manipulation
The notebook for manipulating elastic search indexes and documents is `data/elastic_client.ipynb`. This notebook contains functioning requests that can be used to create all indexes (existing indexes will be ignored) and/or upload data to these indexes.

Note that this notebook can only upload data to the static file indexes. Therefore it has intentional limitations:
- It can only insert data if the index is empty.
- The exception is the very large hourly air quality file which requires uploads in batches.
- The only means to delete an index is to delete it through Kibana, this makes it more difficult to accidentally delete any indexes.
- It can create the indexes for the two data sources with regular harvesting (see below), but it cannot add any data to these indexes.

### Automatic data manipulation
There are two regularly updating data sources. Mastodon, and BOM (Bureau of Meteorology). These data sources are regularly added to via fission functions triggered by timers. The Mastodon set is updated every 5 minutes, the BOM set is updated every 15 minutes.

todo:
- add content description to this readme.
- add installation instructions???
- add instructions on how to use the client.
### Analytics
There are two notebooks for requesting data from elastic search and displaying analytics. (Rafsan and Ojawsi to write this section.)

## Development
To upload any backend changes, run `sh publish_backend.sh` in the root directory of this project. This will:
- Package everything in `backend/` into a zip file.
- Wipe all fission packages, functions, and triggers.
- Recreate all fission packages, functions, and triggers using the zipped archive.

## Tests
To run tests execute `pytest tests/fission_tests.py` from project directory
## Tests
To run tests execute `pytest tests/fission_tests.py` from project directory
184 changes: 69 additions & 115 deletions backend/backend.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,91 @@
import json
from flask import jsonify, current_app, request
from index_creation.create_mortality_persons import create_mortality_persons_index
from index_creation.create_air_quality_hourly_avg import create_air_quality_hourly_average
from index_creation.create_census_g21b import create_census_g21b
from index_creation.create_mortality_females import create_mortality_females_index
from index_creation.create_mortality_males import create_mortality_males_index
from index_creation.create_rainfall_adelaide import create_rainfall_adelaide_index
from index_creation.create_rainfall_brisbane import create_rainfall_brisbane_index
from index_creation.create_rainfall_canberra import create_rainfall_canberra_index
from index_creation.create_rainfall_darwin import create_rainfall_darwin_index
from index_creation.create_rainfall_melbourne import create_rainfall_melbourne_index
from index_creation.create_rainfall_perth import create_rainfall_perth_index
from index_creation.create_rainfall_sydney import create_rainfall_sydney_index
from index_creation.create_rainfall_tasmania import create_rainfall_tasmania_index
from index_creation.create_temperature_adelaide import create_temperature_adelaide_index
from index_creation.create_temperature_brisbane import create_temperature_brisbane_index
from index_creation.create_temperature_canberra import create_temperature_canberra_index
from index_creation.create_temperature_darwin import create_temperature_darwin_index
from index_creation.create_temperature_melbourne import create_temperature_melbourne_index
from index_creation.create_temperature_perth import create_temperature_perth_index
from index_creation.create_temperature_sydney import create_temperature_sydney_index
from index_creation.create_temperature_tasmania import create_temperature_tasmania_index
from flask import request
from constants import *
from elastic_client_provider import get_bulker, get_client

from index_creation.create_bom_index import create_bom_index
from index_creation.create_mastodon_index import create_mastodon_index
from index_creation.create_mortality_index import create_mortality_index
from index_creation.create_temperature_index import create_temperature_index
from index_creation.create_air_quality_hourly_avg import create_air_quality_hourly_average
from index_creation.create_census_g21b import create_census_g21b
from index_creation.create_rainfall_index import create_rainfall_index
from index_creation.create_asthma_by_region_index import create_asthma_by_region_index
from index_creation.create_historic_tweets_index import create_historic_tweets_index
import static.historic_tweet_sentiments
import static.asthma_by_region
import static.air_quality_hourly_avg
import static.census_g21b
import static.mortality_females
import static.mortality_males
import static.mortality_persons
import static.rainfall_adelaide
import static.rainfall_brisbane
import static.rainfall_canberra
import static.rainfall_darwin
import static.rainfall_melbourne
import static.rainfall_perth
import static.rainfall_sydney
import static.rainfall_tasmania
import static.temperature_adelaide
import static.temperature_brisbane
import static.temperature_canberra
import static.temperature_darwin
import static.temperature_melbourne
import static.temperature_perth
import static.temperature_sydney
import static.temperature_tasmania

def insert_hist_tweets_endpoint():
try:
es = get_client()
bulker = get_bulker()

res = insert_hist_tweets(es, bulker)

return json.dumps({'result': res})
except Exception as e:
return json.dumps(str(e))

def insert_region_asthma_endpoint():
try:
es = get_client()
bulker = get_bulker()

res = insert_region_asthma(es, bulker)

return json.dumps({'result': res})
except Exception as e:
return json.dumps(str(e))
import ingestion.historic_tweet_sentiments
import ingestion.asthma_by_region
import ingestion.air_quality_hourly_avg
import ingestion.census_g21b
import ingestion.rainfall
import ingestion.temperature
import ingestion.mortality

def insert_indexes():
try:
print('starting')

es = get_client()
bulker = get_bulker()
try:
index= request.headers['X-Fission-Params-Index']
data = request.json
except KeyError:
print(request.headers)
index= None
print(request.headers)
index= None

if index == AIR_QUALITY_HOURLY_AVG:
res = static.air_quality_hourly_avg.insert(es, bulker)
res = ingestion.air_quality_hourly_avg.insert(es, bulker, data)
elif index == ASTHMA_BY_REGION_INDEX_NAME:
res = static.asthma_by_region.insert(es, bulker)
res = ingestion.asthma_by_region.insert(es, bulker, data)
elif index == CENSUS_G21B:
res = static.census_g21b.insert(es, bulker)
res = ingestion.census_g21b.insert(es, bulker, data)
elif index == HIST_TWEET_INDEX_NAME:
res = static.historic_tweet_sentiments.insert(es, bulker)
res = ingestion.historic_tweet_sentiments.insert(es, bulker, data)
elif index == MORTALITY_FEMALES:
res = static.mortality_females.insert(es, bulker)
res = ingestion.mortality.insert(es, bulker, data, MORTALITY_FEMALES)
elif index == MORTALITY_MALES:
res = static.mortality_males.insert(es, bulker)
res = ingestion.mortality.insert(es, bulker, data, MORTALITY_MALES)
elif index == MORTALITY_PERSONS:
res = static.mortality_persons.insert(es, bulker)
res = ingestion.mortality.insert(es, bulker, data, MORTALITY_PERSONS)
elif index == RAINFALL_ADELAIDE:
res = static.rainfall_adelaide.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_ADELAIDE)
elif index == RAINFALL_BRISBANE:
res = static.rainfall_brisbane.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_BRISBANE)
elif index == RAINFALL_CANBERRA:
res = static.rainfall_canberra.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_CANBERRA)
elif index == RAINFALL_DARWIN:
res = static.rainfall_darwin.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_DARWIN)
elif index == RAINFALL_MELBOURNE:
res = static.rainfall_melbourne.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_MELBOURNE)
elif index == RAINFALL_PERTH:
res = static.rainfall_perth.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_PERTH)
elif index == RAINFALL_SYDNEY:
res = static.rainfall_sydney.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_SYDNEY)
elif index == RAINFALL_TASMANIA:
res = static.rainfall_tasmania.insert(es, bulker)
res = ingestion.rainfall.insert(es, bulker, data, RAINFALL_TASMANIA)
elif index == TEMPERATURE_ADELAIDE:
res = static.temperature_adelaide.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_ADELAIDE)
elif index == TEMPERATURE_BRISBANE:
res = static.temperature_brisbane.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_BRISBANE)
elif index == TEMPERATURE_CANBERRA:
res = static.temperature_canberra.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_CANBERRA)
elif index == TEMPERATURE_DARWIN:
res = static.temperature_darwin.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_DARWIN)
elif index == TEMPERATURE_MELBOURNE:
res = static.temperature_melbourne.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_MELBOURNE)
elif index == TEMPERATURE_PERTH:
res = static.temperature_perth.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_PERTH)
elif index == TEMPERATURE_SYDNEY:
res = static.temperature_sydney.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_SYDNEY)
elif index == TEMPERATURE_TASMANIA:
res = static.temperature_tasmania.insert(es, bulker)
res = ingestion.temperature.insert(es, bulker, data, TEMPERATURE_TASMANIA)
else:
return jsonify({"success":False, "message":"incorrect index"}), 400
return json.dumps({'result': res})
return "Index not found", 404
return f"{res}", 201

except Exception as e:
return json.dumps(str(e))
return json.dumps(str(e)), 500

def create_indexes_endpoint():
try:
Expand All @@ -144,28 +96,30 @@ def create_indexes_endpoint():
results[ASTHMA_BY_REGION_INDEX_NAME] = create_asthma_by_region_index(es)
results[CENSUS_G21B] = create_census_g21b(es)
results[HIST_TWEET_INDEX_NAME] = create_historic_tweets_index(es)
results[MORTALITY_FEMALES] = create_mortality_females_index(es)
results[MORTALITY_MALES] = create_mortality_males_index(es)
results[MORTALITY_PERSONS] = create_mortality_persons_index(es)
results[MORTALITY_FEMALES] = create_mortality_index(es, MORTALITY_FEMALES)
results[MORTALITY_MALES] = create_mortality_index(es, MORTALITY_MALES)
results[MORTALITY_PERSONS] = create_mortality_index(es, MORTALITY_PERSONS)
results[RAINFALL_ADELAIDE] = create_rainfall_index(es, RAINFALL_ADELAIDE)
results[RAINFALL_BRISBANE] = create_rainfall_index(es, RAINFALL_BRISBANE)
results[RAINFALL_CANBERRA] = create_rainfall_index(es, RAINFALL_CANBERRA)
results[RAINFALL_DARWIN] = create_rainfall_index(es, RAINFALL_DARWIN)
results[RAINFALL_MELBOURNE] = create_rainfall_index(es, RAINFALL_MELBOURNE)
results[RAINFALL_PERTH] = create_rainfall_index(es, RAINFALL_PERTH)
results[RAINFALL_SYDNEY] = create_rainfall_index(es, RAINFALL_SYDNEY)
results[RAINFALL_TASMANIA] = create_rainfall_index(es, RAINFALL_TASMANIA)

results[RAINFALL_ADELAIDE] = create_rainfall_adelaide_index(es)
results[RAINFALL_BRISBANE] = create_rainfall_brisbane_index(es)
results[RAINFALL_CANBERRA] = create_rainfall_canberra_index(es)
results[RAINFALL_DARWIN] = create_rainfall_darwin_index(es)
results[RAINFALL_MELBOURNE] = create_rainfall_melbourne_index(es)
results[RAINFALL_PERTH] = create_rainfall_perth_index(es)
results[RAINFALL_SYDNEY] = create_rainfall_sydney_index(es)
results[RAINFALL_TASMANIA] = create_rainfall_tasmania_index(es)
results[TEMPERATURE_ADELAIDE] = create_temperature_index(es, TEMPERATURE_ADELAIDE)
results[TEMPERATURE_BRISBANE] = create_temperature_index(es, TEMPERATURE_BRISBANE)
results[TEMPERATURE_CANBERRA] = create_temperature_index(es, TEMPERATURE_CANBERRA)
results[TEMPERATURE_DARWIN] = create_temperature_index(es, TEMPERATURE_DARWIN)
results[TEMPERATURE_MELBOURNE] = create_temperature_index(es, TEMPERATURE_MELBOURNE)
results[TEMPERATURE_PERTH] = create_temperature_index(es, TEMPERATURE_PERTH)
results[TEMPERATURE_SYDNEY] = create_temperature_index(es, TEMPERATURE_SYDNEY)
results[TEMPERATURE_TASMANIA] = create_temperature_index(es, TEMPERATURE_TASMANIA)

results[TEMPERATURE_ADELAIDE] = create_temperature_adelaide_index(es)
results[TEMPERATURE_BRISBANE] = create_temperature_brisbane_index(es)
results[TEMPERATURE_CANBERRA] = create_temperature_canberra_index(es)
results[TEMPERATURE_DARWIN] = create_temperature_darwin_index(es)
results[TEMPERATURE_MELBOURNE] = create_temperature_melbourne_index(es)
results[TEMPERATURE_PERTH] = create_temperature_perth_index(es)
results[TEMPERATURE_SYDNEY] = create_temperature_sydney_index(es)
results[TEMPERATURE_TASMANIA] = create_temperature_tasmania_index(es)
results[MASTODON] = create_mastodon_index(es)
results[BOM_OBSERVATIONS] = create_bom_index(es)

return json.dumps(results)
return json.dumps(results), 201
except Exception as e:
return json.dumps(str(e))
return json.dumps(str(e)), 500
4 changes: 3 additions & 1 deletion backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
TEMPERATURE_MELBOURNE = 'temperature_melbourne'
TEMPERATURE_PERTH = 'temperature_perth'
TEMPERATURE_SYDNEY = 'temperature_sydney'
TEMPERATURE_TASMANIA = 'temperature_tasmania'
TEMPERATURE_TASMANIA = 'temperature_tasmania'
MASTODON = 'mastodon'
BOM_OBSERVATIONS = 'observations'
Loading

0 comments on commit 7f41fe0

Please sign in to comment.