Skip to content

Commit

Permalink
Merge pull request #108 from GlobalFishingWatch/feature/PIPELINE-2170
Browse files Browse the repository at this point in the history
Moves view to table cotaining fishing events with restricted list
  • Loading branch information
smpiano authored Feb 3, 2025
2 parents c4cc1e5 + eaee6fa commit 4900baf
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 143 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,37 @@ docker-compose run gcloud auth login

## Configuration

### The fishing events incremental load

The way the fishing events was calculated took unexpected time and resources, to improve the calculation the incremental load was developed. See more in `./assets/bigquery/README.md`.

There is a specific python client to make the calls under `./pipe_events/cli.py`.
It has 3 operations:
* `incremental_events` : Opens a BQ session and calculates fishing event by segment. Merges messges by seg_id and timestamp versus a historical fishing events table and updates event_end/event_start. Apply filters, add vessel_id, identities fields and remove overlapping_and_short segments.
It can be invoked:
```bash
# run daily incremental fishing events
$ pipe -v --project fishing_events_test incremental_events
# run daily incremental night loitering
$ pipe -v --project fishing_events_test incremental_events -sfield night_loitering -dest_tbl_prefix incremental_night_loitering_events
```

* `auth_and_regions_fishing_events`: Adds authorization and regions position.
It can be invoked:
```bash
# run daily incremental fishing events
$ pipe -v --project fishing_events_test auth_and_regions_fishing_events
```

* `fishing_restrictive`: restrict the events using a specific list.
It can be invoked:
```bash
# run daily incremental fishing events
$ pipe -v --project fishing_events_test fishing_restrictive
```

### The former standard way

The pipeline exposes the following standard settings:

* `pipe_events.docker_run`: Command to run docker inside the airflow server.
Expand Down Expand Up @@ -47,6 +78,7 @@ In addition to this, the following custom settings are required for this pipelin
* `pipe_events.anchorages.events_table`: BigQuery table to publish the anchorages to. Defaults to `published_events_ports`.
* `pipe_events.anchorages.anchorages_dataset`: BigQuery dataset which contains the named anchorages table. Defaults to `gfw_research`.
* `pipe_events.anchorages.named_anchorages`: BigQuery table containing anchorage information. Defaults to `named_anchorages_v20190307`.
**DEPRECATED BY INCREMENTAL LOAD**
* `pipe_events.fishing.source_table`: BigQuery table containing the scored messages to read from. Defaults to `messages_scored_`.
* `pipe_events.fishing.events_table`: BigQuery table to publish the fishing events to. Defaults to `published_events_fishing`.
* `pipe_events.fishing.segment_vessel`: BigQuery table containing segent information. Defaults to `segment_vessel`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
SELECT
*
FROM
`{{ source_lr_events }}`
`{{ source_restrictive_events }}`
WHERE
json_extract((json_extract_array(event_vessels, '$')[0]), '$.type') = '"fishing"'
21 changes: 12 additions & 9 deletions pipe_events/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pipe_events.utils.parse import parse
from pipe_events.fishing_events_incremental import run as run_incremental
from pipe_events.fishing_events_auth_and_regions import run as run_auth_and_regions
from pipe_events.fishing_events_restricted_view import run as run_restricted_view
from pipe_events.fishing_events_restricted import run as run_restricted
from pipe_events.utils.bigquery import BigqueryHelper


Expand All @@ -14,33 +14,36 @@ def __init__(self, args):
self._bq = BigqueryHelper(args.project, self._log, args.test)

@property
def _params(self):
def _params(self) -> dict:
""" Command arguments to dict."""
return vars(self._args)

def _run_incremental_fishing_events(self):
def _run_incremental_fishing_events(self) -> bool:
print(f"***{run_incremental(self._bq, self._params)}***")
return run_incremental(self._bq, self._params)

def _run_auth_and_regions_fishing_events(self):
def _run_auth_and_regions_fishing_events(self) -> bool:
return run_auth_and_regions(self._bq, self._params)

def _run_restricted_view_fishing_events(self):
return run_restricted_view(self._bq, self._params)
def _run_restricted_fishing_events(self) -> bool:
return run_restricted(self._bq, self._params)

def run(self):
def run(self) -> bool:
"""Executes the operation that matches."""
result = False
if self._args.operation == "incremental_events":
result = self._run_incremental_fishing_events()
elif self._args.operation == "auth_and_regions_fishing_events":
result = self._run_auth_and_regions_fishing_events()
elif self._args.operation == "restricted_view_events":
result = self._run_restricted_view_fishing_events()
elif self._args.operation == "fishing_restrictive":
result = self._run_restricted_fishing_events()
else:
raise RuntimeError(f"Invalid operation: {self._args.operation}")
return result


def main():
"""Executes the client."""
cli = Cli(parse(sys.argv))
result = cli.run()
exit(0 if result else 1)
Expand Down
23 changes: 12 additions & 11 deletions pipe_events/fishing_events_auth_and_regions.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
import logging


def dest_table_description(**extra_items):
return (
f"{extra_items['base_table_description']}\n"
f"{extra_items['table_description']}"
)
from pipe_events.utils.bigquery import dest_table_description


def run(bq, params):
log = logging.getLogger()
dest = params["destination"] + params['reference_date']
schema_path = "./assets/bigquery/fishing-events-4-authorization-schema.json"

bq.create_table(
params["destination"],
schema_file="./assets/bigquery/fishing-events-4-authorization-schema.json",
dest,
schema_file=schema_path,
table_description=dest_table_description(**params),
partition_field="event_end_date",
clustering_fields=["event_end_date", "seg_id", "event_start"],
labels=params["labels"],
)

log.info("*** 1. Creates the authorized with regions.")
log.info("*** 1. Creates the authorized with regions. Or less restrictive table.")
auth_query = bq.format_query("fishing-events-4-authorization.sql.j2", **params)
bq.run_query(
auth_query,
dest_table=params["destination"],
dest_table=dest,
write_disposition="WRITE_TRUNCATE",
partition_field="event_end_date",
clustering_fields=["event_end_date", "seg_id", "event_start"],
labels=params["labels"],
)
bq.update_table_schema(
dest,
schema_path
) # schema should be kept after trucate
return True
8 changes: 1 addition & 7 deletions pipe_events/fishing_events_incremental.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from pipe_events.utils.bigquery import dest_table_description


def run_incremental_fishing_events_query(temp_table, fishing_events_incremental_query):
Expand All @@ -8,13 +9,6 @@ def run_incremental_fishing_events_query(temp_table, fishing_events_incremental_
AS ({fishing_events_incremental_query})"""


def dest_table_description(**extra_items):
return (
f"{extra_items['base_table_description']}\n"
f"{extra_items['table_description']}"
)


def run(bq, params):
log = logging.getLogger()
# Starts a BQ session
Expand Down
33 changes: 33 additions & 0 deletions pipe_events/fishing_events_restricted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
from pipe_events.utils.bigquery import dest_table_description


def run(bq, params):
log = logging.getLogger()
ref_date = params['reference_date']
params["source_restrictive_events"] += ref_date
dest = params["dest_restrictive_events"] + ref_date
# schama is the same just with fishing strict list
schema_path = "./assets/bigquery/fishing-events-4-authorization-schema.json"
bq.create_table(
dest,
schema_file=schema_path,
table_description=dest_table_description(**params),
partition_field="event_end_date",
clustering_fields=["event_end_date", "seg_id", "event_start"],
labels=params["labels"],
)

log.info("*** 1. Creates the restrictive table.")
auth_query = bq.format_query("fishing-events-5-restrictive.sql.j2", **params)
bq.run_query(
auth_query,
dest_table=dest,
write_disposition="WRITE_TRUNCATE",
partition_field="event_end_date",
clustering_fields=["event_end_date", "seg_id", "event_start"],
labels=params["labels"],
)
bq.update_table_schema(dest, schema_path) # schema should be kept after trucate
log.info(f"The table {dest} is ready.")
return True
22 changes: 0 additions & 22 deletions pipe_events/fishing_events_restricted_view.py

This file was deleted.

8 changes: 8 additions & 0 deletions pipe_events/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
import json


def dest_table_description(**items) -> str:
"""Returns the table description."""
return (
f"{items.get('base_table_description', '')}\n"
f"{items.get('table_description', '')}"
)


def as_date_str(d):
result = d
if type(d) in [dt.datetime, dt.date]:
Expand Down
Loading

0 comments on commit 4900baf

Please sign in to comment.