-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #102 from GlobalFishingWatch/VV-650-issue-with-por…
…t-visits-location-name VV-650 implemented v2 of port visits with pipe3
- Loading branch information
Showing
3 changed files
with
434 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
#standardSQL | ||
|
||
# Include some utility functions | ||
{% include 'util.sql.j2' %} | ||
|
||
|
||
WITH | ||
|
||
# | ||
# Spatial measures data | ||
# | ||
source_spatial_measures AS ( | ||
SELECT | ||
*, | ||
FROM `{{ spatial_measures_table }}` | ||
), | ||
|
||
# | ||
# Regions data | ||
# | ||
source_regions AS ( | ||
SELECT | ||
*, | ||
FROM `{{ regions_table }}` | ||
), | ||
|
||
# | ||
# Port visits Events | ||
# | ||
source_port_visits AS ( | ||
SELECT | ||
* | ||
FROM `{{ port_visits_table }}` | ||
WHERE confidence >= 2 | ||
AND end_timestamp <= '{{ end_date }}' | ||
), | ||
|
||
# | ||
# Vessels byyear sources | ||
# | ||
source_product_vessel_info_summary AS ( | ||
SELECT | ||
* | ||
FROM `{{ product_vessel_info_summary_table }}` | ||
), | ||
|
||
source_named_anchorages AS ( | ||
SELECT | ||
* | ||
FROM `{{ named_anchorages_table }}` | ||
), | ||
|
||
|
||
gridded_regions AS ( | ||
SELECT | ||
* EXCEPT(s2_cells), | ||
FROM | ||
source_regions | ||
CROSS JOIN UNNEST (s2_cells) AS s2_cell | ||
), | ||
|
||
-- # Get base info, calculate mean position and aggregate segment ids. | ||
mean_positions AS ( | ||
SELECT | ||
visit_id, | ||
ARRAY_AGG(DISTINCT seg_id) AS seg_ids | ||
FROM | ||
source_port_visits, | ||
UNNEST(events) | ||
GROUP BY | ||
visit_id | ||
), | ||
|
||
-- in order to determine the intermediate anchorage, we prioritize port stops over gaps and beginnings over ends | ||
port_visits_with_ranked_events AS ( | ||
SELECT | ||
source_port_visits.*, | ||
e.timestamp AS event_timestamp, | ||
e.anchorage_id AS event_anchorage_id, | ||
CASE | ||
WHEN event_type = 'PORT_STOP_BEGIN' THEN 1 | ||
WHEN event_type = 'PORT_STOP_END' THEN 2 | ||
WHEN event_type = 'PORT_GAP_BEGIN' THEN 3 | ||
WHEN event_type = 'PORT_GAP_END' THEN 4 | ||
ELSE NULL | ||
END AS event_rank | ||
FROM | ||
source_port_visits | ||
LEFT JOIN | ||
UNNEST (events) events | ||
WHERE vessel_id IN (SELECT vessel_id FROM source_product_vessel_info_summary) | ||
), | ||
|
||
-- we pick the event with the lowest ranking and in case of ties we pick the first one | ||
port_visits_filtered_lowest_ranked_event AS ( | ||
SELECT | ||
* EXCEPT(event_rank) | ||
FROM | ||
port_visits_with_ranked_events | ||
QUALIFY ROW_NUMBER() OVER(PARTITION BY visit_id ORDER BY event_rank, event_timestamp) = 1 | ||
), | ||
|
||
intermadiate_anchorage AS ( | ||
SELECT | ||
visit_id, | ||
event_anchorage_id AS anchorage_id, | ||
iso3, | ||
label, | ||
label_source, | ||
top_destination, | ||
distance_from_shore_m, | ||
at_dock, | ||
source_named_anchorages.lat AS lat, | ||
source_named_anchorages.lon AS lon, | ||
FROM | ||
port_visits_filtered_lowest_ranked_event | ||
JOIN source_named_anchorages ON (port_visits_filtered_lowest_ranked_event.event_anchorage_id = source_named_anchorages.s2id) | ||
), | ||
|
||
# | ||
# Finally, enhance the event with info related to regions and distances. | ||
# | ||
total_events as ( | ||
SELECT | ||
TO_HEX(MD5(FORMAT("%s|%s|%t|%t",'port_visit', source_port_visits.vessel_id, start_timestamp, end_timestamp))) AS event_id, | ||
'port_visit' AS event_type, | ||
source_port_visits.vessel_id as vessel_id, | ||
cast(NULL AS string) AS seg_id, | ||
start_timestamp AS event_start, | ||
end_timestamp AS event_end, | ||
intermadiate_anchorage.lat AS lat_mean, | ||
intermadiate_anchorage.lon AS lon_mean, | ||
start_lat as lat_min, | ||
end_lat AS lat_max, | ||
start_lon AS lon_min, | ||
end_lon AS lon_max, | ||
ST_GEOGPOINT(intermadiate_anchorage.lon, intermadiate_anchorage.lat ) as geo, | ||
S2_CELLIDFROMPOINT(ST_GEOGPOINT(intermadiate_anchorage.lon, intermadiate_anchorage.lat ), s2_level()) as s2_cell, | ||
convert_m_to_km( spatial_measures_start.distance_from_shore_m ) AS start_distance_from_shore_km, | ||
convert_m_to_km( spatial_measures_end.distance_from_shore_m ) AS end_distance_from_shore_km, | ||
convert_m_to_km( spatial_measures_start.distance_from_port_m ) AS start_distance_from_port_km, | ||
convert_m_to_km( spatial_measures_end.distance_from_port_m ) AS end_distance_from_port_km, | ||
TO_JSON_STRING( | ||
STRUCT( | ||
duration_hrs, | ||
source_port_visits.visit_id, | ||
confidence, | ||
STRUCT( | ||
start_anchorage_id AS anchorage_id, | ||
generate_port_id(start_anchorage.iso3, start_anchorage.label) AS id, | ||
start_anchorage.iso3 AS flag, | ||
IF (start_anchorage.label_source = 'top_destination', NULL, start_anchorage.label) AS name, | ||
start_anchorage.top_destination AS top_destination, | ||
convert_m_to_km( start_anchorage.distance_from_shore_m ) AS distance_from_shore_km, | ||
start_anchorage.at_dock, | ||
start_anchorage.lat AS lat, | ||
start_anchorage.lon AS lon | ||
) AS start_anchorage, | ||
STRUCT( | ||
end_anchorage_id AS anchorage_id, | ||
generate_port_id(end_anchorage.iso3, end_anchorage.label) AS id, | ||
end_anchorage.iso3 AS flag, | ||
IF (end_anchorage.label_source = 'top_destination', NULL, end_anchorage.label) AS name, | ||
end_anchorage.top_destination AS top_destination, | ||
convert_m_to_km( end_anchorage.distance_from_shore_m ) AS distance_from_shore_km, | ||
end_anchorage.at_dock, | ||
end_anchorage.lat AS lat, | ||
end_anchorage.lon AS lon | ||
) AS end_anchorage, | ||
STRUCT( | ||
intermadiate_anchorage.anchorage_id AS anchorage_id, | ||
generate_port_id(intermadiate_anchorage.iso3, intermadiate_anchorage.label) AS id, | ||
intermadiate_anchorage.iso3 AS flag, | ||
IF (intermadiate_anchorage.label_source = 'top_destination', NULL, intermadiate_anchorage.label) AS name, | ||
intermadiate_anchorage.top_destination AS top_destination, | ||
convert_m_to_km( intermadiate_anchorage.distance_from_shore_m ) AS distance_from_shore_km, | ||
intermadiate_anchorage.at_dock, | ||
intermadiate_anchorage.lat AS lat, | ||
intermadiate_anchorage.lon AS lon | ||
) AS intermediate_anchorage, | ||
seg_ids | ||
) | ||
) as event_info, | ||
TO_JSON_STRING([ | ||
STRUCT( | ||
vessel.vessel_id AS `id`, | ||
vessel.ssvid AS `ssvid`, | ||
vessel.shipname AS `name`, | ||
vessel.prod_shiptype as `type`, | ||
vessel.mmsi_flag as `flag` | ||
) | ||
]) as event_vessels | ||
FROM | ||
source_port_visits | ||
JOIN source_product_vessel_info_summary vessel on vessel.vessel_id = source_port_visits.vessel_id and vessel.year= EXTRACT(year from start_timestamp) | ||
JOIN source_named_anchorages AS start_anchorage ON (start_anchorage_id = start_anchorage.s2id) | ||
JOIN source_named_anchorages AS end_anchorage ON (end_anchorage_id = end_anchorage.s2id) | ||
LEFT JOIN intermadiate_anchorage ON (source_port_visits.visit_id = intermadiate_anchorage.visit_id) | ||
JOIN source_spatial_measures AS spatial_measures_start ON format_gridcode(start_lon,start_lat) = spatial_measures_start.gridcode | ||
JOIN source_spatial_measures AS spatial_measures_end ON format_gridcode(end_lon, end_lat) = spatial_measures_end.gridcode | ||
), | ||
|
||
create_event_region_matches AS ( | ||
SELECT | ||
events.event_id, | ||
id, | ||
layer, | ||
FROM | ||
total_events AS events | ||
JOIN gridded_regions r ON events.s2_cell = r.s2_cell | ||
WHERE | ||
ST_INTERSECTS(events.geo, r.geo) | ||
), | ||
|
||
event_by_layer_by_id AS ( | ||
SELECT | ||
event_id, | ||
layer, | ||
id | ||
FROM create_event_region_matches | ||
GROUP BY | ||
1, | ||
2, | ||
3 | ||
), | ||
|
||
events_by_layer AS ( | ||
SELECT | ||
event_id, CONCAT('"', layer, '":' ,TO_JSON_STRING(array_agg(id))) AS json_frament | ||
FROM | ||
event_by_layer_by_id | ||
GROUP BY | ||
event_id, | ||
layer | ||
), | ||
|
||
event_with_region AS ( | ||
SELECT | ||
event_id, | ||
parse_regions_to_struct(CONCAT( "{", STRING_AGG(json_frament, ","), "}")) AS regions | ||
FROM events_by_layer | ||
GROUP BY | ||
1 | ||
) | ||
|
||
SELECT | ||
total_events.* EXCEPT (s2_cell, geo), | ||
spatial_measures_mean.regions AS regions_mean_position | ||
FROM | ||
total_events | ||
LEFT JOIN event_with_region spatial_measures_mean USING (event_id) | ||
|
Oops, something went wrong.