diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 5b58ff2af56b3c..52075ae555b9a2 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -190,6 +190,7 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde _table(tdesc.mcTable.table), _access_key(tdesc.mcTable.access_key), _secret_key(tdesc.mcTable.secret_key), + _partition_spec(tdesc.mcTable.partition_spec), _public_access(tdesc.mcTable.public_access) {} MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 18d9bb62f3d178..cba1737c3e69a2 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -235,6 +235,7 @@ class MaxComputeTableDescriptor : public TableDescriptor { const std::string table() const { return _table; } const std::string access_key() const { return _access_key; } const std::string secret_key() const { return _secret_key; } + const std::string partition_spec() const { return _partition_spec; } const std::string public_access() const { return _public_access; } private: @@ -243,6 +244,7 @@ class MaxComputeTableDescriptor : public TableDescriptor { std::string _table; std::string _access_key; std::string _secret_key; + std::string _partition_spec; std::string _public_access; }; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index 9edd8bfc514b4d..34db6a1df4d484 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -64,6 +64,7 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des {"access_key", _table_desc->access_key()}, {"secret_key", _table_desc->secret_key()}, {"project", _table_desc->project()}, + {"partition_spec", _table_desc->partition_spec()}, {"table", _table_desc->table()}, {"public_access", _table_desc->public_access()}, {"start_offset", std::to_string(_range.start_offset)}, diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 8f9b903afdc716..5f4125ec4ed234 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -23,6 +23,7 @@ import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; +import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.data.ArrowRecordReader; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.type.TypeInfo; @@ -31,6 +32,7 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; import java.io.IOException; @@ -40,7 +42,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * MaxComputeJ JniScanner. BE will read data from the scanner object. @@ -49,16 +53,19 @@ public class MaxComputeJniScanner extends JniScanner { private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class); private static final String REGION = "region"; private static final String PROJECT = "project"; + private static final String PARTITION_SPEC = "partition_spec"; private static final String TABLE = "table"; private static final String ACCESS_KEY = "access_key"; private static final String SECRET_KEY = "secret_key"; private static final String START_OFFSET = "start_offset"; private static final String SPLIT_SIZE = "split_size"; private static final String PUBLIC_ACCESS = "public_access"; - private static final Map tableScans = new ConcurrentHashMap<>(); + private final Map tableScans = new ConcurrentHashMap<>(); private final String region; private final String project; private final String table; + private PartitionSpec partitionSpec; + private Set partitionColumns; private final MaxComputeTableScan curTableScan; private MaxComputeColumnValue columnValue; private long remainBatchRows = 0; @@ -76,7 +83,10 @@ public MaxComputeJniScanner(int batchSize, Map params) { table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'."); tableScans.putIfAbsent(tableUniqKey(), newTableScan(params)); curTableScan = tableScans.get(tableUniqKey()); - + String partitionSpec = params.get(PARTITION_SPEC); + if (StringUtils.isNotEmpty(partitionSpec)) { + this.partitionSpec = new PartitionSpec(partitionSpec); + } String[] requiredFields = params.get("required_fields").split(","); String[] types = params.get("columns_types").split("#"); ColumnType[] columnTypes = new ColumnType[types.length]; @@ -124,6 +134,7 @@ protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields } // reorder columns List columnList = curTableScan.getSchema().getColumns(); + columnList.addAll(curTableScan.getSchema().getPartitionColumns()); Map columnRank = new HashMap<>(); for (int i = 0; i < columnList.size(); i++) { columnRank.put(columnList.get(i).getName(), i); @@ -139,13 +150,23 @@ public void open() throws IOException { return; } try { - TableTunnel.DownloadSession session = curTableScan.getSession(); + TableTunnel.DownloadSession session; + if (partitionSpec != null) { + session = curTableScan.openDownLoadSession(partitionSpec); + } else { + session = curTableScan.openDownLoadSession(); + } long start = startOffset == -1L ? 0 : startOffset; long recordCount = session.getRecordCount(); totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount; arrowAllocator = new RootAllocator(Long.MAX_VALUE); - curReader = session.openArrowRecordReader(start, totalRows, readColumns, arrowAllocator); + partitionColumns = session.getSchema().getPartitionColumns().stream() + .map(Column::getName) + .collect(Collectors.toSet()); + List maxComputeColumns = new ArrayList<>(readColumns); + maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName())); + curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator); } catch (Exception e) { close(); throw new IOException(e); @@ -252,6 +273,16 @@ private int readVectors(int expectedRows) throws IOException { appendData(readColumnsToId.get(column.getName()), columnValue); } } + if (partitionSpec != null) { + for (String partitionColumn : partitionColumns) { + String partitionValue = partitionSpec.get(partitionColumn); + Integer readColumnId = readColumnsToId.get(partitionColumn); + if (readColumnId != null && partitionValue != null) { + MaxComputePartitionValue value = new MaxComputePartitionValue(partitionValue); + appendData(readColumnId, value); + } + } + } curReadRows += batchRows; } finally { batch.close(); diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java new file mode 100644 index 00000000000000..cb76447e589622 --- /dev/null +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.maxcompute; + +import org.apache.doris.common.jni.vec.ColumnValue; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; + +/** + * MaxCompute Column value in vector column + */ +public class MaxComputePartitionValue implements ColumnValue { + private String partitionValue; + + public MaxComputePartitionValue(String partitionValue) { + reset(partitionValue); + } + + public void reset(String partitionValue) { + this.partitionValue = partitionValue; + } + + @Override + public boolean canGetStringAsBytes() { + return false; + } + + @Override + public boolean isNull() { + return false; + } + + @Override + public boolean getBoolean() { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort() { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt() { + return Integer.parseInt(partitionValue); + } + + @Override + public float getFloat() { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong() { + return Long.parseLong(partitionValue); + } + + @Override + public double getDouble() { + throw new UnsupportedOperationException(); + } + + @Override + public BigInteger getBigInteger() { + return BigInteger.valueOf(getLong()); + } + + @Override + public BigDecimal getDecimal() { + return BigDecimal.valueOf(getDouble()); + } + + @Override + public String getString() { + return partitionValue; + } + + @Override + public byte[] getStringAsBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public LocalDate getDate() { + throw new UnsupportedOperationException(); + } + + @Override + public LocalDateTime getDateTime() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBytes() { + return partitionValue.getBytes(StandardCharsets.UTF_8); + } + + @Override + public void unpackArray(List values) { + throw new UnsupportedOperationException(); + } + + @Override + public void unpackMap(List keys, List values) { + throw new UnsupportedOperationException(); + } + + @Override + public void unpackStruct(List structFieldIndex, List values) { + throw new UnsupportedOperationException(); + } +} diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java index da67196a3a2f57..c0fa40dae46ae3 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java @@ -18,6 +18,7 @@ package org.apache.doris.maxcompute; import com.aliyun.odps.Odps; +import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.TableSchema; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.tunnel.TableTunnel; @@ -35,8 +36,8 @@ public class MaxComputeTableScan { private final TableTunnel tunnel; private final String project; private final String table; - private volatile TableTunnel.DownloadSession tableSession; private volatile long readRows = 0; + private long totalRows = 0; public MaxComputeTableScan(String region, String project, String table, String accessKey, String secretKey, boolean enablePublicAccess) { @@ -59,13 +60,24 @@ public TableSchema getSchema() { return odps.tables().get(table).getSchema(); } - public synchronized TableTunnel.DownloadSession getSession() throws IOException { - if (tableSession == null) { - try { - tableSession = tunnel.createDownloadSession(project, table); - } catch (TunnelException e) { - throw new IOException(e); - } + public TableTunnel.DownloadSession openDownLoadSession() throws IOException { + TableTunnel.DownloadSession tableSession; + try { + tableSession = tunnel.getDownloadSession(project, table, null); + totalRows = tableSession.getRecordCount(); + } catch (TunnelException e) { + throw new IOException(e); + } + return tableSession; + } + + public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec partitionSpec) throws IOException { + TableTunnel.DownloadSession tableSession; + try { + tableSession = tunnel.getDownloadSession(project, table, partitionSpec, null); + totalRows = tableSession.getRecordCount(); + } catch (TunnelException e) { + throw new IOException(e); } return tableSession; } @@ -76,6 +88,6 @@ public synchronized void increaseReadRows(long rows) { } public boolean endOfScan() { - return readRows >= tableSession.getRecordCount(); + return readRows >= totalRows; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index 012693bccd6146..3c2f3bada03574 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -17,6 +17,11 @@ package org.apache.doris.catalog.external; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MapType; @@ -39,9 +44,16 @@ import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.VarcharTypeInfo; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.StringJoiner; /** * MaxCompute external table. @@ -49,6 +61,8 @@ public class MaxComputeExternalTable extends ExternalTable { private Table odpsTable; + private Set partitionKeys; + private String partitionSpec; public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); @@ -72,9 +86,74 @@ public List initSchema() { result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, true, field.getComment(), true, -1)); } + List partitionColumns = odpsTable.getSchema().getPartitionColumns(); + partitionKeys = new HashSet<>(); + for (com.aliyun.odps.Column partColumn : partitionColumns) { + result.add(new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null, + true, partColumn.getComment(), true, -1)); + partitionKeys.add(partColumn.getName()); + } return result; } + public Optional getPartitionSpec(List conjuncts) { + if (!partitionKeys.isEmpty()) { + if (conjuncts.isEmpty()) { + throw new IllegalArgumentException("Max Compute partition table need partition predicate."); + } + // recreate partitionSpec when conjuncts is changed. + List partitionConjuncts = parsePartitionConjuncts(conjuncts, partitionKeys); + StringJoiner partitionSpec = new StringJoiner(","); + partitionConjuncts.forEach(partitionSpec::add); + this.partitionSpec = partitionSpec.toString(); + return Optional.of(this.partitionSpec); + } + return Optional.empty(); + } + + private static List parsePartitionConjuncts(List conjuncts, Set partitionKeys) { + List partitionConjuncts = new ArrayList<>(); + Set predicates = Sets.newHashSet(); + for (Expr conjunct : conjuncts) { + // collect depart predicate + conjunct.collect(BinaryPredicate.class, predicates); + conjunct.collect(InPredicate.class, predicates); + } + Map slotToConjuncts = new HashMap<>(); + for (Predicate predicate : predicates) { + List slotRefs = new ArrayList<>(); + if (predicate instanceof BinaryPredicate) { + if (((BinaryPredicate) predicate).getOp() != BinaryPredicate.Operator.EQ) { + // max compute only support the EQ operator: pt='pt-value' + continue; + } + // BinaryPredicate has one left slotRef, and partition value not slotRef + predicate.collect(SlotRef.class, slotRefs); + slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate); + } else if (predicate instanceof InPredicate) { + predicate.collect(SlotRef.class, slotRefs); + slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate); + } + } + for (String partitionKey : partitionKeys) { + Predicate partitionPredicate = slotToConjuncts.get(partitionKey); + if (partitionPredicate == null) { + continue; + } + if (partitionPredicate instanceof InPredicate) { + List inList = ((InPredicate) partitionPredicate).getListChildren(); + for (Expr expr : inList) { + String partitionConjunct = partitionKey + "=" + expr.toSql(); + partitionConjuncts.add(partitionConjunct.replace("`", "")); + } + } else { + String partitionConjunct = partitionPredicate.toSql(); + partitionConjuncts.add(partitionConjunct.replace("`", "")); + } + } + return partitionConjuncts; + } + private Type mcTypeToDorisType(TypeInfo typeInfo) { OdpsType odpsType = typeInfo.getOdpsType(); switch (odpsType) { @@ -166,6 +245,7 @@ public TTableDescriptor toThrift() { tMcTable.setRegion(mcCatalog.getRegion()); tMcTable.setAccessKey(mcCatalog.getAccessKey()); tMcTable.setSecretKey(mcCatalog.getSecretKey()); + tMcTable.setPartitionSpec(this.partitionSpec); tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess())); // use mc project as dbName tMcTable.setProject(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java index 5c5e4ded0c11f1..0cd99678baded3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -24,6 +24,7 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; +import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.tunnel.TableTunnel; @@ -35,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; public class MaxComputeExternalCatalog extends ExternalCatalog { private Odps odps; @@ -93,15 +95,21 @@ protected void initLocalObjectsImpl() { odps.setDefaultProject(defaultProject); } - public long getTotalRows(String project, String table) throws TunnelException { + public long getTotalRows(String project, String table, Optional partitionSpec) throws TunnelException { makeSureInitialized(); TableTunnel tunnel = new TableTunnel(odps); String tunnelUrl = tunnelUrlTemplate.replace("{}", region); if (enablePublicAccess) { tunnelUrl = tunnelUrl.replace("-inc", ""); } + TableTunnel.DownloadSession downloadSession; tunnel.setEndpoint(tunnelUrl); - return tunnel.createDownloadSession(project, table).getRecordCount(); + if (!partitionSpec.isPresent()) { + downloadSession = tunnel.getDownloadSession(project, table, null); + } else { + downloadSession = tunnel.getDownloadSession(project, table, new PartitionSpec(partitionSpec.get()), null); + } + return downloadSession.getRecordCount(); } public Odps getClient() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java index 1030a67a30ad70..d7f8d599a61555 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class MaxComputeScanNode extends FileQueryScanNode { @@ -96,7 +97,8 @@ protected List getSplits() throws UserException { } try { List> sliceRange = new ArrayList<>(); - long totalRows = catalog.getTotalRows(table.getDbName(), table.getName()); + Optional partitionSpec = table.getPartitionSpec(conjuncts); + long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec); long fileNum = odpsTable.getFileNum(); long start = 0; long splitSize = (long) Math.ceil((double) totalRows / fileNum); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index a5ae774bd4ce6e..5176720071321d 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -331,6 +331,7 @@ struct TMCTable { 4: optional string access_key 5: optional string secret_key 6: optional string public_access + 7: optional string partition_spec } // "Union" of all table types. diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 81792a08a14376..ea3cab9355c7be 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -168,6 +168,10 @@ extEsPort = 9200 extEsUser = "*******" extEsPassword = "***********" +enableMaxComputeTest=false +aliYunAk="***********" +aliYunSk="***********" + s3Endpoint = "cos.ap-hongkong.myqcloud.com" s3BucketName = "doris-build-hk-1308700295" s3Region = "ap-hongkong" diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out new file mode 100644 index 00000000000000..5fc7ade4894964 --- /dev/null +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -0,0 +1,24 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q1 -- +8639377 + +-- !q2 -- +1 2 2000-08-15 2000-08-16 t 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22.22 23.23 + +-- !q3 -- +false 2 44 423432 +true 77 8920 182239402452 + +-- !q4 -- +6223 maxam 2020-09-21 +9601 qewtoll 2020-09-21 + +-- !q5 -- +1633 siwtow 2021-08-21 + +-- !q6 -- +9601 qewtoll 2020-09-21 + +-- !q7 -- +6223 maxam 2020-09-21 +9601 qewtoll 2020-09-21 diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy new file mode 100644 index 00000000000000..6b050e277a8927 --- /dev/null +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remote,external_remote_maxcompute") { + String enabled = context.config.otherConfigs.get("enableMaxComputeTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String ak = context.config.otherConfigs.get("aliYunAk") + String sk = context.config.otherConfigs.get("aliYunSk"); + String mc_db = "jz_datalake" + String mc_catalog_name = "test_external_mc_catalog" + + sql """drop catalog if exists ${mc_catalog_name};""" + sql """ + create catalog if not exists ${mc_catalog_name} properties ( + "type" = "max_compute", + "mc.region" = "cn-beijing", + "mc.default.project" = "${mc_db}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.public_access" = "true" + ); + """ + + // query data test + def q01 = { + qt_q1 """ select count(*) from store_sales """ + } + // data type test + def q02 = { + qt_q2 """ select * from web_site where web_site_id=2 order by web_site_id """ // test char,date,varchar,double,decimal + qt_q3 """ select * from int_types order by mc_boolean limit 2 """ // test bool,tinyint,int,bigint + } + // test partition table filter + def q03 = { + qt_q4 """ select * from mc_parts where dt = '2020-09-21' """ + qt_q5 """ select * from mc_parts where dt = '2021-08-21' """ + qt_q6 """ select * from mc_parts where dt = '2020-09-21' and mc_bigint > 6223 """ + qt_q7 """ select * from mc_parts where dt = '2020-09-21' or mc_bigint > 0 """ + } + sql """ switch `${mc_catalog_name}`; """ + sql """ use `${mc_db}`; """ + q01() + q02() + q03() + } +}