Skip to content

Commit

Permalink
add chunjun ChunjunSinkFactory for UpdateMode types filter at runtime…
Browse files Browse the repository at this point in the history
… by JdbcDialect class annotation #139
  • Loading branch information
baisui1981 committed Sep 16, 2022
1 parent 17cdaf4 commit c55ead5
Show file tree
Hide file tree
Showing 26 changed files with 511 additions and 107 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
**/target
**/metastore_db
**/log.dir*
**/src/main/webapp/WEB-INF/lib
**/src/main/webapp/WEB-INF/classes
Expand All @@ -16,4 +17,4 @@
*.versionsBackup
*.swp
**/*.log
.flattened-pom.xml
.flattened-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,30 @@ public interface IRCController {
*/
void stopInstance(TargetResName indexName);

SupportTriggerSavePointResult supportTriggerSavePoint(TargetResName collection);

public class SupportTriggerSavePointResult {
public final boolean support;
String unSupportReason;

public SupportTriggerSavePointResult(boolean support) {
this.support = support;
}

public String getUnSupportReason() {
return unSupportReason;
}

public void setUnSupportReason(String unSupportReason) {
this.unSupportReason = unSupportReason;
}
}

/**
* 创建一个Savepoint
*
* @param collection
*/

void triggerSavePoint(TargetResName collection);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.coredefine.module.action.impl;

import com.qlangtech.tis.config.k8s.ReplicasSpec;
import com.qlangtech.tis.coredefine.module.action.IFlinkIncrJobStatus;
import com.qlangtech.tis.coredefine.module.action.IRCController;
import com.qlangtech.tis.coredefine.module.action.TargetResName;
import com.qlangtech.tis.plugin.incr.WatchPodLog;
Expand All @@ -35,6 +34,11 @@ public void deploy(TargetResName collection, ReplicasSpec incrSpec, long timesta
throw new UnsupportedOperationException();
}

@Override
public SupportTriggerSavePointResult supportTriggerSavePoint(TargetResName collection) {
throw new UnsupportedOperationException();
}

@Override
public void triggerSavePoint(TargetResName collection) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class CoreAction extends BasicModule {
private static final Pattern PATTERN_IP = Pattern.compile("^((\\d+?).(\\d+?).(\\d+?).(\\d+?)):(\\d+)_solr$");

// private final String CLIENT_ZK_PATH = "/terminator/dump-controller/";
// public static final XMLResponseParser RESPONSE_PARSER = new XMLResponseParser();
// public static final XMLResponseParser RESPONSE_PARSER = new XMLResponseParser();

private static final int MAX_SHARDS_PER_NODE = 16;

Expand Down Expand Up @@ -146,8 +146,16 @@ public void doGetTaskInfo(Context context) throws Exception {
@Func(value = PermissionConstant.PERMISSION_INCR_PROCESS_MANAGE)
public void doCreateNewSavepoint(Context context) throws Exception {
IRCController incrSync = getRCController();
TargetResName resName = new TargetResName(this.getCollectionName());

IRCController.SupportTriggerSavePointResult supportResult = null;
if (!(supportResult = incrSync.supportTriggerSavePoint(resName)).support) {
this.addErrorMessage(context, supportResult.unSupportReason);
return;
}
incrSync.triggerSavePoint(resName);


incrSync.triggerSavePoint(new TargetResName(this.getCollectionName()));
IndexIncrStatus incrStatus = getIndexIncrStatus(this, false);
this.setBizResult(context, incrStatus);
}
Expand Down Expand Up @@ -1155,7 +1163,10 @@ public void doIncrStop(Context context) throws Exception {

TISK8sDelegate k8sDelegate = TISK8sDelegate.getK8SDelegate(this.getCollectionName());
// 删除增量实例
k8sDelegate.stopIncrProcess();
k8sDelegate.stopIncrProcess(this, context);
if (this.hasErrors(context)) {
return;
}
waittingiIntendedStatus(context, IFlinkIncrJobStatus.State.STOPED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.qlangtech.tis.coredefine.module.action;

import com.alibaba.citrus.turbine.Context;
import com.google.common.collect.Maps;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.config.k8s.ReplicasSpec;
Expand All @@ -27,11 +28,11 @@
import com.qlangtech.tis.plugin.IPluginStore;
import com.qlangtech.tis.plugin.incr.IncrStreamFactory;
import com.qlangtech.tis.plugin.incr.WatchPodLog;
import com.qlangtech.tis.runtime.module.misc.IMessageHandler;
import com.qlangtech.tis.trigger.jst.ILogListener;
import com.qlangtech.tis.util.HeteroEnum;
import com.qlangtech.tis.util.PluginItems;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -177,7 +178,14 @@ public void visit(FlinkJobDeploymentDetails details) {
/**
* 停止增量实例
*/
public void stopIncrProcess() {
public void stopIncrProcess(IMessageHandler handler, Context context) {

IRCController.SupportTriggerSavePointResult vresult
= incrSync.supportTriggerSavePoint(this.indexName);
if (!vresult.support) {
handler.addErrorMessage(context, vresult.getUnSupportReason());
return;
}

try {
this.incrSync.stopInstance(this.indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,12 @@ public final Map<String, Object> getExtractProps() {
return eprops;
}

/**
* 如果返回null则说明不支持增量同步功能
*
* @return
*/
protected abstract boolean isSupportIncr();
// /**
// * 如果返回null则说明不支持增量同步功能
// *
// * @return
// */
// protected abstract boolean isSupportIncr();


// protected IDataXPluginMeta.EndType getEndType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ public final Map<String, Object> getExtractProps() {
return eprops;
}

/**
* 如果返回null则说明不支持增量同步功能
*
* @return
*/
protected abstract boolean isSupportIncr();
// /**
// * 如果返回null则说明不支持增量同步功能
// *
// * @return
// */
// protected abstract boolean isSupportIncr();

// /**
// * 如果返回null则说明不支持增量同步功能
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,6 @@ public Void process(Class<?> targetClass, Void v) {
}

if (dftVal != null && StringUtils.startsWith(String.valueOf(dftVal), IMessageHandler.TSEARCH_PACKAGE)) {

// UploadPluginMeta meta = UploadPluginMeta.parse(dftVal);
// boolean unCache = meta.getBoolean(UploadPluginMeta.KEY_UNCACHE);
//
// Callable<String> valGetter = () -> (String) GroovyShellEvaluate.scriptEval(dftVal);
props.put(PluginExtraProps.KEY_DFTVAL_PROP, GroovyShellEvaluate.scriptEval(String.valueOf(dftVal)));
}

Expand Down Expand Up @@ -559,19 +554,6 @@ private static JSONArray resolveEnumProp(Descriptor descriptor, PluginExtraProps
// }
JSONArray enums = new JSONArray();
if (anEnum != null && anEnum instanceof String) {
// 使用了如下这种配置方式,需要使用groovy进行解析
// "enum": "com.qlangtech.tis.plugin.ds.ReflectSchemaFieldType.all()"
// 需要转化成以下这种格式:
// "enum": [
// {
// "label": "是",
// "val": true
// },
// {
// "label": "否",
// "val": false
// }
// ]
try {
GroovyShellEvaluate.descriptorThreadLocal.set(descriptor);
fieldExtraProps.getProps().put(KEY_ENUM_PROP, GroovyShellEvaluate.scriptEval((String) anEnum, (opts) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.qlangtech.tis.extension.Describable;
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.extension.IPropertyType;
import com.qlangtech.tis.extension.util.GroovyShellEvaluate;
import com.qlangtech.tis.extension.util.PluginExtraProps;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
Expand All @@ -39,6 +40,7 @@
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;

/**
* @author 百岁([email protected]
Expand Down Expand Up @@ -78,7 +80,7 @@ public <T> T convert(Class<T> type, Object value) {

private Boolean inputRequired;

public PluginExtraProps.Props extraProp ;
public PluginExtraProps.Props extraProp;

PropertyType(Field f, Class clazz, Type type, String displayName, FormField formField) {
this.f = f;
Expand Down Expand Up @@ -253,11 +255,34 @@ public Descriptor getItemTypeDescriptorOrDie() {
return d;
}

private Function<List<? extends Descriptor>, List<? extends Descriptor>> subDescFilter;

/**
* Returns all the descriptors that produce types assignable to the property type.
*/
public List<? extends Descriptor> getApplicableDescriptors() {
return TIS.get().getDescriptorList(clazz);


if (subDescFilter == null) {
String subDescEnumFilter = this.getExtraProps().getString(PluginExtraProps.KEY_ENUM_FILTER);
if (StringUtils.isNotEmpty(subDescEnumFilter)) {
String className = this.clazz.getSimpleName() + "_SubFilter";
String pkg = this.clazz.getPackage().getName();
String script = " package " + pkg + " ;\n"
+ "import java.util.function.Function;\n"
+ "import java.util.List;\n"
+ "import com.qlangtech.tis.extension.Descriptor;\n"
+ "class " + className + " implements Function<List<? extends Descriptor>,List<? extends Descriptor>> { \n"
+ " @Override \n"
+ " public List<? extends Descriptor> apply(List<? extends Descriptor> desc) {" + subDescEnumFilter + " }" + "}";

subDescFilter = GroovyShellEvaluate.createParamizerScript(this.clazz, className, script);
} else {
subDescFilter = (descs) -> descs;
}
}

return subDescFilter.apply(TIS.get().getDescriptorList(clazz));
}

/**
Expand Down
Loading

0 comments on commit c55ead5

Please sign in to comment.