Skip to content

Commit

Permalink
Merge branch 'master' into dev.parquet_vec
Browse files Browse the repository at this point in the history
  • Loading branch information
yinzhijian authored May 12, 2022
2 parents 80f1e0d + d7705ac commit 8712bc9
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 46 deletions.
7 changes: 7 additions & 0 deletions fe/check/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ under the License.
<property name="fileNamePattern" value="module\-info\.java$"/>
</module>

<module name="SuppressionFilter">
<property name="file" value="fe/check/checkstyle/suppressions.xml"/>
<property name="optional" value="true"/>
</module>

<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
Expand Down Expand Up @@ -171,6 +176,7 @@ under the License.
</module>
<module name="InvalidJavadocPosition"/>
<module name="JavadocMethod">
<property name="id" value="ProductionScope"/>
<property name="accessModifiers" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
Expand All @@ -179,6 +185,7 @@ under the License.
</module>

<module name="JavadocParagraph"/>
<module name="JavadocStyle"/>
<module name="JavadocTagContinuationIndentation"/>
<module name="MissingJavadocMethod">
<property name="scope" value="public"/>
Expand Down
30 changes: 30 additions & 0 deletions fe/check/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
"https://checkstyle.org/dtds/suppressions_1_2.dtd">

<suppressions>
<!-- Excludes test files from having Javadocs for classes and methods -->
<suppress files="[\\/]jmockit[\\/]" checks=".*" />
<suppress files="[\\/]test[\\/]" checks="MissingJavadocMethod" />
<suppress files="[\\/]test[\\/]" checks="MissingJavadocType" />
</suppressions>
8 changes: 6 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,12 @@ private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
// make column name case match with real column name
String columnName = importColumnDesc.getColumnName();
Column tblColumn = tbl.getColumn(columnName);
String realColName = tblColumn == null ? columnName : tblColumn.getName();
String realColName;
if (tblColumn == null || importColumnDesc.getExpr() == null) {
realColName = columnName;
} else {
realColName = tblColumn.getName();
}
if (importColumnDesc.getExpr() != null) {
Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
exprsByName.put(realColName, expr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
Expand All @@ -41,6 +42,7 @@
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -121,53 +123,69 @@ public void beginTxn(long batchId) throws UserException, TException, TimeoutExce
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr();
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
TStreamLoadPutRequest request = null;
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
TStreamLoadPutRequest request = null;
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
String authCodeUuid = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
db.getId(), txnId).getAuthCode();
request = new TStreamLoadPutRequest()
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
.setColumns(targetColumn);
txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
txnEntry.setLabel(label);
txnExecutor.setTxnId(txnId);
} catch (DuplicatedRequestException e) {
LOG.warn("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
txnEntry.setLabel(label);
txnExecutor.setTxnId (txnId);
} catch (DuplicatedRequestException e) {
LOG.warn ("duplicate request for sync channel. channel: {}, request id: {}, txn: {}, table: {}",
id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
txnExecutor.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
// this happens when channel re-consume same batch, we should just pass through it without begin a new txn
LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}", id, label, targetTable, batchId);
return;
} catch (AnalysisException | BeginTransactionException e) {
LOG.warn("encounter an error when beginning txn in channel {}, table: {}", id, targetTable);
throw e;
} catch (UserException e) {
LOG.warn("encounter an error when creating plan in channel {}, table: {}", id, targetTable);
throw e;
}
try {
// async exec begin transaction
long txnId = txnExecutor.getTxnId();
if (txnId != -1L) {
this.txnExecutor.beginTransaction(request);
LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
txnExecutor.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
// this happens when channel re-consume same batch,
// we should just pass through it without begin a new txn
LOG.warn ("Label already used in channel {}, label: {}, table: {}, batch: {}",
id, label, targetTable, batchId);
return;
} catch (AnalysisException | BeginTransactionException e) {
LOG.warn ("encounter an error when beginning txn in channel {}, table: {}",
id, targetTable);
throw e;
} catch (UserException e) {
LOG.warn ("encounter an error when creating plan in channel {}, table: {}",
id, targetTable);
throw e;
}
} catch (TException e) {
LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
try {
// async exec begin transaction
long txnId = txnExecutor.getTxnId();
if ( txnId != - 1L ) {
this.txnExecutor.beginTransaction (request);
LOG.info ("begin txn in channel {}, table: {}, label:{}, txn id: {}",
id, targetTable, label, txnExecutor.getTxnId());
}
} catch ( TException e) {
LOG.warn ("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch ( TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn ("Error occur while waiting begin txn response in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
throw e;
}
} else {
String failMsg = "current running txns on db " + db.getId() + " is "
+ databaseTransactionMgr.getRunningTxnNums() + ", larger than limit " + Config.max_running_txn_num_per_db;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ private void initParams(ParamCreateContext context)
*/
private void initColumns(ParamCreateContext context) throws UserException {
context.srcTupleDescriptor = analyzer.getDescTbl().createTupleDescriptor();
context.slotDescByName = Maps.newHashMap();
context.exprMap = Maps.newHashMap();
context.slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

// for load job, column exprs is got from file group
// for query, there is no column exprs, they will be got from table's schema in "Load.initColumns"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public class StreamLoadScanNode extends LoadScanNode {
private TupleDescriptor srcTupleDesc;
private TBrokerScanRange brokerScanRange;

private Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap();
private Map<String, Expr> exprsByName = Maps.newHashMap();
// If use case sensitive map, for example,
// the column name 「A」 in the table and the mapping '(a) set (A = a)' in load sql,
// Slotdescbyname stores「a」, later will use 「a」to get table's 「A」 column info, will throw exception.
private final Map<String, SlotDescriptor> slotDescByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
private final Map<String, Expr> exprsByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

// used to construct for streaming loading
public StreamLoadScanNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected Set<Long> unprotectedGetTxnIdsByLabel(String label) {
return labelToTxnIds.get(label);
}

protected int getRunningTxnNums() {
public int getRunningTxnNums() {
return runningTxnNums;
}

Expand Down
2 changes: 1 addition & 1 deletion fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ under the License.
</dependencies>
<configuration>
<configLocation>check/checkstyle/checkstyle.xml</configLocation>
<suppressionsLocation>check/checkstyle/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
<excludes>**/jmockit/**/*</excludes>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
Expand Down

0 comments on commit 8712bc9

Please sign in to comment.