diff --git a/dgp/annotations/camera_transforms.py b/dgp/annotations/camera_transforms.py index 679efd96..e0a39d98 100644 --- a/dgp/annotations/camera_transforms.py +++ b/dgp/annotations/camera_transforms.py @@ -633,7 +633,7 @@ def transform_datum(self, cam_datum: Dict[str, Any]) -> Dict[str, Any]: # pylin rgb_mask = self.transform_mask_2d(rgb_mask) new_datum['rgb_mask'] = rgb_mask - if 'bounding_box_3d' in new_datum: + if 'bounding_box_3d' in new_datum and new_datum['bounding_box_3d'] is not None: # Note: DGP camera class does not model the full camera matrix just focal length and center # if using DGP camera class, do not use transformations that add a skew! boxes = new_datum['bounding_box_3d'] @@ -641,7 +641,7 @@ def transform_datum(self, cam_datum: Dict[str, Any]) -> Dict[str, Any]: # pylin boxes = self.transform_detections_3d(boxes, pose_correction) new_datum['bounding_box_3d'] = boxes - if 'bounding_box_2d' in new_datum: + if 'bounding_box_2d' in new_datum and new_datum['bounding_box_2d'] is not None: boxes = new_datum['bounding_box_2d'] boxes = self.transform_detections_2d(boxes, ) new_datum['bounding_box_2d'] = boxes diff --git a/dgp/annotations/transforms.py b/dgp/annotations/transforms.py index fa6b863b..3390a1b9 100644 --- a/dgp/annotations/transforms.py +++ b/dgp/annotations/transforms.py @@ -1,5 +1,8 @@ -# Copyright 2021 Toyota Research Institute. All rights reserved. +# Copyright 2021-2022 Woven Planet. All rights reserved. from collections import OrderedDict +from typing import Any, Dict + +import numpy as np from dgp.annotations import ONTOLOGY_REGISTRY from dgp.annotations.transform_utils import ( @@ -8,6 +11,7 @@ remap_instance_segmentation_2d_annotation, remap_semantic_segmentation_2d_annotation, ) +from dgp.utils.accumulate import points_in_cuboid class Compose: @@ -198,3 +202,53 @@ def transform_datum(self, datum): ) return datum + + +class AddLidarCuboidPoints(BaseTransform): + """Populate the num_points field for bounding_box_3d""" + def __init__(self, subsample: int = 1) -> None: + """Populate the num_points field for bounding_box_3d. Optionally downsamples the point cloud for speed. + + Parameters + ---------- + subsample: int, default: 1 + Fraction of point cloud to use for computing the number of points. i.e., subsample=10 indicates that + 1/10th of the points should be used. + """ + super().__init__() + self.subsample = subsample + + def transform_datum(self, datum: Dict[str, Any]) -> Dict[str, Any]: + """Populate the num_points field for bounding_box_3d + Parameters + ---------- + datum: Dict[str,Any] + A dgp lidar or point cloud datum. Must contain keys bounding_box_3d and point_cloud + + Returns + ------- + datum: Dict[str,Any] + The datum with num_points added to the cuboids + """ + if 'bounding_box_3d' not in datum: + return datum + + boxes = datum['bounding_box_3d'] + if boxes is None or len(boxes) == 0: + return datum + + assert 'point_cloud' in datum, 'datum should contain point_cloud key' + point_cloud = datum['point_cloud'] + if self.subsample > 1: + N = point_cloud.shape[0] + sample_idx = np.random.choice(N, N // self.subsample) + point_cloud = point_cloud[sample_idx].copy() + + for box in boxes: + # If a box is missing this num_points value we expect it will have a value of 0 + # so only run this for boxes that might be missing the value + if box.num_points == 0: + in_cuboid = points_in_cuboid(point_cloud, box) + box._num_points = np.sum(in_cuboid) * self.subsample + + return datum diff --git a/dgp/contribs/dgp2wicker/.dockerignore b/dgp/contribs/dgp2wicker/.dockerignore new file mode 100644 index 00000000..affdbf72 --- /dev/null +++ b/dgp/contribs/dgp2wicker/.dockerignore @@ -0,0 +1,38 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# macOS files +.DS_Store + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# IDE +.idea/ +\.vscode/ diff --git a/dgp/contribs/dgp2wicker/Dockerfile b/dgp/contribs/dgp2wicker/Dockerfile new file mode 100644 index 00000000..f4c73f91 --- /dev/null +++ b/dgp/contribs/dgp2wicker/Dockerfile @@ -0,0 +1,11 @@ +FROM dgp:latest + +RUN apt-get update && apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends \ + openjdk-11-jre-headless + +ARG WORKSPACE=/home/dgp2wicker +WORKDIR ${WORKSPACE} +COPY . ${WORKSPACE} +COPY sample_wickerconfig.json /root/wickerconfig.json +RUN pip install --editable . +ENV PYTHONPATH="${WORKSPACE}:$PYTHONPATH" diff --git a/dgp/contribs/dgp2wicker/Makefile b/dgp/contribs/dgp2wicker/Makefile new file mode 100644 index 00000000..40641c66 --- /dev/null +++ b/dgp/contribs/dgp2wicker/Makefile @@ -0,0 +1,44 @@ +# Copyright 2022 Woven Planet. All rights reserved. +PYTHON ?= python3 +PACKAGE_NAME ?= dgp2wicker +WORKSPACE ?= /home/$(PACKAGE_NAME) +DOCKER_IMAGE_NAME ?= $(PACKAGE_NAME) +DOCKER_IMAGE ?= $(DOCKER_IMAGE_NAME):latest +DOCKER_OPTS ?= \ + -it \ + --rm \ + --shm-size=62G \ + -e AWS_DEFAULT_REGION \ + -e AWS_ACCESS_KEY_ID \ + -e AWS_SECRET_ACCESS_KEY \ + -e AWS_SESSION_TOKEN\ + -e AWS_PROFILE \ + -e VAULT_ASSUMED_ROLE \ + -e WICKER_CONFIG_PATH \ + -e DISPLAY=${DISPLAY} \ + -v $(PWD):$(WORKSPACE) + +develop: + pip install --editable . + +clean: + $(PYTHON) setup.py clean && \ + rm -rf build dist && \ + find . -name "*.pyc" | xargs rm -f && \ + find . -name "__pycache__" | xargs rm -rf + find . -name "*egg-info" | xargs rm -rf + +docker-build: + docker build \ + --build-arg WORKSPACE=$(WORKSPACE) \ + -t $(DOCKER_IMAGE) . + +docker-run: + docker run \ + --name $(PACKAGE_NAME) \ + $(DOCKER_OPTS) $(DOCKER_IMAGE) $(COMMAND) + +docker-start-interactive: + docker run \ + $(DOCKER_OPTS) \ + $(DOCKER_IMAGE) bash \ No newline at end of file diff --git a/dgp/contribs/dgp2wicker/dgp2wicker/__init__.py b/dgp/contribs/dgp2wicker/dgp2wicker/__init__.py new file mode 100644 index 00000000..a5664a60 --- /dev/null +++ b/dgp/contribs/dgp2wicker/dgp2wicker/__init__.py @@ -0,0 +1,3 @@ +# Copyright 2022 Woven Planet. All rights reserved. + +__version__ = '1.0.0' \ No newline at end of file diff --git a/dgp/contribs/dgp2wicker/dgp2wicker/cli.py b/dgp/contribs/dgp2wicker/dgp2wicker/cli.py new file mode 100644 index 00000000..f01800ee --- /dev/null +++ b/dgp/contribs/dgp2wicker/dgp2wicker/cli.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# Copyright 2022 Woven Planet NA. All rights reserved. +"""dgp2wicker command line interface +""" +import logging +import os +import sys +from functools import partial +from typing import Any, Dict, List + +import click +from dgp2wicker.ingest import ingest_dgp_to_wicker + +from dgp.annotations.camera_transforms import ScaleAffineTransform +from dgp.annotations.transforms import AddLidarCuboidPoints + + +class AddLidarCuboidPointsContext(AddLidarCuboidPoints): + """Add Lidar Points but applied to samples not datums""" + def __call__(self, sample: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + new_sample = [] + for datum in sample: + if datum['datum_type'] == 'point_cloud' and 'bounding_box_3d' in datum: + if datum['bounding_box_3d'] is not None: + datum = super().__call__(datum) + new_sample.append(datum) + return new_sample + + +class ScaleImages(ScaleAffineTransform): + """Scale Transform but applied to samples not datums""" + def __call__(self, sample: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + new_sample = [] + for datum in sample: + if datum['datum_type'] == 'image' and 'rgb' in datum: + datum = super().__call__(datum) + new_sample.append(datum) + return new_sample + + +@click.group() +@click.version_option() +def cli(): + logging.getLogger('dgp2widker').setLevel(level=logging.INFO) + logging.getLogger('py4j').setLevel(level=logging.CRITICAL) + logging.getLogger('botocore').setLevel(logging.CRITICAL) + logging.getLogger('boto3').setLevel(logging.CRITICAL) + logging.getLogger('PIL').setLevel(logging.CRITICAL) + + +@cli.command(name='ingest') +@click.option("--scene-dataset-json", required=True, help="Path to DGP Dataset JSON") +@click.option("--wicker-dataset-name", required=True, default=None, help="Name of dataset in Wicker") +@click.option("--wicker-dataset-version", required=True, help="Version of dataset in Wicker") +@click.option("--datum-names", required=True, help="List of datum names") +@click.option("--requested-annotations", help="List of annotation types") +@click.option("--only-annotated-datums", is_flag=True, help="Apply only annotated datums") +@click.option("--max-num-scenes", required=False, default=None, help="The maximum number of scenes to process") +@click.option("--max-len", required=False, default=1000, help="The maximum number of samples per scene") +@click.option("--chunk-size", required=False, default=1000, help="The number of samples per chunk") +@click.option("--skip-camera-cuboids", is_flag=True, help="If True, skip cuboids for non lidar datums") +@click.option("--num-partitions", required=False, default=None, help="Number of scene partitions") +@click.option("--num-repartitions", required=False, default=None, help="Number of sample partitions") +@click.option("--is-pd", is_flag=True, help="If true, process the dataset with ParallelDomainScene") +@click.option("--data-uri", required=False, default=None, help="Alternate location for scene data") +@click.option("--add-lidar-points", is_flag=True, help="Add lidar point count to lidar cuboids") +@click.option("--half-size-images", is_flag=True, help="Resize image datums to half size") +def ingest( + scene_dataset_json, + wicker_dataset_name, + wicker_dataset_version, + datum_names, + requested_annotations, + only_annotated_datums, + max_num_scenes, + max_len, + chunk_size, + skip_camera_cuboids, + num_partitions, + num_repartitions, + is_pd, + data_uri, + add_lidar_points, + half_size_images, +): + datum_names = [x.strip() for x in datum_names.split(',')] + requested_annotations = [x.strip() for x in requested_annotations.split(',')] if requested_annotations else None + dataset_kwargs = { + 'datum_names': datum_names, + 'requested_annotations': requested_annotations, + 'only_annotated_datums': only_annotated_datums, + } + + pipeline = [] + if add_lidar_points: + pipeline.append(AddLidarCuboidPointsContext()) + if half_size_images: + pipeline.append(ScaleImages(s=.5)) + + results = ingest_dgp_to_wicker( + scene_dataset_json=scene_dataset_json, + wicker_dataset_name=wicker_dataset_name, + wicker_dataset_version=wicker_dataset_version, + dataset_kwargs=dataset_kwargs, + spark_context=None, + pipeline=pipeline, + max_num_scenes=max_num_scenes, + max_len=max_len, + chunk_size=chunk_size, + skip_camera_cuboids=skip_camera_cuboids, + num_partitions=num_partitions, + num_repartitions=num_repartitions, + is_pd=is_pd, + data_uri=data_uri, + ) + + print(results) + + +if __name__ == '__main__': + cli() diff --git a/dgp/contribs/dgp2wicker/dgp2wicker/dataset.py b/dgp/contribs/dgp2wicker/dgp2wicker/dataset.py new file mode 100644 index 00000000..1e285e87 --- /dev/null +++ b/dgp/contribs/dgp2wicker/dgp2wicker/dataset.py @@ -0,0 +1,231 @@ +# Copyright 2022 Woven Planet NA. All rights reserved. +"""Dataloader for DGP SynchornizedScene Wicker datasets""" +# pylint: disable=missing-param-doc +import logging +from collections import OrderedDict, defaultdict +from typing import Any, Dict, List, Optional + +from dgp2wicker.ingest import ( + FIELD_TO_WICKER_SERIALIZER, + ILLEGAL_COMBINATIONS, + gen_wicker_key, + parse_wicker_key, +) +from dgp2wicker.serializers import OntologySerializer +from wicker.core.datasets import S3Dataset # type: ignore + +from dgp.annotations import ANNOTATION_REGISTRY # type: ignore +from dgp.annotations.ontology import Ontology # type: ignore + +logger = logging.getLogger(__name__) + + +def compute_columns( + datum_names: List[str], + datum_types: List[str], + requested_annotations: List[str], + cuboid_datum: Optional[str] = None, + with_ontology_table: bool = True, +) -> List[str]: + """Method to parse requested datums, types, and annotations into keys for fetching from wicker. + + Parameters + ---------- + datum_names: List + List of datum names to load. + + datum_types: List + List of datum types i.e, 'image', 'point_cloud', 'radar_point_cloud'. + + requested_annotations: List + List of annotation types to load i.e. 'bounding_box_3d', 'depth' etc. + + cuboid_datum: str, default: None + Optional datum name to restrict loading of bounding_box_3d annotations to a single datum. + For example if we do not desire to load bounding_box_3d for both the lidar datum and every + image datum, we would set this field to 'lidar'. + + with_ontology_table: bool, default: True + Flag to add loading of ontology tables + + Returns + ------- + columns_to_load: List + A list of keys to fetch from wicker. + """ + # Compute the requested columns. + columns_to_load = ['scene_index', 'scene_uri', 'sample_index_in_scene'] + for datum_name, datum_type in zip(datum_names, datum_types): + fields = ['timestamp', 'pose', 'extrinsics', 'datum_type'] + + # Add extra information for different datum type. + if datum_type == 'image': + fields.extend(['intrinsics', 'rgb', 'distortion']) + elif datum_type in ['point_cloud', 'radar_point_cloud']: + fields.extend(['point_cloud', 'extra_channels']) + + if requested_annotations is not None: + fields.extend(requested_annotations) + + for annotation in fields: + if (datum_type, annotation) in ILLEGAL_COMBINATIONS: + #print('skip', datum_name, datum_type, annotation) + continue + + # If this is a cuboid annotation, optionally, only add it for a specific datum + if annotation == 'bounding_box_3d' and cuboid_datum is not None: + if datum_name != cuboid_datum: + #print('skip', datum_name, datum_type, annotation) + continue + columns_to_load.append(gen_wicker_key(datum_name, annotation)) + + if with_ontology_table and requested_annotations is not None: + for ann in requested_annotations: + if ann in ANNOTATION_REGISTRY: + columns_to_load.append(gen_wicker_key('ontology', ann)) + + return columns_to_load + + +class DGPS3Dataset(S3Dataset): + """ + S3Dataset for data stored in dgp synchronized scene format in wicker. This is a baseclass + inteded for use with all DGP wicker datasets. It handles conversion from wicker binary formats + to DGP datum and annotation objects + """ + def __init__(self, *args: Any, wicker_sample_index: Optional[List[List[int]]] = None, **kwargs: Any) -> None: + """S3Dataset for data stored in dgp synchronized scene format in wicker. This is a baseclass + inteded for use with all DGP wicker datasets. It handles conversion from wicker binary formats + to DGP datum and annotation objects. + + Parameters + ---------- + wicker_sample_index: List[List[int]], default: None + A mapping from this dataset's index to a list of wicker indexes. If None, a mappind for all + single frames will be generated. + """ + super().__init__(*args, **kwargs) + + # All datasets will need a mapping from this datasets index -> [ wicker indexes ] + self.wicker_sample_index: List[List[int]] = [[]] + if wicker_sample_index is None: + N = super().__len__() + self.wicker_sample_index = [[k] for k in range(N)] + else: + self.wicker_sample_index = wicker_sample_index + + self._ontology_table: Optional[Dict[str, Ontology]] = None + + @property + def ontology_table(self) -> Optional[Dict[str, Ontology]]: + """Return the ontology table if any. + + Returns + ------- + ontology_table: Dict + The ontology table or None if an ontology table has not been assigned with self._create_ontology_table. + """ + return self._ontology_table + + def __len__(self) -> int: + """ Number of samples in dataset + + Returns + ------- + length: int + The number of samples in the dataset + """ + return len(self.wicker_sample_index) + + def _create_ontology_table(self, raw_wicker_sample: Dict[str, Any]) -> Dict[str, Ontology]: + """"Create ontology table based on given wicker item. + + Parameters + ---------- + raw_wicker_sample: Dict + A raw wicker sample containing ontology keys, ex: ontology___bounding_box_3d etc. + + Returns + ------- + ontology_table: Dict + A dictionary keyed by annotation name holding an ontology for that annotation. + """ + # Set the ontologies only once. + # NOTE: We assume every sample has the same ontology + ontology_table = {} + ontology_keys = [key for key in raw_wicker_sample if 'ontology' in key] + for key in ontology_keys: + _, ontology_type = parse_wicker_key(key) + serializer = OntologySerializer(ontology_type) + ontology_table[ontology_type] = serializer.unserialize(raw_wicker_sample[key]) + + return ontology_table + + def _process_raw_wicker_sample(self, raw_wicker_sample: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Parse raw data from wicker into datums/fields. + + Parameters + ---------- + raw_wicker_sample: Dict + The raw output from wicker S3Dataset. + + Returns + ------- + sample_dict: Dict[str, Dict[str,Any]] + A dictionary keyed by datum name holding DGP SynchronizedScene like datums. + """ + # Lots of annotations require an ontology table, so we process those first + ontology_table = self._create_ontology_table(raw_wicker_sample) + + if self.ontology_table is None: + self._ontology_table = ontology_table + else: + # Make sure the ontology table has not changed. We expect this to be constant across a dataset + assert set(ontology_table.keys()) == set(self.ontology_table.keys()) + for field in self.ontology_table: + assert self.ontology_table[field] == ontology_table[field] + + output_dict: Dict[str, Dict[str, Any]] = defaultdict(OrderedDict) + for key, raw in raw_wicker_sample.items(): + if key in ['scene_uri', 'scene_index', 'sample_index_in_scene']: + output_dict['meta'][key] = raw + continue + if 'ontology' in key: + continue + + datum_name, field = parse_wicker_key(key) + serializer = FIELD_TO_WICKER_SERIALIZER[field]() + if hasattr(serializer, 'ontology'): + serializer.ontology = self.ontology_table[field] + + output_dict[datum_name]['datum_name'] = datum_name + output_dict[datum_name][field] = serializer.unserialize(raw) + + if 'meta' in output_dict: + output_dict['meta']['datum_name'] = 'meta' + output_dict['meta']['datum_type'] = 'meta' + + return output_dict + + def __getitem__(self, index: int) -> List[Dict[str, Dict[str, Any]]]: + """"Get the dataset item at index. + + Parameters + ---------- + index: int + The index to get. + + Returns + ------- + context: List + A context window with samples as dicts keyed by datum name. + """ + wicker_samples = self.wicker_sample_index[index] + idx = wicker_samples[0] + context = [] + for idx in wicker_samples: + raw = super().__getitem__(idx) + sample = self._process_raw_wicker_sample(raw) + context.append(sample) + + return context diff --git a/dgp/contribs/dgp2wicker/dgp2wicker/ingest.py b/dgp/contribs/dgp2wicker/dgp2wicker/ingest.py new file mode 100644 index 00000000..d5d62fe4 --- /dev/null +++ b/dgp/contribs/dgp2wicker/dgp2wicker/ingest.py @@ -0,0 +1,558 @@ +# Copyright 2022 Woven Planet. All rights reserved. +"""DGP to Wicker ingestion methods +""" +# pylint: disable=missing-param-doc +import logging +import os +import tempfile +import time +import traceback +from collections import OrderedDict +from copy import deepcopy +from functools import partial +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union + +import dgp2wicker.serializers as ws +import numpy as np +import pyspark +import wicker +import wicker.plugins.spark as wsp +from wicker.schema import IntField, StringField + +from dgp.datasets import ParallelDomainScene, SynchronizedScene +from dgp.proto import dataset_pb2 +from dgp.proto.dataset_pb2 import SceneDataset +from dgp.utils.cloud.s3 import sync_dir +from dgp.utils.protobuf import open_pbobject + +ILLEGAL_COMBINATIONS = set([('point_cloud', 'depth'), ('point_cloud', 'semantic_segmentation_2d'), + ('point_cloud', 'instance_segmentation_2d'), ('point_cloud', 'bounding_box_2d')]) + +WICKER_KEY_SEPARATOR = '____' + +# Map keys in SynchronizedScene output to wicker serialization methods +FIELD_TO_WICKER_SERIALIZER = { + 'datum_type': ws.StringSerializer, + 'intrinsics': ws.IntrinsicsSerializer, + 'distortion': ws.DistortionSerializer, + 'extrinsics': ws.PoseSerializer, + 'pose': ws.PoseSerializer, + 'rgb': ws.RGBSerializer, + 'timestamp': ws.LongSerializer, + 'point_cloud': ws.PointCloudSerializer, + 'extra_channels': ws.PointCloudSerializer, + 'bounding_box_2d': ws.BoundingBox2DSerializer, + 'bounding_box_3d': ws.BoundingBox3DSerializer, + 'semantic_segmentation_2d': ws.SemanticSegmentation2DSerializer, + 'instance_segmentation_2d': ws.InstanceSegmentation2DSerializer, + 'depth': ws.DepthSerializer, +} + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def gen_wicker_key(datum_name: str, field: str) -> str: + """Generate a key from a datum name and field i.e 'rgb', 'pose' etc + + Parameters + ---------- + datum_name: str + The name of the datum + + field: str + The field of the datum + + Returns + ------- + key: str + The wicker key name formed from datum_name and field + """ + return f'{datum_name}{WICKER_KEY_SEPARATOR}{field}' + + +def parse_wicker_key(key: str) -> Tuple[str, str]: + """Parse a wicker dataset key into a datum and field combination + + Parameters + ---------- + key: str + The wicker key name formed from datum_name and field + + Returns + ------- + datum_name: str + The name of the datum + + field: str + The field of the datum + """ + return tuple(key.split(WICKER_KEY_SEPARATOR)) # type: ignore + + +def wicker_types_from_sample( + sample: List[List[Dict]], + ontology_table: Optional[Dict] = None, + skip_camera_cuboids: bool = True, +) -> Dict[str, Any]: + """Get the wicker keys and types from an existing dgp sample. + + Parameters + ---------- + sample: List[List[Dict]] + SynchronizedSceneDataset-style sample datum. + + ontology_table: Dict, default: None + A dictionary mapping annotation key(s) to Ontology(s). + + skip_camera_cuboids: bool, default: True + Flag to skip processing bounding_box_3d for image datums + + Returns + ------- + wicker_types: List + The Wicker schema types corresponding to the `wicker_keys`. + """ + wicker_types = OrderedDict() + for datum in sample: + datum_name = datum['datum_name'] + datum_type = datum['datum_type'] + for k, v in datum.items(): + if k == 'datum_name' or (datum_type, k) in ILLEGAL_COMBINATIONS: + continue + if datum_type == 'image' and k == 'bounding_box_3d' and skip_camera_cuboids: + continue + key = gen_wicker_key(datum_name, k) + serializer = FIELD_TO_WICKER_SERIALIZER[k] + wicker_types[key] = serializer().schema(key, v) + + if ontology_table is not None: + for k, v in ontology_table.items(): + key = gen_wicker_key('ontology', k) + wicker_types[key] = ws.OntologySerializer(k).schema(key, v) + + wicker_types['scene_index'] = IntField('scene_index') + wicker_types['sample_index_in_scene'] = IntField('sample_index_in_scene') + wicker_types['scene_uri'] = StringField('scene_uri') + + return wicker_types + + +def dgp_to_wicker_sample( + sample: List[List[Dict]], + wicker_keys: List[str], + scene_index: Optional[int], + sample_index_in_scene: Optional[int], + ontology_table: Optional[Dict], + scene_uri: Optional[str], +) -> Dict: + """Convert a DGP sample to the Wicker format. + + Parameters + ---------- + sample: List[List[Dict]] + SynchronizedSceneDataset-style sample datum. + + wicker_keys: List[str] + Keys to be used in Wicker. + + scene_index: int, default: None + Index of current scene. + + sample_index_in_scene: int, default: None + Index of the sample in current scene. + + ontology_table: Dict, default: None + A dictionary mapping annotation key(s) to Ontology(s). + + scene_uri: str + Relative path to this specific scene json file. + + Returns + ------- + wicker_sample: Dict + DGP sample in the Wicker format. + """ + wicker_sample = {} + for datum in sample: + datum_name = datum['datum_name'] + for k, v in datum.items(): + key = gen_wicker_key(datum_name, k) + if key not in wicker_keys: + continue + serializer = FIELD_TO_WICKER_SERIALIZER[k] + wicker_sample[key] = serializer().serialize(v) + + if ontology_table is not None: + for k, v in ontology_table.items(): + key = gen_wicker_key('ontology', k) + wicker_sample[key] = ws.OntologySerializer(k).serialize(v) + + wicker_sample['scene_index'] = scene_index + wicker_sample['sample_index_in_scene'] = sample_index_in_scene + wicker_sample['scene_uri'] = scene_uri + + return wicker_sample + + +def get_scenes(scene_dataset_json: str, data_uri: Optional[str] = None) -> List[Tuple[int, str, str]]: + """Get all the scene files from scene_dataset_json + + Parameters + ---------- + scene_dataset_json: str + Path ot dataset json in s3 or local. + + data_uri: str, default: None + Optional path to location of raw data. If None, we assume the data is stored alongside scene_dataset_json. + + Returns + ------- + scenes: List[int,str,str] + A list of tuples(, , ) for each scene in scene_dataset_json. + """ + if data_uri is None: + data_uri = os.path.dirname(scene_dataset_json) + + dataset = open_pbobject(scene_dataset_json, SceneDataset) + split_id_to_name = { + dataset_pb2.TRAIN: 'train', + dataset_pb2.VAL: 'val', + dataset_pb2.TEST: 'test', + dataset_pb2.TRAIN_OVERFIT: 'train_overfit', + } + + scenes = [] + for k in dataset.scene_splits: + files = [(split_id_to_name[k], os.path.join(data_uri, f)) for f in dataset.scene_splits[k].filenames] + scenes.extend(files) + logger.info(f'found {len(files)} in split {split_id_to_name[k]}') + + logger.info(f'found {len(scenes)} in {scene_dataset_json}') + + # Add the scene index + scenes = [(k, *x) for k, x in enumerate(scenes)] + + return scenes + + +def chunk_scenes(scenes: List[Tuple[int, str, str]], + max_len: int = 200, + chunk_size: int = 100) -> List[Tuple[int, str, str, Tuple[int, int]]]: + """Split each scene into chunks of max length chunk_size samples. + + Parameters + ---------- + scenes: List[str,str] + List of scene split/path tuples. + + max_len: int, default: 200 + Expected maximum length of each scene. + + chunk_size: int, default: 100 + Maximum size of each chunk. + + Returns + ------- + scenes: List[Tuple[int,str,str,Tuple[int,int]]] + A list of scenes with (, , , (, )) tuples + """ + new_scenes = [] + # Note by using chunks, we download the same scene multiple times. + for c in range(max_len // chunk_size): + chunk = (int(c * chunk_size), int((c + 1) * chunk_size)) + new_scenes.extend([(*x, chunk) for x in scenes]) + return new_scenes + + +def local_spark() -> pyspark.SparkContext: + """Generate a spark context for local testing of small datasets + + Returns + ------- + spark_context: A spark context + """ + spark = pyspark.sql.SparkSession.builder \ + .master('local[*]') \ + .appName("dgp2wicker") \ + .config("spark.driver.memory", "56G") \ + .config("spark.executor.memory", "56G") \ + .config("spark.dynamicAllocation.enabled", "true") \ + .config("spark.dynamicAllocation.maxExecutors", "4") \ + .config("spark.dynamicAllocation.minExecutors","1") \ + .config("spark.executor.cores", "1") \ + .config('spark.task.maxFailures', '4') \ + .config('spark.driver.maxResultSize','4G') \ + .config('spark.python.worker.memory','24G')\ + .getOrCreate() + return spark.sparkContext + + +def ingest_dgp_to_wicker( + scene_dataset_json: str, + wicker_dataset_name: str, + wicker_dataset_version: str, + dataset_kwargs: Dict, + spark_context: pyspark.SparkContext, + pipeline: Optional[List[Callable]] = None, + max_num_scenes: int = None, + max_len: int = 1000, + chunk_size: int = 1000, + skip_camera_cuboids: bool = True, + num_partitions: int = None, + num_repartitions: int = None, + is_pd: bool = False, + data_uri: str = None, +) -> Dict[str, int]: + """Ingest DGP dataset into Wicker datastore + + Parameters + ---------- + scene_dataset_json: str + Path to scenes dataset json. + + wicker_dataset_name: str + Name of the dataset used in Wicker datastore. + + wicker_dataset_version: str + Semantic version of the dataset (i.e., xxx.xxx.xxx). + + spark_context: pyspark.SparkContext, default: None + A spark context. If None, will generate one using dgp2wicker.ingest.local_spark() and default settings + + dataset_kwargs: dict + Arguments for dataloader. + + spark_context: A spark context, default: None + A spark context. If None, will use a local spark context. + + pipeline: List[Callable], default: None + A list of transformations to apply to every sample. + + max_num_scenes: int, default: None + An optional upper bound on the number of scenes to process. Typically used for testing. + + max_len: int, default: 1000 + Maximum expected length of a scene + + chunk_size: int, default: 1000 + Chunk size to split scenes into. If less than max_len, the same scene will be downloaded + multiple times. + + skip_camera_cuboids: bool, default: True + Optional to flag to skip converting 'bounding_box_3d' for image_datum types. + + num_partitions: int, default=None + Number of partitions to map scenes over. If None, defaults to number of scenes + + num_repartitions: int, default=None + Number of partitions to shuffle all samples over. If none, defaults to num_scenes*5 + + is_pd: bool, default=False + Flag to indicate if the dataset to laod is a Parallel Domain dataset. If true, the scenes + will be loaded with ParallelDomainScene with use_virtual_cameras set to False. + + data_uri: str + Optional path to raw data location if raw data is not stored alongside scene_dataset_json. + """ + def open_scene( + scene_json_uri: str, + temp_dir: str, + dataset_kwargs: Dict[str, Any], + ) -> Union[SynchronizedScene, ParallelDomainScene]: + """Utility function to download a scene and open it + + Parameters + ---------- + scene_json_uri: str + Path to scene json. + + temp_dir: str + Path to directory to store scene if downloaded from s3. Not used if scene_json is local + + dataset_kwargs: dict + Arguments for data loader. i.e, datum_names, requested annotations etc. If this is a PD scene + the dataset_kwargs should contain an `is_pd` key set to True. + + Returns + ------- + dataset: A DGP dataset + """ + scene_dir_uri = os.path.dirname(scene_json_uri) + scene_json = os.path.basename(scene_json_uri) + + if scene_dir_uri.startswith('s3://'): + # If the scene is in s3, fetch it + local_path = temp_dir + logger.info(f'downloading scene from {scene_dir_uri} to {local_path}') + assert not temp_dir.startswith('s3'), f'{temp_dir}' + sync_dir(scene_dir_uri, local_path) + else: + # Otherwise we expect the scene is on disk somewhere, so we just ignore the temp_dir + local_path = scene_dir_uri + logger.info(f'Using local scene from {scene_dir_uri}') + + dataset_kwargs = deepcopy(dataset_kwargs) + dataset_kwargs['scene_json'] = os.path.join(local_path, scene_json) + + is_pd = dataset_kwargs.pop('is_pd') + + if is_pd: + dataset_kwargs['use_virtual_camera_datums'] = False + dataset = ParallelDomainScene(**dataset_kwargs) + else: + dataset = SynchronizedScene(**dataset_kwargs) + + return dataset + + def process_scene( + partition: List[Tuple[int, str, str, Tuple[int, int]]], dataset_kwargs: Dict, pipeline: List[Callable], + wicker_types: List[str] + ) -> Generator[Tuple[str, Any], None, None]: + """Main task to parrallelize. This takes a list of scene chunks and sequentially + downloads the scene to a temporary directory, opens the scene, applies any transformations, + and yields wicker serialized samples. + + Parameters + ---------- + partition: tuple + A list of scenes to process with this spark partition. + Each entry should be a tuple with . + + dataset_kwargs: Dict + Arguments for data loader. See open_scene for details. + + pipeline: List[Callable] + List of transformations to apply to samples. + + wicker_types: List + A list of keys in the wicker schema. + + Returns + ------- + wicker_sample: (split, sample) + Yields a wicker converted sample + """ + for scene_info in partition: + yield_count = 0 + global_scene_index, split, scene_json_uri, chunk = scene_info + chunk_start, chunk_end = chunk + scene_dir_uri = os.path.dirname(scene_json_uri) + scene_json = os.path.basename(scene_json_uri) + + st = time.time() + with tempfile.TemporaryDirectory() as temp_dir: + # TODO (chris.ochoa): check the number of samples before syncing the entire scene + try: + # Download and open the scene + dataset = open_scene(scene_json_uri, temp_dir, dataset_kwargs) + except Exception as e: + logger.error(f'Failed to open {scene_json_uri}, skipping...') + logger.error(e) + traceback.print_exc() + continue + + ontology_table = dataset.dataset_metadata.ontology_table + + for i in range(chunk_start, chunk_end): + if i >= len(dataset): + break + + try: + _, sample_index_in_scene, _ = dataset.dataset_item_index[i] + sample = dataset[i][0] # explicitly using 0 context here + + # Apply any transformations + for transform in pipeline: + sample = transform(sample) + + wicker_sample = dgp_to_wicker_sample( + sample, + wicker_keys=wicker_types.keys(), + scene_index=int(global_scene_index), + sample_index_in_scene=int(sample_index_in_scene), + ontology_table=ontology_table, + scene_uri=os.path.join(os.path.basename(scene_dir_uri), scene_json), + ) + + assert wicker_sample is not None + for k, v in wicker_sample.items(): + assert v is not None, f'{k} has invalid wicker serialized item' + + yield_count += 1 + yield (split, deepcopy(wicker_sample)) + + except Exception as e: + logger.error('failed to get sample, skipping...') + logger.error(e) + traceback.print_exc() + continue + + dt = time.time() - st + logger.info( + f'Finished {global_scene_index} {split}/{scene_dir_uri}, chunk:{chunk_start}->{chunk_end}.\ + Yielded {yield_count}, took {dt:.2f} seconds' + ) + + dataset_kwargs['is_pd'] = is_pd + + if pipeline is None: + pipeline = [] + + # Parse the dataset json and get the scene list. This is a list of tuple split, fully qualified scene uri + scenes = get_scenes(scene_dataset_json, data_uri=data_uri) + if max_num_scenes is not None: + scenes = scenes[:int(max_num_scenes)] + + if num_partitions is None: + num_partitions = len(scenes) + + if num_repartitions is None: + num_repartitions = 5 * len(scenes) + + # Use first sample to obtain schema. NOTE: We actually don't need the sample here for this + # TODO (chris.ochoa): generate the keys we expect for a specific combination of datums/annotations/ontologies + _, _, scene_json_uri = scenes[0] + with tempfile.TemporaryDirectory() as temp_dir: + dataset = open_scene(scene_json_uri, temp_dir, dataset_kwargs) + logger.info(f'Got dataset with {len(dataset)} samples') + sample = dataset[0][0] + # Apply any transformations + for transform in pipeline: + sample = transform(sample) + # TODO (chris.ochoa): make sure this has all the keys we care about in this ontology! + # it is possible (though never encountered) that for some reason the first scene may not have annotations + # and therefore no ontologies and conversion will fail. + ontology_table = dataset.dataset_metadata.ontology_table + + # Build the schema from the sample + wicker_types = wicker_types_from_sample( + sample=sample, + ontology_table=ontology_table, + skip_camera_cuboids=skip_camera_cuboids, + ) + + wicker_dataset_schema = wicker.schema.DatasetSchema( + primary_keys=['scene_index', 'sample_index_in_scene'], fields=wicker_types.values() + ) + + # Chunk the scenes + scenes = chunk_scenes(scenes, max_len=max_len, chunk_size=chunk_size) + + # Shuffle the scenes + scenes = np.random.permutation(scenes).tolist() + + # Setup spark + if spark_context is None: + spark_context = local_spark() + + process = partial(process_scene, dataset_kwargs=dataset_kwargs, pipeline=pipeline, wicker_types=wicker_types) + rdd = spark_context.parallelize(scenes, + numSlices=num_partitions).mapPartitions(process).repartition(num_repartitions) + + return wsp.persist_wicker_dataset( + wicker_dataset_name, + wicker_dataset_version, + wicker_dataset_schema, + rdd, + ) diff --git a/dgp/contribs/dgp2wicker/dgp2wicker/serializers.py b/dgp/contribs/dgp2wicker/dgp2wicker/serializers.py new file mode 100644 index 00000000..8664c7f9 --- /dev/null +++ b/dgp/contribs/dgp2wicker/dgp2wicker/serializers.py @@ -0,0 +1,360 @@ +# Copyright 2022 Woven Planet. All rights reserved. +"""Wicker conversion methods""" +# pylint: disable=arguments-renamed +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring +# pylint: disable=missing-class-docstring +import base64 +import io +import json +from abc import ABC, abstractmethod +from typing import Any, Dict, Optional + +import cv2 +import numpy as np +from PIL import Image +from wicker.schema import BytesField, LongField, NumpyField, StringField + +from dgp.annotations import ( + ONTOLOGY_REGISTRY, + BoundingBox2DAnnotationList, + BoundingBox3DAnnotationList, + DenseDepthAnnotation, + PanopticSegmentation2DAnnotation, + SemanticSegmentation2DAnnotation, +) +from dgp.annotations.ontology import Ontology +from dgp.proto.annotations_pb2 import ( + BoundingBox2DAnnotations, + BoundingBox3DAnnotations, +) +from dgp.proto.ontology_pb2 import Ontology as OntologyV2Pb2 +from dgp.utils.pose import Pose +from dgp.utils.structures.bounding_box_2d import BoundingBox2D +from dgp.utils.structures.bounding_box_3d import BoundingBox3D + +WICKER_RAW_NONE_VALUE = b'\x00\x00\x00\x00' + +# NOTE: all of these unwicker methods would be vastly simpler if we modify dgp annotations to support +# saving/loading into file like objects. TODO: (chris.ochoa) + + +class WickerSerializer(ABC): + @abstractmethod + def schema(self, name: str, data: Any) -> Any: + """Returns a wicker schema object. + + Parameters + ---------- + name: str + The name of the key in the wicker schema. + + data: Any + Example raw data to serialize. Some wicker types need additional information, + for example numpy arrays need to know the shape. + + Returns + ------- + schema: The schema object. + """ + raise NotImplementedError + + def serialize(self, data: Any) -> Any: + """Convert data for consumption by native wicker types. + + Parameters + ---------- + data: Any + The raw data to convert. + + Returns + ------- + data: Any + The converted data. This should be consumable by the schema object returned by self.schema. + """ + return data + + def unserialize(self, raw: Any) -> Any: + """Convert raw wicker data to a useable type. + + Parameters + ---------- + raw: Any + The raw wicker data. Input depends on self.schema. Raw will be a string for wicker StringField, + a numpy array for NumpyField or bytes for ByteField etc. + + Returns + ------- + output: Any + The converted output + """ + return raw + + +class StringSerializer(WickerSerializer): + def schema(self, name: str, data: str): + return StringField(name) + + +class LongSerializer(WickerSerializer): + def schema(self, name: str, data: int): + return LongField(name, data) + + +class DistortionSerializer(WickerSerializer): + def schema(self, name: str, data: Any): + return StringField(name) + + def serialize(self, data: Dict[str, float]) -> str: + return json.dumps(data) + + def unserialize(self, raw: str) -> Dict[str, float]: + return json.loads(raw) + + +class PoseSerializer(WickerSerializer): + def schema(self, name: str, data: Any): + return NumpyField(name, shape=(4, 4), dtype='float64') + + def serialize(self, pose: Pose) -> np.ndarray: + return pose.matrix.astype(np.float64) + + def unserialize(self, raw: np.ndarray) -> Pose: + return Pose.from_matrix(raw) + + +class IntrinsicsSerializer(WickerSerializer): + def schema(self, name: str, data: Any): + return NumpyField(name, shape=(3, 3), dtype='float32') + + def serialize(self, K: np.ndarray) -> np.ndarray: + return K.astype(np.float32) + + +class RGBSerializer(WickerSerializer): + def schema(self, name: str, data: Any): + return BytesField(name, is_heavy_pointer=True) + + def serialize(self, rgb: Image.Image) -> bytes: + rgb_bytes = io.BytesIO() + rgb.save(rgb_bytes, "JPEG") + return rgb_bytes.getvalue() + + def unserialize(self, raw: np.ndarray) -> Image.Image: + return Image.open(io.BytesIO(raw)) + + +class PointCloudSerializer(WickerSerializer): + def schema(self, name: str, data: Any): + return NumpyField(name, shape=(-1, data.shape[1]), dtype='float32', is_heavy_pointer=True) + + def serialize(self, point_cloud: np.ndarray) -> np.ndarray: + return point_cloud.astype(np.float32) + + +class BoundingBox2DSerializer(WickerSerializer): + def __init__(self, ): + super().__init__() + self._ontology = None + + @property + def ontology(self) -> Ontology: + return self._ontology + + @ontology.setter + def ontology(self, ontology: Ontology): + self._ontology = ontology + + def schema(self, name: str, data: Any): + return BytesField(name, required=False, is_heavy_pointer=True) + + def serialize(self, annotation: Optional[BoundingBox2DAnnotationList]) -> bytes: + if annotation is None: + return WICKER_RAW_NONE_VALUE + return annotation.to_proto().SerializeToString() + + def unserialize(self, raw: bytes) -> BoundingBox2DAnnotationList: + if raw == WICKER_RAW_NONE_VALUE or self.ontology is None: + return None + + _box = BoundingBox2DAnnotations() + _box.ParseFromString(raw) + + boxlist = [ + BoundingBox2D( + box=np.float32([ann.box.x, ann.box.y, ann.box.w, ann.box.h]), + class_id=self.ontology.class_id_to_contiguous_id[ann.class_id], + instance_id=ann.instance_id, + color=self.ontology.colormap[ann.class_id], + attributes=getattr(ann, "attributes", {}), + ) for ann in _box.annotations + ] + + return BoundingBox2DAnnotationList(self.ontology, boxlist) + + +class BoundingBox3DSerializer(WickerSerializer): + def __init__(self, ): + super().__init__() + self.ontology = None + + @property + def ontology(self) -> Ontology: + return self._ontology + + @ontology.setter + def ontology(self, ontology: Ontology): + self._ontology = ontology + + def schema(self, name: str, data: Any): + return BytesField(name, required=False, is_heavy_pointer=True) + + def serialize(self, annotation: Optional[BoundingBox3DAnnotationList]) -> bytes: + if annotation is None: + return WICKER_RAW_NONE_VALUE + return annotation.to_proto().SerializeToString() + + def unserialize(self, raw: bytes) -> BoundingBox3DAnnotationList: + if raw == WICKER_RAW_NONE_VALUE or self.ontology is None: + return None + + _box = BoundingBox3DAnnotations() + _box.ParseFromString(raw) + + boxlist = [ + BoundingBox3D( + pose=Pose.load(ann.box.pose), + sizes=np.float32([ann.box.width, ann.box.length, ann.box.height]), + class_id=self.ontology.class_id_to_contiguous_id[ann.class_id], + instance_id=ann.instance_id, + color=self.ontology.colormap[ann.class_id], + attributes=getattr(ann, "attributes", {}), + num_points=ann.num_points, + occlusion=ann.box.occlusion, + truncation=ann.box.truncation + ) for ann in _box.annotations + ] + + return BoundingBox3DAnnotationList(self.ontology, boxlist) + + +class SemanticSegmentation2DSerializer(WickerSerializer): + def __init__(self, ): + super().__init__() + self.ontology = None + + @property + def ontology(self) -> Ontology: + return self._ontology + + @ontology.setter + def ontology(self, ontology: Ontology): + self._ontology = ontology + + def schema(self, name: str, data: Any): + return BytesField(name, required=False, is_heavy_pointer=True) + + def serialize(self, annotation: SemanticSegmentation2DAnnotation) -> bytes: + if annotation is None: + return WICKER_RAW_NONE_VALUE + + # Save the image as PNG + _, buffer = cv2.imencode(".png", annotation._segmentation_image) + return io.BytesIO(buffer).getvalue() + + def unserialize(self, raw: bytes) -> SemanticSegmentation2DAnnotation: + if raw == WICKER_RAW_NONE_VALUE or self.ontology is None: + return None + + raw_bytes = io.BytesIO(raw) + segmentation_image = cv2.imdecode(np.frombuffer(raw_bytes.getbuffer(), np.uint8), cv2.IMREAD_UNCHANGED) + + if len(segmentation_image.shape) == 3: + segmentation_image = segmentation_image[:, :, 2].copy(order='C') + + # Pixels of value VOID_ID are not remapped to a label. + not_ignore = segmentation_image != self.ontology.VOID_ID + segmentation_image[not_ignore] = self.ontology.label_lookup[segmentation_image[not_ignore]] + + return SemanticSegmentation2DAnnotation(self.ontology, segmentation_image) + + def set_ontology(self, ontology: Ontology): + self.ontology = ontology + + +class InstanceSegmentation2DSerializer(WickerSerializer): + def __init__(self, ): + super().__init__() + self.ontology = None + + @property + def ontology(self) -> Ontology: + return self._ontology + + @ontology.setter + def ontology(self, ontology: Ontology): + self._ontology = ontology + + def schema(self, name: str, data: Any): + return BytesField(name, required=False, is_heavy_pointer=True) + + def serialize(self, annotation: PanopticSegmentation2DAnnotation) -> bytes: + if annotation is None: + return WICKER_RAW_NONE_VALUE + + _, buffer = cv2.imencode(".png", annotation.panoptic_image) + panoptic_image = io.BytesIO(buffer).getvalue() + index_to_label = json.dumps(annotation.index_to_label).encode('utf-8') + ann_bytes = { + 'panoptic_image': base64.b64encode(panoptic_image).decode(), + 'index_to_label': base64.b64encode(index_to_label).decode() + } + return json.dumps(ann_bytes).encode('utf-8') + + def unserialize(self, raw: bytes) -> PanopticSegmentation2DAnnotation: + if raw == WICKER_RAW_NONE_VALUE or self.ontology is None: + return None + + raw_dict = json.loads(raw) + raw_bytes = io.BytesIO(base64.b64decode(raw_dict['panoptic_image'])) + panoptic_image = cv2.imdecode(np.frombuffer(raw_bytes.getbuffer(), np.uint8), cv2.IMREAD_UNCHANGED) + if len(panoptic_image.shape) == 3: + _L = panoptic_image + label_map = _L[:, :, 2] + 256 * _L[:, :, 1] + 256 * 256 * _L[:, :, 0] + panoptic_image = label_map.astype(PanopticSegmentation2DAnnotation.DEFAULT_PANOPTIC_IMAGE_DTYPE) + + raw_bytes = io.BytesIO(base64.b64decode(raw_dict['index_to_label'])) + index_to_label = json.loads(raw_bytes.getvalue()) + + return PanopticSegmentation2DAnnotation(self.ontology, panoptic_image, index_to_label) + + +class DepthSerializer(): + def schema(self, name: str, data: DenseDepthAnnotation): + return NumpyField(name, shape=data.depth.shape, dtype='float32', is_heavy_pointer=True) + + def serialize(self, depth: DenseDepthAnnotation) -> np.ndarray: + return depth.depth.astype(np.float32) + + def unserialize(self, raw: bytes) -> DenseDepthAnnotation: + return DenseDepthAnnotation(raw) + + +class OntologySerializer(): + def __init__(self, ontology_type: str): + super().__init__() + self.ontology_type = ontology_type + + def schema(self, name: str, data: Ontology): + return BytesField(name) + + def serialize(self, ontology: Ontology) -> bytes: + return ontology.to_proto().SerializeToString() + + def unserialize(self, raw: bytes) -> Ontology: + if raw == WICKER_RAW_NONE_VALUE: + return None + + ontology = OntologyV2Pb2() + ontology.ParseFromString(raw) + return ONTOLOGY_REGISTRY[self.ontology_type](ontology) diff --git a/dgp/contribs/dgp2wicker/readme.md b/dgp/contribs/dgp2wicker/readme.md new file mode 100644 index 00000000..42ca0b0a --- /dev/null +++ b/dgp/contribs/dgp2wicker/readme.md @@ -0,0 +1,77 @@ +# DGP SynchronizedScene to Wicker Conversion + +This adds support for using DGP data in [wicker](https://github.com/woven-planet/wicker) + +Specifically this saves the output of SynchronizedScene to wicker + +--- + +### Install + +```bash +cd dgp/contribs/dgp2wicker +pip install --editable . +``` + +or, use the included docker. +Note: the s3 location of the wicker datasets is specified in a required wicker config file, please see Wicker documentaiton for more details. An example sample_wickerconfig.json is included in the docker, this can be modified +with the s3 bucket path and will work with the docker. + +```bash +cd dgp/contribs/dgp2wicker +make docker-build +``` + +### Example + +#### Save dataset to wicker + +```bash +$dgp2wicker ingest \ +--scene-dataset-json \ +--wicker-dataset-name test_dataset \ +--wicker-dataset-version 0.0.1 \ +--datum-names camera_01,camera_02,lidar \ +--requested-annotations bounding_box_3d,semantic_segmentation_2d \ +--only-annotated-datums +``` + +#### Read dataset from wicker + +```python +from dgp2wicker.dataset import DGPS3Dataset, compute_columns + +columns = compute_columns(datum_names = ['camera_01','camera_02','lidar',],\ + datum_types = ['image','image','point_cloud',], \ + requested_annotations=['bounding_box_3d','semantic_segmentation_2d','depth',], \ + cuboid_datum = 'lidar',) + +dataset = DGPS3Dataset(dataset_name = 'test_dataset',\ + dataset_version = '0.0.1', \ + dataset_partition_name='train', \ + columns_to_load = columns,) + +context = dataset[0] +``` + +--- + +### Supported datums/annotations + +datums: + +- [x] image +- [x] point_cloud +- [ ] radar_point_cloud +- [ ] file_datum +- [ ] agent + +annotations: + +- [x] bounding_box_2d +- [x] bounding_box_3d +- [x] depth +- [x] semantic_segmentation_2d +- [x] instance_segmentation_2d +- [ ] key_point_2d +- [ ] key_line_2d diff --git a/dgp/contribs/dgp2wicker/requirements.txt b/dgp/contribs/dgp2wicker/requirements.txt new file mode 100644 index 00000000..edbf0a82 --- /dev/null +++ b/dgp/contribs/dgp2wicker/requirements.txt @@ -0,0 +1 @@ +wicker[spark] \ No newline at end of file diff --git a/dgp/contribs/dgp2wicker/sample_wickerconfig.json b/dgp/contribs/dgp2wicker/sample_wickerconfig.json new file mode 100644 index 00000000..5617ba20 --- /dev/null +++ b/dgp/contribs/dgp2wicker/sample_wickerconfig.json @@ -0,0 +1,6 @@ +{ + "aws_s3_config": { + "s3_datasets_path": "", + "region": "" + } +} \ No newline at end of file diff --git a/dgp/contribs/dgp2wicker/setup.py b/dgp/contribs/dgp2wicker/setup.py new file mode 100644 index 00000000..60f7f3b5 --- /dev/null +++ b/dgp/contribs/dgp2wicker/setup.py @@ -0,0 +1,53 @@ +# Copyright 2022 Woven Planet NA. All rights reserved. +"""Setup.py for dgp2wicker""" +import importlib + +from setuptools import find_packages, setup +from setuptools.command.develop import develop +from setuptools.command.install import install + + +class CustomInstallCommand(install): + """Custom install command""" + def run(self): + install.run(self) + + +class CustomDevelopCommand(develop): + """Custom develop command""" + def run(self): + develop.run(self) + + +__version__ = importlib.import_module('dgp2wicker').__version__ + +with open('requirements.txt', 'r', encoding='utf-8') as f: + requirements = f.read().splitlines() + +with open('readme.md', 'r', encoding='utf-8') as f: + readme = f.read() + +packages = find_packages(exclude=['tests']) +setup( + name="dgp2wicker", + version=__version__, + description="Tools to convert TRI's DGP to L5's Wicker format.", + long_description= readme, + long_description_content_type='text/markdown', + author="Chris Ochoa, Kuan Lee", + author_email='charles.ochoa@woven-planet.global, kuan-hui.lee@woven-planet.global', + url="https://github.tri-ad.tech/kuan-hui-lee/dgp2wicker", + packages=packages, + entry_points={'console_scripts': [ + 'dgp2wicker=dgp2wicker.cli:cli', + ]}, + include_package_data=True, + setup_requires=['cython==0.29.21', 'grpcio==1.41.0', 'grpcio-tools==1.41.0'], + install_requires=requirements, + zip_safe=False, + python_requires='>=3.7', + cmdclass={ + 'install': CustomInstallCommand, + 'develop': CustomDevelopCommand, + } +) diff --git a/dgp/contribs/dgp2wicker/tests/__init__.py b/dgp/contribs/dgp2wicker/tests/__init__.py new file mode 100644 index 00000000..bde85908 --- /dev/null +++ b/dgp/contribs/dgp2wicker/tests/__init__.py @@ -0,0 +1 @@ +# Copyright 2022 Woven Planet. All rights reserved. diff --git a/dgp/contribs/dgp2wicker/tests/test.py b/dgp/contribs/dgp2wicker/tests/test.py new file mode 100644 index 00000000..6cfba335 --- /dev/null +++ b/dgp/contribs/dgp2wicker/tests/test.py @@ -0,0 +1,235 @@ +# Copyright 2022 Woven Planet. All rights reserved. +import json +import os +import unittest + +import cv2 +import dgp2wicker.serializers as ws +import numpy as np +import wicker.plugins.spark as wsp +from click.testing import CliRunner +from dgp2wicker.cli import ingest +from dgp2wicker.dataset import DGPS3Dataset, compute_columns +from dgp2wicker.ingest import ( + FIELD_TO_WICKER_SERIALIZER, + dgp_to_wicker_sample, + gen_wicker_key, + ingest_dgp_to_wicker, + parse_wicker_key, + wicker_types_from_sample, +) +from PIL.Image import Image + +import dgp +from dgp.datasets import SynchronizedSceneDataset + +DGP_DIR = os.path.dirname(os.path.relpath(dgp.__file__)) +TEST_DATA_DIR = os.path.join(DGP_DIR, '..', 'tests', 'data', 'dgp') + +TEST_WICKER_DATASET_JSON = os.path.join(TEST_DATA_DIR, "test_scene", "scene_dataset_v1.0.json") +TEST_WICKER_DATASET_NAME = 'test_wicker_dataset' +TEST_WICKER_DATASET_VERSION = '0.0.1' +TEST_WICKER_DATASET_KWARGS = { + 'datum_names': [ + 'LIDAR', + 'CAMERA_01', + ], + 'requested_annotations': ['bounding_box_2d', 'bounding_box_3d'], + 'only_annotated_datums': True +} + + +def s3_is_configured(): + """Utility to check if there is a valid s3 dataset path configured""" + wicker_config_path = os.getenv('WICKER_CONFIG_PATH', os.path.expanduser('~/wickerconfig.json')) + with open(wicker_config_path, 'r', encoding='utf-8') as f: + wicker_config = json.loads(f.read()) + + return wicker_config['aws_s3_config']['s3_datasets_path'].startswith('s3://') + + +class TestDDGP2Wicker(unittest.TestCase): + def setUp(self): + """Create a local dgp dataset for testing""" + self.dataset = SynchronizedSceneDataset( + TEST_WICKER_DATASET_JSON, + split='train', + datum_names=['LIDAR', 'CAMERA_01'], + forward_context=0, + backward_context=0, + requested_annotations=("bounding_box_2d", "bounding_box_3d") + ) + + def test_keys(self): + """Sanity check the key parsing""" + datum_key, datum_field = 'CAMERA_01', 'timestamp' + key = gen_wicker_key(datum_key, datum_field) + datum_key2, datum_field2 = parse_wicker_key(key) + assert datum_key == datum_key2 + assert datum_field == datum_field2 + + def test_schema(self): + """Sanity check the schema generation""" + sample = self.dataset[0][0] + ontology_table = self.dataset.dataset_metadata.ontology_table + wicker_types = wicker_types_from_sample(sample, ontology_table, skip_camera_cuboids=True) + expected_keys = [ + 'CAMERA_01____timestamp', 'CAMERA_01____rgb', 'CAMERA_01____intrinsics', 'CAMERA_01____distortion', + 'CAMERA_01____extrinsics', 'CAMERA_01____pose', 'CAMERA_01____bounding_box_2d', 'CAMERA_01____datum_type', + 'LIDAR____timestamp', 'LIDAR____extrinsics', 'LIDAR____pose', 'LIDAR____point_cloud', + 'LIDAR____extra_channels', 'LIDAR____bounding_box_3d', 'LIDAR____datum_type', 'ontology____bounding_box_2d', + 'ontology____bounding_box_3d', 'scene_index', 'sample_index_in_scene', 'scene_uri' + ] + + assert (set(expected_keys) == set(wicker_types.keys())) + + def test_conversion(self): + """Test serializers and conversion to wicker formats""" + sample = self.dataset[0][0] + ontology_table = self.dataset.dataset_metadata.ontology_table + wicker_types = wicker_types_from_sample(sample, ontology_table, skip_camera_cuboids=True) + + sample_dict = {datum['datum_name']: datum for datum in sample} + # This tests wicker serialization + wicker_sample = dgp_to_wicker_sample( + sample=sample, + wicker_keys=list(wicker_types.keys()), + scene_index=0, + sample_index_in_scene=0, + ontology_table=ontology_table, + scene_uri='scene/scene.json' + ) + assert set(wicker_sample.keys()) == set(wicker_types.keys()) + + # Test that we can correctly unserialize all the objects + # NOTE: this only tests the datum_name/datum_filed combinations + # for what is actually in the sample dataset. Types not available in the small + # public test dataset should be manually tested offline. + for key, raw in wicker_sample.items(): + # Parse the key to figure out what datum/field we have + if key in ('scene_index', 'sample_index_in_scene', 'scene_uri'): + continue + datum_key, datum_field = parse_wicker_key(key) + # Grab the correct serializer + if datum_key == 'ontology': + serializer = ws.OntologySerializer(datum_field) + elif datum_field in FIELD_TO_WICKER_SERIALIZER: + serializer = FIELD_TO_WICKER_SERIALIZER[datum_field]() + else: + print(f'{key} not supported') + continue + + if hasattr(serializer, 'ontology'): + serializer.ontology = ontology_table[datum_field] + + unserialized = serializer.unserialize(raw) + + # comparison depends on the type. Images for example are wickerized to jpeg, + # so there may be some slight loss of quality. Numpy arrays should just be close etc. + if datum_field == 'rgb': + assert isinstance(unserialized, Image) + org_im = np.array(sample_dict[datum_key]['rgb']) + new_im = np.array(unserialized) + psnr = cv2.PSNR(org_im, new_im) + assert psnr > 40 + elif datum_field in ('point_cloud', 'extra_channels', 'intrinsics'): + org = sample_dict[datum_key][datum_field] + assert np.allclose(org, unserialized) + elif datum_field in ('pose', 'extrinsics'): + org = sample_dict[datum_key][datum_field] + assert np.allclose(org.matrix, unserialized.matrix) + elif datum_key != 'ontology': + org = sample_dict[datum_key][datum_field] + assert org == unserialized + + @unittest.skipUnless(s3_is_configured(), True) + def test_ingest(self): + """Test ingestion""" + # The test dataset is really small, smaller than the expected partition size + wsp.SPARK_PARTITION_SIZE = 12 + + output = ingest_dgp_to_wicker( + scene_dataset_json=TEST_WICKER_DATASET_JSON, + wicker_dataset_name=TEST_WICKER_DATASET_NAME, + wicker_dataset_version=TEST_WICKER_DATASET_VERSION, + dataset_kwargs=TEST_WICKER_DATASET_KWARGS, + spark_context=None, + pipeline=None, + max_num_scenes=None, + max_len=1000, + chunk_size=1000, + skip_camera_cuboids=True, + num_partitions=None, + num_repartitions=None, + is_pd=False, + data_uri=None, + ) + + assert output['train'] == 6 + assert output['val'] == 6 + + @unittest.skipUnless(s3_is_configured(), True) + def test_ingest_cli(self): + """Test ingestion via the cli""" + + # The test dataset is really small, smaller than the expected partition size + wsp.SPARK_PARTITION_SIZE = 12 + + cmd = f'--scene-dataset-json {TEST_WICKER_DATASET_JSON}\n' + cmd += f'--wicker-dataset-name {TEST_WICKER_DATASET_NAME}\n' + cmd += f'--wicker-dataset-version {TEST_WICKER_DATASET_VERSION}\n' + cmd += '--datum-names CAMERA_01,LIDAR\n' + cmd += '--requested-annotations bounding_box_2d,bounding_box_3d\n' + cmd += '--only-annotated-datums\n' + cmd += '--half-size-images\n' + cmd += '--add-lidar-points' + + runner = CliRunner() + result = runner.invoke(ingest, cmd) + + assert result.exit_code == 0 + + @unittest.skipUnless(s3_is_configured(), True) + def test_dataset(self): + """Test That we can read a dataset from wicker""" + self.test_ingest() + + columns = compute_columns( + datum_names=[ + 'CAMERA_01', + 'LIDAR', + ], + datum_types=[ + 'image', + 'point_cloud', + ], + requested_annotations=['bounding_box_2d', 'bounding_box_3d'], + cuboid_datum='LIDAR', + with_ontology_table=True + ) + + dataset = DGPS3Dataset( + dataset_name=TEST_WICKER_DATASET_NAME, + dataset_version=TEST_WICKER_DATASET_VERSION, + dataset_partition_name='train', + columns_to_load=columns, + ) + sample = dataset[0][0] + + expected_camera_fields = { + 'extrinsics', 'bounding_box_2d', 'pose', 'datum_name', 'datum_type', 'distortion', 'intrinsics', 'rgb', + 'timestamp' + } + expected_lidar_fields = { + 'pose', 'datum_name', 'datum_type', 'extra_channels', 'point_cloud', 'bounding_box_3d', 'extrinsics', + 'timestamp' + } + + assert set(sample['CAMERA_01'].keys()) == expected_camera_fields + assert isinstance(sample['CAMERA_01']['rgb'], Image) + assert set(sample['LIDAR'].keys()) == expected_lidar_fields + assert set(dataset.ontology_table.keys()) == {'bounding_box_2d', 'bounding_box_3d'} + + +if __name__ == '__main__': + unittest.main()