Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
be part

3

4

fix bug
  • Loading branch information
morningman committed May 27, 2022
1 parent cfe640c commit 407e224
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 79 deletions.
36 changes: 28 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
bool compact = request->has_compact() ? request->compact() : false;
st = _exec_plan_fragment(request->request(), compact);
PFragmentRequestVersion version = request->has_version() ? request->version() : PFragmentRequestVersion::VERSION_1;
st = _exec_plan_fragment(request->request(), version, compact);
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
Expand Down Expand Up @@ -201,14 +202,33 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
}
}

Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, bool compact) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, PFragmentRequestVersion version, bool compact) {
if (version == PFragmentRequestVersion::VERSION_1) {
// VERSION_1 should be removed in v1.2
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
} else if (version == PFragmentRequestVersion::VERSION_2) {
TExecPlanFragmentParamsList t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request));
}

LOG(INFO) << "cmy version 2 t_request.paramsList size: " << t_request.paramsList.size();
for (const TExecPlanFragmentParams& params : t_request.paramsList) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
}
sleep(1);
return Status::OK();
} else {
return Status::InternalError("invalid version");
}
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}

void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base,
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class PInternalServiceImpl : public PBackendService {
PHandShakeResponse* response, google::protobuf::Closure* done) override;

private:
Status _exec_plan_fragment(const std::string& s_request, bool compact);
Status _exec_plan_fragment(const std::string& s_request, PFragmentRequestVersion version, bool compact);

Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);

Expand Down
9 changes: 0 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1046,15 +1046,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_local_replica_selection_fallback = false;


/**
* The timeout of executing async remote fragment.
* In normal case, the async remote fragment will be executed in a short time. If system are under high load
* condition,try to set this timeout longer.
*/
@ConfField(mutable = true)
public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec

/**
* The number of query retries.
* A query may retry if we encounter RPC exception and no result has been sent to user.
Expand Down
115 changes: 69 additions & 46 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
Expand Down Expand Up @@ -548,14 +549,12 @@ private void sendFragment() throws TException, RpcException, UserException {
for (PlanFragment fragment : fragments) {
FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId());

// set up exec states
// 1. set up exec states
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx);
List<Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>>> futures =
Lists.newArrayList();

// update memory limit for colocate join
// 2. update memory limit for colocate join
if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
long newMemory = memoryLimit / rate;
Expand All @@ -574,7 +573,9 @@ private void sendFragment() throws TException, RpcException, UserException {
needCheckBackendState = true;
}

// 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE.
int instanceId = 0;
Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
for (TExecPlanFragmentParams tParam : tParams) {
BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++,
profileFragmentId, tParam, this.addressToBackendID);
Expand All @@ -592,18 +593,30 @@ private void sendFragment() throws TException, RpcException, UserException {
fragment.getFragmentId().asInt(), jobId);
}
}
futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));

backendIdx++;
BackendExecStates states = beToExecStates.get(execState.backend.getId());
if (states == null) {
states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress);
beToExecStates.putIfAbsent(execState.backend.getId(), states);
}
states.addState(execState);
}

// 4. send fragments
List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures =
Lists.newArrayList();
for (BackendExecStates states : beToExecStates.values()) {
futures.add(Pair.create(states, states.execRemoteFragmentsAsync()));
}

for (Pair<BackendExecState, Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
// 5. wait send fragment rpc finish
for (Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
TStatusCode code;
String errMsg = null;
Exception exception = null;
try {
InternalService.PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms,
TimeUnit.MILLISECONDS);
InternalService.PExecPlanFragmentResult result =
pair.second.get(queryOptions.query_timeout, TimeUnit.SECONDS);
code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
Expand All @@ -612,7 +625,7 @@ private void sendFragment() throws TException, RpcException, UserException {
LOG.warn("catch a execute exception", e);
exception = e;
code = TStatusCode.THRIFT_RPC_ERROR;
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr);
} catch (InterruptedException e) {
LOG.warn("catch a interrupt exception", e);
exception = e;
Expand All @@ -621,7 +634,6 @@ private void sendFragment() throws TException, RpcException, UserException {
LOG.warn("catch a timeout exception", e);
exception = e;
code = TStatusCode.TIMEOUT;
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
}

if (code != TStatusCode.OK) {
Expand All @@ -630,28 +642,26 @@ private void sendFragment() throws TException, RpcException, UserException {
}

if (errMsg == null) {
errMsg = "exec rpc error. backend id: " + pair.first.backend.getId();
errMsg = "exec rpc error. backend id: " + pair.first.beId;
}
queryStatus.setStatus(errMsg);
LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}",
errMsg, code, fragment.getFragmentId(),
pair.first.address.hostname, pair.first.address.port);
LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}", errMsg,
code, fragment.getFragmentId(), pair.first.brpcAddr);
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
+ pair.first.backend.getId() + " fragment: "
+ DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id));
throw new RpcException(pair.first.brpcAddr.hostname,
"send fragment timeout. backend id: " + pair.first.beId);
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
throw new RpcException(pair.first.backend.getHost(), "rpc failed");
SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
throw new RpcException(pair.first.brpcAddr.hostname, "rpc failed");
default:
throw new UserException(errMsg);
}
}

// succeed to send the plan fragment, update the "alreadySentBackendIds"
alreadySentBackendIds.add(pair.first.backend.getId());
alreadySentBackendIds.add(pair.first.beId);
}

profileFragmentId += 1;
Expand Down Expand Up @@ -1918,6 +1928,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr
FInstanceExecParam fi = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
this.address = fi.host;
this.backend = idToBackend.get(addressToBackendID.get(address));
this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());

String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")";
this.profile = new RuntimeProfile(name);
Expand Down Expand Up @@ -2016,15 +2027,40 @@ public boolean isBackendStateHealthy() {
return true;
}

public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
try {
brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
} catch (Exception e) {
throw new TException(e.getMessage());
}
this.initiated = true;
public FragmentInstanceInfo buildFragmentInstanceInfo() {
return new QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId())
.fragmentId(String.valueOf(fragmentId)).address(this.address).build();
}

private TUniqueId fragmentInstanceId() {
return this.rpcParams.params.getFragmentInstanceId();
}
}

/**
* A set of BackendExecState for same Backend
*/
public class BackendExecStates {
long beId;
TNetworkAddress brpcAddr;
List<BackendExecState> states = Lists.newArrayList();

public BackendExecStates(long beId, TNetworkAddress brpcAddr) {
this.beId = beId;
this.brpcAddr = brpcAddr;
}

public void addState(BackendExecState state) {
this.states.add(state);
}

public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync() throws TException {
try {
return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
for (BackendExecState state : states) {
paramsList.addToParamsList(state.rpcParams);
}
return BackendServiceProxy.getInstance().execPlanFragmentsAsync(brpcAddr, paramsList);
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
Expand All @@ -2046,13 +2082,10 @@ public boolean isDone() {

@Override
public InternalService.PExecPlanFragmentResult get() {
InternalService.PExecPlanFragmentResult result = InternalService.PExecPlanFragmentResult
.newBuilder()
.setStatus(Types.PStatus.newBuilder()
.addErrorMsgs(e.getMessage())
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue())
.build())
.build();
InternalService.PExecPlanFragmentResult result =
InternalService.PExecPlanFragmentResult.newBuilder().setStatus(
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage()).setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build())
.build();
return result;
}

Expand All @@ -2063,16 +2096,6 @@ public InternalService.PExecPlanFragmentResult get(long timeout, TimeUnit unit)
};
}
}

public FragmentInstanceInfo buildFragmentInstanceInfo() {
return new QueryStatisticsItem.FragmentInstanceInfo.Builder()
.instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address)
.build();
}

private TUniqueId fragmentInstanceId() {
return this.rpcParams.params.getFragmentInstanceId();
}
}

// execution parameters for a single fragment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeParams;
Expand Down Expand Up @@ -85,8 +86,10 @@ public void beginTransaction(TStreamLoadPutRequest request) throws UserException
txnEntry.setBackend(backend);
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PExecPlanFragmentResult> future = BackendServiceProxy.getInstance().execPlanFragmentAsync(
address, tRequest);
TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList();
paramsList.addToParamsList(tRequest);
Future<InternalService.PExecPlanFragmentResult> future =
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList);
InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import org.apache.doris.common.Config;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFoldConstantParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -92,17 +93,20 @@ private BackendServiceClient getProxy(TNetworkAddress address) {
}
}

public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
TNetworkAddress address, TExecPlanFragmentParams tRequest)
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(
TNetworkAddress address, TExecPlanFragmentParamsList paramsList)
throws TException, RpcException {
InternalService.PExecPlanFragmentRequest.Builder builder = InternalService.PExecPlanFragmentRequest.newBuilder();
InternalService.PExecPlanFragmentRequest.Builder builder =
InternalService.PExecPlanFragmentRequest.newBuilder();
if (Config.use_compact_thrift_rpc) {
builder.setRequest(ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(tRequest)));
builder.setRequest(
ByteString.copyFrom(new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList)));
builder.setCompact(true);
} else {
builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(tRequest))).build();
builder.setRequest(ByteString.copyFrom(new TSerializer().serialize(paramsList))).build();
builder.setCompact(false);
}
builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2);

final InternalService.PExecPlanFragmentRequest pRequest = builder.build();
try {
Expand Down
Loading

0 comments on commit 407e224

Please sign in to comment.