Skip to content

Commit

Permalink
[fix](multi-catalog)fix maxcompute partition filter and session creat…
Browse files Browse the repository at this point in the history
…ion (#24911)

add maxcompute partition support
fix maxcompute partition filter
modify maxcompute session create method
  • Loading branch information
wsjz authored Oct 17, 2023
1 parent ce18f11 commit 18c2a13
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 16 deletions.
1 change: 1 addition & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_table(tdesc.mcTable.table),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_partition_spec(tdesc.mcTable.partition_spec),
_public_access(tdesc.mcTable.public_access) {}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class MaxComputeTableDescriptor : public TableDescriptor {
const std::string table() const { return _table; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
const std::string partition_spec() const { return _partition_spec; }
const std::string public_access() const { return _public_access; }

private:
Expand All @@ -243,6 +244,7 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string _table;
std::string _access_key;
std::string _secret_key;
std::string _partition_spec;
std::string _public_access;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"project", _table_desc->project()},
{"partition_spec", _table_desc->partition_spec()},
{"table", _table_desc->table()},
{"public_access", _table_desc->public_access()},
{"start_offset", std::to_string(_range.start_offset)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.type.TypeInfo;
Expand All @@ -31,6 +32,7 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
Expand All @@ -40,7 +42,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* MaxComputeJ JniScanner. BE will read data from the scanner object.
Expand All @@ -49,16 +53,19 @@ public class MaxComputeJniScanner extends JniScanner {
private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class);
private static final String REGION = "region";
private static final String PROJECT = "project";
private static final String PARTITION_SPEC = "partition_spec";
private static final String TABLE = "table";
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
private static final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
private final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
private PartitionSpec partitionSpec;
private Set<String> partitionColumns;
private final MaxComputeTableScan curTableScan;
private MaxComputeColumnValue columnValue;
private long remainBatchRows = 0;
Expand All @@ -76,7 +83,10 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'.");
tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
curTableScan = tableScans.get(tableUniqKey());

String partitionSpec = params.get(PARTITION_SPEC);
if (StringUtils.isNotEmpty(partitionSpec)) {
this.partitionSpec = new PartitionSpec(partitionSpec);
}
String[] requiredFields = params.get("required_fields").split(",");
String[] types = params.get("columns_types").split("#");
ColumnType[] columnTypes = new ColumnType[types.length];
Expand Down Expand Up @@ -124,6 +134,7 @@ protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields
}
// reorder columns
List<Column> columnList = curTableScan.getSchema().getColumns();
columnList.addAll(curTableScan.getSchema().getPartitionColumns());
Map<String, Integer> columnRank = new HashMap<>();
for (int i = 0; i < columnList.size(); i++) {
columnRank.put(columnList.get(i).getName(), i);
Expand All @@ -139,13 +150,23 @@ public void open() throws IOException {
return;
}
try {
TableTunnel.DownloadSession session = curTableScan.getSession();
TableTunnel.DownloadSession session;
if (partitionSpec != null) {
session = curTableScan.openDownLoadSession(partitionSpec);
} else {
session = curTableScan.openDownLoadSession();
}
long start = startOffset == -1L ? 0 : startOffset;
long recordCount = session.getRecordCount();
totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : recordCount;

arrowAllocator = new RootAllocator(Long.MAX_VALUE);
curReader = session.openArrowRecordReader(start, totalRows, readColumns, arrowAllocator);
partitionColumns = session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
List<Column> maxComputeColumns = new ArrayList<>(readColumns);
maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName()));
curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator);
} catch (Exception e) {
close();
throw new IOException(e);
Expand Down Expand Up @@ -252,6 +273,16 @@ private int readVectors(int expectedRows) throws IOException {
appendData(readColumnsToId.get(column.getName()), columnValue);
}
}
if (partitionSpec != null) {
for (String partitionColumn : partitionColumns) {
String partitionValue = partitionSpec.get(partitionColumn);
Integer readColumnId = readColumnsToId.get(partitionColumn);
if (readColumnId != null && partitionValue != null) {
MaxComputePartitionValue value = new MaxComputePartitionValue(partitionValue);
appendData(readColumnId, value);
}
}
}
curReadRows += batchRows;
} finally {
batch.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.maxcompute;

import org.apache.doris.common.jni.vec.ColumnValue;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;

/**
* MaxCompute Column value in vector column
*/
public class MaxComputePartitionValue implements ColumnValue {
private String partitionValue;

public MaxComputePartitionValue(String partitionValue) {
reset(partitionValue);
}

public void reset(String partitionValue) {
this.partitionValue = partitionValue;
}

@Override
public boolean canGetStringAsBytes() {
return false;
}

@Override
public boolean isNull() {
return false;
}

@Override
public boolean getBoolean() {
throw new UnsupportedOperationException();
}

@Override
public byte getByte() {
throw new UnsupportedOperationException();
}

@Override
public short getShort() {
throw new UnsupportedOperationException();
}

@Override
public int getInt() {
return Integer.parseInt(partitionValue);
}

@Override
public float getFloat() {
throw new UnsupportedOperationException();
}

@Override
public long getLong() {
return Long.parseLong(partitionValue);
}

@Override
public double getDouble() {
throw new UnsupportedOperationException();
}

@Override
public BigInteger getBigInteger() {
return BigInteger.valueOf(getLong());
}

@Override
public BigDecimal getDecimal() {
return BigDecimal.valueOf(getDouble());
}

@Override
public String getString() {
return partitionValue;
}

@Override
public byte[] getStringAsBytes() {
throw new UnsupportedOperationException();
}

@Override
public LocalDate getDate() {
throw new UnsupportedOperationException();
}

@Override
public LocalDateTime getDateTime() {
throw new UnsupportedOperationException();
}

@Override
public byte[] getBytes() {
return partitionValue.getBytes(StandardCharsets.UTF_8);
}

@Override
public void unpackArray(List<ColumnValue> values) {
throw new UnsupportedOperationException();
}

@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
throw new UnsupportedOperationException();
}

@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.maxcompute;

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
Expand All @@ -35,8 +36,8 @@ public class MaxComputeTableScan {
private final TableTunnel tunnel;
private final String project;
private final String table;
private volatile TableTunnel.DownloadSession tableSession;
private volatile long readRows = 0;
private long totalRows = 0;

public MaxComputeTableScan(String region, String project, String table,
String accessKey, String secretKey, boolean enablePublicAccess) {
Expand All @@ -59,13 +60,24 @@ public TableSchema getSchema() {
return odps.tables().get(table).getSchema();
}

public synchronized TableTunnel.DownloadSession getSession() throws IOException {
if (tableSession == null) {
try {
tableSession = tunnel.createDownloadSession(project, table);
} catch (TunnelException e) {
throw new IOException(e);
}
public TableTunnel.DownloadSession openDownLoadSession() throws IOException {
TableTunnel.DownloadSession tableSession;
try {
tableSession = tunnel.getDownloadSession(project, table, null);
totalRows = tableSession.getRecordCount();
} catch (TunnelException e) {
throw new IOException(e);
}
return tableSession;
}

public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec partitionSpec) throws IOException {
TableTunnel.DownloadSession tableSession;
try {
tableSession = tunnel.getDownloadSession(project, table, partitionSpec, null);
totalRows = tableSession.getRecordCount();
} catch (TunnelException e) {
throw new IOException(e);
}
return tableSession;
}
Expand All @@ -76,6 +88,6 @@ public synchronized void increaseReadRows(long rows) {
}

public boolean endOfScan() {
return readRows >= tableSession.getRecordCount();
return readRows >= totalRows;
}
}
Loading

0 comments on commit 18c2a13

Please sign in to comment.