Skip to content

Commit

Permalink
Merge pull request #6 from aonischuk/feature/CDAP-15835-config-valida…
Browse files Browse the repository at this point in the history
…tions

CDAP-15835. Config validations
  • Loading branch information
albertshau authored Oct 3, 2019
2 parents 294a474 + 6140c2a commit a029bbf
Show file tree
Hide file tree
Showing 5 changed files with 399 additions and 107 deletions.
22 changes: 13 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
<groupId>io.cdap.plugin</groupId>
<artifactId>cobol-to-avro-transform</artifactId>
<packaging>jar</packaging>
<version>1.2.0-SNAPSHOT</version>
<version>1.3.0-SNAPSHOT</version>
<name>Cobol to Avro Transform</name>

<properties>
<avro.version>1.7.7</avro.version>
<cdap.version>6.0.0-SNAPSHOT</cdap.version>
<cdap.version>6.1.0-SNAPSHOT</cdap.version>
<guava.version>18.0</guava.version>
<janino.version>3.0.7</janino.version>
<legstar.avro.version>0.4.2</legstar.avro.version>
<logback.version>1.2.3</logback.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<data.stream.parent>system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.stream.parent>
<data.pipeline.parent>system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.pipeline.parent>
<data.stream.parent>system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.stream.parent>
<data.pipeline.parent>system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.pipeline.parent>
</properties>

<dependencies>
Expand Down Expand Up @@ -91,6 +91,12 @@
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>hydrator-test</artifactId>
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-formats</artifactId>
Expand Down Expand Up @@ -125,8 +131,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand All @@ -137,9 +143,7 @@
<configuration>
<instructions>
<_exportcontents>
io.cdap.plugin.*;
com.google.common.*;
com.legstar.*
io.cdap.plugin.cobol.*;
</_exportcontents>
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
<Embed-Transitive>true</Embed-Transitive>
Expand Down
106 changes: 21 additions & 85 deletions src/main/java/io/cdap/plugin/cobol/CobolRecordConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,24 @@
package io.cdap.plugin.cobol;

import com.legstar.avro.cob2avro.io.AbstractZosDatumReader;
import com.legstar.cob2xsd.Cob2XsdConfig;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.plugin.EndpointPluginContext;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.common.AvroConverter;
import io.cdap.plugin.common.StreamByteSource;
import io.cdap.plugin.common.StreamCharSource;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import javax.annotation.Nullable;

/**
* {@link Transform} plugin to convert COBOL data file into StructuredRecords.
Expand All @@ -52,31 +45,37 @@
public class CobolRecordConverter extends Transform<StructuredRecord, StructuredRecord> {
private static final Logger LOG = LoggerFactory.getLogger(CobolRecordConverter.class);

private final Config config;
private final CobolRecordConverterConfig config;

public CobolRecordConverter(Config config) {
public CobolRecordConverter(CobolRecordConverterConfig config) {
this.config = config;
}

private CopybookReader copybookReader;
private Schema avroSchema;
private io.cdap.cdap.api.data.schema.Schema schema;
private Schema schema;

@Override
public void initialize(TransformContext context) throws Exception {
super.initialize(context);
Properties properties = new Properties();
properties.setProperty(Cob2XsdConfig.CODE_FORMAT, config.getCodeFormat());
StreamCharSource streamCharSource
= new StreamCharSource(new ByteArrayInputStream(config.copybook.getBytes(StandardCharsets.UTF_8)));
copybookReader = new CopybookReader(streamCharSource, properties);
this.avroSchema = copybookReader.getSchema();
this.schema = AvroConverter.fromAvroSchema(avroSchema);
this.copybookReader = config.getCopybookReader();
this.schema = config.getOutputSchemaAndValidate(copybookReader);
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);

FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
io.cdap.cdap.api.data.schema.Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
Schema outputSchema = config.getOutputSchemaAndValidate(failureCollector, inputSchema);
failureCollector.getOrThrowException();

pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
}

@Override
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
byte[] body = input.get(config.fieldName);
byte[] body = input.get(config.getContentFieldName());
StreamByteSource source = new StreamByteSource(new ByteArrayInputStream(body), body.length);
try (AbstractZosDatumReader<GenericRecord> reader = copybookReader.createRecordReader(source, config.getCharset(),
config.hasRDW())) {
Expand All @@ -86,67 +85,4 @@ public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
}
}
}

class GetSchemaRequest {
public String copybook;
@Nullable
public String codeFormat;

private String getCodeFormat() {
return codeFormat == null ? Cob2XsdConfig.CodeFormat.FIXED_FORMAT.name() : codeFormat;
}
}

/**
* Endpoint method to get the output schema given copybook.
*
* @param request {@link GetSchemaRequest} containing information about the cobol copybook.
* @param pluginContext context to create plugins
* @return schema of fields
* @throws IOException if there are any errors converting schema
*/
@javax.ws.rs.Path("outputSchema")
public io.cdap.cdap.api.data.schema.Schema getSchema(GetSchemaRequest request,
EndpointPluginContext pluginContext) throws IOException {
Properties properties = new Properties();
properties.setProperty(Cob2XsdConfig.CODE_FORMAT, request.getCodeFormat());
StreamCharSource streamCharSource
= new StreamCharSource(new ByteArrayInputStream(request.copybook.getBytes(StandardCharsets.UTF_8)));
CopybookReader reader = new CopybookReader(streamCharSource, properties);
Schema avroSchema = reader.getSchema();
return AvroConverter.fromAvroSchema(avroSchema);
}

public static final class Config extends PluginConfig {
@Description("COBOL Copybook")
@Macro
private String copybook;

@Description("CodeFormat in the Copybook")
@Nullable
private String codeFormat;

@Description("Charset used to read the data. Default Charset is 'IBM01140'.")
@Nullable
private String charset;

@Description("Records start with Record Descriptor Word")
@Nullable
private Boolean rdw;

@Description("Name of the field containing COBOL records")
private String fieldName;

public String getCodeFormat() {
return codeFormat == null ? Cob2XsdConfig.CodeFormat.FIXED_FORMAT.name() : codeFormat;
}

public String getCharset() {
return charset == null ? "IBM01140" : charset;
}

public boolean hasRDW() {
return rdw == null ? true : rdw;
}
}
}
Loading

0 comments on commit a029bbf

Please sign in to comment.