Skip to content

Commit

Permalink
date or datetime col has the possiblity to make as one of the primary…
Browse files Browse the repository at this point in the history
… keys, to make the pararm nullable as the constructor of DataType datavane/tis#176
  • Loading branch information
baisui1981 committed Dec 17, 2022
1 parent 8f73001 commit c40c554
Show file tree
Hide file tree
Showing 47 changed files with 1,806 additions and 969 deletions.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<module>tis-plugin-test/tis-testcontainer-mysql</module>
<module>tis-plugin-test/tis-testcontainer-postgresql</module>
<module>tis-plugin-test/tis-testcontainer-oracle</module>
<module>tis-plugin-test/tis-testcontainer-starrocks</module>

<!-- <module>tis-solr-plugin</module>
<module>tis-local-dump-build</module>-->
Expand Down Expand Up @@ -167,6 +168,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.qlangtech.tis.plugins</groupId>
<artifactId>tis-testcontainer-starrocks</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.qlangtech.tis.plugins</groupId>
<artifactId>tis-scala-compiler</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions tis-datax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<module>tis-datax-oracle-plugin</module>
<module>tis-datax-doris-starrocks-plugin</module>
<module>tis-ds-tidb-plugin</module>
<module>tis-hive-plugin</module>
<module>tis-hive-flat-table-builder-plugin</module>
<module>tis-ds-mysql-plugin</module>
<module>tis-ds-mysql-v5-plugin</module>
Expand All @@ -70,6 +71,7 @@
<module>tis-datax-hudi-common</module>
<module>tis-datax-local-embedded-executor</module>


</modules>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ private void processSession(ISessionVisit sessionVisit) {

}

@Override
public void visitFirstConnection(IConnProcessor connProcessor) {
throw new UnsupportedOperationException();
}

@Override
public void refectTableInDB(List<String> tabs, Connection conn) throws SQLException {
throw new UnsupportedOperationException();
}

interface ISessionVisit {
void visit(Session session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import com.qlangtech.tis.web.start.TisAppLaunch;
import com.qlangtech.tis.web.start.TisSubModule;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -58,8 +61,6 @@
* @create: 2021-06-06 19:48
**/
public abstract class BasicDataSourceFactory extends DataSourceFactory implements JdbcUrlBuilder {
//


private static final Logger logger = LoggerFactory.getLogger(BasicDataSourceFactory.class);

Expand Down Expand Up @@ -181,34 +182,7 @@ private final void visitConnection(final IConnProcessor connProcessor, final boo
}
}


@Override
public final List<String> getTablesInDB() {
try {
final List<String> tabs = new ArrayList<>();

this.visitFirstConnection((conn) -> {
refectTableInDB(tabs, conn);
});

// final DBConfig dbConfig = getDbConfig();
// dbConfig.vistDbName((config, ip, databaseName) -> {
// visitConnection(config, ip, databaseName, (conn) -> {
// refectTableInDB(tabs, conn);
// });
// return true;
// });
return tabs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Connection getConnection(String jdbcUrl) throws SQLException {
return super.getConnection(jdbcUrl);
}

public void refectTableInDB(List<String> tabs, Connection conn) throws SQLException {
Statement statement = null;
ResultSet resultSet = null;
Expand All @@ -229,17 +203,41 @@ public void refectTableInDB(List<String> tabs, Connection conn) throws SQLExcept
}
}

@Override
public List<String> getTablesInDB() {
try {
final List<String> tabs = new ArrayList<>();

this.visitFirstConnection((conn) -> {
refectTableInDB(tabs, conn);
});
return tabs;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Connection getConnection(String jdbcUrl) throws SQLException {
return super.getConnection(jdbcUrl);
}


protected String getRefectTablesSql() {
return "show tables";
}

public final DBConfig getDbConfig() {
final DBConfig dbConfig = new DBConfig(this);
dbConfig.setName(this.getDbName());
dbConfig.setDbEnum(DBConfigParser.parseDBEnum(getDbName(), this.nodeDesc));
dbConfig.setDbEnum(DBConfigParser.parseDBEnum(getDbName(), getNodeDesc()));
return dbConfig;
}

protected String getNodeDesc() {
return this.nodeDesc;
}


public List<String> getJdbcUrls() {
final DBConfig dbLinkMetaData = this.getDbConfig();
Expand Down Expand Up @@ -306,15 +304,17 @@ public abstract static class BasicRdbmsDataSourceFactoryDescriptor

static {
try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080/next");
ClientConfig clientConfig = new ClientConfig(
"http://127.0.0.1:" + TisAppLaunch.getPort(TisSubModule.ZEPPELIN) + "/next");
zeppelinClient = new ZeppelinClient(clientConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public String createOrGetNotebook(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) throws Exception {
public String createOrGetNotebook(
IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) throws Exception {

BasicDataSourceFactory dsFactory = postFormVals.newInstance(this, msgHandler);
String idVal = dsFactory.identityValue();
Expand All @@ -328,7 +328,13 @@ public String createOrGetNotebook(IControlMsgHandler msgHandler, Context context

File interpreterCfg = new File(com.qlangtech.tis.TIS.pluginCfgRoot, "interpreter/" + idVal);
if (!interpreterCfg.exists()) {

List<String> jdbcUrls = dsFactory.getJdbcUrls();
if (CollectionUtils.isEmpty(jdbcUrls)) {
throw new IllegalStateException("dataSource:" + idVal + " relevant jdbcUrl can not be empty");
}
FileUtils.write(interpreterCfg, zeppelinClient.createJDBCInterpreter(idVal), TisUTF8.get(), false);

}

notebookId = zeppelinClient.createNoteWithParagraph("/tis/" + idVal, ZeppelinClient.getInterpreterName(idVal));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,9 @@ public boolean validateLoadProps(IFieldErrorHandler msgHandler, Context context,
}


protected abstract String getRowDelimiterKey() ;

protected abstract String getColSeparatorKey() ;
protected abstract String getRowDelimiterKey();

protected abstract String getColSeparatorKey();


// @Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class DataXElasticsearchWriter extends DataxWriter implements IDataxConte


public ElasticEndpoint getToken() {
ElasticEndpoint aliyunToken = ParamsConfig.getItem(endpoint, ElasticEndpoint.KEY_DISPLAY_NAME);
ElasticEndpoint aliyunToken = ParamsConfig.getItem(endpoint, ElasticEndpoint.KEY_ELASTIC_SEARCH_DISPLAY_NAME);
return aliyunToken;
}

Expand Down Expand Up @@ -434,7 +434,7 @@ public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
public static class DefaultDescriptor extends BaseDataxWriterDescriptor {
public DefaultDescriptor() {
super();
registerSelectOptions(FIELD_ENDPOINT, () -> ParamsConfig.getItems(ElasticEndpoint.KEY_DISPLAY_NAME));
registerSelectOptions(FIELD_ENDPOINT, () -> ParamsConfig.getItems(ElasticEndpoint.KEY_ELASTIC_SEARCH_DISPLAY_NAME));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,26 @@
import com.alibaba.datax.plugin.writer.elasticsearchwriter.ESInitialization;
import com.qlangtech.tis.config.ParamsConfig;
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.extension.INotebookable;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.manage.common.TisUTF8;
import com.qlangtech.tis.plugin.AuthToken;
import com.qlangtech.tis.plugin.HttpEndpoint;
import com.qlangtech.tis.plugin.aliyun.AccessKey;
import com.qlangtech.tis.plugin.aliyun.NoneToken;
import com.qlangtech.tis.plugin.aliyun.UsernamePassword;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.web.start.TisAppLaunch;
import com.qlangtech.tis.web.start.TisSubModule;
import io.searchbox.client.JestResult;
import io.searchbox.cluster.Health;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.ZeppelinClient;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -42,7 +52,6 @@
**/
public class ElasticEndpoint extends HttpEndpoint {

public static final String KEY_DISPLAY_NAME = "elasticToken";

public final ESInitialization createESInitialization() {
UsernamePassword auth = this.accept(new AuthToken.Visitor<UsernamePassword>() {
Expand Down Expand Up @@ -74,7 +83,86 @@ public static List<? extends Descriptor> filter(List<? extends Descriptor> descs
}

@TISExtension()
public static class DefaultDescriptor extends Descriptor<ParamsConfig> {
public static class DefaultDescriptor extends Descriptor<ParamsConfig> implements INotebookable {

// copy from org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter
public static final String ELASTICSEARCH_HOST = "elasticsearch.host";
public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name";
public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size";
public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username";
public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password";
static final ZeppelinClient zeppelinClient;

static {
try {
ClientConfig clientConfig = new ClientConfig(
"http://127.0.0.1:" + TisAppLaunch.getPort(TisSubModule.ZEPPELIN) + "/next");
zeppelinClient = new ZeppelinClient(clientConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* see org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter
*
* @param msgHandler
* @param context
* @param postFormVals
* @return
* @throws Exception
*/
@Override
public String createOrGetNotebook(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) throws Exception {

ParamsConfig cfg = postFormVals.newInstance(this, msgHandler);
ElasticEndpoint endpoint = (ElasticEndpoint) cfg;
String idVal = cfg.identityValue();
String notebookId = null;
File notebookDir = this.getConfigFile().getFile().getParentFile();
File notebookToken = new File(notebookDir, "notebook_" + idVal);
if (notebookToken.exists()) {
notebookId = FileUtils.readFileToString(notebookToken, TisUTF8.get());
return notebookId;
}

File interpreterCfg = new File(com.qlangtech.tis.TIS.pluginCfgRoot, "interpreter/" + idVal);
if (!interpreterCfg.exists()) {

URL endpointURL = new URL(endpoint.getEndpoint());
zeppelinClient.deleteInterpreter(idVal);
FileUtils.write(interpreterCfg
, zeppelinClient.createInterpreter(idVal, InterpreterGroup.ELASTIC_GROUP, (props) -> {
props.add(ELASTICSEARCH_HOST, endpointURL.getHost());
props.add(ELASTICSEARCH_PORT, String.valueOf(endpointURL.getPort()));
props.add(ELASTICSEARCH_CLIENT_TYPE, "http");
endpoint.accept(new AuthToken.Visitor<Void>() {
@Override
public Void visit(NoneToken noneToken) {
return null;
}
@Override
public Void visit(AccessKey accessKey) {
return null;
}
@Override
public Void visit(UsernamePassword token) {
props.add(ELASTICSEARCH_BASIC_AUTH_USERNAME, token.userName);
props.add(ELASTICSEARCH_BASIC_AUTH_PASSWORD, token.password);
return null;
}
});

}), TisUTF8.get(), false);

}

notebookId = zeppelinClient.createNoteWithParagraph("/tis/" + idVal, ZeppelinClient.getInterpreterName(idVal));
FileUtils.write(notebookToken, notebookId, TisUTF8.get(), false);
return notebookId;
}

@Override
protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) {
Expand All @@ -100,7 +188,7 @@ protected boolean verify(IControlMsgHandler msgHandler, Context context, PostFor

@Override
public String getDisplayName() {
return KEY_DISPLAY_NAME;
return KEY_ELASTIC_SEARCH_DISPLAY_NAME;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import org.apache.commons.lang.StringUtils;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -149,6 +150,15 @@ public List<ColumnMetaData> getTableMetadata(EntityName table) {
throw new UnsupportedOperationException();
}

@Override
public void visitFirstConnection(IConnProcessor connProcessor) {
throw new UnsupportedOperationException();
}

@Override
public void refectTableInDB(List<String> tabs, Connection conn) throws SQLException {
throw new UnsupportedOperationException();
}

private MongoClient createMongoClient() {
MongoClient mongoClient = null;
Expand Down
Loading

0 comments on commit c40c554

Please sign in to comment.