Skip to content

Commit

Permalink
[BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfi…
Browse files Browse the repository at this point in the history
…gs (#17051)

* [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs

* [BEAM-14048] Fix checkstyle

* [BEAM-14048] Fix warnings

* [BEAM-14048] Fix warnings

* [BEAM-14048] Fix warning

* [BEAM-14048] Fix warning

* [BEAM-14048] Remove unused dependencies

* [BEAM-14048] Add needed dependencies

* [BEAM-14048] Fix spotless

* [BEAM-14048] Fix typo

* [BEAM-14048] Use fori instead of stream

* [BEAM-14048] Suppress warning

* [BEAM-14048] Add used undeclared artifacts

* [BEAM-14048] Change dependencies to test

* [BEAM-14048] Refactoring

* [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects

* [BEAM-14048] Suppress warning

* Update maven repo

* Update build.gradle

* [BEAM-14048] Use ServiceNow CDAP dependency from Maven central

* [BEAM-14048] Set macroFields

Co-authored-by: Alex Kosolapov <[email protected]>
Co-authored-by: Elizaveta Lomteva <[email protected]>
Co-authored-by: Elizaveta Lomteva <[email protected]>
  • Loading branch information
4 people authored May 5, 2022
1 parent 0a01fbe commit e7d3e8c
Show file tree
Hide file tree
Showing 11 changed files with 527 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ class BeamModulePlugin implements Plugin<Project> {
def aws_java_sdk_version = "1.12.135"
def aws_java_sdk2_version = "2.17.127"
def cassandra_driver_version = "3.10.2"
def cdap_version = "6.5.1"
def checkerframework_version = "3.10.0"
def classgraph_version = "4.8.104"
def errorprone_version = "2.10.0"
Expand Down Expand Up @@ -536,6 +537,10 @@ class BeamModulePlugin implements Plugin<Project> {
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
cdap_api : "io.cdap.cdap:cdap-api:$cdap_version",
cdap_common : "io.cdap.cdap:cdap-common:$cdap_version",
cdap_etl_api : "io.cdap.cdap:cdap-etl-api:$cdap_version",
cdap_plugin_service_now : "io.cdap.plugin:servicenow-plugins:1.1.0",
checker_qual : "org.checkerframework:checker-qual:$checkerframework_version",
classgraph : "io.github.classgraph:classgraph:$classgraph_version",
commons_codec : "commons-codec:commons-codec:1.15",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*BeamZetaSqlCatalog\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*BufferingPullSubscriberTest\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*cdap.*PluginConfigInstantiationUtils\.java" />

<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/cdap/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# See the OWNERS docs at https://s.apache.org/beam-owners
52 changes: 52 additions & 0 deletions sdks/java/io/cdap/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'java'
id 'org.apache.beam.module'
}

applyJavaNature(
exportJavadoc: false,
automaticModuleName: 'org.apache.beam.sdk.io.cdap',
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: CDAP :: Java"
ext.summary = """Apache Beam SDK provides a simple, Java-based
interface for integration with CDAP plugins."""

/** Define the list of runners which execute a precommit test.
* Some runners are run from separate projects, see the preCommit task below
* for details.
*/

dependencies {
implementation library.java.guava
implementation library.java.cdap_api
implementation library.java.cdap_common
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.slf4j_api
implementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.cdap_plugin_service_now
testImplementation library.java.cdap_etl_api
testImplementation library.java.vendored_guava_26_0_jre
testImplementation library.java.junit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cdap.cdap.api.plugin.PluginConfig;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Class for building {@link PluginConfig} object of the specific class {@param <T>}. */
public class ConfigWrapper<T extends PluginConfig> {

private static final Logger LOG = LoggerFactory.getLogger(ConfigWrapper.class);

@Nullable private Map<String, Object> paramsMap = null;
private final Class<T> configClass;

public ConfigWrapper(Class<T> configClass) {
this.configClass = configClass;
}

public ConfigWrapper<T> fromJsonString(String jsonString) throws IOException {
TypeReference<HashMap<String, Object>> typeRef =
new TypeReference<HashMap<String, Object>>() {};
try {
paramsMap = new ObjectMapper().readValue(jsonString, typeRef);
} catch (IOException e) {
LOG.error("Can not read json string to params map", e);
throw e;
}
return this;
}

public ConfigWrapper<T> fromJsonFile(File jsonFile) throws IOException {
TypeReference<HashMap<String, Object>> typeRef =
new TypeReference<HashMap<String, Object>>() {};
try {
paramsMap = new ObjectMapper().readValue(jsonFile, typeRef);
} catch (IOException e) {
LOG.error("Can not read json file to params map", e);
throw e;
}
return this;
}

public ConfigWrapper<T> withParams(Map<String, Object> paramsMap) {
this.paramsMap = new HashMap<>(paramsMap);
return this;
}

public ConfigWrapper<T> setParam(String paramName, Object param) {
getParamsMap().put(paramName, param);
return this;
}

public @Nullable T build() {
return PluginConfigInstantiationUtils.getPluginConfig(getParamsMap(), configClass);
}

private @Nonnull Map<String, Object> getParamsMap() {
if (paramsMap == null) {
paramsMap = new HashMap<>();
}
return paramsMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cdap;

import com.google.common.reflect.TypeToken;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.common.lang.InstantiatorFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Class for getting any filled {@link PluginConfig} configuration object. */
@SuppressWarnings({"assignment.type.incompatible", "UnstableApiUsage", "return.type.incompatible"})
public class PluginConfigInstantiationUtils {

private static final Logger LOG = LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
private static final String MACRO_FIELDS_FIELD_NAME = "macroFields";

/**
* Method for instantiating {@link PluginConfig} object of specific class {@param configClass}.
* After instantiating, it will go over all {@link Field}s with the {@link Name} annotation and
* set the appropriate parameter values from the {@param params} map for them.
*
* @param params map of config fields, where key is the name of the field, value must be String or
* boxed primitive
* @return Config object for given map of arguments and configuration class
*/
static @Nullable <T extends PluginConfig> T getPluginConfig(
Map<String, Object> params, Class<T> configClass) {
// Validate configClass
if (configClass == null) {
throw new IllegalArgumentException("Config class must be not null!");
}
List<Field> allFields = new ArrayList<>();
Class<?> currClass = configClass;
while (currClass != null && !currClass.equals(Object.class)) {
allFields.addAll(
Arrays.stream(currClass.getDeclaredFields())
.filter(f -> !Modifier.isStatic(f.getModifiers()))
.collect(Collectors.toList()));
currClass = currClass.getSuperclass();
}
InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);

T config = instantiatorFactory.get(TypeToken.of(configClass)).create();

if (config != null) {
for (Field field : allFields) {
field.setAccessible(true);

Class<?> fieldType = field.getType();

Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
Object fieldValue =
declaredAnnotation != null ? params.get(declaredAnnotation.value()) : null;

if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
try {
field.set(config, fieldValue);
} catch (IllegalAccessException e) {
LOG.error("Can not set a field with value {}", fieldValue);
}
} else if (field.getName().equals(MACRO_FIELDS_FIELD_NAME)) {
try {
field.set(config, Collections.emptySet());
} catch (IllegalAccessException e) {
LOG.error("Can not set macro fields");
}
}
}
}
return config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Transforms for reading and writing from CDAP. */
@Experimental(Kind.SOURCE_SINK)
package org.apache.beam.sdk.io.cdap;

import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Loading

0 comments on commit e7d3e8c

Please sign in to comment.