Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add areas #77

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions fvh3t/core/area.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fvh3t.core.trajectory import Trajectory
from fvh3t.core.trajectory_layer import TrajectoryLayer

from qgis.core import (
QgsGeometry,
QgsWkbTypes,
)

from fvh3t.core.exceptions import InvalidGeometryTypeException


class Area:
"""
A wrapper class around a QgsGeometry which represents a
polygon through which trajectories can pass. The geometry
must be a polygon.
"""

def __init__(
self,
geom: QgsGeometry,
name: str,
) -> None:
if geom.type() != QgsWkbTypes.GeometryType.PolygonGeometry:
msg = "Area must be created from a polygon geometry!"
raise InvalidGeometryTypeException(msg)

self.__geom: QgsGeometry = geom
self.__name: str = name
self.__trajectory_count: int = 0
self.__average_speed: float = 0.0

def geometry(self) -> QgsGeometry:
return self.__geom

def name(self) -> str:
return self.__name

def trajectory_count(self) -> int:
return self.__trajectory_count

def average_speed(self) -> float:
return self.__average_speed

def intersects(self, traj: Trajectory) -> bool:
return self.__geom.intersects(traj.as_geometry())

def count_trajectories_from_layer(self, layer: TrajectoryLayer) -> None:
self.count_trajectories(layer.trajectories())

def count_trajectories(
self,
trajectories: tuple[Trajectory, ...],
) -> None:
speed = 0.0

for trajectory in trajectories:
if self.intersects(trajectory):
traj_speed = trajectory.average_speed()
speed += traj_speed

self.__trajectory_count += 1

if self.__trajectory_count > 0:
self.__average_speed = speed / self.__trajectory_count
147 changes: 147 additions & 0 deletions fvh3t/core/area_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from qgis.core import QgsFeature, QgsFeatureSource, QgsField, QgsVectorLayer, QgsWkbTypes
from qgis.PyQt.QtCore import QDateTime, QMetaType, QVariant

from fvh3t.core.area import Area
from fvh3t.core.exceptions import InvalidFeatureException, InvalidLayerException

if TYPE_CHECKING:
from fvh3t.core.trajectory_layer import TrajectoryLayer


class AreaLayer:
"""
Wrapper around a QgsVectorLayer object from which areas
can be instantiated, i.e.

1. is a polygon layer
2. has a name field
3. has features
"""

def __init__(
self,
layer: QgsVectorLayer,
id_field: str,
name_field: str,
) -> None:
self.__layer: QgsVectorLayer = layer
self.__name_field = name_field
self.__id_field = id_field

if self.is_valid():
self.__areas: tuple[Area, ...] = ()
self.create_areas()

def create_areas(self) -> None:
name_field_idx: int = self.__layer.fields().indexOf(self.__name_field)
areas: list[Area] = []

for feature in self.__layer.getFeatures():
name: str = feature[name_field_idx]
area = Area(
feature.geometry(),
name=name,
)

areas.append(area)

self.__areas = tuple(areas)

def count_trajectories_from_layer(self, layer: TrajectoryLayer) -> None:
for area in self.__areas:
area.count_trajectories_from_layer(layer)

def areas(self) -> tuple[Area, ...]:
return self.__areas

def as_polygon_layer(
self, traveler_class: str | None, start_time: QDateTime, end_time: QDateTime
) -> QgsVectorLayer | None:
polygon_layer = QgsVectorLayer("Polygon", "Polygon Layer", "memory")
polygon_layer.setCrs(self.__layer.crs())

polygon_layer.startEditing()

polygon_layer.addAttribute(QgsField("fid", QVariant.Int))
polygon_layer.addAttribute(QgsField("name", QVariant.String))
polygon_layer.addAttribute(QgsField("class", QVariant.String))
polygon_layer.addAttribute(QgsField("interval_start", QVariant.DateTime))
polygon_layer.addAttribute(QgsField("interval_end", QVariant.DateTime))
polygon_layer.addAttribute(QgsField("vehicle_count", QVariant.Int))
polygon_layer.addAttribute(QgsField("speed_avg", QVariant.Double))

fields = polygon_layer.fields()

for i, area in enumerate(self.__areas, 1):
feature = QgsFeature(fields)

feature.setAttributes(
[
i,
area.name(),
traveler_class if traveler_class else "all",
start_time,
end_time,
area.trajectory_count(),
round(area.average_speed(), 2),
]
)
feature.setGeometry(area.geometry())

if not feature.isValid():
raise InvalidFeatureException

polygon_layer.addFeature(feature)

polygon_layer.commitChanges()

return polygon_layer

def is_field_valid(self, field_name: str, *, accepted_types: list[QMetaType.Type]) -> bool:
"""
Check that a field 1) exists and 2) has an
acceptable type. Leave type list empty to
allow any type.
"""
field_id: int = self.__layer.fields().indexFromName(field_name)

if field_id == -1:
return False

if not accepted_types: # means all types are accepted
return True

field: QgsField = self.__layer.fields().field(field_id)
field_type: QMetaType.Type = field.type()

return field_type in accepted_types

def is_valid(self) -> bool:
is_layer_valid: bool = self.__layer.isValid()
if not is_layer_valid:
msg = "Layer is not valid."
raise InvalidLayerException(msg)

is_polygon_layer: bool = self.__layer.geometryType() == QgsWkbTypes.GeometryType.PolygonGeometry
if not is_polygon_layer:
msg = "Layer is not a polygon layer."
raise InvalidLayerException(msg)

has_features: bool = self.__layer.hasFeatures() == QgsFeatureSource.FeatureAvailability.FeaturesAvailable
if not has_features:
msg = "Layer has no features."
raise InvalidLayerException(msg)

if not self.is_field_valid(self.__name_field, accepted_types=[QMetaType.Type.QString]):
msg = "Name field either not found or of incorrect type."
raise InvalidLayerException(msg)

if not self.is_field_valid(self.__id_field, accepted_types=[]):
msg = "ID field not found."
raise InvalidLayerException(msg)

return True
12 changes: 7 additions & 5 deletions fvh3t/core/gate_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def create_gates(self) -> None:
def gates(self) -> tuple[Gate, ...]:
return self.__gates

def as_line_layer(self, traveler_class: str, start_time: QDateTime, end_time: QDateTime) -> QgsVectorLayer | None:
def as_line_layer(
self, traveler_class: str | None, start_time: QDateTime, end_time: QDateTime
) -> QgsVectorLayer | None:
line_layer = QgsVectorLayer("LineString", "Line Layer", "memory")
line_layer.setCrs(self.__layer.crs())

Expand All @@ -78,21 +80,21 @@ def as_line_layer(self, traveler_class: str, start_time: QDateTime, end_time: QD

fields = line_layer.fields()

for i, gate in enumerate(self.__gates):
for i, gate in enumerate(self.__gates, 1):
feature = QgsFeature(fields)

feature.setAttributes(
[
i,
gate.name(),
traveler_class,
traveler_class if traveler_class else "all",
start_time,
end_time,
gate.counts_negative(),
gate.counts_positive(),
gate.trajectory_count(),
gate.average_speed(),
gate.average_acceleration(),
round(gate.average_speed(), 2),
round(gate.average_acceleration(), 2),
]
)
feature.setGeometry(gate.geometry())
Expand Down
36 changes: 36 additions & 0 deletions fvh3t/core/qgis_layer_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from qgis.core import (
QgsCoordinateReferenceSystem,
QgsDefaultValue,
QgsFeatureRenderer,
QgsField,
QgsFieldConstraints,
Expand All @@ -14,6 +15,16 @@


class QgisLayerUtils:
@staticmethod
def set_area_style(layer: QgsVectorLayer) -> None:
doc = QDomDocument()

with open(resources_path("style", "area_style.xml")) as style_file:
doc.setContent(style_file.read())

renderer = QgsFeatureRenderer.load(doc.documentElement(), QgsReadWriteContext())
layer.setRenderer(renderer)

@staticmethod
def set_gate_style(layer: QgsVectorLayer) -> None:
doc = QDomDocument()
Expand Down Expand Up @@ -43,3 +54,28 @@ def create_gate_layer(crs: QgsCoordinateReferenceSystem) -> QgsVectorLayer:
QgisLayerUtils.set_gate_style(layer)

return layer

@staticmethod
def create_area_layer(crs: QgsCoordinateReferenceSystem) -> QgsVectorLayer:
layer = QgsVectorLayer("Polygon", "Area Layer", "memory")
layer.setCrs(crs)

with edit(layer):
layer.addAttribute(QgsField("fid", QVariant.Int))
layer.addAttribute(QgsField("name", QVariant.String))

layer.setFieldConstraint(0, QgsFieldConstraints.Constraint.ConstraintNotNull)

def_value = QgsDefaultValue(
"""with_variable('feat_id', (maximum("fid") + 1), if (@feat_id is NULL, 1, @feat_id))"""
)
layer.setDefaultValueDefinition(0, def_value)

efc = layer.editFormConfig()
efc.setReadOnly(0)

layer.setEditFormConfig(efc)

QgisLayerUtils.set_area_style(layer)

return layer
18 changes: 11 additions & 7 deletions fvh3t/core/trajectory_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ def __init__(
self.__trajectories: tuple[Trajectory, ...] = ()
self.create_trajectories(extra_filter_expression)

# TODO: should the class of traveler be handled here?

def layer(self) -> QgsVectorLayer:
return self.__layer

Expand Down Expand Up @@ -134,21 +132,27 @@ def trajectories(self) -> tuple[Trajectory, ...]:
def crs(self) -> QgsCoordinateReferenceSystem:
return self.__layer.crs()

def create_trajectories(self, filter_expression: str | None) -> None:
def create_trajectories(self, extra_filter_expression: str | None) -> None:
id_field_idx: int = self.__layer.fields().indexOf(self.__id_field)
timestamp_field_idx: int = self.__layer.fields().indexOf(self.__timestamp_field)
width_field_idx: int = self.__layer.fields().indexOf(self.__width_field)
length_field_idx: int = self.__layer.fields().indexOf(self.__length_field)
height_field_idx: int = self.__layer.fields().indexOf(self.__height_field)

id_is_string: bool = self.__layer.fields().field(id_field_idx).type() == QMetaType.Type.QString

unique_ids: set[Any] = self.__layer.uniqueValues(id_field_idx)

trajectories: list[Trajectory] = []

for identifier in unique_ids:
expression_str = f'("{self.__id_field}" = {identifier})'
if filter_expression:
expression_str += f" and ({filter_expression})"
if id_is_string:
expression_str = f"(\"{self.__id_field}\" = '{identifier}')"
else:
expression_str = f'("{self.__id_field}" = {identifier})'

if extra_filter_expression:
expression_str += f" and ({extra_filter_expression})"

expression = QgsExpression(expression_str)
request = QgsFeatureRequest(expression)
Expand Down Expand Up @@ -207,7 +211,7 @@ def as_line_layer(self) -> QgsVectorLayer | None:

fields = line_layer.fields()

for i, trajectory in enumerate(self.__trajectories):
for i, trajectory in enumerate(self.__trajectories, 1):
feature = QgsFeature(fields)

min_size_x, min_size_y, min_size_z = trajectory.minimum_size()
Expand Down
Loading