Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daily and night fishing events incremental implementation by Chris #104

Merged
merged 36 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
761de2a
Rename fishing events queries and split processing steps
Chr96er Sep 11, 2024
2764108
Modify incremental processing steps; 1 and 2 tested; 3 subject to change
Chr96er Sep 11, 2024
d9e8585
Add closed events to overall merged table
Chr96er Sep 11, 2024
1e88ea6
Wrap up incremental fishing events; tested on staging pipeline
Chr96er Sep 11, 2024
c9ef98f
Use placeholder instead of hard coded table name
Chr96er Sep 11, 2024
516f13a
Don't reload backfill closed events during merge
Chr96er Sep 16, 2024
2c79a22
Remove obsolete injected code; rename parameter; add event_end_date
Chr96er Sep 19, 2024
2430c90
Fix filters by adding quotes; rename parameter; filter delta table fo…
Chr96er Sep 19, 2024
5a37ce0
Rename parameters; add CREATE TABLE statement; add event_end_date column
Chr96er Sep 19, 2024
ea0b288
Add code that orchestrates delta and backfill of incremental load
Chr96er Sep 19, 2024
f074811
Add readme containing challenges and solutions of incremental fishing…
Chr96er Sep 19, 2024
0d27e71
Add requirements.txt
Chr96er Sep 19, 2024
8bd5701
Fix PIPELINE-2192 - bug in splitting gaps in fishing events
Chr96er Sep 20, 2024
8dc226d
Move filter for earliest delta event to correct CTE; remove overlappi…
Chr96er Sep 20, 2024
7f35779
Further parameterise orchestration
Chr96er Sep 20, 2024
b431274
Remove comment
Chr96er Sep 20, 2024
e84964a
Document processing steps: original and incremental
Chr96er Sep 20, 2024
6822644
Properly indent stages
Chr96er Sep 20, 2024
093f324
Finalize fishing events: move cleanup to filter; apply less restricti…
Chr96er Sep 23, 2024
184e4d6
Remove obsolete and add new parameters
Chr96er Sep 23, 2024
430b8d4
Update description for incremental setup
Chr96er Sep 23, 2024
33ca03e
Update description for incremental setup
Chr96er Sep 23, 2024
c66f0fa
Adds utilities: bigqueryhelper and parser of commands
smpiano Oct 17, 2024
251227b
Adds merge filter and auth schemas
smpiano Oct 17, 2024
3fab18c
Add client and harness to run the queries with their schemas
smpiano Oct 17, 2024
5d5e045
Setup the flake8 and runs black over the py code
smpiano Oct 17, 2024
1fb1ca9
parametrize the max_fishing_event_gap_hours and removes the creation …
smpiano Oct 17, 2024
aadd2d2
Removes of orchestrate and fixes in schema fields descr
smpiano Oct 21, 2024
daec243
Increments version project
smpiano Oct 21, 2024
6acf82e
Adds destination table prefix to control from external.
smpiano Oct 21, 2024
8ac2a50
Adds creation of the restricted view, removes the prod_shiptype use e…
smpiano Nov 19, 2024
6eb705f
No to report error if view already exists
smpiano Nov 19, 2024
2bf0406
Fix PEP8
smpiano Nov 22, 2024
5e7ba16
Fix on mocking bigquery.Client
smpiano Nov 22, 2024
8b90dcf
pkg_resources module is not more supported in py3.12
smpiano Nov 22, 2024
178175c
Merge pull request #105 from GlobalFishingWatch/feature/PIPELINE-2170…
smpiano Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 99
exclude = ./assets
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ RUN pip install -e .

# Setup the entrypoint for quickly executing the pipelines
ENTRYPOINT ["scripts/run"]

44 changes: 44 additions & 0 deletions assets/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Incremental fishing detection

## Challenges and solutions

Conceptual challenges:
1. Fishing events can cross multiple days.
2. Fishing events are defined by all messages within the event. We apply filters that depend on all messages existing (e.g. average speed).
3. Fishing events could theoretically go back all the way to 2012 because there's no hard limit that would cut off events if a vessel was continuously fishing. In fact the longest event is currently > 100 days but this could be much higher.
4. We apply the mutable field `overlapping_and_short`.
5. The definition of fishing events depends on the best_vessel_class which is mutable over time and dictates whether to use `nnet_score` or `night_loitering` for each vessel.

Solutions:
1. Because of challenge #1 any incremental solution requires a merge step.
2. Because of challenge #2 we need to keep all messages available as long as an event isn't "closed" yet because the speed threshold may or may not be exceeded as we add more messages.
3. Because of challenge #3 we potentially need to keep a very long history of messages.
4. Because of challenge #4 we need to calculate fishing events by segment (which fortunately has been the default definition).
5. Because of challenge #5 we need to develop 2 parallel pipelines: 1. calculating fishing events based on `nnet_score`; 2. calculating fishing events based on `night_loitering`. In the daily merge step we then look up best_vessel_class and choose the right fishing event for each vessel.

Challenges based on solutions:
1. Solutions #2 and #3 combined mean that we have to implement another incremental mechanism. We cannot rescan the message history for all fishing events every day. Instead, we need to incrementally upsert new and open fishing events. This cannot happen in the first incremental load, because `research_messages` is partitioned by message timestamp. Instead, we first generate an incremental table that contains messages and is partitioned by fishing event end date, which is a cheap way of keeping track of "open" fishing events (which must be on the most recent event end date because the maximum gap within a fishing event is 2 hours).


## Processing steps
### Original setup
Previously, fishing events was split into two steps:
1. Calculate fishing events based on `research_messages`, add `vessel_id` and all other identity information and remove `overlapping_and_short` segments. Based on the latest best_vessel_class fishing events were either based on `nnet_score` (`best_vessel_class != 'squid_jigger'`) or `night_loitering (`best_vessel_class = 'squid_jigger'`).
2. Add authorization and regions information.

### Incremental setup
1. The former first step is split into 3 stages:
1. Calculate fishing events by segment for any given time period (backfill or yearly/monthly/daily increment are possible). Annotate the fishing events on messages so that we have all messages for each fishing event. We call the output `temp_fishing_event_messages_temp`. The suffix `_temp` shows that this table does not need to be stored permanently but is only used in the next step (therefore BQ temporary tables could be used). For incremental loads we pad the previous day, so we can merge fishing events in the next step.

The code for this is exactly the first half of the original first step.

2. Merge `fishing_event_messages_temp` with the existing `fishing_event_messages_merged` table. The merge step retrieves `fishing_event_messages_merged` from the last two hours of the previous day and attempts to merge messages by `seg_id` and `timestamp` with the latest incremental `fishing_event_messages_temp`. If fishing events overlap they get merged, which means updating the `event_end` of the "open" fishing event messages, and updating the `event_start` of the new fishing event messages.

This step is entirely new. The most similar existing concept in pipe3 to this table is `raw_port_events`. Technically, at this point we only store "potential" events and not all of them will eventually pass all filters.

3. Generate final fishing events which are based on all messages within an event. Apply filters, add `vessel_id` and identity fields, and remove `overlapping_and_short` segments.


All 3 steps need to be run twice, once for `nnet_score` and once for `night_loitering`.

2. The former second step of adding authorization and regions information is largely unchanged. However, it now combines `fishing events` and `night loitering events`. Also, a hard coded filter for less retrictive fishing vessels is applied and in the end
156 changes: 156 additions & 0 deletions assets/bigquery/fishing-events-1-incremental.sql.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#standardSQL

-- Include some utility functions

--
-- Fishing Events
--
-- Aggregate position messages that have been annotated with a fishing score into fishing events
-- A fishing event is a sequence of consecutive messages that all have a fishing score of 1.0
-- messages with score=null are ignored
WITH
-- Source tables
source_research_messages AS (
SELECT *
FROM `{{ messages_table }}`
-- allow incremental as well as backfill loads
WHERE DATE(timestamp) BETWEEN '{{ start_date }}' AND '{{ end_date }}'
),

source_messages AS (
SELECT
ssvid,
seg_id,
timestamp,
EXTRACT(year FROM timestamp) as year,
lat,
lon,
speed_knots, --speed (knots) from AIS message
meters_to_prev, --meters to previous position in segment
implied_speed_knots, --meters_to_prev / hours
hours, --time since the previous position in the segment
{{ nnet_score_night_loitering }} score
FROM
source_research_messages
WHERE
lat > -90
AND lat < 90
),

--
-- Group messages into events which are consecutive sequences of messages with the same score within the same seg_id
-- First for each message, get the score from the previous message in the segement
--
prev_score_message AS (
SELECT
ssvid,
seg_id,
timestamp,
lat,
lon,
score,
meters_to_prev,
implied_speed_knots,
hours,
LAG(score) OVER (PARTITION BY seg_id ORDER BY timestamp) AS prev_score,
LAG(timestamp) OVER (PARTITION BY seg_id ORDER BY timestamp) AS prev_timestamp,
LAG(seg_id) OVER (PARTITION BY seg_id ORDER BY TIMESTAMP) AS prev_seg_id
FROM
source_messages),

--
-- Now get the time range from the start of a group to the end of the group
-- Do this by filtering to only the first message in each grouping and making a time range from
-- the first message in one group to the prev_timestamp of first message in the next group
--
event_range AS (
SELECT
ssvid,
seg_id,
score,
prev_timestamp,
meters_to_prev,
timestamp AS event_start,
LEAD(prev_timestamp) OVER (PARTITION BY seg_id ORDER BY timestamp) as event_end,
lat,
lon
FROM
prev_score_message
WHERE
-- identifies first message in event
prev_score IS NULL
OR
score != prev_score
OR
-- splits fishing events with consecutive nnet fishing positions of 1 if
-- previous ais position is farther than 10,000 meters away from current position
-- OR if current position was registered more than 2 hours after previous position
meters_to_prev > 10000
OR
hours > {{ max_fishing_event_gap_hours }}
),

--
-- Filter event ranges to only those with score = 1.0 (fishing)
-- and for each fishing event get the end of the time range of the previous fishing event
--
prev_fishing_event_range AS (
SELECT
ssvid,
seg_id,
event_start,
event_end,
lat,
lon,
-- calculate time and distance to previous fishing event
-- if previous fishing event is within same seg_id
st_distance(st_geogpoint(lon,lat),
st_geogpoint(lag(lon, 1) over (partition by seg_id order by event_start),
lag(lat, 1) over (partition by seg_id order by event_start)) ) as distance_to_prev_event_m,
-- intermediate step; prev_event_end gets fixed/completed later in query
LAG(event_end) OVER (PARTITION BY seg_id ORDER BY event_start) AS prev_event_end
FROM
event_range
WHERE
score = 1.0 ),

--
-- Create ranges spanning consecutive events that are separated by a small time interval
--
fishing_event_range AS (
SELECT
ssvid,
seg_id,
distance_to_prev_event_m,
event_start,
LEAD(prev_event_end) OVER (PARTITION BY seg_id ORDER BY event_start) AS event_end
FROM
prev_fishing_event_range
WHERE
prev_event_end IS NULL
-- combine fishing events if fishing events are temporally close enough, as defined in restriction below
-- combine fishing events less than 1 hour and 2 km apart
-- this line will combine fishing events, even if there are null or non-fishing scores between events
OR (TIMESTAMP_DIFF(event_start, prev_event_end, SECOND) > 3600
OR distance_to_prev_event_m > 2000)
),

--
-- Tag all the messages with the start time of the event range that contains the message
-- limit this to just messages with score = 1.0
--
fishing_event_message AS (
SELECT
source_messages.*,
fishing_event_range.event_start,
MAX(timestamp) OVER (PARTITION BY seg_id, event_start) event_end
FROM
source_messages
JOIN
fishing_event_range USING (seg_id)
WHERE
timestamp >= event_start
AND (event_end IS NULL OR timestamp <= event_end)
AND score = 1.0 )

select *, DATE(event_end) event_end_date from fishing_event_message
Loading