Skip to content

Commit

Permalink
[fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850)
Browse files Browse the repository at this point in the history
fix new coordinator compute a wrong scanRangeNum, introduced by #41730

This bug will show a wrong progress in s3 load:
```
Progress: 0.00%(73/2147483647)
```
  • Loading branch information
924060929 authored Nov 13, 2024
1 parent 6c2c36d commit 5a4f7c6
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
Expand All @@ -40,12 +44,18 @@
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryGlobals;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Suppliers;
Expand Down Expand Up @@ -418,9 +428,57 @@ private TNetworkAddress computeDirectConnectCoordinator() {

private int getScanRangeNum() {
int scanRangeNum = 0;
for (ScanNode scanNode : scanNodes) {
scanRangeNum += scanNode.getScanRangeNum();
for (PipelineDistributedPlan distributedPlan : distributedPlans) {
for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) {
ScanSource scanSource = instanceJob.getScanSource();
if (scanSource instanceof BucketScanSource) {
BucketScanSource bucketScanSource = (BucketScanSource) scanSource;
for (Map<ScanNode, ScanRanges> kv : bucketScanSource.bucketIndexToScanNodeToTablets.values()) {
for (ScanRanges scanRanges : kv.values()) {
for (TScanRangeParams param : scanRanges.params) {
scanRangeNum += computeScanRangeNumByScanRange(param);
}
}
}
} else {
DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource;
for (ScanRanges scanRanges : defaultScanSource.scanNodeToScanRanges.values()) {
for (TScanRangeParams param : scanRanges.params) {
scanRangeNum += computeScanRangeNumByScanRange(param);
}
}
}
}
}
return scanRangeNum;
}

private int computeScanRangeNumByScanRange(TScanRangeParams param) {
int scanRangeNum = 0;
TScanRange scanRange = param.getScanRange();
if (scanRange == null) {
return scanRangeNum;
}
TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
if (brokerScanRange != null) {
scanRangeNum += brokerScanRange.getRanges().size();
}
TExternalScanRange externalScanRange = scanRange.getExtScanRange();
if (externalScanRange != null) {
TFileScanRange fileScanRange = externalScanRange.getFileScanRange();
if (fileScanRange != null) {
if (fileScanRange.isSetRanges()) {
scanRangeNum += fileScanRange.getRanges().size();
} else if (fileScanRange.isSetSplitSource()) {
scanRangeNum += fileScanRange.getSplitSource().getNumSplits();
}
}
}
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
if (paloScanRange != null) {
scanRangeNum += 1;
}
// TODO: more ranges?
return scanRangeNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ public PlanChecker optimize() {
return this;
}

public NereidsPlanner plan(String sql) {
StatementContext statementContext = new StatementContext(connectContext, new OriginStatement(sql, 0));
connectContext.setStatementContext(statementContext);
NereidsPlanner planner = new NereidsPlanner(statementContext);
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
LogicalPlanAdapter parsedPlanAdaptor = new LogicalPlanAdapter(parsedPlan, statementContext);
statementContext.setParsedStatement(parsedPlanAdaptor);
planner.planWithLock(parsedPlanAdaptor);
return planner;
}

public PlanChecker dpHypOptimize() {
double now = System.currentTimeMillis();
cascadesContext.getStatementContext().setDpHyp(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.TestWithFeService;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.UUID;

public class NereidsCoordinatorTest extends TestWithFeService {
@BeforeAll
public void init() throws Exception {
FeConstants.runningUnitTest = true;

createDatabase("test");
useDatabase("test");

createTable("create table tbl(id int) distributed by hash(id) buckets 10 properties('replication_num' = '1');");
}

@Test
public void testNereidsCoordinatorScanRangeNum() throws IOException {
NereidsPlanner planner = plan("select * from test.tbl");
NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance()
.createCoordinator(connectContext, null, planner, null);
int scanRangeNum = coordinator.getScanRangeNum();
Assertions.assertEquals(10, scanRangeNum);
}

@Test
public void testNereidsCoordinatorScanRangeNum2() throws IOException {
NereidsPlanner planner = plan("select * from information_schema.columns");
NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance()
.createCoordinator(connectContext, null, planner, null);
int scanRangeNum = coordinator.getScanRangeNum();
Assertions.assertEquals(0, scanRangeNum);
}

private NereidsPlanner plan(String sql) throws IOException {
ConnectContext connectContext = createDefaultCtx();
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
connectContext.setThreadLocalInfo();

UUID uuid = UUID.randomUUID();
connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
NereidsPlanner planner = PlanChecker.from(connectContext).plan(sql);
return planner;
}
}

0 comments on commit 5a4f7c6

Please sign in to comment.