Skip to content

Commit

Permalink
fix: error segments for vessel with no excursion in db not created
Browse files Browse the repository at this point in the history
  • Loading branch information
ejamet73 authored and marthevienne committed Jan 7, 2025
1 parent b7b61c8 commit 1ea3fd5
Showing 1 changed file with 80 additions and 83 deletions.
163 changes: 80 additions & 83 deletions backend/bloom/tasks/create_update_excursions_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ def run():
for vessel_id in batch["vessel_id"].unique():
df_end = batch.loc[batch["vessel_id"] == vessel_id].copy()
df_end.rename(columns={"timestamp": "timestamp_end",
"heading": "heading_at_end",
"speed": "speed_at_end",
"longitude": "end_longitude",
"latitude": "end_latitude"
}, inplace=True)
"heading": "heading_at_end",
"speed": "speed_at_end",
"longitude": "end_longitude",
"latitude": "end_latitude"
}, inplace=True)
df_end.sort_values("timestamp_end", inplace=True)
df_end.reset_index(drop=True, inplace=True)
# get every end entry but the last one ; each one of them will be the start point of a segment
Expand Down Expand Up @@ -185,7 +185,6 @@ def get_distance_in_miles(x) -> float:
return distance.distance(p1, p2).miles

df["distance"] = df.apply(get_distance_in_miles, axis=1)
print(df)
else:
#df_start=df_end.copy()
vessel_last_segment = pd.DataFrame()
Expand Down Expand Up @@ -229,97 +228,95 @@ def get_distance_in_miles(x) -> float:
df = df[df["timestamp_start"] != df["timestamp_end"]].copy()
# reseting index
df.reset_index(inplace=True, drop=True)
#print(df)

if (df.shape[0] > 0):
# calculate distance
def get_distance_in_miles(x) -> float:
p1 = (x.start_latitude, x.start_longitude)
p2 = (x.end_latitude, x.end_longitude)
return distance.distance(p1, p2).miles
if (df.shape[0] > 0):
# calculate distance
def get_distance_in_miles(x) -> float:
p1 = (x.start_latitude, x.start_longitude)
p2 = (x.end_latitude, x.end_longitude)
return distance.distance(p1, p2).miles

df["distance"] = df.apply(get_distance_in_miles, axis=1)
df["distance"] = df.apply(get_distance_in_miles, axis=1)

# calculate duration in seconds
def get_duration(x) -> float:
return (x.timestamp_end - x.timestamp_start).total_seconds()
# calculate duration in seconds
def get_duration(x) -> float:
return (x.timestamp_end - x.timestamp_start).total_seconds()

df["segment_duration"] = df.apply(get_duration, axis=1)
df["segment_duration"] = df.apply(get_duration, axis=1)

# set default type as AT_SEA
df["type"] = "AT_SEA"
# set default type as AT_SEA
df["type"] = "AT_SEA"

# set type as default_ais for segment with duration > 35 min
df.loc[df["segment_duration"] >= 2100, "type"] = "DEFAULT_AIS"
# set type as default_ais for segment with duration > 35 min
df.loc[df["segment_duration"] >= 2100, "type"] = "DEFAULT_AIS"

# calculate average speed in knot
df["average_speed"] = df["distance"] / (df["segment_duration"] / 3600)
# calculate average speed in knot
df["average_speed"] = df["distance"] / (df["segment_duration"] / 3600)

# set last_vessel_segment
df["last_vessel_segment"] = 0
if len(df) >1 :
df["last_vessel_segment"].iloc[-1] = 1
else :
df["last_vessel_segment"] = 1
# set last_vessel_segment
df["last_vessel_segment"] = 0
if len(df) >1 :
df["last_vessel_segment"].iloc[-1] = 1
else :
df["last_vessel_segment"] = 1

# check if segment ends in a port (only for segment with average_speed < maximal_speed_to_check_if_in_port or with type 'DEFAULT_AIS')
def get_port(x, session):
if x.type == 'DEFAULT_AIS' or x.average_speed < maximal_speed_to_check_if_in_port:
res = port_repository.get_closest_port_in_range(session, x.end_longitude, x.end_latitude,
threshold_distance_to_port)
if res:
(port_id, distance) = res
return port_id
else:
return None
def get_port(x, session):
if x.type == 'DEFAULT_AIS' or x.average_speed < maximal_speed_to_check_if_in_port:
res = port_repository.get_closest_port_in_range(session, x.end_longitude, x.end_latitude,
threshold_distance_to_port)
if res:
(port_id, distance) = res
return port_id
else:
return None

df["port"] = df.apply(get_port, axis=1, args=(session,))

# get or create new excursion
# logic :
# if segment ends in a port while ongoing excursion is open, then we close the excursion
# else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
# else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
# additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
# else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
# on its ending position, distance traveled and a given average exit speed
df["excursion_id"] = np.NaN
for a in df.index:
if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
if (open_ongoing_excursion):
close_excursion(session, ongoing_excursion_id, int(df["port"].iloc[a]),
df["end_latitude"].iloc[a],
df["end_longitude"].iloc[a],
df["timestamp_end"].iloc[a]) # put the close excursion function here
df["excursion_id"].iloc[a] = ongoing_excursion_id
open_ongoing_excursion = False
nb_closed_excursion += 1
elif open_ongoing_excursion:
else:
return None
df["port"] = df.apply(get_port, axis=1, args=(session,))

# get or create new excursion
# logic :
# if segment ends in a port while ongoing excursion is open, then we close the excursion
# else, if the ongoing excursion is open, then we use the ongoing excursion_id for the segment
# else, we create a new excursion whose id will become the ongoing excursion_id for this segment and the future ones
# additionnaly, when we create a new excursion, if the vessel is 'new' then we create an 'empty' excursion
# else, if the first segment of this new excursion is of type 'DEFAULT_AIS', we estimate the time of departure based
# on its ending position, distance traveled and a given average exit speed
df["excursion_id"] = np.NaN
for a in df.index:
if df["port"].iloc[a] is not None and df["port"].iloc[a] >= 0:
if (open_ongoing_excursion):
close_excursion(session, ongoing_excursion_id, int(df["port"].iloc[a]),
df["end_latitude"].iloc[a],
df["end_longitude"].iloc[a],
df["timestamp_end"].iloc[a]) # put the close excursion function here
df["excursion_id"].iloc[a] = ongoing_excursion_id
open_ongoing_excursion = False
nb_closed_excursion += 1
elif open_ongoing_excursion:
df["excursion_id"].iloc[a] = ongoing_excursion_id
else:
if is_new_vessel:
ongoing_excursion_id = add_excursion(session, int(vessel_id),
df["timestamp_end"].iloc[a],
Point(df["end_longitude"].iloc[a],
df["end_latitude"].iloc[
a]))
is_new_vessel = False
nb_created_excursion += 1
else:
if is_new_vessel:
ongoing_excursion_id = add_excursion(session, int(vessel_id),
df["timestamp_end"].iloc[a],
Point(df["end_longitude"].iloc[a],
df["end_latitude"].iloc[
a]))
is_new_vessel = False
nb_created_excursion += 1
else:
def get_time_of_departure():
if (df['type'].iloc[a] == 'DEFAULT_AIS'):
return df['timestamp_end'].iloc[a] - timedelta(0, 3600 * df['distance'].iloc[
a] / average_exit_speed)
else:
return df["timestamp_start"].iloc[a]

ongoing_excursion_id = add_excursion(session, int(vessel_id),
get_time_of_departure()) # put the create new excursion function here
nb_created_excursion += 1
open_ongoing_excursion = True
df["excursion_id"].iloc[a] = ongoing_excursion_id
def get_time_of_departure():
if (df['type'].iloc[a] == 'DEFAULT_AIS'):
return df['timestamp_end'].iloc[a] - timedelta(0, 3600 * df['distance'].iloc[
a] / average_exit_speed)
else:
return df["timestamp_start"].iloc[a]

ongoing_excursion_id = add_excursion(session, int(vessel_id),
get_time_of_departure()) # put the create new excursion function here
nb_created_excursion += 1
open_ongoing_excursion = True
df["excursion_id"].iloc[a] = ongoing_excursion_id
# concat the result for current vessel in the result dataframe
if (df.shape[0] > 0):
result = pd.concat([result, df[df["excursion_id"] >= 0]], axis=0)
Expand Down

0 comments on commit 1ea3fd5

Please sign in to comment.