Skip to content

Commit

Permalink
Merge pull request #55 from CedrickArmel/feat/etl
Browse files Browse the repository at this point in the history
fix: buffer size exceeds limits due to large arrays in the accumulator
  • Loading branch information
CedrickArmel authored Oct 15, 2024
2 parents 691486e + 83fb100 commit 97096d5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/etl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ jobs:
run: |
source env/bin/activate
run_pipeline --source=${{ secrets.BUCKET }}/raw \
--output=gs://${{ secrets.BUCKET }}/primary/neurips-dataset-c1d1f1m1b30sum.tfrecords \
--output=gs://${{ secrets.BUCKET }}/primary/ \
--mask --flat --corr --dark \
--runner=${{ vars.RUNNER}} \
--worker_disk_type=compute.googleapis.com/projects/${{ secrets.PROJECT_ID }}/zones/${{ secrets.REGION }}/diskTypes/pd-ssd \
--worker_disk_type=compute.googleapis.com/projects/${{ secrets.PROJECT_ID }}/zones/${{ secrets.REGION }}/diskTypes/pd-standard \
--project=${{ secrets.PROJECT_ID }} \
--region=${{ secrets.REGION }} \
--temp_location=gs://${{ secrets.BUCKET }}/pipeline_root/ \
Expand Down
22 changes: 12 additions & 10 deletions src/neuripsadc/etl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
_BeamArgumentParser,
)

from neuripsadc.etl.ops import get_raw_data_uris, save_dataset_to_tfrecords
from neuripsadc.etl.ops import gen_name, get_raw_data_uris, save_dataset_to_tfrecords
from neuripsadc.etl.transforms import CalibrationFn, CombineDataFn


Expand All @@ -53,7 +53,7 @@ def _add_argparse_args(cls, parser: _BeamArgumentParser) -> None:
"--output",
type=str,
required=True,
help="GCS URI or local path where the resulting TFRecord dataset will be stored (e.g., gs://bucket/output/ds.tfrecords).",
help="GCS folder URI or local path where the resulting TFRecord dataset will be stored (e.g., gs://bucket/output/).",
)
parser.add_argument(
"--cutinf",
Expand Down Expand Up @@ -106,12 +106,19 @@ def run_pipeline(argv: list | None = None, save_session: bool = True):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
etloptions = ETLOptions(
pipeline_args, machine_type="n2-custom-2-32768-ext", disk_size_gb=210
pipeline_args, machine_type="n2-custom-8-131072-ext", disk_size_gb=500
)
etloptions.view_as(SetupOptions).save_main_session = save_session
bucket = etloptions.source.split("/")[0]
folder = "/".join(etloptions.source.split("/")[1:])
uris = get_raw_data_uris(bucket, folder)
etloptions.output = etloptions.output + gen_name(etloptions)
signature = (
tf.TensorSpec(shape=None, dtype=tf.int64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
)
pipeline = beam.Pipeline(options=etloptions)
(
pipeline
Expand All @@ -125,6 +132,7 @@ def run_pipeline(argv: list | None = None, save_session: bool = True):
etloptions.corr,
etloptions.dark,
etloptions.flat,
signature,
etloptions.binning,
)
)
Expand All @@ -134,13 +142,7 @@ def run_pipeline(argv: list | None = None, save_session: bool = True):
>> beam.Map( # noqa: W503
lambda x: save_dataset_to_tfrecords(
element=x,
uri=etloptions.output.get(),
output_signature=(
tf.TensorSpec(shape=None, dtype=tf.int64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
),
uri=etloptions.output,
)
)
)
Expand Down
41 changes: 22 additions & 19 deletions src/neuripsadc/etl/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# mypy: disable-error-code="misc, no-untyped-def, type-arg"
from collections import defaultdict
from datetime import datetime
from typing import Any

import tensorflow as tf

# mypy: disable-error-code="misc, no-untyped-def, type-arg"
from apache_beam.options.pipeline_options import PipelineOptions

from neuripsadc.utilis import list_blobs, save_to_tfrecords


Expand Down Expand Up @@ -69,7 +72,7 @@ def split_key_value(path: str) -> tuple[str, str | None]:

def get_raw_data_uris(
bucket_name: str, folder: str | None = None
) -> list[tuple[str, list[str]]]:
) -> list[tuple[str, list[str], str]]:
"""
Get the data URIs in the Google Cloud Storage bucket.
Expand All @@ -88,7 +91,10 @@ def get_raw_data_uris(
]
non_raw_data = [item for item in key_value_pairs if item[0] != "raw"]
grouped_data = group_by_key(non_raw_data)
result = [(key, value + raw_data) for key, value in grouped_data]
result = [
(key, value + raw_data, datetime.now().strftime("%Y%m%d%H%M%S"))
for key, value in grouped_data
]
return result


Expand Down Expand Up @@ -129,26 +135,23 @@ def make_example(pid: int, airs: tf.Tensor, fgs: tf.Tensor, target: tf.Tensor):
def save_dataset_to_tfrecords(
element: list[Any],
uri: str,
output_signature: tuple[Any, Any, Any, Any] | None = None,
):
"""Creates a tf.data.Dataset form a PCollection and save it to a TFRecord.
"""Creates a tf.data.Dataset form a PCollection of uris and save it to a TFRecord.
Args:
element (list): _description_
element (list): PCollection of uris
uri (str): Location to save the data. Can be a filesystem or a (GCS) bucket.
output_signature (tuple | None, optional): Tensorflow Output type\
specification. Defaults to None.
"""
examples = element[0]
dataset = tf.data.Dataset.from_generator(
lambda: iter(examples),
# output signature important car sans ça TF essaye de concatener tous
# les éléments du tuple dans un unique tensor ce qui crée des erreurs.
output_signature=output_signature,
)
dataset = dataset.map(
lambda id, airs, fgs, target: tf.py_function(
func=make_example, inp=[id, airs, fgs, target], Tout=tf.string
)
filenames = element[0]
dataset = tf.data.TFRecordDataset(
filenames=tf.data.Dataset.from_tensor_slices(filenames),
num_parallel_reads=tf.data.AUTOTUNE,
)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
save_to_tfrecords(dataset, uri)


def gen_name(option: PipelineOptions):
"""Generate name from options"""
code = f"c{int(option.corr)}d{int(option.dark)}f{int(option.flat)}m{int(option.mask)}b{int(option.binning)}"
return f"neurips-dataset-{code}.tfrecords"
24 changes: 22 additions & 2 deletions src/neuripsadc/etl/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import tensorflow as tf
from astropy.stats import sigma_clip

from neuripsadc.etl.ops import make_example
from neuripsadc.utilis import save_to_tfrecords


class CalibrationFn(beam.DoFn):
"""Perfroms the raw data calibration."""
Expand All @@ -42,6 +45,7 @@ def __init__(
corr: bool,
dark: bool,
flat: bool,
signature: tuple,
binning: int | None = None,
):
"""
Expand All @@ -53,6 +57,7 @@ def __init__(
corr (bool): Wheteher to apply linear correction.
dark (bool): Whether to apply current dark correction.
flat (bool): Whether to apply flat pixels correction.
signature (tuple): List of tf.TypeSpec
"""

self.CUT_INF = cut_inf
Expand All @@ -62,9 +67,10 @@ def __init__(
self.CORR = corr
self.DARK = dark
self.FLAT = flat
self.signature = signature

def process(self, element): # type: ignore
id, uris = element
id, uris, timestamp = element
airs_data, fgs_data, info_data = self._load_data(id, uris)
airs_signal = self._calibrate_airs_data(airs_data, info_data)
airs_signal = tf.convert_to_tensor(airs_signal.reshape(1, *airs_signal.shape))
Expand All @@ -74,7 +80,21 @@ def process(self, element): # type: ignore
np.array([np.nan]) if info_data["labels"] is None else info_data["labels"]
)
labels = tf.convert_to_tensor(labels.reshape(1, *labels.shape))
return [(int(id), airs_signal, fgs_signal, labels)]
record = tf.data.Dataset.from_generator(
lambda: iter([(int(id), airs_signal, fgs_signal, labels)]),
output_signature=self.signature,
)
record = record.map(
lambda id, airs, fgs, target: tf.py_function(
func=make_example, inp=[id, airs, fgs, target], Tout=tf.string
)
)
bucket = uris[0].split("/")[2]
tmp_output_path = (
f"gs://{bucket}/pipeline_root/neurips-etl/{timestamp}/record-{id}.tfrecord"
)
save_to_tfrecords(record, tmp_output_path)
return [tmp_output_path]

def _calibrate_airs_data(
self, data: dict[str, pd.DataFrame], info: dict[str, pd.DataFrame]
Expand Down

0 comments on commit 97096d5

Please sign in to comment.