Skip to content

Commit

Permalink
add chunjun ChunjunSinkFactory for UpdateMode types filter at runtime…
Browse files Browse the repository at this point in the history
… by JdbcDialect class annotation datavane/tis#139
  • Loading branch information
baisui1981 committed Sep 16, 2022
1 parent 596aa58 commit 8230ab1
Show file tree
Hide file tree
Showing 31 changed files with 324 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ public DefaultDescriptor() {
this.registerSelectOptions(KEY_FIELD_NAME_SPARK_CONN, () -> ParamsConfig.getItems(ISparkConnGetter.PLUGIN_NAME));
this.registerSelectOptions(KEY_FIELD_NAME_HIVE_CONN, () -> ParamsConfig.getItems(IHiveConnGetter.PLUGIN_NAME));
}

@Override
public boolean isSupportIncr() {
return true;
}
@Override
public PluginVender getVender() {
return PluginVender.TIS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public DefaultDescriptor() {
}
@Override
public boolean isSupportIncr() {
return true;
return false;
}
@Override
public EndType getEndType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,23 @@

package com.qlangtech.plugins.incr.flink.chunjun.sink;

import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.dialect.SupportUpdateMode;
import com.dtstack.chunjun.sink.WriteMode;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.extension.impl.SuFormProperties;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.IncrSelectedTabExtend;
import com.qlangtech.tis.plugin.incr.TISSinkFactory;
import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory;
import com.qlangtech.tis.plugins.incr.flink.connector.UpdateMode;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author: 百岁([email protected]
Expand All @@ -43,6 +52,24 @@ public UpdateMode getIncrMode() {
return Objects.requireNonNull(incrMode, "incrMode can not be null");
}

/**
* 由于每种 JdbcDialect 支持的写入类型是不同的所以需要在在运行时 更新下拉列表需要进行过滤
*
* @param descs
* @return
* @see JdbcDialect
* @see SupportUpdateMode
*/
public static List<UpdateMode.BasicDescriptor> filter(List<UpdateMode.BasicDescriptor> descs) {

SuFormProperties.SuFormGetterContext context = SuFormProperties.subFormGetterProcessThreadLocal.get();
Objects.requireNonNull(context, "context can not be null");
//String dataXName = context.param.getDataXName();
ChunjunSinkFactory sinkFactory = (ChunjunSinkFactory) TISSinkFactory.getIncrSinKFactory(context.param.getPluginContext());
Set<WriteMode> writeModes = sinkFactory.supportSinkWriteMode();
return descs.stream().filter((d) -> writeModes.contains(d.writeMode)).collect(Collectors.toList());
}

@Override
public boolean isSource() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package com.qlangtech.plugins.incr.flink.chunjun.sink;

import com.dtstack.chunjun.connector.jdbc.TableCols;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
import com.qlangtech.plugins.incr.flink.chunjun.common.ColMetaUtils;
import com.qlangtech.tis.plugin.ds.ColMeta;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf;
import com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.dialect.SupportUpdateMode;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
import com.dtstack.chunjun.constants.ConfigConstant;
import com.dtstack.chunjun.sink.DtOutputFormatSinkFunction;
import com.dtstack.chunjun.sink.SinkFactory;
import com.dtstack.chunjun.sink.WriteMode;
import com.dtstack.chunjun.util.TableUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.qlangtech.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.datax.IDataxProcessor;
Expand Down Expand Up @@ -61,6 +64,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
Expand Down Expand Up @@ -357,8 +361,40 @@ protected DataStreamSink<RowData> createOutput(
return createResult;
}

/**
* 找到chunjun Sink 支持的write方式
*
* @return
*/
public final Set<WriteMode> supportSinkWriteMode() {
Class<? extends JdbcDialect> dialectClass = this.getJdbcDialectClass();
SupportUpdateMode supportMode = dialectClass.getAnnotation(SupportUpdateMode.class);
Objects.requireNonNull(supportMode, "dialectClass:" + dialectClass.getClass().getName()
+ " can not find annotation " + SupportUpdateMode.class);
Set<WriteMode> result = Sets.newHashSet(supportMode.modes());
result.add(WriteMode.INSERT);
return result;
}

protected abstract Class<? extends JdbcDialect> getJdbcDialectClass();

protected final JdbcDialect createJdbcDialect(SyncConf syncConf) {
try {

protected abstract JdbcDialect createJdbcDialect(SyncConf syncConf);
Class<? extends JdbcDialect> clazz = getJdbcDialectClass();
Constructor<?>[] constructors = clazz.getConstructors();
for (Constructor<?> c : constructors) {

if (c.getParameterCount() == 1 && c.getParameterTypes()[0] == SyncConf.class) {
return (JdbcDialect) c.newInstance(syncConf);
}
}

return clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}


protected abstract JdbcOutputFormat createChunjunOutputFormat(DataSourceFactory dsFactory);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugins.incr.flink.connector;

import com.dtstack.chunjun.sink.WriteMode;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.extension.Describable;
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.extension.IPropertyType;
import com.qlangtech.tis.extension.PluginFormProperties;
import org.apache.commons.lang.StringUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -38,18 +41,36 @@ public void set(Map<String, Object> params) {
params.put("writeMode", getMode());
}

protected abstract String getMode();
protected final String getMode() {
return getWriteMode().getMode();
}

@Override
public Descriptor<UpdateMode> getDescriptor() {
public BasicDescriptor getDescriptor() {
Descriptor<UpdateMode> desc = TIS.get().getDescriptor(this.getClass());
if (!(desc instanceof BasicDescriptor)) {
throw new IllegalStateException("desc must be type of " + BasicDescriptor.class.getName());
}
return desc;
return (BasicDescriptor) desc;
}

protected WriteMode getWriteMode() {
return getDescriptor().writeMode;
}

protected static abstract class BasicDescriptor extends Descriptor<UpdateMode> {
public static abstract class BasicDescriptor extends Descriptor<UpdateMode> {

public final WriteMode writeMode;

public BasicDescriptor(WriteMode writeMode) {
super();
this.writeMode = Objects.requireNonNull(writeMode);
}

public final String getDisplayName() {
return StringUtils.capitalize(writeMode.getMode());
}

public final PluginFormProperties getPluginFormPropertyTypes(Optional<IPropertyType.SubFormFilter> subFormFilter) {
return super.getPluginFormPropertyTypes(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,14 @@
**/
public class InsertType extends UpdateMode {

private static final WriteMode INSERT = WriteMode.INSERT;

@Override
protected String getMode() {
return INSERT.getMode();
}
// private static final WriteMode INSERT = WriteMode.INSERT;

@TISExtension
public static final class DftDescriptor extends BasicUpdate.BasicDescriptor {
public String getDisplayName() {
return StringUtils.capitalize(INSERT.getMode());
public DftDescriptor() {
super(WriteMode.INSERT);
}


}
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugins.incr.flink.connector.impl;

import com.dtstack.chunjun.sink.WriteMode;
import com.qlangtech.tis.extension.TISExtension;
import org.apache.commons.lang.StringUtils;

/**
* @author: 百岁([email protected]
* @create: 2022-07-18 10:09
**/
public class ReplaceType extends BasicUpdate {

// private static final String REPLACE = "replace";

@Override
protected String getMode() {
return WriteMode.REPLACE.getMode();
}
// @Override
// protected String getMode() {
// return WriteMode.REPLACE.getMode();
// }

@TISExtension
public static final class DftDescriptor extends BasicDescriptor {
public String getDisplayName() {
return StringUtils.capitalize(WriteMode.REPLACE.getMode());
public DftDescriptor() {
super(WriteMode.REPLACE);
}

// public String getDisplayName() {
// return StringUtils.capitalize(WriteMode.REPLACE.getMode());
// }
}
}
Loading

0 comments on commit 8230ab1

Please sign in to comment.