Skip to content

Commit

Permalink
Merge pull request #47 from CedrickArmel/feat/etl
Browse files Browse the repository at this point in the history
fix: pipeline options and wait until finish
  • Loading branch information
CedrickArmel authored Oct 13, 2024
2 parents f2e66a5 + 159f15d commit 29d79c7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/etl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
--output=gs://${{ secrets.BUCKET }}/primary/neurips-dataset-c1d1f1m1b30sum.tfrecords \
--mask --flat --corr --dark \
--runner=${{ vars.RUNNER}} \
--worker_disk_type=compute.googleapis.com/projects/${{ secrets.PROJECT_ID }}/zones/${{ secrets.REGION }}/diskTypes/DISK_TYPE
--worker_disk_type=compute.googleapis.com/projects/${{ secrets.PROJECT_ID }}/zones/${{ secrets.REGION }}/diskTypes/DISK_TYPE \
--project=${{ secrets.PROJECT_ID }} \
--region=${{ secrets.REGION }} \
--temp_location=gs://${{ secrets.BUCKET }}/pipeline_root/ \
Expand Down
61 changes: 31 additions & 30 deletions src/neuripsadc/etl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,39 +111,40 @@ def run_pipeline(argv: list | None = None, save_session: bool = True):
etloptions.view_as(SetupOptions).save_main_session = save_session
bucket = etloptions.source.split("/")[0]
folder = "/".join(etloptions.source.split("/")[1:])
with beam.Pipeline(options=etloptions) as pipeline:
uris = get_raw_data_uris(bucket, folder)
_ = (
pipeline
| "Create uris collection" >> beam.Create(uris) # noqa: W503
| "Data calibration" # noqa: W503
>> beam.ParDo( # noqa: W503
CalibrationFn(
etloptions.cutinf,
etloptions.cutsup,
etloptions.mask,
etloptions.corr,
etloptions.dark,
etloptions.flat,
etloptions.binning,
)
uris = get_raw_data_uris(bucket, folder)
pipeline = beam.Pipeline(options=etloptions)
(
pipeline
| "Create uris collection" >> beam.Create(uris) # noqa: W503
| "Data calibration" # noqa: W503
>> beam.ParDo( # noqa: W503
CalibrationFn(
etloptions.cutinf,
etloptions.cutsup,
etloptions.mask,
etloptions.corr,
etloptions.dark,
etloptions.flat,
etloptions.binning,
)
| "Collection merging" # noqa: W503
>> beam.CombineGlobally(CombineDataFn()) # noqa: W503
| "Data saving" # noqa: W503
>> beam.Map( # noqa: W503
lambda x: save_dataset_to_tfrecords(
element=x,
uri=etloptions.output.get(),
output_signature=(
tf.TensorSpec(shape=None, dtype=tf.int64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
),
)
)
| "Collection merging" # noqa: W503
>> beam.CombineGlobally(CombineDataFn()) # noqa: W503
| "Data saving" # noqa: W503
>> beam.Map( # noqa: W503
lambda x: save_dataset_to_tfrecords(
element=x,
uri=etloptions.output.get(),
output_signature=(
tf.TensorSpec(shape=None, dtype=tf.int64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
tf.TensorSpec(shape=None, dtype=tf.float64),
),
)
)
)
_ = pipeline.run().wait_until_finish(duration=10)


if __name__ == "__main__":
Expand Down

0 comments on commit 29d79c7

Please sign in to comment.