Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into change_SpinLock_to_mutex_2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Nov 12, 2024
2 parents f5790ae + 81f43a1 commit b2afe0f
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 40 deletions.
4 changes: 4 additions & 0 deletions fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ under the License.
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
<build>
<finalName>doris-fe-common</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,15 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static boolean backup_job_compressed_serialization = false;

/**
* A internal config, to indicate whether to enable the restore snapshot rpc compression.
*
* The ccr syncer will depends this config to decide whether to compress the meta and job
* info of the restore snapshot request.
*/
@ConfField(mutable = false)
public static boolean enable_restore_snapshot_rpc_compression = true;

/**
* Control the max num of tablets per backup job involved.
*/
Expand Down
48 changes: 48 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/GZIPUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class GZIPUtils {
public static boolean isGZIPCompressed(byte[] data) {
// From RFC 1952: 3.2. Members with a deflate compressed data stream (ID1 = 8, ID2 = 8)
return data.length >= 2 && data[0] == (byte) 0x1F && data[1] == (byte) 0x8B;
}

public static byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bytesStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) {
gzipStream.write(data);
}
return bytesStream.toByteArray();
}

public static byte[] decompress(byte[] data) throws IOException {
ByteArrayInputStream bytesStream = new ByteArrayInputStream(data);
try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) {
return IOUtils.toByteArray(gzipStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1575,8 +1575,7 @@ private void waitingAllSnapshotsFinished() {
return;
}

LOG.info("waiting {} replicas to make snapshot: [{}]. {}",
unfinishedSignatureToId.size(), unfinishedSignatureToId, this);
LOG.info("waiting {} replicas to make snapshot. {}", unfinishedSignatureToId.size(), this);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,11 @@ private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
* The top plan fragment will only summarize the status of the exported result set and return it to fe.
*/
private void pushDownResultFileSink(Analyzer analyzer) {
if (fragments.size() < 1) {
return;
}
if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
return;
}
if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
return;
}
if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()
|| ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
|| fragments.size() < 1
|| !(fragments.get(0).getPlanRoot() instanceof ExchangeNode)
|| !(fragments.get(0).getSink() instanceof ResultFileSink)) {
return;
}
PlanFragment topPlanFragment = fragments.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
Expand Down Expand Up @@ -208,6 +209,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -2765,8 +2767,9 @@ public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TExcep

// getSnapshotImpl
private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp)
throws UserException {
// Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type
throws UserException, IOException {
// Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name,
// snapshot_type
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
Expand Down Expand Up @@ -2811,10 +2814,22 @@ private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String c
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else {
result.setMeta(snapshot.getMeta());
result.setJobInfo(snapshot.getJobInfo());
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();

LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}",
label, snapshot.getMeta().length, snapshot.getJobInfo().length);
label, meta.length, jobInfo.length);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
result.setCompressed(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta "
+ "size {}, compressed job info size {}", label, meta.length, jobInfo.length);
}
}
result.setMeta(meta);
result.setJobInfo(jobInfo);
}

return result;
Expand Down Expand Up @@ -2928,8 +2943,27 @@ private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest reque
restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs);
}
}
RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, request.getMeta(),
request.getJobInfo());

byte[] meta = request.getMeta();
byte[] jobInfo = request.getJobInfo();
if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}",
meta.length, jobInfo.length);
}
try {
meta = GZIPUtils.decompress(meta);
jobInfo = GZIPUtils.decompress(jobInfo);
} catch (Exception e) {
LOG.warn("decompress meta and job info failed", e);
throw new UserException("decompress meta and job info failed", e);
}
} else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) {
throw new UserException("The request is compressed, but the config "
+ "`enable_restore_snapshot_rpc_compressed` is not enabled.");
}

RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo);
restoreStmt.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ public String toSQL(boolean roundByParentheses) {
}

public ColumnStatistic toColumnStatistic() {
// For non-empty table, return UNKNOWN if we can't collect ndv value.
// Because inaccurate ndv is very misleading.
if (count > 0 && ndv == 0 && count != nullCount) {
return ColumnStatistic.UNKNOWN;
}
try {
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder();
columnStatisticBuilder.setCount(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
// it will trigger load function again without cache an empty value.
return null;
}
if (columnStatistic.isPresent()) {
// For non-empty table, return UNKNOWN if we can't collect ndv value.
// Because inaccurate ndv is very misleading.
ColumnStatistic stats = columnStatistic.get();
if (stats.count > 0 && stats.ndv == 0 && stats.count != stats.numNulls) {
columnStatistic = Optional.of(ColumnStatistic.UNKNOWN);
}
}
return columnStatistic;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,8 @@ private void doPreHeat() {
long tblId = statsId.tblId;
long idxId = statsId.idxId;
String colId = statsId.colId;
final StatisticsCacheKey k =
new StatisticsCacheKey(tblId, idxId, colId);
final StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colId);
ColumnStatistic c = ColumnStatistic.fromResultRow(r);
if (c.count > 0 && c.ndv == 0 && c.count != c.numNulls) {
c = ColumnStatistic.UNKNOWN;
}
putCache(k, c);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache. reason: [{}]. Row:[{}]", t.getMessage(), r);
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1077,13 +1077,15 @@ struct TGetSnapshotRequest {
7: optional string label_name
8: optional string snapshot_name
9: optional TSnapshotType snapshot_type
10: optional bool enable_compress;
}

struct TGetSnapshotResult {
1: optional Status.TStatus status
2: optional binary meta
3: optional binary job_info
4: optional Types.TNetworkAddress master_address
5: optional bool compressed;
}

struct TTableRef {
Expand All @@ -1107,6 +1109,7 @@ struct TRestoreSnapshotRequest {
13: optional bool clean_tables
14: optional bool clean_partitions
15: optional bool atomic_restore
16: optional bool compressed;
}

struct TRestoreSnapshotResult {
Expand Down
4 changes: 1 addition & 3 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2706,9 +2706,7 @@ PARTITION `p599` VALUES IN (599)
alter_result = sql """show column stats alter_test(id)"""
assertEquals(1, alter_result.size())
alter_result = sql """show column cached stats alter_test(id)"""
assertEquals(0, alter_result.size())
alter_result = sql """show column cached stats alter_test(id)"""
assertEquals(0, alter_result.size())
assertEquals(1, alter_result.size())
sql """alter table alter_test modify column id set stats ('row_count'='100', 'ndv'='0', 'num_nulls'='100', 'data_size'='2.69975443E8', 'min_value'='1', 'max_value'='2');"""
alter_result = sql """show column stats alter_test(id)"""
assertEquals(1, alter_result.size())
Expand Down

0 comments on commit b2afe0f

Please sign in to comment.