Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clickhouse tables query #1331

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,31 @@ public class ClickHouseMetaData extends DefaultMetaService implements MetaData {
= "SELECT TRIGGER_NAME FROM INFORMATION_SCHEMA.TRIGGERS where TRIGGER_SCHEMA = '%s';";
private static String SELECT_TABLE_COLUMNS = "select * from `system`.columns where table ='%s' and database='%s';";
private static String VIEW_SQL
= "SELECT create_table_query from system.`tables` WHERE `database`='%s' and name='%s'";
= "SELECT create_table_query from system.`tables` WHERE `database`='%s' and name='%s' and engine='View'";
private List<String> systemDatabases = Arrays.asList("information_schema", "system");
public static final String FUNCTION_SQL = "SELECT name,create_query as ddl from system.functions where origin='SQLUserDefined'";
private static String FUNCTION_SQL = "SELECT name,create_query as ddl from system.functions where name='%s'";
private static String FUNCTIONS_SQL = "SELECT name from system.functions where origin='%s'";

public static String format(String tableName) {
return "`" + tableName + "`";
}
@Override
public List<Table> tables(Connection connection, String databaseName, String schemaName, String tableName) {
return SQLExecutor.getInstance().tables(connection, databaseName,
schemaName, tableName,
new String[]{"TABLE", "SYSTEM TABLE",
"REMOTE TABLE","DICTIONARY"});
}

@Override
public List<Function> functions(Connection connection, String databaseName, String schemaName) {
return SQLExecutor.getInstance().execute(connection, FUNCTION_SQL, resultSet -> {
String sql ;
if (systemDatabases.contains(schemaName)) {
sql=String.format(FUNCTIONS_SQL,"System");
}else {
sql = String.format(FUNCTIONS_SQL, "SQLUserDefined");
}
return SQLExecutor.getInstance().execute(connection, sql, resultSet -> {
List<Function> functions = new ArrayList<>();
while (resultSet.next()) {
Function function = new Function();
Expand Down Expand Up @@ -81,11 +95,10 @@ public List<Database> databases(Connection connection) {
@Override
public String tableDDL(Connection connection, @NotEmpty String databaseName, String schemaName,
@NotEmpty String tableName) {
String sql = "SHOW CREATE TABLE " + format(databaseName) + "."
+ format(tableName);
String sql =String.format("show create table %s.%s", schemaName, tableName);
return SQLExecutor.getInstance().execute(connection, sql, resultSet -> {
if (resultSet.next()) {
return resultSet.getString("Create Table");
return resultSet.getString("statement");
}
return null;
});
Expand All @@ -94,7 +107,7 @@ public String tableDDL(Connection connection, @NotEmpty String databaseName, Str
@Override
public Function function(Connection connection, @NotEmpty String databaseName, String schemaName,
String functionName) {
return SQLExecutor.getInstance().execute(connection, FUNCTION_SQL, resultSet -> {
return SQLExecutor.getInstance().execute(connection, String.format(FUNCTION_SQL, functionName), resultSet -> {
Function function = new Function();
function.setDatabaseName(databaseName);
function.setSchemaName(schemaName);
Expand Down Expand Up @@ -218,7 +231,7 @@ private void setColumnSize(TableColumn column, String columnType) {

@Override
public Table view(Connection connection, String databaseName, String schemaName, String viewName) {
String sql = String.format(VIEW_SQL, databaseName, viewName);
String sql = String.format(VIEW_SQL, schemaName, viewName);
return SQLExecutor.getInstance().execute(connection, sql, resultSet -> {
Table table = new Table();
table.setDatabaseName(databaseName);
Expand Down Expand Up @@ -299,7 +312,6 @@ public TableMeta getTableMeta(String databaseName, String schemaName, String tab
@Override
public String getMetaDataName(String... names) {
return Arrays.stream(names)
.skip(1) // 跳过第一个名称
.filter(StringUtils::isNotBlank)
.map(name -> "`" + name + "`")
.collect(Collectors.joining("."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ public List<Schema> schemas(Connection connection, String databaseName) {
});
return SortUtils.sortSchema(schemas, systemSchemas);
}

@Override
public List<Table> tables(Connection connection, String databaseName, String schemaName, String tableName) {
return SQLExecutor.getInstance().tables(connection, StringUtils.isEmpty(databaseName) ? null : databaseName,
StringUtils.isEmpty(schemaName) ? null : schemaName, tableName,
new String[]{"TABLE", "SYSTEM TABLE","PARTITIONED TABLE"});
}

private static final String SELECT_TABLE_INDEX = "SELECT tmp.INDISPRIMARY AS Index_primary, tmp.TABLE_SCHEM, tmp.TABLE_NAME, tmp.NON_UNIQUE, tmp.INDEX_QUALIFIER, tmp.INDEX_NAME AS Key_name, tmp.indisclustered, tmp.ORDINAL_POSITION AS Seq_in_index, TRIM ( BOTH '\"' FROM pg_get_indexdef ( tmp.CI_OID, tmp.ORDINAL_POSITION, FALSE ) ) AS Column_name,CASE tmp.AM_NAME WHEN 'btree' THEN CASE tmp.I_INDOPTION [ tmp.ORDINAL_POSITION - 1 ] & 1 :: SMALLINT WHEN 1 THEN 'D' ELSE'A' END ELSE NULL END AS Collation, tmp.CARDINALITY, tmp.PAGES, tmp.FILTER_CONDITION , tmp.AM_NAME AS Index_method, tmp.DESCRIPTION AS Index_comment FROM ( SELECT n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, NOT i.indisunique AS NON_UNIQUE, NULL AS INDEX_QUALIFIER, ci.relname AS INDEX_NAME,i.INDISPRIMARY , i.indisclustered , ( information_schema._pg_expandarray ( i.indkey ) ).n AS ORDINAL_POSITION, ci.reltuples AS CARDINALITY, ci.relpages AS PAGES, pg_get_expr ( i.indpred, i.indrelid ) AS FILTER_CONDITION, ci.OID AS CI_OID, i.indoption AS I_INDOPTION, am.amname AS AM_NAME , d.description FROM pg_class ct JOIN pg_namespace n ON ( ct.relnamespace = n.OID ) JOIN pg_index i ON ( ct.OID = i.indrelid ) JOIN pg_class ci ON ( ci.OID = i.indexrelid ) JOIN pg_am am ON ( ci.relam = am.OID ) left outer join pg_description d on i.indexrelid = d.objoid WHERE n.nspname = '%s' AND ct.relname = '%s' ) AS tmp ;";
private static String ROUTINES_SQL = "SELECT p.proname, p.prokind, pg_catalog.pg_get_functiondef(p.oid) as \"code\" FROM pg_catalog.pg_proc p where p.prokind = '%s' and p.proname='%s'";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,29 +420,24 @@ public ListResult<SimpleTable> queryTables(TablePageQueryParam param) {

private long addDBCache(Long dataSourceId, String databaseName, String schemaName, long version) {
String key = getTableKey(dataSourceId, databaseName, schemaName);

Connection connection = Chat2DBContext.getConnection();
long n = 0;
try (ResultSet resultSet = connection.getMetaData().getTables(databaseName, schemaName, null,
new String[]{"TABLE", "SYSTEM TABLE"})) {
List<TableCacheDO> cacheDOS = new ArrayList<>();
while (resultSet.next()) {
TableCacheDO tableCacheDO = new TableCacheDO();
tableCacheDO.setDatabaseName(databaseName);
tableCacheDO.setSchemaName(schemaName);
tableCacheDO.setTableName(resultSet.getString("TABLE_NAME"));
tableCacheDO.setExtendInfo(resultSet.getString("REMARKS"));
tableCacheDO.setDataSourceId(dataSourceId);
tableCacheDO.setVersion(version);
tableCacheDO.setKey(key);
cacheDOS.add(tableCacheDO);
if (cacheDOS.size() >= 500) {
getTableCacheMapper().batchInsert(cacheDOS);
cacheDOS = new ArrayList<>();
}
n++;
}
if (!CollectionUtils.isEmpty(cacheDOS)) {
MetaData metaData = Chat2DBContext.getMetaData();
List<Table> tables = metaData.tables(connection, databaseName, schemaName, null);
if (tables.isEmpty()) {
return 0;
}
List<TableCacheDO> cacheDOS = tables.stream().map(table -> {
TableCacheDO tableCacheDO = new TableCacheDO();
tableCacheDO.setDatabaseName(databaseName);
tableCacheDO.setSchemaName(schemaName);
tableCacheDO.setTableName(table.getName());
tableCacheDO.setExtendInfo(table.getComment());
tableCacheDO.setDataSourceId(dataSourceId);
tableCacheDO.setVersion(version);
tableCacheDO.setKey(key);
return tableCacheDO;
}).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(cacheDOS)) {
getTableCacheMapper().batchInsert(cacheDOS);
}
LambdaQueryWrapper<TableCacheDO> q = new LambdaQueryWrapper();
Expand All @@ -455,10 +450,7 @@ private long addDBCache(Long dataSourceId, String databaseName, String schemaNam
q.eq(TableCacheDO::getSchemaName, schemaName);
}
getTableCacheMapper().delete(q);
} catch (SQLException e) {
throw new RuntimeException(e);
}
return n;
return tables.size();
}

private Long getLock(Long dataSourceId, String databaseName, String schemaName, TableCacheVersionDO versionDO) {
Expand Down