diff --git a/sdks/java/io/csv/build.gradle b/sdks/java/io/csv/build.gradle index 2be8f59d1f39..92c66ff01406 100644 --- a/sdks/java/io/csv/build.gradle +++ b/sdks/java/io/csv/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.commons_csv implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.joda_time testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation library.java.junit testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java new file mode 100644 index 000000000000..ad7d05912faa --- /dev/null +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * {@link CsvIOParseError} is a data class to store errors from CSV record processing. It is {@link + * org.apache.beam.sdk.schemas.Schema} mapped for compatibility with writing to Beam Schema-aware + * I/O connectors. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class CsvIOParseError { + + static Builder builder() { + return new AutoValue_CsvIOParseError.Builder(); + } + + /** The caught {@link Exception#getMessage()}. */ + public abstract String getMessage(); + + /** + * The CSV record associated with the caught {@link Exception}. Annotated {@link Nullable} as not + * all processing errors are associated with a CSV record. + */ + public abstract @Nullable String getCsvRecord(); + + /** + * The filename associated with the caught {@link Exception}. Annotated {@link Nullable} as not + * all processing errors are associated with a file. + */ + public abstract @Nullable String getFilename(); + + /** The date and time when the {@link Exception} occurred. */ + public abstract Instant getObservedTimestamp(); + + /** The caught {@link Exception#getStackTrace()}. */ + public abstract String getStackTrace(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setMessage(String message); + + abstract Builder setCsvRecord(String csvRecord); + + abstract Builder setFilename(String filename); + + abstract Builder setObservedTimestamp(Instant observedTimestamp); + + abstract Builder setStackTrace(String stackTrace); + + public abstract CsvIOParseError build(); + } +} diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java new file mode 100644 index 000000000000..8e746c006050 --- /dev/null +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.csv; + +import static org.junit.Assert.assertNotNull; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaProvider; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; + +public class CsvIOParseErrorTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static final SchemaProvider SCHEMA_PROVIDER = new DefaultSchema.DefaultSchemaProvider(); + + @Test + public void usableInSingleOutput() { + List want = + Arrays.asList( + CsvIOParseError.builder() + .setMessage("error message") + .setObservedTimestamp(Instant.now()) + .setStackTrace("stack trace") + .build(), + CsvIOParseError.builder() + .setMessage("error message") + .setObservedTimestamp(Instant.now()) + .setStackTrace("stack trace") + .setFilename("filename") + .setCsvRecord("csv record") + .build()); + + PCollection errors = pipeline.apply(Create.of(want)); + PAssert.that(errors).containsInAnyOrder(want); + + pipeline.run(); + } + + @Test + public void usableInMultiOutput() { + List want = + Arrays.asList( + CsvIOParseError.builder() + .setMessage("error message") + .setObservedTimestamp(Instant.now()) + .setStackTrace("stack trace") + .build(), + CsvIOParseError.builder() + .setMessage("error message") + .setObservedTimestamp(Instant.now()) + .setStackTrace("stack trace") + .setFilename("filename") + .setCsvRecord("csv record") + .build()); + + TupleTag errorTag = new TupleTag() {}; + TupleTag anotherTag = new TupleTag() {}; + + PCollection errors = pipeline.apply("createWant", Create.of(want)); + PCollection anotherPCol = pipeline.apply("createAnother", Create.of("a", "b", "c")); + PCollectionTuple pct = PCollectionTuple.of(errorTag, errors).and(anotherTag, anotherPCol); + PAssert.that(pct.get(errorTag)).containsInAnyOrder(want); + + pipeline.run(); + } + + @Test + public void canDeriveSchema() { + TypeDescriptor type = TypeDescriptor.of(CsvIOParseError.class); + Schema schema = SCHEMA_PROVIDER.schemaFor(type); + assertNotNull(schema); + pipeline.run(); + } +}