From 2ede4d7de19235a727a73d8ade7b8478e38264bc Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 19 Aug 2024 11:15:01 +0800 Subject: [PATCH] [fix](join) Disable scan sharing for NAAJ (#39480) --- .../java/org/apache/doris/qe/Coordinator.java | 20 ++++++++++--------- .../org/apache/doris/qe/CoordinatorTest.java | 20 +++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 4ee46d3bec3c38..881cb7d096a042 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1876,10 +1876,11 @@ protected void computeFragmentHosts() throws Exception { if ((isColocateFragment(fragment, fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) { - computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); + computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params, + fragment.hasNullAwareLeftAntiJoin()); } else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) { bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), - parallelExecInstanceNum, params); + parallelExecInstanceNum, params, fragment.hasNullAwareLeftAntiJoin()); } else { // case A for (Entry>> entry : fragmentExecParamsMap.get( @@ -1904,7 +1905,8 @@ protected void computeFragmentHosts() throws Exception { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() + && !fragment.hasNullAwareLeftAntiJoin(); boolean ignoreStorageDataDistribution = forceToLocalShuffle || (node.isPresent() && node.get().ignoreStorageDataDistribution(context, addressToBackendID.size()) && useNereids); @@ -2072,9 +2074,9 @@ private List findOrInsert(Map> } private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } private Map getReplicaNumPerHostForOlapTable() { @@ -2689,16 +2691,16 @@ private void computeScanRangeAssignmentByBucket( } private void computeInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } } private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params, Map fragmentIdBucketSeqToScanRangeMap, Map> curFragmentIdToSeqToAddressMap, - Map> fragmentIdToScanNodeIds) { + Map> fragmentIdToScanNodeIds, boolean hasNullAwareLeftAntiJoin) { Map bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); Set scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); @@ -2732,7 +2734,7 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc * 2. Use Nereids planner. */ boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() && !hasNullAwareLeftAntiJoin; boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, addressToBackendID.size())) diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index e249b3d87d21d2..b6632a39db71d6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -123,7 +123,7 @@ public void testComputeColocateJoinInstanceParam() { Deencapsulation.setField(coordinator, "fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); // check whether one instance have 3 tablet to scan @@ -134,15 +134,15 @@ public void testComputeColocateJoinInstanceParam() { } params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -324,7 +324,7 @@ public void testColocateJoinAssignment() { PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); StringBuilder sb = new StringBuilder(); params.appendTo(sb); Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); @@ -452,19 +452,19 @@ public void testComputeBucketShuffleJoinInstanceParam() { Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -506,7 +506,7 @@ public void testBucketShuffleAssignment() { new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); StringBuilder sb = new StringBuilder(); params.appendTo(sb);